您好,登錄后才能下訂單哦!
這篇文章主要介紹“IBatchSpout API怎么使用”,在日常操作中,相信很多人在IBatchSpout API怎么使用問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”IBatchSpout API怎么使用”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!
IBatchSpout是storm trident推出的一種可以批量發(fā)射的Spout。非事務(wù)性,基本的spout
1:Map getComponentConfiguration();定義配置,可以用backtype.storm.Config。
2:void open(Map conf, TopologyContext context); Spout的初始化方法 ,參數(shù)conf即是getComponentConfiguration定義的配置
3:Fields getOutputFields(); 聲明輸出的fields
4:void emitBatch(long batchId, TridentCollector collector); 批量發(fā)射tuple,本次的批次號(hào)為batchId
5:void ack(long batchId);批次號(hào)為batchId的數(shù)據(jù)處理成功
6: void close();
一個(gè)例子
package storm.projectA; import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import storm.trident.operation.TridentCollector; import storm.trident.spout.IBatchSpout; import backtype.storm.Config; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class MySpout implements IBatchSpout{ /** * */ private static final long serialVersionUID = 1L; private long maxBatchSize;//每批次最大的數(shù)量 private BufferedReader br;//源文件流 HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();//保存發(fā)送過(guò)的所有數(shù)據(jù),以便于重復(fù)發(fā)送 /** * @param conf 配置 * @param context */ @Override public void open(Map conf, TopologyContext context) { String filePath = (String)conf.get("filePath"); maxBatchSize = (Long)conf.get("maxBatchSize"); try { br = new BufferedReader(new FileReader(filePath)); } catch (FileNotFoundException e) { e.printStackTrace(); } } /*** spout的發(fā)送方法 * @param batchId 批次id * @param collector 批量發(fā)射器 */ @Override public void emitBatch(long batchId, TridentCollector collector) { List<List<Object>> batch = batches.get(batchId); if (batch == null) { batch = new ArrayList<List<Object>>(); for (int i = 0; i < maxBatchSize; i++) { try { String line = br.readLine(); if(line == null){ break; } batch.add(new Values(line)); } catch (IOException e) { e.printStackTrace(); } } } for(List<Object> list : batch){ collector.emit(list); } } @Override public void ack(long batchId) { batches.remove(batchId); } /** * close 方法 */ @Override public void close() { if(br!=null){ try { br.close(); } catch (IOException e) { e.printStackTrace(); } } } @Override public Map getComponentConfiguration() { Config conf = new Config(); //最大并行度 本地模式設(shè)置為1 conf.setMaxTaskParallelism(1); conf.put("filePath", "D:\\aaa.txt"); conf.put("maxBatchSize", 2); return conf; } /** * 輸出的fileds */ @Override public Fields getOutputFields() { return new Fields("sentence"); } }
到此,關(guān)于“IBatchSpout API怎么使用”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。