您好,登錄后才能下訂單哦!
這篇文章主要講解了“怎么使用IBasic Bolt實現(xiàn)自動確認”,文中的講解內(nèi)容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“怎么使用IBasic Bolt實現(xiàn)自動確認”吧!
Bolt是這樣一種組件,它把元組作為輸入,然后產(chǎn)生新的元組作為輸出。實現(xiàn)一個bolt時,通常需要實現(xiàn)IRichBolt接口。Bolts對象由客戶端機器創(chuàng)建,序列化為拓撲,并提交給集群中的主機。然后集群啟動工人進程反序列化bolt,調(diào)用prepare,最后開始處理元組。
NOTE:要創(chuàng)建一個bolt對象,它通過構(gòu)造器參數(shù)初始化成員屬性,bolt被提交到集群時,這些屬性值會隨著一起序列化。
Bolts擁有如下方法:
declareOutputFields(OutputFieldsDeclarer declarer) { 為bolt聲明輸出模式 } prepare(java.util.Map stormConf, TopologyContext context, OutputCollector collector) { 僅在bolt開始處理元組之前調(diào)用 } execute(Tuple input){ 處理輸入的單個元組 } cleanup(){ 在bolt即將關(guān)閉時調(diào)用 }
下面看一個例子,在這個例子中bolt把一句話分割成單詞列表:
class SplitSentence implements IRichBolt { private OutputCollector collector; publlic void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple tuple) { String sentence = tuple.getString(0); for(String word : sentence.split(" ")) { collector.emit(new Values(word)); } } public void cleanup(){} public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); }}
正如你所看到的,這是一個很簡單的bolt。值得一提的是在這個例子里,沒有消息擔保。這就意味著,如果bolt因為某些原因丟棄了一些消息——不論是因為bolt掛了,還是因為程序故意丟棄的——生成這條消息的spout不會收到任何通知,任何其它的spouts和bolts也不會收到。
然而在許多情況下,你想確保消息在整個拓撲范圍內(nèi)都被處理過了。
正如前面所說的,Storm保證通過spout發(fā)送的每條消息會得到所有bolt的全面處理。基于設(shè)計上的考慮,這意味著你要自己決定你的bolts是否保證這一點。
拓撲是一個樹型結(jié)構(gòu),消息(元組)穿過其中一條或多條分支。樹上的每個節(jié)點都會調(diào)用ack(tuple)或fail(tuple),Storm因此知道一條消息是否失敗了,并通知那個/那些制造了這些消息的spout(s)。既然一個Storm拓撲運行在高度并行化的環(huán)境里,跟蹤始發(fā)spout實例的最好方法就是在消息元組內(nèi)包含一個始發(fā)spout引用。這一技巧稱做錨定(譯者注:原文為Anchoring)。修改一下剛剛講過的SplitSentence,使它能夠確保消息都被處理了。
class SplitSentence implenents IRichBolt { private OutputCollector collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple tuple) { String sentence = tuple.getString(0); for(String word : sentence.split(" ")) { collector.emit(tuple, new Values(word)); } collector.ack(tuple); } public void cleanup(){} public void declareOutputFields(OutputFieldsDeclarer declarer){ declar.declare(new Fields("word")); }}
錨定發(fā)生在調(diào)用collector.emit()時。正如前面提到的,Storm可以沿著元組追蹤到始發(fā)spout。collector.ack(tuple)和collector.fail(tuple)會告知spout每條消息都發(fā)生了什么。當樹上的每條消息都已被處理了,Storm就認為來自spout的元組被全面的處理了。如果一個元組沒有在設(shè)置的超時時間內(nèi)完成對消息樹的處理,就認為這個元組處理失敗。默認超時時間為30秒。
NOTE:你可以通過修改Config.TOPOLOGY_MESSAGE_TIMEOUT修改拓撲的超時時間。
當然了spout需要考慮消息的失敗情況,并相應(yīng)的重試或丟棄消息。
NOTE:你處理的每條消息要么是確認的(譯者注:collector.ack())要么是失敗的(譯者注:collector.fail())。Storm使用內(nèi)存跟蹤每個元組,所以如果你不調(diào)用這兩個方法,該任務(wù)最終將耗盡內(nèi)存。
一個bolt可以使用emit(streamId, tuple)把元組分發(fā)到多個流,其中參數(shù)streamId是一個用來標識流的字符串。然后,你可以在TopologyBuilder決定由哪個流訂閱它。
為了用bolt連接或聚合數(shù)據(jù)流,你需要借助內(nèi)存緩沖元組。為了在這一場景下確保消息完成,你不得不把流錨定到多個元組上??梢韵?strong>emit方法傳入一個元組列表來達成目的。
... List anchors = new ArrayList(); anchors.add(tuple1); anchors.add(tuple2); collector.emit(anchors, values); ...
通過這種方式,bolt在任意時刻調(diào)用ack或fail方法,都會通知消息樹,而且由于流錨定了多個元組,所有相關(guān)的spout都會收到通知。
你可能已經(jīng)注意到了,在許多情況下都需要消息確認。簡單起見,Storm提供了另一個用來實現(xiàn)bolt的接口,IBasicBolt。對于該接口的實現(xiàn)類的對象,會在執(zhí)行execute方法之后自動調(diào)用ack方法。
class SplitSentence extends BaseBasicBolt { public void execute(Tuple tuple, BasicOutputCollector collector) { String sentence = tuple.getString(0); for(String word : sentence.split(" ")) { collector.emit(new Values(word)); }} public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
NOTE:分發(fā)消息的BasicOutputCollector自動錨定到作為參數(shù)傳入的元組。
感謝各位的閱讀,以上就是“怎么使用IBasic Bolt實現(xiàn)自動確認”的內(nèi)容了,經(jīng)過本文的學習后,相信大家對怎么使用IBasic Bolt實現(xiàn)自動確認這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。