溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

storm中可靠性和非可靠性的示例分析

發(fā)布時間:2021-12-10 13:49:36 來源:億速云 閱讀:110 作者:小新 欄目:云計算

這篇文章將為大家詳細講解有關storm中可靠性和非可靠性的示例分析,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。

1.Spout的可靠性保證

     在Storm中,消息處理可靠性從Spout開始的。storm為了保證數(shù)據(jù)能正確的被處理, 對于spout產(chǎn)生的每一個tuple,storm都能夠進行跟蹤,這里面涉及到了ack/fail的處理, 如果一個tuple被處理成功,那么spout便會調(diào)用其ack方法,如果失敗,則會調(diào)用fail方法。而topology中處理tuple的每一個bolt都會通過OutputCollector來告知storm,當前bolt處理是否成功。

     我們知道spout必須能夠追蹤它發(fā)射的所有tuples或其子tuples,并且在這些tuples處理失敗時能夠重發(fā)。那么spout如何追蹤tuple呢?storm是通過一個簡單的anchor機制來實現(xiàn)的(在下面的bolt可靠性中會講到)。

       spout發(fā)射根tuple,根tuple產(chǎn)生子tuples。這就形成一個TupleTree。在這個tree中,所有的bolt都會ack或fail一個tuple,如果tree中所有的bolt都ack了經(jīng)過它的tuple,那么Spout的ack方法就會被調(diào)用,表示整個消息被處理完成。如果tree中的任何一個bolt fail一個tuple,或者整個處理過程超時,則Spout的fail方法便會被調(diào)用。

     另外一點, storm只是通過ack/fail機制來告訴應用方bolt中間的處理情況, 對于成功/失敗該如何處理, 必須由應用自己來決定, 因為storm內(nèi)部也沒有保存失敗的具體數(shù)據(jù), 但是也有辦法知道失敗記錄,因為spout的ack/fail方法會附帶一個msgId對象, 我們可以在最初發(fā)射tuple的時候?qū)sgId設置為tuple, 然后在ack/fail中對該tuple進行處理。這里其實有個問題, 就是每個bolt執(zhí)行完之后要顯式的調(diào)用ack/fail,否則會出現(xiàn)tuple不釋放導致oom. 不知道storm在最初設計的時候,為什么不將bolt的ack設置為默認調(diào)用。

     Storm的ISpout接口定義了三個與可靠性有關的方法:nextTuple,ack和fail。

public interface ISpout extends Serializable {
           void open( Map conf, TopologyContext context, SpoutOutputCollector collector);
           void close();
           void nextTuple();
           void ack(Object msgId);
           void fail(Object msgId);
    }

    我們知道,當Storm的Spout發(fā)射一個Tuple后,他便會調(diào)用nextTuple()方法,在這個過程中,保證可靠性處理的第一步就是為發(fā)射出的Tuple分配一個唯一的ID,并把這個ID傳給emit()方法:

collector.emit( new Values("value1" , "value2") , msgId );

    為Tuple分配一個唯一ID的目的就是為了告訴Storm,Spout希望這個Tuple產(chǎn)生的Tuple tree在處理完成或失敗后告知它,如果Tuple被處理成功,Spout的ack()方法就會被調(diào)用,相反如果處理失敗,Spout的fail()方法就會被調(diào)用,Tuple的ID也都會傳入這兩個方法中。

     需要注意的是,雖然spout有可靠性機制,但這個機制是否啟用由我們控制的。IBasicBolt在emit一個tuple后自動調(diào)用ack()方法,用來實現(xiàn)比較簡單的計算,這個是不可靠的。如果是IRichBolt的話,如果想要實現(xiàn)anchor,必須自己調(diào)用ack方法,這個保證可靠性。

2.Bolt中的可靠性

     Bolt中的可靠性主要靠兩步來實現(xiàn):


    1. 發(fā)射衍生Tuple的同時anchor原Tuple

    2. 對各個Tuples做ack或fail處理     

     anchor一個Tuple就意味著在輸入Tuple和其衍生Tuple之間建立了關聯(lián),關聯(lián)之后的Tuple便加入了Tuple tree。我們可以通過如下方式anchor一個Tuple:

collector.emit( tuple, new Values( word));

    如果我們發(fā)射新tuple的時候不同時發(fā)射元tuple,那么新發(fā)射的Tuple不會參與到整個可靠性機制中,它們的fail不會引起root tuple的重發(fā),我們成為unanchor:

collector.emit( new Values( word));

ack和fail一個tuple的操作方法:

