溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Storm概念和工作原理的分析是怎樣的

發(fā)布時(shí)間:2021-12-03 10:35:29 來(lái)源:億速云 閱讀:103 作者:柒染 欄目:云計(jì)算

這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)?lái)有關(guān)Storm概念和工作原理的分析是怎樣的,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

Strom的結(jié)構(gòu)

Storm概念和工作原理的分析是怎樣的

Storm與傳統(tǒng)關(guān)系型數(shù)據(jù)庫(kù)
    傳統(tǒng)關(guān)系型數(shù)據(jù)庫(kù)是先存后計(jì)算,而storm則是先算后存,甚至不存
    傳統(tǒng)關(guān)系型數(shù)據(jù)庫(kù)很難部署實(shí)時(shí)計(jì)算,只能部署定時(shí)任務(wù)統(tǒng)計(jì)分析窗口數(shù)據(jù)
    關(guān)系型數(shù)據(jù)庫(kù)重視事務(wù),并發(fā)控制,相對(duì)來(lái)說(shuō)Storm比較簡(jiǎn)陋
    Storm與Hadoop,Spark等是流行的大數(shù)據(jù)方案

與Storm關(guān)系密切的語(yǔ)言:核心代碼用clojure書寫,實(shí)用程序用python開發(fā),使用java開發(fā)拓?fù)?

topology

    Storm集群中有兩種節(jié)點(diǎn),一種是控制節(jié)點(diǎn)(Nimbus節(jié)點(diǎn)),另一種是工作節(jié)點(diǎn)(Supervisor節(jié)點(diǎn))。所有Topology任務(wù)的 提交必須在Storm客戶端節(jié)點(diǎn)上進(jìn)行(需要配置 storm.yaml文件),由Nimbus節(jié)點(diǎn)分配給其他Supervisor節(jié)點(diǎn)進(jìn)行處理。 Nimbus節(jié)點(diǎn)首先將提交的Topology進(jìn)行分片,分成一個(gè)個(gè)的Task,并將Task和Supervisor相關(guān)的信息提交到 zookeeper集群上,Supervisor會(huì)去zookeeper集群上認(rèn)領(lǐng)自己的Task,通知自己的Worker進(jìn)程進(jìn)行Task的處理。

    和同樣是計(jì)算框架的MapReduce相比,MapReduce集群上運(yùn)行的是Job,而Storm集群上運(yùn)行的是Topology。但是Job在運(yùn)行結(jié)束之后會(huì)自行結(jié)束,Topology卻只能被手動(dòng)的kill掉,否則會(huì)一直運(yùn)行下去

    Storm不處理計(jì)算結(jié)果的保存,這是應(yīng)用代碼需要負(fù)責(zé)的事情,如果數(shù)據(jù)不大,你可以簡(jiǎn)單地保存在內(nèi)存里,也可以每次都更新數(shù)據(jù)庫(kù),也可以采用NoSQL存儲(chǔ)。這部分事情完全交給用戶。

    數(shù)據(jù)存儲(chǔ)之后的展現(xiàn),也是你需要自己處理的,storm UI 只提供對(duì)topology的監(jiān)控和統(tǒng)計(jì)。

    總體的Topology處理流程圖為:
Storm概念和工作原理的分析是怎樣的

 

zookeeper集群

    storm使用zookeeper來(lái)協(xié)調(diào)整個(gè)集群, 但是要注意的是storm并不用zookeeper來(lái)傳遞消息。所以zookeeper上的負(fù)載是非常低的,單個(gè)節(jié)點(diǎn)的zookeeper在大多數(shù)情況下 都已經(jīng)足夠了, 但是如果你要部署大一點(diǎn)的storm集群, 那么你需要的zookeeper也要大一點(diǎn)。關(guān)于如何部署zookeeper,可以看http://zookeeper.apache.org/doc /r3.3.3/zookeeperAdmin.html

    部署zookeeper有些需要注意的地方:
    1、對(duì)zookeeper做好監(jiān)控非常重要, zookeeper是fail-fast的系統(tǒng),只要出現(xiàn)什么錯(cuò)誤就會(huì)退出, 所以實(shí)際場(chǎng)景中要監(jiān)控,更多細(xì)節(jié)看http://zookeeper.apache.org/doc/r3.3.3 /zookeeperAdmin.html#sc_supervision
    2、實(shí)際場(chǎng)景中要配置一個(gè)cron job來(lái)壓縮zookeeper的數(shù)據(jù)和業(yè)務(wù)日志。zookeeper自己是不會(huì)去壓縮這些的,所以你如果不設(shè)置一個(gè)cron job, 那么你很快就會(huì)發(fā)現(xiàn)磁盤不夠用了,更多細(xì)節(jié)可以查看http://zookeeper.apache.org/doc/r3.3.3 /zookeeperAdmin.html#sc_maintenance

 

