溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何理解Storm的并行度、Grouping策略以及消息可靠處理機制

發(fā)布時間:2021-11-23 10:16:22 來源:億速云 閱讀:145 作者:柒染 欄目:云計算

如何理解Storm的并行度、Grouping策略以及消息可靠處理機制,相信很多沒有經(jīng)驗的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個問題。

概念:

Workers (JVMs): 在一個節(jié)點上可以運行一個或多個獨立的JVM 進程。一個Topology可以包含一個或多個worker(并行的跑在不同的machine上), 所以worker process就是執(zhí)行一個topology的子集, 并且worker只能對應(yīng)于一個topology

Executors (threads): 在一個worker JVM進程中運行著多個Java線程。一個executor線程可以執(zhí)行一個或多個tasks。但一般默認(rèn)每個executor只執(zhí)行一個task。一個worker可以包含一個或多個executor, 每個component (spout或bolt)至少對應(yīng)于一個executor, 所以可以說executor執(zhí)行一個compenent的子集, 同時一個executor只能對應(yīng)于一個component。

Tasks(bolt/spout instances):Task就是具體的處理邏輯對象,每一個Spout和Bolt會被當(dāng)作很多task在整個集群里面執(zhí)行。每一個task對應(yīng)到一個線程,而stream grouping則是定義怎么從一堆task發(fā)射tuple到另外一堆task。你可以調(diào)用TopologyBuilder.setSpout和TopBuilder.setBolt來設(shè)置并行度 — 也就是有多少個task。

 

配置并行度

對于并發(fā)度的配置, 在storm里面可以在多個地方進行配置, 優(yōu)先級為:defaults.yaml < storm.yaml < topology-specific configuration < internal component-specific configuration < external component-specific configuration

worker processes的數(shù)目, 可以通過配置文件和代碼中配置, worker就是執(zhí)行進程, 所以考慮并發(fā)的效果, 數(shù)目至少應(yīng)該大亍machines的數(shù)目

executor的數(shù)目, component的并發(fā)線程數(shù),只能在代碼中配置(通過setBolt和setSpout的參數(shù)), 例如, setBolt("green-bolt", new GreenBolt(), 2)

tasks的數(shù)目, 可以不配置, 默認(rèn)和executor1:1, 也可以通過setNumTasks()配置

Topology的worker數(shù)通過config設(shè)置,即執(zhí)行該topology的worker(java)進程數(shù)。它可以通過 storm rebalance 命令任意調(diào)整。

Config conf = new Config();
conf.setNumWorkers(2); // use two worker processes
topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // set parallelism hint to 2
topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2).setNumTasks(4).shuffleGrouping("blue-spout"); //set tasks number to 4
topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6).shuffleGrouping("green-bolt");
StormSubmitter.submitTopology("mytopology", conf, topologyBuilder.createTopology());

如何理解Storm的并行度、Grouping策略以及消息可靠處理機制  

動態(tài)的改變并行度

Storm支持在不 restart topology 的情況下, 動態(tài)的改變(增減) worker processes 的數(shù)目和 executors 的數(shù)目, 稱為rebalancing. 通過Storm web UI,或者通過storm rebalance命令實現(xiàn):

storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
流分組策略----Stream Grouping

Stream Grouping,告訴topology如何在兩個組件之間發(fā)送tuple
定義一個topology的其中一步是定義每個bolt接收什么樣的流作為輸入。stream grouping就是用來定義一個stream應(yīng)該如果分配數(shù)據(jù)給bolts上面的多個tasks

Storm里面有7種類型的stream grouping,你也可以通過實現(xiàn)CustomStreamGrouping接口來實現(xiàn)自定義流分組
1. Shuffle Grouping
隨機分組,隨機派發(fā)stream里面的tuple,保證每個bolt task接收到的tuple數(shù)目大致相同。

2. Fields Grouping
按字段分組,比如,按"user-id"這個字段來分組,那么具有同樣"user-id"的 tuple 會被分到相同的Bolt里的一個task, 而不同的"user-id"則可能會被分配到不同的task。

3. All Grouping
廣播發(fā)送,對亍每一個tuple,所有的bolts都會收到

4. Global Grouping
全局分組,整個stream被分配到storm中的一個bolt的其中一個task。再具體一點就是分配給id值最低的那個task。

5. None Grouping
不分組,這個分組的意思是說stream不關(guān)心到底怎樣分組。目前這種分組和Shuffle grouping是一樣的效果, 有一點不同的是storm會把使用none grouping的這個bolt放到這個bolt的訂閱者同一個線程里面去執(zhí)行(如果可能的話)。

6. Direct Grouping
指向型分組, 這是一種比較特別的分組方法,用這種分組意味著消息(tuple)的發(fā)送者指定由消息接收者的哪個task處理這個消息。只有被聲明為 Direct Stream 的消息流可以聲明這種分組方法。而且這種消息tuple必須使用 emitDirect 方法來發(fā)射。消息處理者可以通過 TopologyContext 來獲取處理它的消息的task的id (OutputCollector.emit方法也會返回task的id) 

7. Local or shuffle grouping
本地或隨機分組。如果目標(biāo)bolt有一個或者多個task與源bolt的task在同一個工作進程中,tuple將會被隨機發(fā)送給這些同進程中的tasks。否則,和普通的Shuffle Grouping行為一致。

 

消息的可靠處理機制

    在storm中,可靠的信息處理機制是從spout開始的。一個提供了可靠的處理機制的spout需要記錄他發(fā)射出去的tuple,當(dāng)下游bolt處理tuple或者子tuple失敗時spout能夠重新發(fā)射。

    Storm通過調(diào)用Spout的nextTuple()發(fā)送一個tuple。為實現(xiàn)可靠的消息處理,首先要給每個發(fā)出的tuple帶上唯一的ID,并且將ID作為參數(shù)傳遞給SoputOutputCollector的emit()方法:collector.emit(new Values("value1","value2"), msgId); 給tuple指定ID告訴Storm系統(tǒng),無論處理成功還是失敗,spout都要接收tuple樹上所有節(jié)點返回的通知。如果處理成功,spout的ack()方法將會對編號是msgId的消息應(yīng)答確認(rèn);如果處理失敗或者超時,會調(diào)用fail()方法。

    bolt要實現(xiàn)可靠的信息處理機制包含兩個步驟:1.當(dāng)發(fā)射衍生的tuple時,需要錨定讀入的tuple;2.當(dāng)處理消息成功或失敗時分別確認(rèn)應(yīng)答或者報錯。

    錨定一個tuple的意思是,建立讀入tuple和衍生出的tuple之間的對應(yīng)關(guān)系,這樣下游的bolt就可以通過應(yīng)答確認(rèn)、報錯或超時來加入到tuple樹結(jié)構(gòu)中??梢酝ㄟ^調(diào)用OutputCollector的emit()的一個重載函數(shù)錨定一個或一組tuple:collector.emit(tuple, new Values(word))

    非錨定(collector.emit(new Values(word));)的tuple不會對數(shù)據(jù)流的可靠性起作用。如果一個非錨定的tuple在下游處理失敗,原始的根tuple不會重新發(fā)送。
 
    超時時間可以通過任務(wù)級參數(shù)Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS進行配置,默認(rèn)超時值為30秒。

     Storm 系統(tǒng)中有一組叫做"acker"的特殊的任務(wù),它們負(fù)責(zé)跟蹤DAG(有向無環(huán)圖)中的每個消息。acker任務(wù)保存了spout消息id到一對值的映射。第一個值就是spout的任務(wù)id,通過這個id,acker就知道消息處理完成時該通知哪個spout任務(wù)。第二個值是一個64bit的數(shù)字,我們稱之為"ack val", 它是樹中所有消息的隨機id的異或計算結(jié)果。ack val表示了整棵樹的的狀態(tài),無論這棵樹多大,只需要這個固定大小的數(shù)字就可以跟蹤整棵樹。當(dāng)消息被創(chuàng)建和被應(yīng)答的時候都會有相同的消息id發(fā)送過來做異或。

    每當(dāng)acker發(fā)現(xiàn)一棵樹的ack val值為0的時候,它就知道這棵樹已經(jīng)被完全處理了。因為消息的隨機ID是一個64bit的值,因此ack val在樹處理完之前被置為0的概率非常小。假設(shè)你每秒鐘發(fā)送一萬個消息,從概率上說,至少需要50,000,000年才會有機會發(fā)生一次錯誤。即使如此,也只有在這個消息確實處理失敗的情況下才會有數(shù)據(jù)的丟失!

 有三種方法可以去掉消息的可靠性:
1、將參數(shù)Config.TOPOLOGY_ACKERS設(shè)置為0,通過此方法,當(dāng)Spout發(fā)送一個消息的時候,它的ack方法將立刻被調(diào)用;
2、Spout發(fā)送一個消息時,不指定此消息的messageID。當(dāng)需要關(guān)閉特定消息可靠性的時候,可以使用此方法;
3、最后,如果你不在意某個消息派生出來的子孫消息的可靠性,則此消息派生出來的子消息在發(fā)送時不要做錨定,即在emit方法中不指定輸入消息。因為這些子孫消息沒有被錨定在任何tuple tree中,因此他們的失敗不會引起任何spout重新發(fā)送消息。 

看完上述內(nèi)容,你們掌握如何理解Storm的并行度、Grouping策略以及消息可靠處理機制的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI