您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關storm中可靠性和非可靠性的示例分析,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
在Storm中,消息處理可靠性從Spout開始的。storm為了保證數(shù)據(jù)能正確的被處理, 對于spout產(chǎn)生的每一個tuple,storm都能夠進行跟蹤,這里面涉及到了ack/fail的處理, 如果一個tuple被處理成功,那么spout便會調(diào)用其ack方法,如果失敗,則會調(diào)用fail方法。而topology中處理tuple的每一個bolt都會通過OutputCollector來告知storm,當前bolt處理是否成功。
我們知道spout必須能夠追蹤它發(fā)射的所有tuples或其子tuples,并且在這些tuples處理失敗時能夠重發(fā)。那么spout如何追蹤tuple呢?storm是通過一個簡單的anchor機制來實現(xiàn)的(在下面的bolt可靠性中會講到)。
spout發(fā)射根tuple,根tuple產(chǎn)生子tuples。這就形成一個TupleTree。在這個tree中,所有的bolt都會ack或fail一個tuple,如果tree中所有的bolt都ack了經(jīng)過它的tuple,那么Spout的ack方法就會被調(diào)用,表示整個消息被處理完成。如果tree中的任何一個bolt fail一個tuple,或者整個處理過程超時,則Spout的fail方法便會被調(diào)用。
另外一點, storm只是通過ack/fail機制來告訴應用方bolt中間的處理情況, 對于成功/失敗該如何處理, 必須由應用自己來決定, 因為storm內(nèi)部也沒有保存失敗的具體數(shù)據(jù), 但是也有辦法知道失敗記錄,因為spout的ack/fail方法會附帶一個msgId對象, 我們可以在最初發(fā)射tuple的時候?qū)sgId設置為tuple, 然后在ack/fail中對該tuple進行處理。這里其實有個問題, 就是每個bolt執(zhí)行完之后要顯式的調(diào)用ack/fail,否則會出現(xiàn)tuple不釋放導致oom. 不知道storm在最初設計的時候,為什么不將bolt的ack設置為默認調(diào)用。
Storm的ISpout接口定義了三個與可靠性有關的方法:nextTuple,ack和fail。
public interface ISpout extends Serializable { void open( Map conf, TopologyContext context, SpoutOutputCollector collector); void close(); void nextTuple(); void ack(Object msgId); void fail(Object msgId); }
我們知道,當Storm的Spout發(fā)射一個Tuple后,他便會調(diào)用nextTuple()方法,在這個過程中,保證可靠性處理的第一步就是為發(fā)射出的Tuple分配一個唯一的ID,并把這個ID傳給emit()方法:
collector.emit( new Values("value1" , "value2") , msgId );
為Tuple分配一個唯一ID的目的就是為了告訴Storm,Spout希望這個Tuple產(chǎn)生的Tuple tree在處理完成或失敗后告知它,如果Tuple被處理成功,Spout的ack()方法就會被調(diào)用,相反如果處理失敗,Spout的fail()方法就會被調(diào)用,Tuple的ID也都會傳入這兩個方法中。
需要注意的是,雖然spout有可靠性機制,但這個機制是否啟用由我們控制的。IBasicBolt在emit一個tuple后自動調(diào)用ack()方法,用來實現(xiàn)比較簡單的計算,這個是不可靠的。如果是IRichBolt的話,如果想要實現(xiàn)anchor,必須自己調(diào)用ack方法,這個保證可靠性。
Bolt中的可靠性主要靠兩步來實現(xiàn):
發(fā)射衍生Tuple的同時anchor原Tuple
對各個Tuples做ack或fail處理
anchor一個Tuple就意味著在輸入Tuple和其衍生Tuple之間建立了關聯(lián),關聯(lián)之后的Tuple便加入了Tuple tree。我們可以通過如下方式anchor一個Tuple:
collector.emit( tuple, new Values( word));
如果我們發(fā)射新tuple的時候不同時發(fā)射元tuple,那么新發(fā)射的Tuple不會參與到整個可靠性機制中,它們的fail不會引起root tuple的重發(fā),我們成為unanchor:
collector.emit( new Values( word));
ack和fail一個tuple的操作方法:
this .collector.ack(tuple); this .collector.fail(tuple);
上面講過了,IBasicBolt 實現(xiàn)類不關心ack/fail, spout的ack/fail完全由后面的bolt的ack/fail來決定. 其execute方法的BasicOutputCollector參數(shù)也沒有提供ack/fail方法給你調(diào)用. 相當于忽略了該bolt的ack/fail行為。
在 IRichBolt實現(xiàn)類中, 如果OutputCollector.emit(oldTuple,newTuple)這樣調(diào)用來發(fā)射tuple(anchoring), 那么后面的bolt的ack/fail會影響spout ack/fail, 如果collector.emit(newTuple)這樣來發(fā)射tuple(在storm稱之為anchoring), 則相當于斷開了后面bolt的ack/fail對spout的影響.spout將立即根據(jù)當前bolt前面的ack/fail的情況來決定調(diào)用spout的ack/fail. 所以某個bolt后面的bolt的成功失敗對你來說不關心, 你可以直接通過這種方式來忽略.中間的某個bolt fail了, 不會影響后面的bolt執(zhí)行, 但是會立即觸發(fā)spout的fail. 相當于短路了, 后面bolt雖然也執(zhí)行了, 但是ack/fail對spout已經(jīng)無意義了. 也就是說, 只要bolt集合中的任何一個fail了, 會立即觸發(fā)spout的fail方法. 而ack方法需要所有的bolt調(diào)用為ack才能觸發(fā). 所以IBasicBolt用來做filter或者簡單的計算比較合適。
storm的可靠性是由spout和bolt共同決定的,storm利用了anchor機制來保證處理的可靠性。如果spout發(fā)射的一個tuple被完全處理,那么spout的ack方法即會被調(diào)用,如果失敗,則其fail方法便會被調(diào)用。在bolt中,通過在emit(oldTuple,newTuple)的方式來anchor一個tuple,如果處理成功,則需要調(diào)用bolt的ack方法,如果失敗,則調(diào)用其fail方法。一個tuple及其子tuple共同構成了一個tupletree,當這個tree中所有tuple在指定時間內(nèi)都完成時spout的ack才會被調(diào)用,但是當tree中任何一個tuple失敗時,spout的fail方法則會被調(diào)用。
IBasicBolt類會自動調(diào)用ack/fail方法,而IRichBolt則需要我們手動調(diào)用ack/fail方法。我們可以通過TOPOLOGY_MESSAGE_TIMEOUT_SECS參數(shù)來指定一個tuple的處理完成時間,若這個時間未被處理完成,則spout也會調(diào)用fail方法。
一個實現(xiàn)可靠性的spout:
public class ReliableSentenceSpout extends BaseRichSpout { private static final long serialVersionUID = 1L; private ConcurrentHashMap<UUID, Values> pending; private SpoutOutputCollector collector; private String[] sentences = { "my dog has fleas", "i like cold beverages" , "the dog ate my homework" , "don't have a cow man" , "i don't think i like fleas" }; private int index = 0; public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( "sentence")); } public void open( Map config, TopologyContext context, SpoutOutputCollector collector) { this. collector = collector; this. pending = new ConcurrentHashMap<UUID, Values>(); } public void nextTuple() { Values values = new Values( sentences[ index]); UUID msgId = UUID. randomUUID(); this. pending.put(msgId, values); this. collector.emit(values, msgId); index++; if ( index >= sentences. length) { index = 0; } //Utils.waitForMillis(1); } public void ack(Object msgId) { this. pending.remove(msgId); } public void fail(Object msgId) { this. collector.emit( this. pending.get(msgId), msgId); } }
一個實現(xiàn)可靠性的bolt:
public class ReliableSplitSentenceBolt extends BaseRichBolt { private OutputCollector collector; public void prepare( Map config, TopologyContext context, OutputCollector collector) { this. collector = collector; } public void execute(Tuple tuple) { String sentence = tuple.getStringByField("sentence" ); String[] words = sentence.split( " "); for (String word : words) { this. collector.emit(tuple, new Values(word)); } this. collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( "word")); } }
這個例子中我們實現(xiàn)了storm的可靠性,tuple失敗了將會重新發(fā)送,直到處理成功。這里pending是一個map,為了實現(xiàn)tuple的失敗重發(fā)。storm里面topology.max.spout.pending屬性解釋:
1.同時活躍的batch數(shù)量,你必須設置同時處理的batch數(shù)量。你可以通過”topology.max.spout.pending” 來指定, 如果你不指定,默認是1。
2.topology.max.spout.pending 的意義在于 ,緩存spout發(fā)送出去的tuple,當下流的bolt還有topology.max.spout.pending 個 tuple 沒有消費完時,spout會停下來,等待下游bolt去消費,當tuple 的個數(shù)少于topology.max.spout.pending個數(shù)時,spout 會繼續(xù)從消息源讀取消息。(這個屬性僅對可靠消息處理)。
如果使用事務,則表示同時處理的batch數(shù)量,如果非事務,則理解成第二種。
總而言之,如果不需要保證可靠性,spout繼承BaseRichSpout,bolt繼承BaseBasicBolt,它們內(nèi)部實現(xiàn)了一些方法,自動ack,我們不需要關心ack和fail;如果要保證可靠性,spout實現(xiàn)IRichSpout接口,發(fā)tuple的時候,帶上msgId,自定義fail和ack方法,bolt繼承BaseRichBolt,發(fā)送tuple的時候要帶上原tuple,要手動ack。
關于“storm中可靠性和非可靠性的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。