溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Storm并發(fā)度怎么設(shè)置

發(fā)布時間:2021-12-22 15:49:45 來源:億速云 閱讀:161 作者:iii 欄目:開發(fā)技術(shù)

本篇內(nèi)容介紹了“Storm并發(fā)度怎么設(shè)置”的有關(guān)知識,在實(shí)際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

  Storm架構(gòu):master/slave

  主節(jié)點(diǎn):Nimbus

  負(fù)責(zé)在集群上進(jìn)行任務(wù)(Topology)的分發(fā)與資源的調(diào)度以及監(jiān)控

  工作節(jié)點(diǎn):Supervisor

  接收到任務(wù)請求后,啟動一個或多個Worker進(jìn)程來處理任務(wù);默認(rèn)情況下,一個Supervisor最多啟動4個Worker

  工作進(jìn)程:Worker

  在Supervisor中的子進(jìn)程,存在著若干個Spout和Bolt線程,來負(fù)責(zé)Spout和Bolt組件處理任務(wù)(實(shí)際是開啟的executor線程)

  作業(yè):Topologies(死循環(huán),不會結(jié)束)

  Spout:獲取數(shù)據(jù)的組件

  Bolt:處理數(shù)據(jù)的組件

  Stream:Spout和Bolt之間數(shù)據(jù)流動的通道

  Tuple:

  1)Stream的最小組成單位,Spout向Bolt發(fā)送一次數(shù)據(jù)叫一個Tuple

  2)同一個Stream中Tuple的類型相同,不同的Stream中可能相同/不同

  3)一個key-value形式的Map

  數(shù)據(jù)流分發(fā)策略(Stream groupings):

  解決Spout和Bolt之間數(shù)據(jù)傳輸(發(fā)送Tuple元組)的問題

  1)shuffleGrouping:

  隨機(jī)派發(fā)Stream中的Tuple到Bolt中

  2)fieldsGrouping:

  根據(jù)字段的哈希值與Bolt個數(shù)進(jìn)行取模操作然后進(jìn)行分組發(fā)送,一個節(jié)點(diǎn)是一個Worker, 一個Bolt是一個task, 全部節(jié)點(diǎn)的Spout或Bolt的個數(shù)叫并發(fā)度。

  Storm并發(fā)度設(shè)置:

  1.Worker并發(fā)度:

  首先按照集群規(guī)模和集群的物理位置來設(shè)定

  一般會把Worker均分到每一個節(jié)點(diǎn)里, 一個supervisor默認(rèn)設(shè)置一個Worker

  2.Spout數(shù)量設(shè)定:

  Spout總數(shù)默認(rèn)等于Kafka(消息中間件)對應(yīng)Topic的分區(qū)數(shù),提高吞吐速度

  一般一個Worker設(shè)置一個Spout

  3.Bolt1數(shù)量設(shè)定:

  首先根據(jù)數(shù)據(jù)量和處理數(shù)據(jù)的時間來設(shè)定

  一般情況下, Bolt1的數(shù)量是Spout數(shù)量的2倍(根據(jù)項(xiàng)目進(jìn)行修改)

  4.Bolt2數(shù)量設(shè)定:

  首先根據(jù)數(shù)據(jù)量和處理數(shù)據(jù)的時間來設(shè)定,因?yàn)锽olt1傳過來的中間結(jié)果數(shù)據(jù)已經(jīng)減少很多,Bolt2的數(shù)量可以酌情減少。

  容錯機(jī)制:異或方式<相同為,不同為1>

  tupleId - 產(chǎn)生新數(shù)據(jù),會產(chǎn)生一個tupleId;

  整個過程中的tupleId按順序兩兩異或到最后

  若結(jié)果為,則數(shù)據(jù)正確,否則錯誤

  messageId - 代表整條信息,API中指定提供給程序員,long型

  rootId - 代表某條信息,提供給storm框架

  出現(xiàn)數(shù)據(jù)運(yùn)算失敗的兩種情況:

  execute(){

  1.異常(數(shù)據(jù)異常)

  2.任務(wù)運(yùn)行超時 -- 認(rèn)為處理失敗

  }

  因?yàn)閿?shù)據(jù)發(fā)送時導(dǎo)致的數(shù)據(jù)重復(fù)發(fā)送問題, 如何解決?

 ?、?

  1.比如對訂單信息做處理, 處理成功后, 把訂單信息ID存儲到Redis(set)

  2.信息發(fā)送時, 判斷是否處理過此信息

  execute(){

  if()

  else()

  }

 ?、?

  不作處理: 點(diǎn)擊流日日志分析: pv, uv

  指標(biāo)分析: 訂單人數(shù), 訂單金額

  消息的可靠性保障和acker機(jī)制: open / nextTuple / ack / fail/ close

 ?、?Spout類:

  在發(fā)送tuple時,Spout會提供一個msgId,用于在后續(xù)識別tuple;Storm會根據(jù)msgId跟蹤創(chuàng)建的tuple樹,直到某個tuple被完整處理,根據(jù)msgId調(diào)用最初發(fā)送tuple的Spout中ack()方法,檢測到超時就調(diào)用fail()方法 -- 這兩個方法的調(diào)用必須由最初創(chuàng)建這個tuple的Spout執(zhí)行;當(dāng)Spout從消息隊(duì)列(Kafka/RocketMQ)中取出一條數(shù)據(jù)時,實(shí)際上沒有被取出,而是保持一個掛起狀態(tài),等待消息完成的信號,掛起狀態(tài)的信息不會被發(fā)送到其它的消費(fèi)者;當(dāng)該消息被"取出"時,隊(duì)列會將消息體數(shù)據(jù)和一個唯一的msgId提供給客戶端,當(dāng)Spout的ack()/fail()方法被調(diào)用時,Spout根據(jù)發(fā)送的id向隊(duì)列請求將消息從隊(duì)列中移除/重新放入隊(duì)列。

 ?、?acker任務(wù):

  高效的實(shí)現(xiàn)可靠性 -- 必須顯式的在Bolt中調(diào)用定義在Spout中的ack()和fail()方法,Storm拓?fù)溆幸恍┨厥獾姆Q為"acker"的任務(wù),負(fù)責(zé)跟蹤Spout發(fā)送的tuple的DAG,當(dāng)一個acker發(fā)現(xiàn)DAG結(jié)束后,它就會給創(chuàng)建Spout tuple的Spout任務(wù)發(fā)送一條消息,讓這個任務(wù)來應(yīng)答這個消息。acker并不會直接的跟蹤tuple樹,在acker樹中存儲了一個表,用于將Spout tuple的id與一對值相映射,id為創(chuàng)建這個tuple的任務(wù)id,第二個值為一個64bit的數(shù)字(ack val),這個值是這棵樹中所有被創(chuàng)建的或者被應(yīng)答的tuple的tuple id進(jìn)行異或運(yùn)算的結(jié)果值。

 ?、?移除可靠性:

  1.將 Config.TOPOLOGY_ACKERS 設(shè)置為

  2.在SpoutOutputCollector.emit 方法中省略消息 id 來關(guān)閉 spout tuple 的跟蹤功能

  3.在發(fā)送 tuple 的時候選擇發(fā)送“非錨定”的(unanchored)tuple

“Storm并發(fā)度怎么設(shè)置”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI