溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

storm消息的可靠處理方法是什么

發(fā)布時間:2021-12-23 09:06:06 來源:億速云 閱讀:130 作者:iii 欄目:云計算

本篇內(nèi)容介紹了“storm消息的可靠處理方法是什么”的有關(guān)知識,在實(shí)際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

4.1 簡介

storm可以確保spout發(fā)送出來的每個消息都會被完整的處理。本章將會描述storm體系是如何達(dá)到這個目標(biāo)的,并將會詳述開發(fā)者應(yīng)該如何使用storm的這些機(jī)制來實(shí)現(xiàn)數(shù)據(jù)的可靠處理。

4.2 理解消息被完整處理

一個消息(tuple)從spout發(fā)送出來,可能會導(dǎo)致成百上千的消息基于此消息被創(chuàng)建。

我們來思考一下流式的“單詞統(tǒng)計”的例子:

storm任務(wù)從數(shù)據(jù)源(Kestrel queue)每次讀取一個完整的英文句子;將這個句子分解為獨(dú)立的單詞,最后,實(shí)時的輸出每個單詞以及它出現(xiàn)過的次數(shù)。

本例中,每個從spout發(fā)送出來的消息(每個英文句子)都會觸發(fā)很多的消息被創(chuàng)建,那些從句子中分隔出來的單詞就是被創(chuàng)建出來的新消息。

這些消息構(gòu)成一個樹狀結(jié)構(gòu),我們稱之為“tuple tree”,看起來如圖1所示:

storm消息的可靠處理方法是什么

圖1 示例tuple tree

在什么條件下,Storm才會認(rèn)為一個從spout發(fā)送出來的消息被完整處理呢?答案就是下面的條件同時被滿足:

  • tuple tree不再生長

  • 樹中的任何消息被標(biāo)識為“已處理”

如果在指定的時間內(nèi),一個消息衍生出來的tuple tree未被完全處理成功,則認(rèn)為此消息未被完整處理。這個超時值可以通過任務(wù)級參數(shù)Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 進(jìn)行配置,默認(rèn)超時值為30秒。

4.3 消息的生命周期

如果消息被完整處理或者未被完整處理,Storm會如何進(jìn)行接下來的操作呢?為了弄清這個問題,我們來研究一下從spout發(fā)出來的消息的生命周期。這里列出了spout應(yīng)該實(shí)現(xiàn)的接口:

storm消息的可靠處理方法是什么

首先, Storm使用spout實(shí)例的nextTuple()方法從spout請求一個消息(tuple)。 收到請求以后,spout使用open方法中提供的SpoutOutputCollector向它的輸出流發(fā)送一個或多個消息。每發(fā)送一個消息,Spout會給這個消息提供一個message ID,它將會被用來標(biāo)識這個消息。

假設(shè)我們從kestrel隊列中讀取消息,Spout會將kestrel 隊列為這個消息設(shè)置的ID作為此消息的message ID。 向SpoutOutputCollector中發(fā)送消息格式如下:

storm消息的可靠處理方法是什么

接來下,這些消息會被發(fā)送到后續(xù)業(yè)務(wù)處理的bolts, 并且Storm會跟蹤由此消息產(chǎn)生出來的新消息。當(dāng)檢測到一個消息衍生出來的tuple tree被完整處理后,Storm會調(diào)用Spout中的ack方法,并將此消息的messageID作為參數(shù)傳入。同理,如果某消息處理超時,則此消息對應(yīng)的Spout的fail方法會被調(diào)用,調(diào)用時此消息的messageID會被作為參數(shù)傳入。

注意:一個消息只會由發(fā)送它的那個spout任務(wù)來調(diào)用ack或fail。如果系統(tǒng)中某個spout由多個任務(wù)運(yùn)行,消息也只會由創(chuàng)建它的spout任務(wù)來應(yīng)答(ack或fail),決不會由其他的spout任務(wù)來應(yīng)答。

我們繼續(xù)使用從kestrel隊列中讀取消息的例子來闡述高可靠性下spout需要做些什么(假設(shè)這個spout的名字是KestrelSpout)。

我們先簡述一下kestrel消息隊列:

當(dāng)KestrelSpout從kestrel隊列中讀取一個消息,表示它“打開”了隊列中某個消息。這意味著,此消息并未從隊列中真正的刪除,而是將此消息設(shè)置為“pending”狀態(tài),它等待來自客戶端的應(yīng)答,被應(yīng)答以后,此消息才會被真正的從隊列中刪除。處于“pending”狀態(tài)的消息不會被其他的客戶端看到。另外,如果一個客戶端意外的斷開連接,則由此客戶端“打開”的所有消息都會被重新加入到隊列中。當(dāng)消息被“打開”的時候,kestrel隊列同時會為這個消息提供一個唯一的標(biāo)識。

KestrelSpout就是使用這個唯一的標(biāo)識作為這個tuple的messageID的。稍后當(dāng)ack或fail被調(diào)用的時候,KestrelSpout會把a(bǔ)ck或者fail連同messageID一起發(fā)送給kestrel隊列,kestrel會將消息從隊列中真正刪除或者將它重新放回隊列中。

4.4 可靠相關(guān)的API

為了使用Storm提供的可靠處理特性,我們需要做兩件事情:

  1. 無論何時在tuple tree中創(chuàng)建了一個新的節(jié)點(diǎn),我們需要明確的通知Storm;

  2. 當(dāng)處理完一個單獨(dú)的消息時,我們需要告訴Storm 這棵tuple tree的變化狀態(tài)。

通過上面的兩步,storm就可以檢測到一個tuple tree何時被完全處理了,并且會調(diào)用相關(guān)的ack或fail方法。Storm提供了簡單明了的方法來完成上述兩步。

為tuple tree中指定的節(jié)點(diǎn)增加一個新的節(jié)點(diǎn),我們稱之為錨定(anchoring)。錨定是在我們發(fā)送消息的同時進(jìn)行的。為了更容易說明問題,我們使用下面代碼作為例子。本示例的bolt將包含整句話的消息分解為一系列的子消息,每個子消息包含一個單詞。

storm消息的可靠處理方法是什么

每個消息都通過這種方式被錨定:把輸入消息作為emit方法的第一個參數(shù)。因為word消息被錨定在了輸入消息上,這個輸入消息是spout發(fā)送過來的tuple tree的根節(jié)點(diǎn),如果任意一個word消息處理失敗,派生這個tuple tree那個spout 消息將會被重新發(fā)送。

與此相反,我們來看看使用下面的方式emit消息時,Storm會如何處理:

storm消息的可靠處理方法是什么

如果以這種方式發(fā)送消息,將會導(dǎo)致這個消息不會被錨定。如果此tuple tree中的消息處理失敗,派生此tuple tree的根消息不會被重新發(fā)送。根據(jù)任務(wù)的容錯級別,有時候很適合發(fā)送一個非錨定的消息。

一個輸出消息可以被錨定在一個或者多個輸入消息上,這在做join或聚合的時候是很有用的。一個被多重錨定的消息處理失敗,會導(dǎo)致與之關(guān)聯(lián)的多個spout消息被重新發(fā)送。多重錨定通過在emit方法中指定多個輸入消息來實(shí)現(xiàn):

storm消息的可靠處理方法是什么

多重錨定會將被錨定的消息加到多棵tuple tree上。

注意:多重綁定可能會破壞傳統(tǒng)的樹形結(jié)構(gòu),從而構(gòu)成一個DAGs(有向無環(huán)圖),如圖2所示:

storm消息的可靠處理方法是什么

圖2 多重錨定構(gòu)成的鉆石型結(jié)構(gòu)

Storm的實(shí)現(xiàn)可以像處理樹那樣來處理DAGs。

錨定表明了如何將一個消息加入到指定的tuple tree中,高可靠處理API的接下來部分將向您描述當(dāng)處理完tuple tree中一個單獨(dú)的消息時我們該做些什么。這些是通過OutputCollector 的ack和fail方法來實(shí)現(xiàn)的?;仡^看一下例子SplitSentence,可以發(fā)現(xiàn)當(dāng)所有的word消息被發(fā)送完成后,輸入的表示句子的消息會被應(yīng)答(acked)。

每個被處理的消息必須表明成功或失?。╝cked 或者failed)。Storm是使用內(nèi)存來跟蹤每個消息的處理情況的,如果被處理的消息沒有應(yīng)答的話,遲早內(nèi)存會被耗盡!>

很多bolt遵循特定的處理流程: 讀取一個消息、發(fā)送它派生出來的子消息、在execute結(jié)尾處應(yīng)答此消息。一般的過濾器(filter)或者是簡單的處理功能都是這類的應(yīng)用。Storm有一個BasicBolt接口封裝了上述的流程。示例SplitSentence可以使用BasicBolt來重寫:

storm消息的可靠處理方法是什么

使用這種方式,代碼比之前稍微簡單了一些,但是實(shí)現(xiàn)的功能是一樣的。發(fā)送到BasicOutputCollector的消息會被自動的錨定到輸入消息,并且,當(dāng)execute執(zhí)行完畢的時候,會自動的應(yīng)答輸入消息。