this .collector.ack(tuple);
this .collector.fail(tuple);

    上面講過了,IBasicBolt 實現(xiàn)類不關心ack/fail, spout的ack/fail完全由后面的bolt的ack/fail來決定. 其execute方法的BasicOutputCollector參數(shù)也沒有提供ack/fail方法給你調(diào)用. 相當于忽略了該bolt的ack/fail行為。

     在 IRichBolt實現(xiàn)類中, 如果OutputCollector.emit(oldTuple,newTuple)這樣調(diào)用來發(fā)射tuple(anchoring), 那么后面的bolt的ack/fail會影響spout ack/fail, 如果collector.emit(newTuple)這樣來發(fā)射tuple(在storm稱之為anchoring), 則相當于斷開了后面bolt的ack/fail對spout的影響.spout將立即根據(jù)當前bolt前面的ack/fail的情況來決定調(diào)用spout的ack/fail. 所以某個bolt后面的bolt的成功失敗對你來說不關心, 你可以直接通過這種方式來忽略.中間的某個bolt fail了, 不會影響后面的bolt執(zhí)行, 但是會立即觸發(fā)spout的fail. 相當于短路了, 后面bolt雖然也執(zhí)行了, 但是ack/fail對spout已經(jīng)無意義了. 也就是說, 只要bolt集合中的任何一個fail了, 會立即觸發(fā)spout的fail方法. 而ack方法需要所有的bolt調(diào)用為ack才能觸發(fā). 所以IBasicBolt用來做filter或者簡單的計算比較合適。

3.總結

    storm的可靠性是由spout和bolt共同決定的,storm利用了anchor機制來保證處理的可靠性。如果spout發(fā)射的一個tuple被完全處理,那么spout的ack方法即會被調(diào)用,如果失敗,則其fail方法便會被調(diào)用。在bolt中,通過在emit(oldTuple,newTuple)的方式來anchor一個tuple,如果處理成功,則需要調(diào)用bolt的ack方法,如果失敗,則調(diào)用其fail方法。一個tuple及其子tuple共同構成了一個tupletree,當這個tree中所有tuple在指定時間內(nèi)都完成時spout的ack才會被調(diào)用,但是當tree中任何一個tuple失敗時,spout的fail方法則會被調(diào)用。

    IBasicBolt類會自動調(diào)用ack/fail方法,而IRichBolt則需要我們手動調(diào)用ack/fail方法。我們可以通過TOPOLOGY_MESSAGE_TIMEOUT_SECS參數(shù)來指定一個tuple的處理完成時間,若這個時間未被處理完成,則spout也會調(diào)用fail方法。

4.一個可靠的WordCount例子

一個實現(xiàn)可靠性的spout: 

 public class ReliableSentenceSpout extends BaseRichSpout {
     private static final long serialVersionUID = 1L;
     private ConcurrentHashMap<UUID, Values> pending;
     private SpoutOutputCollector collector;
     private String[] sentences = { "my dog has fleas", "i like cold beverages" , "the dog ate my homework" , "don't have a cow man" , "i don't think i like fleas" };
     private int index = 0;
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
          declarer.declare( new Fields( "sentence"));
      }
     public void open( Map config, TopologyContext context, SpoutOutputCollector collector) {
           this. collector = collector;
           this. pending = new ConcurrentHashMap<UUID, Values>();
      }
     public void nextTuple() {
          Values values = new Values( sentences[ index]);
          UUID msgId = UUID. randomUUID();
           this. pending.put(msgId, values);
           this. collector.emit(values, msgId);
           index++;
           if ( index >= sentences. length) {
               index = 0;
          }
           //Utils.waitForMillis(1);
      }
     public void ack(Object msgId) {
           this. pending.remove(msgId);
      }
     public void fail(Object msgId) {
           this. collector.emit( this. pending.get(msgId), msgId);
      }
 }

一個實現(xiàn)可靠性的bolt:

public class ReliableSplitSentenceBolt extends BaseRichBolt {
     private OutputCollector collector;
     public void prepare( Map config, TopologyContext context, OutputCollector collector) {
           this. collector = collector;
      }
     public void execute(Tuple tuple) {
          String sentence = tuple.getStringByField("sentence" );
          String[] words = sentence.split( " ");
           for (String word : words) {
               this. collector.emit(tuple, new Values(word));
          }
           this. collector.ack(tuple);
      }
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
          declarer.declare( new Fields( "word"));
      }
 }

這個例子中我們實現(xiàn)了storm的可靠性,tuple失敗了將會重新發(fā)送,直到處理成功。這里pending是一個map,為了實現(xiàn)tuple的失敗重發(fā)。storm里面topology.max.spout.pending屬性解釋:
1.同時活躍的batch數(shù)量,你必須設置同時處理的batch數(shù)量。你可以通過”topology.max.spout.pending” 來指定, 如果你不指定,默認是1。
2.topology.max.spout.pending 的意義在于 ,緩存spout發(fā)送出去的tuple,當下流的bolt還有topology.max.spout.pending 個 tuple 沒有消費完時,spout會停下來,等待下游bolt去消費,當tuple 的個數(shù)少于topology.max.spout.pending個數(shù)時,spout 會繼續(xù)從消息源讀取消息。(這個屬性僅對可靠消息處理)。

如果使用事務,則表示同時處理的batch數(shù)量,如果非事務,則理解成第二種。

總而言之,如果不需要保證可靠性,spout繼承BaseRichSpout,bolt繼承BaseBasicBolt,它們內(nèi)部實現(xiàn)了一些方法,自動ack,我們不需要關心ack和fail;如果要保證可靠性,spout實現(xiàn)IRichSpout接口,發(fā)tuple的時候,帶上msgId,自定義fail和ack方法,bolt繼承BaseRichBolt,發(fā)送tuple的時候要帶上原tuple,要手動ack。

關于“storm中可靠性和非可靠性的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。

AI