博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
storm详解
阅读量:6439 次
发布时间:2019-06-23

本文共 6593 字,大约阅读时间需要 21 分钟。

1.  构建拓扑代码

package demo;import backtype.storm.topology.TopologyBuilder;import backtype.storm.tuple.Fields;public class AreaAmtTopo {    public static void main(String[] args) {    TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new OrderBaseSpout(KafkaProperties.Order_topic),5);builder.setBolt("filter",new AreaFilterBolt(),5).shuffleGrouping("spout");builder.setBolt("areabolt",new AreaAmtBolt(),2).fieldsGrouping("filter",new Fields("area_id"));builder.setBolt("rsltbolt",new AreaRsltBolt(),1).shuffleGrouping("areabolt");    }}

2.一级过滤bolt

package demo;import java.util.Map;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;//一级的过滤boltpublic class AreaFilterBolt implements IBasicBolt {    @Override    public void declareOutputFields(OutputFieldsDeclarer declarer) {        // TODO Auto-generated method stub        declarer.declare(new Fields("area_id","order_amt","create_time"));//tuple里面每个value的对应name    }    @Override    public Map
 getComponentConfiguration() {        // TODO Auto-generated method stub        return null;    }    @Override    public void cleanup() {        // TODO Auto-generated method stub    }    @Override    public void execute(Tuple input, BasicOutputCollector collector) {        //order_id,order_amt,create_time,area_id        String order=input.getString(0);//取出集合values中的第一个value        if(order!=null){                        String orderArr[]=order.split("\\t");            collector.emit(new Values(orderArr[3],orderArr[1],DateFmt.getCountDate(orderArr[2], DateFmt.date_short)));//area_id,order_amt,create_time                    }     }    @Override    public void prepare(Map arg0, TopologyContext arg1) {        // TODO Auto-generated method stub    }}

3.局部汇总bolt(按日期和区域和汇总)

package demo;import java.util.HashMap;import java.util.Map;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;//局部汇总public class AreaAmtBolt implements IBasicBolt {        Map
 countsMap=null;    @Override    public void declareOutputFields(            OutputFieldsDeclarer declarer) {                declarer.declare(new Fields("date_area","amt"));    }    @Override    public Map
 getComponentConfiguration() {        // TODO Auto-generated method stub        return null;    }    @Override    public void prepare(Map paramMap, TopologyContext paramTopologyContext) {        // TODO Auto-generated method stub         countsMap =new HashMap
();    }    @Override    public void execute(Tuple input,            BasicOutputCollector collector) {                if(input!=null)//如果spout端没数据就会发空值,所以要做判断再往下发        {        String area_id=input.getString(0);        Double order_amt=input.getDouble(1);        String  order_date=input.getStringByField("order_date");                Double count=countsMap.get(area_id+"_"+order_date);        if (count==null){            count = 0.0;            }                count+=order_amt;        countsMap.put(area_id+"_"+order_date,count);        System.err.println("areaAmtBolt"+order_date+"_"+area_id+"="+count);        collector.emit(new Values(area_id+"_"+order_date,count));        }    }    @Override    public void cleanup() {        countsMap.clear();    }}

4. 最终结果写入Hbase

package demo;import java.util.HashMap;import java.util.HashSet;import java.util.Map;import java.util.Set;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;//结果定时写入hbase的boltpublic class AreaRsltBolt implements IBasicBolt {    Map
 countsMap=null;    long beginTime=System.currentTimeMillis();    long endTime=0L;    HBaseDao dao=null;    @Override    public void declareOutputFields(            OutputFieldsDeclarer paramOutputFieldsDeclarer) {        // TODO Auto-generated method stub    }    @Override    public Map
 getComponentConfiguration() {        // TODO Auto-generated method stub        return null;    }    @Override    public void prepare(Map paramMap, TopologyContext paramTopologyContext) {         countsMap =new HashMap
();         dao=new HBaseDAOImp();    }    @Override    public void execute(Tuple input,            BasicOutputCollector paramBasicOutputCollector) {        String date_areaid=input.getString(0);        double  order_amt=input.getDouble(1);         countsMap.put(date_areaid,order_amt);        endTime=System.currentTimeMillis();        if (endTime-beginTime>=5*1000){                   for(String key:countsMap.keySet()){              //put into hbase            //2014-05-05_1,amt              dao.insert("area_order","cf","order_amt",countsMap.get(key));              System.err.println("rsltBolt put hbase: key="+key+"; order_amt="+countsMap.get(key));            }            beginTime=System.currentTimeMillis();        }            }    @Override    public void cleanup() {        // TODO Auto-generated method stub    }}

5. DateFmt代码

package demo;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Calendar;import java.util.Date;public class DateFmt {    public static final String date_long="yyyy-MM-dd HH:mm:ss";    public static final String date_short="yyyy-MM-dd";        public static SimpleDateFormat sdf=new SimpleDateFormat(date_short);    public static String getCountDate(String date,String patton){        SimpleDateFormat sdf=new SimpleDateFormat(patton);        Calendar cal =Calendar.getInstance();        if (date!=null){                        try {                cal.setTime(sdf.parse(date));            } catch (ParseException e) {                                e.printStackTrace();            }        }                return sdf.format(cal.getTime());            }        public static Date parseDate(String dateStr) throws Exception{                return sdf.parse(dateStr);    }            public static void main(String[] args) {                System.out.println(DateFmt.getCountDate("2015-09-08 09:09:08 ", DateFmt.date_long));    }}

本文出自 “” 博客,请务必保留此出处

转载地址:http://gnuwo.baihongyu.com/

你可能感兴趣的文章
linux sed命令
查看>>
浅谈当下网页设计趋势
查看>>
TCP 滑动窗口和 拥塞窗口
查看>>
VS2008调试程序时出现"XXX mutex not created."
查看>>
解决Java连接MySQL存储过程返回参数值为乱码问题
查看>>
c++ 字符检测 TCharacter
查看>>
MalformedObjectNameException: Invalid character '' in value part of property
查看>>
Hadoop格式化HDFS报错java.net.UnknownHostException: localhost.localdomain: localhost.localdomain
查看>>
android 40 Io编程
查看>>
STL之Vector(不定长数组)
查看>>
python oracle使用心得
查看>>
准备着手学习python
查看>>
OOP几大原则【转】
查看>>
ExtJs--09--javascript对象的方法的3种写法 prototype通过原型设置方法效率最好
查看>>
磁盘镜像工具Guymager
查看>>
排序算法(一)——冒泡排序及改进
查看>>
Ext江湖
查看>>
一起谈.NET技术,实战ASP.NET大规模网站架构:Web加速器
查看>>
RHEL 6.6下Python 2.6.6升级到Python 3.6.6
查看>>
linux 内核启动过程以及挂载android 根文件系统的过程 ( 转)
查看>>