很多情況下,一個消息需要延遲應(yīng)答,例如聚合或者是join。只有根據(jù)一組輸入消息得到一個結(jié)果之后,才會應(yīng)答之前所有的輸入消息。并且聚合和join大部分時候?qū)敵鱿⒍际嵌嘀劐^定。然而,這些特性不是IBasicBolt所能處理的。

4.5 高效的實(shí)現(xiàn)tuple tree

Storm 系統(tǒng)中有一組叫做“acker”的特殊的任務(wù),它們負(fù)責(zé)跟蹤DAG(有向無環(huán)圖)中的每個消息。每當(dāng)發(fā)現(xiàn)一個DAG被完全處理,它就向創(chuàng)建這個根消息的spout任務(wù)發(fā)送一個信號。拓?fù)渲衋cker任務(wù)的并行度可以通過配置參數(shù)Config.TOPOLOGY_ACKERS來設(shè)置。默認(rèn)的acker任務(wù)并行度為1,當(dāng)系統(tǒng)中有大量的消息時,應(yīng)該適當(dāng)提高acker任務(wù)的并發(fā)度。

為了理解Storm可靠性處理機(jī)制,我們從研究一個消息的生命周期和tuple tree的管理入手。當(dāng)一個消息被創(chuàng)建的時候(無論是在spout還是bolt中),系統(tǒng)都為該消息分配一個64bit的隨機(jī)值作為id。這些隨機(jī)的id是acker用來跟蹤由spout消息派生出來的tuple tree的。

每個消息都知道它所在的tuple tree對應(yīng)的根消息的id。每當(dāng)bolt新生成一個消息,對應(yīng)tuple tree中的根消息的messageId就拷貝到這個消息中。當(dāng)這個消息被應(yīng)答的時候,它就把關(guān)于tuple tree變化的信息發(fā)送給跟蹤這棵樹的acker。例如,他會告訴acker:本消息已經(jīng)處理完畢,但是我派生出了一些新的消息,幫忙跟蹤一下吧。

舉個例子,假設(shè)消息D和E是由消息C派生出來的,這里演示了消息C被應(yīng)答時,tuple tree是如何變化的。

storm消息的可靠處理方法是什么

因為在C被從樹中移除的同時D和E會被加入到tuple tree中,因此tuple tree不會被過早的認(rèn)為已完全處理。

關(guān)于Storm如何跟蹤tuple tree,我們再深入的探討一下。前面說過系統(tǒng)中可以有任意個數(shù)的acker,那么,每當(dāng)一個消息被創(chuàng)建或應(yīng)答的時候,它怎么知道應(yīng)該通知哪個acker呢?

系統(tǒng)使用一種哈希算法來根據(jù)spout消息的messageId確定由哪個acker跟蹤此消息派生出來的tuple tree。因為每個消息都知道與之對應(yīng)的根消息的messageId,因此它知道應(yīng)該與哪個acker通信。

當(dāng)spout發(fā)送一個消息的時候,它就通知對應(yīng)的acker一個新的根消息產(chǎn)生了,這時acker就會創(chuàng)建一個新的tuple tree。當(dāng)acker發(fā)現(xiàn)這棵樹被完全處理之后,他就會通知對應(yīng)的spout任務(wù)。

tuple是如何被跟蹤的呢?系統(tǒng)中有成千上萬的消息,如果為每個spout發(fā)送的消息都構(gòu)建一棵樹的話,很快內(nèi)存就會耗盡。所以,必須采用不同的策略來跟蹤每個消息。由于使用了新的跟蹤算法,Storm只需要固定的內(nèi)存(大約20字節(jié))就可以跟蹤一棵樹。這個算法是storm正確運(yùn)行的核心,也是storm最大的突破。

acker任務(wù)保存了spout消息id到一對值的映射。第一個值就是spout的任務(wù)id,通過這個id,acker就知道消息處理完成時該通知哪個spout任務(wù)。第二個值是一個64bit的數(shù)字,我們稱之為“ack val”, 它是樹中所有消息的隨機(jī)id的異或結(jié)果。ack val表示了整棵樹的的狀態(tài),無論這棵樹多大,只需要這個固定大小的數(shù)字就可以跟蹤整棵樹。當(dāng)消息被創(chuàng)建和被應(yīng)答的時候都會有相同的消息id發(fā)送過來做異或。

每當(dāng)acker發(fā)現(xiàn)一棵樹的ack val值為0的時候,它就知道這棵樹已經(jīng)被完全處理了。因為消息的隨機(jī)ID是一個64bit的值,因此ack val在樹處理完之前被置為0的概率非常小。假設(shè)你每秒鐘發(fā)送一萬個消息,從概率上說,至少需要50,000,000年才會有機(jī)會發(fā)生一次錯誤。即使如此,也只有在這個消息確實(shí)處理失敗的情況下才會有數(shù)據(jù)的丟失!

4.6 選擇合適的可靠性級別

Acker任務(wù)是輕量級的,所以在拓?fù)渲胁⒉恍枰嗟腶cker存在。可以通過Storm UI來觀察acker任務(wù)的吞吐量,如果看上去吞吐量不夠的話,說明需要添加額外的acker。

如果你并不要求每個消息必須被處理(你允許在處理過程中丟失一些信息),那么可以關(guān)閉消息的可靠處理機(jī)制,從而可以獲取較好的性能。關(guān)閉消息的可靠處理機(jī)制意味著系統(tǒng)中的消息數(shù)會減半(每個消息不需要應(yīng)答了)。另外,關(guān)閉消息的可靠處理可以減少消息的大?。ú恍枰總€tuple記錄它的根id了),從而節(jié)省帶寬。

有三種方法可以關(guān)系消息的可靠處理機(jī)制:

  • 將參數(shù)Config.TOPOLOGY_ACKERS設(shè)置為0,通過此方法,當(dāng)Spout發(fā)送一個消息的時候,它的ack方法將立刻被調(diào)用;

  • 第二個方法是Spout發(fā)送一個消息時,不指定此消息的messageID。當(dāng)需要關(guān)閉特定消息可靠性的時候,可以使用此方法;

  • 最后,如果你不在意某個消息派生出來的子孫消息的可靠性,則此消息派生出來的子消息在發(fā)送時不要做錨定,即在emit方法中不指定輸入消息。因為這些子孫消息沒有被錨定在任何tuple tree中,因此他們的失敗不會引起任何spout重新發(fā)送消息。

4.7 集群的各級容錯

到現(xiàn)在為止,大家已經(jīng)理解了Storm的可靠性機(jī)制,并且知道了如何選擇不同的可靠性級別來滿足需求。接下來我們研究一下Storm如何保證在各種情況下確保數(shù)據(jù)不丟失。

3.7.1 任務(wù)級失敗

  • 因為bolt任務(wù)crash引起的消息未被應(yīng)答。此時,acker中所有與此bolt任務(wù)關(guān)聯(lián)的消息都會因為超時而失敗,對應(yīng)spout的fail方法將被調(diào)用。

  • acker任務(wù)失敗。如果acker任務(wù)本身失敗了,它在失敗之前持有的所有消息都將會因為超時而失敗。Spout的fail方法將被調(diào)用。

  • Spout任務(wù)失敗。這種情況下,Spout任務(wù)對接的外部設(shè)備(如MQ)負(fù)責(zé)消息的完整性。例如當(dāng)客戶端異常的情況下,kestrel隊列會將處于pending狀態(tài)的所有的消息重新放回到隊列中。

4.7.2  任務(wù)槽(slot) 故障

  • worker失敗。每個worker中包含數(shù)個bolt(或spout)任務(wù)。supervisor負(fù)責(zé)監(jiān)控這些任務(wù),當(dāng)worker失敗后,supervisor會嘗試在本機(jī)重啟它。

  • supervisor失敗。supervisor是無狀態(tài)的,因此supervisor的失敗不會影響當(dāng)前正在運(yùn)行的任務(wù),只要及時的將它重新啟動即可。supervisor不是自舉的,需要外部監(jiān)控來及時重啟。

  • nimbus失敗。nimbus是無狀態(tài)的,因此nimbus的失敗不會影響當(dāng)前正在運(yùn)行的任務(wù)(nimbus失敗時,無法提交新的任務(wù)),只要及時的將它重新啟動即可。nimbus不是自舉的,需要外部監(jiān)控來及時重啟。

4.7.3.  集群節(jié)點(diǎn)(機(jī)器)故障

  • storm集群中的節(jié)點(diǎn)故障。此時nimbus會將此機(jī)器上所有正在運(yùn)行的任務(wù)轉(zhuǎn)移到其他可用的機(jī)器上運(yùn)行。

  • zookeeper集群中的節(jié)點(diǎn)故障。zookeeper保證少于半數(shù)的機(jī)器宕機(jī)仍可正常運(yùn)行,及時修復(fù)故障機(jī)器即可。

“storm消息的可靠處理方法是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI