您好,登錄后才能下訂單哦!
這篇文章主要介紹“Storm分布式RPC怎么配置”,在日常操作中,相信很多人在Storm分布式RPC怎么配置問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Storm分布式RPC怎么配置”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!
首先需要在storm集群上把DRPC的環(huán)境準(zhǔn)備好,在storm.yaml當(dāng)中增加如下內(nèi)容
drpc.servers:
- "192.168.1.118"
之后通過storm drpc啟動分布式RPC服務(wù)。
之后,跟其他的topology并沒有什么不同,我們需要寫點代碼,我這邊直接從storm的例子當(dāng)中找了個:
public class BasicDRPCTopology {
public static class ExclaimBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String input = tuple.getString(1);
collector.emit(new Values(tuple.getValue(0), input + "!"));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "result"));
}
}
public static void main(String[] args) throws Exception {
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
builder.addBolt(new ExclaimBolt(), 3);
Config conf = new Config();
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar("DRCP-TEST", conf, builder.createRemoteTopology());
}
}
從main函數(shù)開始,簡單解釋一下:
首先new一個LinearDRPCTopologyBuilder對象,其中的參數(shù)【exclamation】就是我們在執(zhí)行rpc調(diào)用時候的方法名。
之后我們加入一個自己的bolt,并行數(shù)量為3
之后用StormSubmitter把這個topology提交上去就行了。
代碼完成之后,打一個jar包,用storm jar把topology提交到集群上。
客戶端調(diào)用,非常簡單
DRPCClient client = new DRPCClient("192.168.1.118", 3772);
String result = client.execute("exclamation", "china");
System.out.println(result);
到此為止,一個最簡單的DRPC調(diào)用的工作已經(jīng)完成了。
等等,還有點問題,LinearDRPCTopologyBuilder 這個東西是不建議使用的(我這里的版本是0.9.3)。
源碼上有這么一行:
Trident subsumes the functionality provided by this class, so it's deprecated
大概意思就是trident這個東西已經(jīng)包含了LinearDRPCTopologyBuilder 當(dāng)中的功能。
trident是什么意思?翻譯了一下,【三叉戟】,靠,看起來很牛逼的樣子。必須試試。
那么上第二份代碼:
public class TridentDRPCTopology {
public static void main(String[] args) throws Exception {
Config conf = new Config();
StormSubmitter.submitTopologyWithProgressBar("word-count", conf, buildTopology());
}
public static StormTopology buildTopology() {
TridentTopology topology = new TridentTopology();
topology.newDRPCStream("word-count").
each(new Fields("args"), new Split(), new Fields("word")).
groupBy(new Fields("word")).
aggregate(new One(), new Fields("one")).
aggregate(new Fields("one"), new Sum(), new Fields("word-count"));
return topology.build();
}
public static class Split extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String sentence = tuple.getString(0);
for (String word : sentence.split(" ")) {
collector.emit(new Values(word));
}
}
}
public static class One implements CombinerAggregator<Integer> {
@Override
public Integer init(TridentTuple tuple) {
return 1;
}
@Override
public Integer combine(Integer val1, Integer val2) {
return 1;
}
@Override
public Integer zero() {
return 1;
}
}
}
這個topology的功能要稍稍復(fù)雜一些,給出一句話,查一下一共有多少個詞,當(dāng)然了,不能重復(fù)計數(shù)。main函數(shù)當(dāng)中非常簡單,提交一個topology。而這個topology的構(gòu)建過程是在buildTopology當(dāng)中完成的。
topology.newDRPCStream("word-count").
each(new Fields("args"), new Split(), new Fields("word")). //用空格分詞
groupBy(new Fields("word")). //分組
aggregate(new One(), new Fields("one")). //給每組的數(shù)量設(shè)定為1
aggregate(new Fields("one"), new Sum(), new Fields("word-count")); //sum計算總和
這樣的方式看起來跟spark當(dāng)中對RDD的操作是有些像的。
好了,還是打包,提交。
然后是客戶端測試:
DRPCClient client = new DRPCClient("192.168.1.118", 3772);
String result = client.execute("word-count", "mywife asdf asdf asdfasdfasfweqw saaa weweew");
System.out.println(result);
到此,關(guān)于“Storm分布式RPC怎么配置”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。