Component

    Storm中,Spout和Bolt都是Component。所以,Storm定義了一個(gè)名叫IComponent的總接口
    全家普如下:綠色部分是我們最常用、比較簡(jiǎn)單的部分。紅色部分是與事務(wù)相關(guān)的
Storm概念和工作原理的分析是怎樣的

 

Spout

    Spout是Stream的消息產(chǎn)生源, Spout組件的實(shí)現(xiàn)可以通過繼承BaseRichSpout類或者其他Spout類來(lái)完成,也可以通過實(shí)現(xiàn)IRichSpout接口來(lái)實(shí)現(xiàn)
public interface ISpout extends Serializable {
  void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
  void close();
  void nextTuple();
  void ack(Object msgId);
  void fail(Object msgId);
}
    open()方法 -- 初始化方法
    close() -- 在該spout將要關(guān)閉時(shí)調(diào)用。但是不保證其一定被調(diào)用,因?yàn)樵诩褐衧upervisor節(jié)點(diǎn),可以使用kill -9來(lái)殺死worker進(jìn)程。只有當(dāng)Storm是在本地模式下運(yùn)行,如果是發(fā)送停止命令,可以保證close的執(zhí)行
    ack(Object msgId) -- 成功處理tuple時(shí)回調(diào)的方法,通常情況下,此方法的實(shí)現(xiàn)是將消息隊(duì)列中的消息移除,防止消息重放
    fail(Object msgId) -- 處理tuple失敗時(shí)回調(diào)的方法,通常情況下,此方法的實(shí)現(xiàn)是將消息放回消息隊(duì)列中然后在稍后時(shí)間里重放
    nextTuple() -- 這是Spout類中最重要的一個(gè)方法。發(fā)射一個(gè)Tuple到Topology都是通過這個(gè)方法來(lái)實(shí)現(xiàn)的。調(diào)用此方法時(shí),storm向spout發(fā)出請(qǐng)求,讓spout發(fā)出元組(tuple)到輸出器(ouput collector)。這種方法應(yīng)該是非阻塞的,所以spout如果沒有元組發(fā)出,這個(gè)方法應(yīng)該返回。nextTuple、ack 和fail 都在spout任務(wù)的同一個(gè)線程中被循環(huán)調(diào)用。 當(dāng)沒有元組的發(fā)射時(shí),應(yīng)該讓nextTuple睡眠一個(gè)很短的時(shí)間(如一毫秒),以免浪費(fèi)太多的CPU。
繼承了BaseRichSpout后,不用實(shí)現(xiàn)close、 activate、 deactivate、 ack、 fail 和 getComponentConfiguration 方法,只關(guān)心最基本核心的部分。
通常情況下(Shell和事務(wù)型的除外),實(shí)現(xiàn)一個(gè)Spout,可以直接實(shí)現(xiàn)接口IRichSpout,如果不想寫多余的代碼,可以直接繼承BaseRichSpout

 

Bolt

    Bolt類接收由Spout或者其他上游Bolt類發(fā)來(lái)的Tuple,對(duì)其進(jìn)行處理。Bolt組件的實(shí)現(xiàn)可以通過繼承BasicRichBolt類或者IRichBolt接口等來(lái)完成
    prepare方法 -- 此方法和Spout中的open方法類似,在集群中一個(gè)worker中的task初始化時(shí)調(diào)用。 它提供了bolt執(zhí)行的環(huán)境
    declareOutputFields方法 -- 用于聲明當(dāng)前Bolt發(fā)送的Tuple中包含的字段(field),和Spout中類似
    cleanup方法 -- 同ISpout的close方法,在關(guān)閉前調(diào)用。同樣不保證其一定執(zhí)行。
    execute方法 -- 這是Bolt中最關(guān)鍵的一個(gè)方法,對(duì)于Tuple的處理都可以放到此方法中進(jìn)行。具體的發(fā)送是通過emit方法來(lái)完成的。execute接受一個(gè)tuple進(jìn)行處理,并用prepare方法傳入的OutputCollector的ack方法(表示成功)或fail(表示失敗)來(lái)反饋處理結(jié)果。
    Storm提供了IBasicBolt接口,其目的就是實(shí)現(xiàn)該接口的Bolt不用在代碼中提供反饋結(jié)果了,Storm內(nèi)部會(huì)自動(dòng)反饋成功。如果你確實(shí)要反饋失敗,可以拋出FailedException
    通常情況下,實(shí)現(xiàn)一個(gè)Bolt,可以實(shí)現(xiàn)IRichBolt接口或繼承BaseRichBolt,如果不想自己處理結(jié)果反饋,可以實(shí)現(xiàn) IBasicBolt接口或繼承BaseBasicBolt,它實(shí)際上相當(dāng)于自動(dòng)實(shí)現(xiàn)了collector.emit.ack(inputTuple)

 

Topology運(yùn)行流程

    (1)Storm提交后,會(huì)把代碼首先存放到Nimbus節(jié)點(diǎn)的inbox目錄下,之后,會(huì)把當(dāng)前Storm運(yùn)行的配置生成一個(gè)stormconf.ser文件放到Nimbus節(jié)點(diǎn)的stormdist目錄中,在此目錄中同時(shí)還有序列化之后的Topology代碼文件
    (2) 在設(shè)定Topology所關(guān)聯(lián)的Spouts和Bolts時(shí),可以同時(shí)設(shè)置當(dāng)前Spout和Bolt的executor數(shù)目和task數(shù)目,默認(rèn)情況下,一個(gè)Topology的task的總和是和executor的總和一致的。之后,系統(tǒng)根據(jù)worker的數(shù)目,盡量平均的分配這些task的執(zhí)行。worker在哪個(gè)supervisor節(jié)點(diǎn)上運(yùn)行是由storm本身決定的
    (3)任務(wù)分配好之后,Nimbus節(jié)點(diǎn)會(huì)將任務(wù)的信息提交到zookeeper集群,同時(shí)在zookeeper集群中會(huì)有workerbeats節(jié)點(diǎn),這里存儲(chǔ)了當(dāng)前Topology的所有worker進(jìn)程的心跳信息
    (4)Supervisor 節(jié)點(diǎn)會(huì)不斷的輪詢zookeeper集群,在zookeeper的assignments節(jié)點(diǎn)中保存了所有Topology的任務(wù)分配信息、代碼存儲(chǔ)目錄、任務(wù)之間的關(guān)聯(lián)關(guān)系等,Supervisor通過輪詢此節(jié)點(diǎn)的內(nèi)容,來(lái)領(lǐng)取自己的任務(wù),啟動(dòng)worker進(jìn)程運(yùn)行
    (5)一個(gè)Topology運(yùn)行之后,就會(huì)不斷的通過Spouts來(lái)發(fā)送Stream流,通過Bolts來(lái)不斷的處理接收到的Stream流,Stream流是無(wú)界的。
    最后一步會(huì)不間斷的執(zhí)行,除非手動(dòng)結(jié)束Topology。

 

Topology運(yùn)行方式

    在開始創(chuàng)建項(xiàng)目之前,了解Storm的操作模式(operation modes)是很重要的。 Storm有兩種運(yùn)行方式
    本地運(yùn)行的提交方式,例:
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology());
Thread.sleep(2000);
cluster.shutdown();
    分布式提交方式,例:
StormSubmitter.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology());
 
    需要注意的是,在Storm代碼編寫完成之后,需要打包成jar包放到Nimbus中運(yùn)行,打包的時(shí)候,不需要把依賴的jar都打迚去,否則如果把依賴的storm.jar包打進(jìn)去的話,運(yùn)行時(shí)會(huì)出現(xiàn)重復(fù)的配置文件錯(cuò)誤導(dǎo)致Topology無(wú)法運(yùn)行。因?yàn)門opology運(yùn)行之前,會(huì)加載本地的 storm.yaml 配置文件。

    運(yùn)行的命令如下: storm jar StormTopology.jar mainclass [args]

 

storm守護(hù)進(jìn)程的命令

    Nimbus: storm nimbus 啟動(dòng)nimbus守護(hù)進(jìn)程
    Supervisor: storm supervisor 啟動(dòng)supervisor守護(hù)迚程
    UI:storm ui 這將啟動(dòng)stormUI的守護(hù)進(jìn)程,為監(jiān)測(cè)storm集群提供一個(gè)基于web的用戶界面。
    DRPC: storm drpc 啟動(dòng)DRPC的守護(hù)進(jìn)程

 

storm管理命令

    JAR:storm jar topology_jar topology_class [arguments...]
    jar命令是用于提交一個(gè)集群拓?fù)?它運(yùn)行指定參數(shù)的topology_class中的main()方法,上傳topology_jar到nimbus,由nimbus發(fā)布到集群中。一旦提交,storm將激活拓?fù)洳㈤_始處理topology_class 中的main()方法,main()方法負(fù)責(zé)調(diào)用StormSubmitter.submitTopology()方法,并提供一個(gè)唯一的拓?fù)?集群)的名。如果一個(gè)擁有該名稱的拓?fù)湟呀?jīng)存在于集群中,jar命令將會(huì)失敗。常見的做法是在使用命令行參數(shù)來(lái)指定拓?fù)涿Q,以便拓?fù)湓谔峤坏臅r(shí)候被命名。

    KILL:storm kill topology_name [-w wait_time]
    殺死一個(gè)拓?fù)?,可以使用kill命令。它會(huì)以一種安全的方式銷毀一個(gè)拓?fù)洌紫韧S猛負(fù)?,在等待拓?fù)湎⒌臅r(shí)間段內(nèi)允許拓?fù)渫瓿僧?dāng)前的數(shù)據(jù)流。執(zhí)行kill命令時(shí)可以通過-w [等待秒數(shù)]指定拓?fù)渫S靡院蟮牡却龝r(shí)間。也可以在Storm UI 界面上實(shí)現(xiàn)同樣的功能

    Deactivate:storm deactivate topology_name
    停用拓?fù)鋾r(shí),所有已分發(fā)的元組都會(huì)得到處理,spouts的nextTuple方法將不會(huì)被調(diào)用。也可以在Storm UI 界面上實(shí)現(xiàn)同樣的功能

    Activate:storm activate topology_name
    啟動(dòng)一個(gè)停用的拓?fù)?。也可以在Storm UI 界面上實(shí)現(xiàn)同樣的功能

    Rebalance:storm rebalance topology_name [-w wait_time] [-n worker_count] [-e component_name=executer_count]...
    rebalance使你重新分配集群任務(wù)。這是個(gè)很強(qiáng)大的命令。比如,你向一個(gè)運(yùn)行中的集群增加了節(jié)點(diǎn)。rebalance命令將會(huì)停用拓?fù)?,然后在相?yīng)超時(shí)時(shí)間之后重分配worker,并重啟拓?fù)?
例:storm rebalance wordcount-topology -w 15 -n 5 -e sentence-spout=4 -e split-bolt=8

    還有其他管理命令,如:Remoteconfvalue、REPL、Classpath等

新建storm項(xiàng)目注意事項(xiàng)

    為了開發(fā)storm項(xiàng)目,你的classpath里面需要有storm的jar包。最推薦的方式是使用Maven,不使用maven的話你可以手動(dòng)把storm發(fā)行版里面的所有的jar包添加到classpath

    storm-starter項(xiàng)目使用Leiningen作為build和依賴管理工具,你可以下載這個(gè)腳本(https://raw.githubusercontent.com/technomancy/leiningen/stable/bin/lein)來(lái)安裝Leiningen, 把它加入到你的PATH, 使它可執(zhí)行。要拉取storm的所有依賴包,簡(jiǎn)單地在項(xiàng)目的根目錄執(zhí)行 lein deps 就可以了

上述就是小編為大家分享的Storm概念和工作原理的分析是怎樣的了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI