溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Storm怎么改變并行度

發(fā)布時間:2021-12-23 11:44:35 來源:億速云 閱讀:120 作者:iii 欄目:云計算

這篇文章主要介紹“Storm怎么改變并行度”,在日常操作中,相信很多人在Storm怎么改變并行度問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Storm怎么改變并行度”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

package bolts;
 
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
 
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
 
public class WordNormalizer implements IRichBolt{
    private OutputCollector collector;
 
    public void cleanup(){}
 
    /**
    * The bolt will receive the line from the
    * words file and process it to Normalize this line
    *
    * The normalize will be put the words in lower case
    * and split the line to get all words in this
    */
 
    public void execute(Tuple input) {
        String sentence = input.getString(0);
        String[]words= sentence.split(" ");
        for(String word:words){
            word =word.trim();
            if(!word.isEmpty()){
                word =word.toLowerCase();
                //Emit the word
                List a =new ArrayList();
                a.add(input);
                collector.emit(a,new Values(word));
            }
        }
        // Acknowledge the tuple
        collector.ack(input);
    }
 
    public void prepare(Map stormConf,TopologyContext context,OutputCollector collector) {
        this.collector=collector;
    }
 
    /**
    * The bolt will only emit the field "word"
    */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
 
}

提示:在這個類中,每調(diào)用一次execute()方法,會發(fā)送多個元組。例如,當execute()方法收到“This is the Storm book”這個句子時,該方法會發(fā)送5個新元組。

第二個bolt,WordCounter,負責統(tǒng)計每個單詞個數(shù)。當topology結(jié)束時(cleanup()方法被調(diào)用時),顯示每個單詞的個數(shù)。

提示:第二個bolt中什么也不發(fā)送,本例中,將數(shù)據(jù)添加到一個map對象中,但是現(xiàn)實生活中,bolt可以將數(shù)據(jù)存儲到一個數(shù)據(jù)庫中。

package bolts;
 
import java.util.HashMap;
import java.util.Map;
 
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
 
public class WordCounter implements IRichBolt{
    Integer id;
    String name;
    Map<String,Integer>counters;
 
    private OutputCollector collector;
 
    /**
    * At the end of the spout (when the cluster is shutdown
    * We will show the word counters
    */
 
    @Override
 
    public void cleanup(){
        System.out.println("-- Word Counter ["+name+"-"+id+"]--");
        for(Map.Entry<String,Integer>entry: counters.entrySet()){
            System.out.println(entry.getKey()+": "+entry.getValue());
        }
    }
 
    /**
    * On each word We will count
    */
    @Override
 
    public void execute(Tuple input) {
        String str =input.getString(0);
        /**
        * If the word dosn't exist in the map we will create
        * this, if not We will add 1
        */
        if(!counters.containsKey(str)){
            counters.put(str,1);
        }else{
            Integer c =counters.get(str) +1;
            counters.put(str,c);
        }
        //Set the tuple as Acknowledge
        collector.ack(input);
    }
 
    /**
    * On create
    */
 
    @Override
 
    public void prepare(Map stormConf,TopologyContext context,OutputCollector collector) {
        this.counters=newHashMap<String,Integer>();
        this.collector=collector;
        this.name=context.getThisComponentId();
        this.id=context.getThisTaskId();
    }
 
    @Override
 
    public void declareOutputFields(OutputFieldsDeclarer declarer) {}
 
}

execute()方法使用一個映射(Map類型)采集單詞并統(tǒng)計這些單詞個數(shù)。當topology結(jié)束的時候,cleanup()方法被調(diào)用并且打印出counter映射。(這僅僅是個例子,通常情況下,當topology關(guān)閉時,你應(yīng)該使用cleanup()方法關(guān)閉活動鏈接和其他資源。)

主類

在主類中,你將創(chuàng)建topology和一個LocalCluster對象,LocalCluster對象使你可以在本地測試和調(diào)試topology。LocalCluster結(jié)合Config對象允許你嘗試不同的集群配置。例如,如果不慎使用一個全局變量或者類變量,當配置不同數(shù)量的worker測試topology的時候,你將會發(fā)現(xiàn)這個錯誤。(關(guān)于config對象在第三章會有更多介紹)

提示:所有的topology結(jié)點應(yīng)該可以在進程間沒有數(shù)據(jù)共享的情形下獨立運行(也就是說沒有全局或者類變量),因為當topology運行在一個真實的集群上時,這些進程可能運行在不同的機器上。

你將使用TopologyBuilder創(chuàng)建topology,TopologyBuilder會告訴Storm怎么安排節(jié)點順序、它們怎么交換數(shù)據(jù)。

TopologyBuilder builder =new TopologyBuilder();
builder.setSpout("word-reader",new WordReader());
builder.setBolt("word-normalizer",new WordNormalizer()).shuffleGrouping("word-reader");
builder.setBolt("word-counter",new WordCounter(),2).fieldsGrouping("word-normalizer",new Fields("word"));

本例中spout和bolt之間使用隨機分組(shuffleGrouping)連接,這種分組類型告訴Storm以隨機分布的方式從源節(jié)點往目標節(jié)點發(fā)送消息。

接著,創(chuàng)建一個包含topology配置信息的Config對象,該配置信息在運行時會與集群配置信息合并,并且通過prepare()方法發(fā)送到所有節(jié)點。

Config conf =new Config();
conf.put("wordsFile",args[0]);
conf.setDebug(false);

將wordFile屬性設(shè)置為將要被spout讀取的文件名稱(文件名在args參數(shù)中傳入),并將debug屬性設(shè)置為true,因為你在開發(fā)過程中,當debug為true時,Storm會打印節(jié)點間交換的所有消息和其他調(diào)試數(shù)據(jù),這些信息有助于理解topology是如何運行的。

前面提到,你將使用LocalCluster來運行topology。在一個產(chǎn)品環(huán)境中,topology會持續(xù)運行,但是在本例中,你僅需運行topology幾秒鐘就能看到結(jié)果。

LocalCluster cluster =new LocalCluster();
cluster.submitTopology("Getting-Started-Toplogie",conf,builder.createTopology());
Thread.sleep(1000);
cluster.shutdown();

使用createTopology和submitTopology創(chuàng)建、運行topology,睡眠兩秒(topology運行在不同的線程中),然后通過關(guān)閉集群來停止topology。

例2-3將上面代碼拼湊到一起。

例2-3.src/main/java/TopologyMain.java

import spouts.WordReader;
import bolts.WordCounter;
import bolts.WordNormalizer;
 
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
 
public class TopologyMain{
    public static void main(String[]args)throws InterruptedException{
    //Topology definition
        TopologyBuilder builder =new TopologyBuilder();
        builder.setSpout("word-reader",new WordReader());
        builder.setBolt("word-normalizer",new WordNormalizer()).shuffleGrouping("word-reader");
        builder.setBolt("word-counter",new WordCounter(),2).fieldsGrouping("word-normalizer",new Fields("word"));
 
    //Configuration
        Config conf =new Config();
        conf.put("wordsFile",args[0]);
        conf.setDebug(false);
 
    //Topology run
        conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,1);
        LocalCluster cluster =new LocalCluster();
        cluster.submitTopology("Getting-Started-Toplogie",conf,builder.createTopology());
        Thread.sleep(1000);
        cluster.shutdown();
    }
 
}

運行本項目

現(xiàn)在開始準備運行第一個topology!如果你新建一個文本文件(src/main/resources/words.txt)并且每行一個單詞,則可以通過如下命令運行這個topology:

mvn exec:java -Dexec.mainClass=”TopologyMain” -Dexec.args=”src/main/resources/words.txt”

例如,如果你使用如下words.txt文件:

Storm
test
are
great
is
an
Storm
simple
application
but
very
powerful
really
Storm
is
great

在日志中,你將會看到類似如下信息:

is: 2
application: 1
but: 1
great: 1
test: 1
simple: 1
Storm: 3
really: 1
are: 1
great: 1
an: 1
powerful: 1
very: 1

在本例中,你只使用了每個結(jié)點的一個單一實例,假如此時有一個非常大的日志文件怎么去統(tǒng)計每個單詞的個數(shù)?此時可以很方便地改系統(tǒng)中節(jié)點數(shù)量來并行工作,如創(chuàng)建WordCounter的兩個實例:

1
builder.setBolt(         "word-counter"         ,         new         WordCounter(),         2         ).shuffleGrouping(         "word-normalizer"         );

重新運行這個程序,你將看到:

– Word Counter [word-counter-2] –
application: 1
is: 1
great: 1
are: 1
powerful: 1
Storm: 3
– Word Counter [word-counter-3] –
really: 1
is: 1
but: 1
great: 1
test: 1
simple: 1
an: 1
very: 1

太棒了!改變并行度,so easy(當然,在實際生活中,每個實例運行在不同的機器中)。但仔細一看似乎還有點問題:“is”和“great”這兩個單詞在每個WordCounter實例中都被計算了一次。Why?當使用隨機分組(shuffleGrouping)時,Storm以隨機分布的方式向每個bolt實例發(fā)送每條消息。在這個例子中,將相同的單詞發(fā)送到同一個WordCounter實例是更理想的。為了實現(xiàn)這個,你可以將shuffleGrounping(“word-normalizer”)改成fieldsGrouping(“word-normalizer”,new Fields(“word”))。嘗試一下并重新運行本程序來確認結(jié)果。

到此,關(guān)于“Storm怎么改變并行度”的學習就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI