溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Storm分布式RPC怎么配置

發(fā)布時間:2021-12-16 16:43:26 來源:億速云 閱讀:175 作者:iii 欄目:云計算

這篇文章主要介紹“Storm分布式RPC怎么配置”,在日常操作中,相信很多人在Storm分布式RPC怎么配置問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Storm分布式RPC怎么配置”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

首先需要在storm集群上把DRPC的環(huán)境準(zhǔn)備好,在storm.yaml當(dāng)中增加如下內(nèi)容

 drpc.servers:
  - "192.168.1.118"

之后通過storm drpc啟動分布式RPC服務(wù)。

之后,跟其他的topology并沒有什么不同,我們需要寫點代碼,我這邊直接從storm的例子當(dāng)中找了個:

public class BasicDRPCTopology {
    public static class ExclaimBolt extends BaseBasicBolt {
        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String input = tuple.getString(1);
            collector.emit(new Values(tuple.getValue(0), input + "!"));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "result"));
        }

    }

    public static void main(String[] args) throws Exception {
        LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
        builder.addBolt(new ExclaimBolt(), 3);

        Config conf = new Config();
        conf.setNumWorkers(3);
        StormSubmitter.submitTopologyWithProgressBar("DRCP-TEST", conf, builder.createRemoteTopology());
    }
}

從main函數(shù)開始,簡單解釋一下:

首先new一個LinearDRPCTopologyBuilder對象,其中的參數(shù)【exclamation】就是我們在執(zhí)行rpc調(diào)用時候的方法名。

之后我們加入一個自己的bolt,并行數(shù)量為3

之后用StormSubmitter把這個topology提交上去就行了。

代碼完成之后,打一個jar包,用storm jar把topology提交到集群上。

客戶端調(diào)用,非常簡單

        DRPCClient client = new DRPCClient("192.168.1.118", 3772);
        String result = client.execute("exclamation", "china");
        System.out.println(result);

到此為止,一個最簡單的DRPC調(diào)用的工作已經(jīng)完成了。

等等,還有點問題,LinearDRPCTopologyBuilder 這個東西是不建議使用的(我這里的版本是0.9.3)。

源碼上有這么一行:

Trident subsumes the functionality provided by this class, so it's deprecated

大概意思就是trident這個東西已經(jīng)包含了LinearDRPCTopologyBuilder 當(dāng)中的功能。

trident是什么意思?翻譯了一下,【三叉戟】,靠,看起來很牛逼的樣子。必須試試。

那么上第二份代碼:

public class TridentDRPCTopology {
    public static void main(String[] args) throws Exception {
        Config conf = new Config();
        StormSubmitter.submitTopologyWithProgressBar("word-count", conf, buildTopology());
    }

    public static StormTopology buildTopology() {
        TridentTopology topology = new TridentTopology();

        topology.newDRPCStream("word-count").
                each(new Fields("args"), new Split(), new Fields("word")).
                groupBy(new Fields("word")).
                aggregate(new One(), new Fields("one")).
                aggregate(new Fields("one"), new Sum(), new Fields("word-count"));
        return topology.build();
    }

    public static class Split extends BaseFunction {
        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            String sentence = tuple.getString(0);
            for (String word : sentence.split(" ")) {
                collector.emit(new Values(word));
            }
        }
    }

    public static class One implements CombinerAggregator<Integer> {
        @Override
        public Integer init(TridentTuple tuple) {
            return 1;
        }

        @Override
        public Integer combine(Integer val1, Integer val2) {
            return 1;
        }

        @Override
        public Integer zero() {
            return 1;
        }
    }
}

這個topology的功能要稍稍復(fù)雜一些,給出一句話,查一下一共有多少個詞,當(dāng)然了,不能重復(fù)計數(shù)。main函數(shù)當(dāng)中非常簡單,提交一個topology。而這個topology的構(gòu)建過程是在buildTopology當(dāng)中完成的。

        topology.newDRPCStream("word-count").
                each(new Fields("args"), new Split(), new Fields("word")).    //用空格分詞
                groupBy(new Fields("word")).    //分組
                aggregate(new One(), new Fields("one")).    //給每組的數(shù)量設(shè)定為1
                aggregate(new Fields("one"), new Sum(), new Fields("word-count"));    //sum計算總和

這樣的方式看起來跟spark當(dāng)中對RDD的操作是有些像的。

好了,還是打包,提交。

然后是客戶端測試:

        DRPCClient client = new DRPCClient("192.168.1.118", 3772);
        String result = client.execute("word-count", "mywife asdf asdf asdfasdfasfweqw saaa weweew");
        System.out.println(result);

到此,關(guān)于“Storm分布式RPC怎么配置”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI