您好,登錄后才能下訂單哦!
這篇文章主要講解了“storm怎么構(gòu)建拓?fù)浯a”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“storm怎么構(gòu)建拓?fù)浯a”吧!
1. 構(gòu)建拓?fù)浯a
package demo; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; public class AreaAmtTopo { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new OrderBaseSpout(KafkaProperties.Order_topic),5); builder.setBolt("filter",new AreaFilterBolt(),5).shuffleGrouping("spout"); builder.setBolt("areabolt",new AreaAmtBolt(),2).fieldsGrouping("filter",new Fields("area_id")); builder.setBolt("rsltbolt",new AreaRsltBolt(),1).shuffleGrouping("areabolt"); } }
2.一級(jí)過濾bolt
package demo; import java.util.Map; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; //一級(jí)的過濾bolt public class AreaFilterBolt implements IBasicBolt { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("area_id","order_amt","create_time"));//tuple里面每個(gè)value的對(duì)應(yīng)name } @Override public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } @Override public void cleanup() { // TODO Auto-generated method stub } @Override public void execute(Tuple input, BasicOutputCollector collector) { //order_id,order_amt,create_time,area_id String order=input.getString(0);//取出集合values中的第一個(gè)value if(order!=null){ String orderArr[]=order.split("\\t"); collector.emit(new Values(orderArr[3],orderArr[1],DateFmt.getCountDate(orderArr[2], DateFmt.date_short)));//area_id,order_amt,create_time } } @Override public void prepare(Map arg0, TopologyContext arg1) { // TODO Auto-generated method stub } }
3.局部匯總bolt(按日期和區(qū)域和匯總)
package demo; import java.util.HashMap; import java.util.Map; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; //局部匯總 public class AreaAmtBolt implements IBasicBolt { Map<String,Double> countsMap=null; @Override public void declareOutputFields( OutputFieldsDeclarer declarer) { declarer.declare(new Fields("date_area","amt")); } @Override public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } @Override public void prepare(Map paramMap, TopologyContext paramTopologyContext) { // TODO Auto-generated method stub countsMap =new HashMap<String, Double>(); } @Override public void execute(Tuple input, BasicOutputCollector collector) { if(input!=null)//如果spout端沒數(shù)據(jù)就會(huì)發(fā)空值,所以要做判斷再往下發(fā) { String area_id=input.getString(0); Double order_amt=input.getDouble(1); String order_date=input.getStringByField("order_date"); Double count=countsMap.get(area_id+"_"+order_date); if (count==null){ count = 0.0; } count+=order_amt; countsMap.put(area_id+"_"+order_date,count); System.err.println("areaAmtBolt"+order_date+"_"+area_id+"="+count); collector.emit(new Values(area_id+"_"+order_date,count)); } } @Override public void cleanup() { countsMap.clear(); } }
4. 最終結(jié)果寫入Hbase
package demo; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; //結(jié)果定時(shí)寫入hbase的bolt public class AreaRsltBolt implements IBasicBolt { Map<String,Double> countsMap=null; long beginTime=System.currentTimeMillis(); long endTime=0L; HBaseDao dao=null; @Override public void declareOutputFields( OutputFieldsDeclarer paramOutputFieldsDeclarer) { // TODO Auto-generated method stub } @Override public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } @Override public void prepare(Map paramMap, TopologyContext paramTopologyContext) { countsMap =new HashMap<String, Double>(); dao=new HBaseDAOImp(); } @Override public void execute(Tuple input, BasicOutputCollector paramBasicOutputCollector) { String date_areaid=input.getString(0); double order_amt=input.getDouble(1); countsMap.put(date_areaid,order_amt); endTime=System.currentTimeMillis(); if (endTime-beginTime>=5*1000){ for(String key:countsMap.keySet()){ //put into hbase //2014-05-05_1,amt dao.insert("area_order","cf","order_amt",countsMap.get(key)); System.err.println("rsltBolt put hbase: key="+key+"; order_amt="+countsMap.get(key)); } beginTime=System.currentTimeMillis(); } } @Override public void cleanup() { // TODO Auto-generated method stub } }
5. DateFmt代碼
package demo; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; public class DateFmt { public static final String date_long="yyyy-MM-dd HH:mm:ss"; public static final String date_short="yyyy-MM-dd"; public static SimpleDateFormat sdf=new SimpleDateFormat(date_short); public static String getCountDate(String date,String patton){ SimpleDateFormat sdf=new SimpleDateFormat(patton); Calendar cal =Calendar.getInstance(); if (date!=null){ try { cal.setTime(sdf.parse(date)); } catch (ParseException e) { e.printStackTrace(); } } return sdf.format(cal.getTime()); } public static Date parseDate(String dateStr) throws Exception{ return sdf.parse(dateStr); } public static void main(String[] args) { System.out.println(DateFmt.getCountDate("2015-09-08 09:09:08 ", DateFmt.date_long)); } }
感謝各位的閱讀,以上就是“storm怎么構(gòu)建拓?fù)浯a”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對(duì)storm怎么構(gòu)建拓?fù)浯a這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。