溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何進(jìn)行Twitter Storm Stream Grouping編寫自定義分組實(shí)現(xiàn)

發(fā)布時(shí)間:2021-11-24 15:30:23 來源:億速云 閱讀:211 作者:柒染 欄目:云計(jì)算

本篇文章為大家展示了如何進(jìn)行Twitter Storm Stream Grouping編寫自定義分組實(shí)現(xiàn),內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細(xì)介紹希望你能有所收獲。

##自定義Grouping測試

Storm是支持自定義分組的,本篇文章就是探究Storm如何編寫一個(gè)自定義分組器,以及對Storm分組器如何分組數(shù)據(jù)的理解。

這是我寫的一個(gè)自定義分組,總是把數(shù)據(jù)分到第一個(gè)Task:

public class MyFirstStreamGrouping implements CustomStreamGrouping {
    private static Logger log = LoggerFactory.getLogger(MyFirstStreamGrouping.class);

    private List<Integer> tasks;

    @Override
    public void prepare(WorkerTopologyContext context, GlobalStreamId stream,
		List<Integer> targetTasks) {
	    this.tasks = targetTasks;
	    log.info(tasks.toString());
    }	
    @Override
    public List<Integer> chooseTasks(int taskId, List<Object> values) {
	    log.info(values.toString());
	    return Arrays.asList(tasks.get(0));
    }
}

從上面的代碼可以看出,該自定義分組會把數(shù)據(jù)歸并到第一個(gè)Task<code>Arrays.asList(tasks.get(0));</code>,也就是數(shù)據(jù)到達(dá)后總是被派發(fā)到第一組。

測試代碼:

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new TestWordSpout(), 2); 
//自定義分組,
builder.setBolt("exclaim1", new DefaultStringBolt(), 3)
	    .customGrouping("words", new MyFirstStreamGrouping());

和之前的測試用例一樣,Spout總是發(fā)送<code>new String[] {“nathan”, “mike”, “jackson”, “golda”, “bertels”}</code>列表的字符串。我們運(yùn)行驗(yàn)證一下:

11878 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
11943 [Thread-41-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [nathan]
11944 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
11979 [Thread-29-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike]
11980 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike
12045 [Thread-41-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson]
12045 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
12080 [Thread-29-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson]
12081 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
12145 [Thread-41-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike]
12146 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike

從這個(gè)運(yùn)行日志我們可以看出,數(shù)據(jù)總是派發(fā)到一個(gè)Blot:Thread-25-exclaim1。因?yàn)槲視r(shí)本地測試,Thread-25-exclaim1是線程名。而派發(fā)的線程是數(shù)據(jù)多個(gè)線程的。因此該測試符合預(yù)期,即總是發(fā)送到一個(gè)Task,并且這個(gè)Task也是第一個(gè)。

##理解自定義分組實(shí)現(xiàn)

自己實(shí)現(xiàn)一個(gè)自定義分組難嗎?其實(shí)如果你理解了Hadoop的Partitioner,Storm的CustomStreamGrouping和它也是一樣的道理。

Hadoop MapReduce的Map完成后會把Map的中間結(jié)果寫入磁盤,在寫磁盤前,線程首先根據(jù)數(shù)據(jù)最終要傳送到的Reducer把數(shù)據(jù)劃分成相應(yīng)的分區(qū),然后不同的分區(qū)進(jìn)入不同的Reduce。我們先來看看Hadoop是怎樣把數(shù)據(jù)怎樣分組的,這是Partitioner唯一一個(gè)方法:

public class Partitioner<K, V> {
    @Override
    public int getPartition(K key, V value, int numReduceTasks) {
        return 0;
    }
}

上面的代碼中:Map輸出的數(shù)據(jù)都會經(jīng)過getPartition()方法,用來確定下一步的分組。numReduceTasks是一個(gè)Job的Reduce數(shù)量,而返回值就是確定該條數(shù)據(jù)進(jìn)入哪個(gè)Reduce。返回值必須大于等于0,小于numReduceTasks,否則就會報(bào)錯(cuò)。返回0就意味著這條數(shù)據(jù)進(jìn)入第一個(gè)Reduce。對于隨機(jī)分組來說,這個(gè)方法可以這么實(shí)現(xiàn):

public int getPartition(K key, V value, int numReduceTasks) {
    return hash(key) % numReduceTasks;
}

其實(shí)Hadoop 默認(rèn)的Hash分組策略也正是這么實(shí)現(xiàn)的。這樣好處是,數(shù)據(jù)在整個(gè)集群基本上是負(fù)載平衡的。

搞通了Hadoop的Partitioner,我們來看看Storm的CustomStreamGrouping。

這是CustomStreamGrouping類的源碼:

public interface CustomStreamGrouping extends Serializable {

   void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks);

   List<Integer> chooseTasks(int taskId, List<Object> values); 
}

一模一樣的道理,targetTasks就是Storm運(yùn)行時(shí)告訴你,當(dāng)前有幾個(gè)目標(biāo)Task可以選擇,每一個(gè)都給編上了數(shù)字編號。而 <code> chooseTasks(int taskId, List<Object> values); </code> 就是讓你選擇,你的這條數(shù)據(jù)values,是要哪幾個(gè)目標(biāo)Task處理?

如上文文章開頭的自定義分組器實(shí)現(xiàn)的代碼,我選擇的總是讓第一個(gè)Task來處理數(shù)據(jù),<code> return Arrays.asList(tasks.get(0)); </code> 。和Hadoop不同的是,Storm允許一條數(shù)據(jù)被多個(gè)Task處理,因此返回值是List<Integer>.就是讓你來在提供的 'List<Integer> targetTasks' Task中選擇任意的幾個(gè)(必須至少是一個(gè))Task來處理數(shù)據(jù)。

上述內(nèi)容就是如何進(jìn)行Twitter Storm Stream Grouping編寫自定義分組實(shí)現(xiàn),你們學(xué)到知識或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識儲備,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI