溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Storm中怎么使用Direct Grouping分組策略

發(fā)布時間:2021-12-23 13:48:38 來源:億速云 閱讀:146 作者:iii 欄目:云計算

這篇文章主要介紹“Storm中怎么使用Direct Grouping分組策略”,在日常操作中,相信很多人在Storm中怎么使用Direct Grouping分組策略問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Storm中怎么使用Direct Grouping分組策略”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

使用 Direct Grouping 分組策略,將首字母相同的單詞發(fā)送給同一個task計數(shù)
數(shù)據(jù)源spout
 

package com.zhch.v3;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

public class SentenceSpout extends BaseRichSpout {
    private FileReader fileReader = null;
    private boolean completed = false;

    private ConcurrentHashMap<UUID, Values> pending;
    private SpoutOutputCollector collector;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("sentence"));
    }

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.collector = spoutOutputCollector;
        this.pending = new ConcurrentHashMap<UUID, Values>();

        try {
            this.fileReader = new FileReader(map.get("wordsFile").toString());
        } catch (Exception e) {
            throw new RuntimeException("Error reading file [" + map.get("wordsFile") + "]");
        }
    }

    @Override
    public void nextTuple() {
        if (completed) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
        }

        String line;
        BufferedReader reader = new BufferedReader(fileReader);
        try {
            while ((line = reader.readLine()) != null) {
                Values values = new Values(line);
                UUID msgId = UUID.randomUUID();
                this.pending.put(msgId, values);
                this.collector.emit(values, msgId);
            }
        } catch (Exception e) {
            throw new RuntimeException("Error reading tuple", e);
        } finally {
            completed = true;
        }
    }

    @Override
    public void ack(Object msgId) {
        this.pending.remove(msgId);
    }

    @Override
    public void fail(Object msgId) {
        this.collector.emit(this.pending.get(msgId), msgId);
    }
}

實現(xiàn)語句分割bolt

package com.zhch.v3;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.List;
import java.util.Map;

public class SplitSentenceBolt extends BaseRichBolt {
    private OutputCollector collector;
    private List<Integer> numCounterTasks;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        //獲取下游bolt的taskId列表
        this.numCounterTasks = topologyContext.getComponentTasks(WordCountTopology.COUNT_BOLT_ID);
    }

    @Override
    public void execute(Tuple tuple) {
        String sentence = tuple.getStringByField("sentence");
        String[] words = sentence.split(" ");
        for (String word : words) {
            Integer taskId = this.numCounterTasks.get(this.getWordCountIndex(word));
            collector.emitDirect(taskId, tuple, new Values(word));
        }
        this.collector.ack(tuple);
    }

    public Integer getWordCountIndex(String word) {
        word = word.trim().toUpperCase();
        if (word.isEmpty())
            return 0;
        else {
            //單詞首字母對下游 bolt taskId 列表長度取余
            return word.charAt(0) % numCounterTasks.size();
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word"));
    }
}

實現(xiàn)單詞計數(shù)bolt 

package com.zhch.v3;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

public class WordCountBolt extends BaseRichBolt {
    private OutputCollector collector;
    private HashMap<String, Long> counts = null;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        this.counts = new HashMap<String, Long>();
    }

    @Override
    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        Long count = this.counts.get(word);
        if (count == null) {
            count = 0L;
        }
        count++;
        this.counts.put(word, count);

        BufferedWriter writer = null;
        try {
            writer = new BufferedWriter(new FileWriter("/home/grid/stormData/result.txt"));
            Iterator<String> keys = this.counts.keySet().iterator();
            while (keys.hasNext()) {
                String w = keys.next();
                Long c = this.counts.get(w);
                writer.write(w + " : " + c);
                writer.newLine();
                writer.flush();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (writer != null) {
                try {
                    writer.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                writer = null;
            }
        }

        this.collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word", "count"));
    }
}

實現(xiàn)單詞計數(shù)topology 

package com.zhch.v3;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;

public class WordCountTopology {
    public static final String SENTENCE_SPOUT_ID = "sentence-spout";
    public static final String SPLIT_BOLT_ID = "split-bolt";
    public static final String COUNT_BOLT_ID = "count-bolt";
    public static final String TOPOLOGY_NAME = "word-count-topology-v3";

    public static void main(String[] args) throws Exception {
        SentenceSpout spout = new SentenceSpout();
        SplitSentenceBolt spiltBolt = new SplitSentenceBolt();
        WordCountBolt countBolt = new WordCountBolt();

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);
        builder.setBolt(SPLIT_BOLT_ID, spiltBolt, 2).setNumTasks(4)
                .shuffleGrouping(SENTENCE_SPOUT_ID);
        builder.setBolt(COUNT_BOLT_ID, countBolt, 2)
                .directGrouping(SPLIT_BOLT_ID); //使用 Direct Grouping 分組策略

        Config config = new Config();
        config.put("wordsFile", args[0]);

        if (args != null && args.length > 1) {
            config.setNumWorkers(2);
            //集群模式啟動
            StormSubmitter.submitTopology(args[1], config, builder.createTopology());
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
            try {
                Thread.sleep(5 * 1000);
            } catch (InterruptedException e) {
            }
            cluster.killTopology(TOPOLOGY_NAME);
            cluster.shutdown();
        }
    }
}

提交到Storm集群 

storm jar Storm02-1.0-SNAPSHOT.jar com.zhch.v3.WordCountTopology /home/grid/stormData/input.txt word-count-topology-v3

運(yùn)行結(jié)果:

[grid@hadoop5 stormData]$ cat result.txt 
second : 1
can : 1
set : 1
simple : 1
use : 2
unbounded : 1
used : 1
It : 1
Storm : 4
online : 1
cases: : 1
open : 1
Apache : 1
of : 2
over : 1
more : 1
clocked : 1
easy : 2
scalable : 1
any : 1
guarantees : 1
ETL : 1
million : 1
continuous : 1
is : 6
with : 1
it : 2
makes : 1
your : 1
a : 4
at : 1
machine : 1
analytics : 1
up : 1
and : 5
many : 1
system : 1
source : 1
what : 1
operate : 1
will : 1
computation : 2
streams : 1
[grid@hadoop6 stormData]$ cat result.txt 
to : 3
for : 2
data : 2
distributed : 2
has : 1
free : 1
programming : 1
reliably : 1
fast: : 1
processing : 2
be : 2
Hadoop : 1
did : 1
fun : 1
learning : 1
torm : 1
process : 1
RPC : 1
node : 1
processed : 2
per : 2
realtime : 3
benchmark : 1
batch : 1
doing : 1
lot : 1
language : 1
tuples : 1
fault-tolerant : 1

到此,關(guān)于“Storm中怎么使用Direct Grouping分組策略”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI