溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Storm中怎么定義Blots程序

發(fā)布時(shí)間:2021-12-18 17:33:18 來源:億速云 閱讀:130 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要介紹“Storm中怎么定義Blots程序”,在日常操作中,相信很多人在Storm中怎么定義Blots程序問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對(duì)大家解答”Storm中怎么定義Blots程序”的疑惑有所幫助!接下來,請(qǐng)跟著小編一起來學(xué)習(xí)吧!

具體內(nèi)容

在Storm程序開發(fā)過程之中有兩個(gè)重要的核心概念:Spout、Blot。Spout會(huì)作為數(shù)據(jù)的發(fā)起點(diǎn),這個(gè)數(shù)據(jù)可能來源于各種地方,例如:Kafka傳遞過來的消息內(nèi)容,而每一個(gè)Spout接收到數(shù)據(jù)之后如果有需要?jiǎng)t將數(shù)據(jù)傳遞給Blot,由多個(gè)Blot進(jìn)行數(shù)據(jù)的操作處理。把每一個(gè)Blot想象為一個(gè)數(shù)據(jù)的過濾器,而最后一個(gè)Blot將作為數(shù)據(jù)的存儲(chǔ)使用,而一般的存儲(chǔ)設(shè)備往往是文件、Redis數(shù)據(jù)庫。

Storm中怎么定義Blots程序

在本次程序的處理里面,Spouts將使用隨機(jī)數(shù)產(chǎn)生相應(yīng)的年齡數(shù)據(jù),而有會(huì)有三個(gè)Blot進(jìn)行數(shù)據(jù)的處理,這些數(shù)據(jù)處理是有自己嚴(yán)格的語法要求的。如果要想實(shí)現(xiàn)Storm開發(fā),則需要將Storm所有的相關(guān)庫文件配置到ClassPath之中。如果要想開發(fā)Spout往往需要實(shí)現(xiàn)一個(gè)IRickSpout接口,但是這個(gè)接口里面的方法比較多,所以建議繼承這個(gè)接口的子類:BaseRichSpout。范例:定義InfoCreateSpouts

package cn.mldn.info.spouts;import java.util.Map;import java.util.Random;import org.apache.storm.spout.SpoutOutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;importorg.apache.storm.topology.base.BaseRichSpout;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Values;@SuppressWarnings("serial")publi cclass InfoCreateSpoutextendsBaseRichSpout{private SpoutOutputCollector collector=null;private String nameStr="aBcDefghIjklmnopQrStuvwxyz";//假設(shè)為用戶名private Random rand=new Random();
@SuppressWarnings("rawtypes")@Overridepublic void open(Mapconf,TopologyContextcontext,SpoutOutputCollectorcollector){//為Spout初始化方法,這個(gè)初始化方法只執(zhí)行一次;this.collector=collector;//接收初始化方法中的SpoutOutputCollector對(duì)象 }@OverridepublicvoidnextTuple(){ //執(zhí)行Spout程序時(shí)會(huì)自動(dòng)找到此方法,此方法為發(fā)送消息//從正常的開發(fā)角度而言,此處的數(shù)據(jù)應(yīng)該由消息系統(tǒng)傳遞過來 String nameInfo=String.valueOf(this.nameStr.charAt(rand.nextInt(nameStr.length())));intageInfo=this.rand.nextInt(150);//隨機(jī)生成一個(gè)年齡 //最終如果要進(jìn)行數(shù)據(jù)的發(fā)送,結(jié)構(gòu):name、age,所有的Spouts的數(shù)據(jù)要交給Blot完成。this.collector.emit(newValues(nameInfo,ageInfo));}@OverridepublicvoiddeclareOutputFields(OutputFieldsDeclarerdeclare){//需要定義每一個(gè)傳輸中的數(shù)據(jù)保存的名稱declare.declare(newFields("name","age"));//向后發(fā)送Tuple的時(shí)候此為信息的文字標(biāo)注}}

在整個(gè)的代碼里面,nextTuple()為系統(tǒng)自動(dòng)調(diào)用,Spout產(chǎn)生Blot所需要的數(shù)據(jù)。

定義Blots程序

在本處理流程之中需要有三個(gè)Blot,作用分別如下:

·AgeJudementBlot:判斷傳入數(shù)據(jù)的年齡是成年人(ADULT)還是年輕人(YOUNG);·NameUpperBlot:因?yàn)樾彰写髮懞托?,為了統(tǒng)一管理,信息都變?yōu)榇髮?;·FinalBlot:進(jìn)行數(shù)據(jù)的保存處理。但是需要注意的是,此時(shí)定義的只是一個(gè)個(gè)獨(dú)立的Blot,彼此之間的聯(lián)系需要通過程序來完成。范例:進(jìn)行年齡判斷的Blot實(shí)現(xiàn).

package cn.mldn.info.blots;import org.apache.storm.topology.BasicOutputCollector;importorg.apache.storm.topology.OutputFieldsDeclarer;importorg.apache.storm.topology.base.BaseBasicBolt;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values;@SuppressWarnings("serial")public class AgeJudementBlotextendsBaseBasicBolt{  @Override public void execute(Tupletuple,BasicOutputCollectorcollector){

Storm中怎么定義Blots程序

范例:處理姓名大小寫的操作

Storm中怎么定義Blots程序

范例:增加一個(gè)輸出到文件的Blot

Storm中怎么定義Blots程序

Storm中怎么定義Blots程序

這些定義的Blot一定要接收Spout傳來的數(shù)據(jù),但是這些Blot之間沒有直接的聯(lián)系,所有的關(guān)系都必須通過程序動(dòng)態(tài)配置。

編寫測試程序

Storm最大的好處是直接提供了本地的windows模擬測試,但是在配置過程里面需要配置這些bolts關(guān)系。范例:編寫測試程序

Storm中怎么定義Blots程序

到此,關(guān)于“Storm中怎么定義Blots程序”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI