溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Storm Trident的詳細介紹

發(fā)布時間:2021-08-19 20:33:26 來源:億速云 閱讀:129 作者:chen 欄目:云計算

本篇內(nèi)容主要講解“Storm Trident的詳細介紹”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Storm Trident的詳細介紹”吧!

一、概要
1.1 Storm(簡介)
     Storm是一個實時的可靠地分布式流計算框架。
     具體就不多說了,舉個例子,它的一個典型的大數(shù)據(jù)實時計算應用場景:從Kafka消息隊列讀取消息(可以是logs,clicks,sensor data);通過Storm對消息進行計算聚合等預處理;把處理結(jié)果持久化到NoSQL數(shù)據(jù)庫或者HDFS做進一步深入分析。
1.2 Trident(簡介)
     Trident是對Storm的更高一層的抽象,除了提供一套簡單易用的流數(shù)據(jù)處理API之外,它以batch(一組tuples)為單位進行處理,這樣一來,可以使得一些處理更簡單和高效。
     我們知道把Bolt的運行狀態(tài)僅僅保存在內(nèi)存中是不可靠的,如果一個node掛掉,那么這個node上的任務就會被重新分配,但是之前的狀態(tài)是無法恢復的。因此,比較聰明的方式就是把storm的計算狀態(tài)信息持久化到database中,基于這一點,trident就變得尤為重要。因為在處理大數(shù)據(jù)時,我們在與database打交道時通常會采用批處理的方式來避免給它帶來壓力,而trident恰恰是以 batch groups 的形式處理數(shù)據(jù),并提供了一些聚合功能的API。

二、Trident API 實踐
     Trident其實就是一套API,但現(xiàn)階段網(wǎng)上關(guān)于Trident API中各個函數(shù)的用法含義資料不多,下面我就根據(jù)一些英文資料和自己的理解,詳細介紹一下Trident API各個函數(shù)的用法和含義。
2.1 each() 方法
     作用:操作batch中的每一個tuple內(nèi)容,一般與Filter或者Function函數(shù)配合使用。
     下面通過一個例子來介紹each()方法,假設(shè)我們有一個FakeTweetsBatchSpout,它會模擬一個Stream,隨機產(chǎn)生一個個消息。我們可以通過設(shè)置這個Spout類的構(gòu)造參數(shù)來改變這個Spout的batch Size的大小。
2.1.1 Filter類:過濾tuple
     一個通過actor字段過濾消息的Filter:

public static class PerActorTweetsFilter extends BaseFilter {
    String actor;
    public PerActorTweetsFilter(String actor) {
        this.actor = actor;
    }
    @Override
    public boolean isKeep(TridentTuple tuple) {
        return tuple.getString(0).equals(actor);
    }
}

   Topology:   

topology.newStream("spout", spout)
	.each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
	.each(new Fields("actor", "text"), new Utils.PrintFilter());

     從上面例子看到,each()方法有一些構(gòu)造參數(shù): 
第一個構(gòu)造參數(shù):作為Field Selector,一個tuple可能有很多字段,通過設(shè)置Field,我們可以隱藏其它字段,僅僅接收指定的字段(其它字段實際還在)。 

第二個是一個Filter:用來過濾掉除actor名叫"dave"外的其它消息。

2.1.2 Function類:加工處理tuple內(nèi)容
     一個能把tuple中text內(nèi)容變成大寫的Function:

public static class UppercaseFunction extends BaseFunction {
    @Override
    public void execute(TridentTuple tuple, TridentCollector collector) {
        collector.emit(new Values(tuple.getString(0).toUpperCase()));
    }
}

     Topology: 

topology.newStream("spout", spout)
	.each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
	.each(new Fields("text", "actor"), new UppercaseFunction(), new Fields("uppercased_text"))
	.each(new Fields("actor", "text", "uppercased_text"), new Utils.PrintFilter());

    首先,UppercaseFunction函數(shù)的輸入是Fields("text", "actor"),其作用是把其中的"text"字段內(nèi)容都變成大寫。 

  其次,它比Filter多出一個輸出字段,作用是每個tuple在經(jīng)過這個Function函數(shù)處理后,輸出字段都會被追加到tuple后面,在本例中,執(zhí)行完Function之后的tuple內(nèi)容多了一個"uppercased_text",并且這個字段排在最后面。

2.1.3 Field Selector與project
   我們需要注意的是,上面每個each()方法的第一個Field字段僅僅是隱藏掉沒有指定的字段內(nèi)容,實際上被隱藏的字段依然還在tuple中,如果想要徹底丟掉它們,我們就需要用到project()方法。
   投影操作作用是僅保留Stream指定字段的數(shù)據(jù),比如有一個Stream包含如下字段: [“a”, “b”, “c”, “d”],運行如下代碼:
mystream.project(new Fields("b", "d"))  則輸出的流僅包含 [“b”, “d”]字段。

2.2 parallelismHint()方法和partitionBy()
2.2.1 parallelismHint()
     指定Topology的并行度,即用多少線程執(zhí)行這個任務。我們可以稍微改一下我們的Filter,通過打印當前任務的partitionIndex來區(qū)分當前是哪個線程。
Filter:

public static class PerActorTweetsFilter extends BaseFilter {
    private int partitionIndex;
    private String actor;

    public PerActorTweetsFilter(String actor) {
        this.actor = actor;
    }

    @Override
    public void prepare(Map conf, TridentOperationContext context) {
        this.partitionIndex = context.getPartitionIndex();
    }

    @Override
    public boolean isKeep(TridentTuple tuple) {
        boolean filter = tuple.getString(0).equals(actor);

        if (filter) {
            System.err.println("I am partition [" + partitionIndex + "] and I have kept a tweet by: " + actor);
        }

        return filter;
    }
}

Topology: 

topology.newStream("spout", spout)
	.each(new Fields("actor", "text"), new PerActorTweetsFilter("dave")).parallelismHint(5)
	.each(new Fields("actor", "text"), new Utils.PrintFilter());

     如果我們指定執(zhí)行Filter任務的線程數(shù)量為5,那么最終的執(zhí)行結(jié)果會如何呢?看一下我們的測試結(jié)果: 

I am partition [4] and I have kept a tweet by: dave
I am partition [3] and I have kept a tweet by: dave
I am partition [0] and I have kept a tweet by: dave
I am partition [2] and I have kept a tweet by: dave
I am partition [1] and I have kept a tweet by: dave
     我們可以很清楚的發(fā)現(xiàn),一共有5個線程在執(zhí)行Filter。
     如果我們想要2個Spout和5個Filter怎么辦呢?如下面代碼所示,實現(xiàn)很簡單。

topology.newStream("spout", spout).parallelismHint(2).shuffle()
	.each(new Fields("actor", "text"), new PerActorTweetsFilter("dave")).parallelismHint(5)
	.each(new Fields("actor", "text"), new Utils.PrintFilter());

2.2.2 partitionBy()和重定向操作(repartitioning operation) 

     我們注意到上面的例子中用到了shuffle(),shuffle()是一個重定向操作。那什么是重定向操作呢?重定向定義了我們的tuple如何被route到下一處理層,當然不同的層之間可能會有不同的并行度,shuffle()的作用是把tuple隨機的route下一層的線程中,而partitionBy()則根據(jù)我們的指定字段按照一致性哈希算法route到下一層的線程中,也就是說,如果我們用partitionBy()的話,同一個字段名的tuple會被route到同一個線程中。
     比如,如果我們把上面代碼中的shuffle()改成partitionBy(new Fields("actor")),猜一下結(jié)果會怎樣?
I am partition [2] and I have kept a tweet by: dave
I am partition [2] and I have kept a tweet by: dave
I am partition [2] and I have kept a tweet by: dave
I am partition [2] and I have kept a tweet by: dave
     測試結(jié)果正如我們上面描述的那樣,相同字段的tuple被route到了同一個partition中。
重定向操作有如下幾種:
shuffle:通過隨機分配算法來均衡tuple到各個分區(qū)
broadcast:每個tuple都被廣播到所有的分區(qū),這種方式在drcp時非常有用,比如在每個分區(qū)上做stateQuery
partitionBy:根據(jù)指定的字段列表進行劃分,具體做法是用指定字段列表的hash值對分區(qū)個數(shù)做取模運算,確保相同字段列表的數(shù)據(jù)被劃分到同一個分區(qū)
global:所有的tuple都被發(fā)送到一個分區(qū),這個分區(qū)用來處理整個Stream
batchGlobal:一個Batch中的所有tuple都被發(fā)送到同一個分區(qū),不同的Batch會去往不同的分區(qū)
Partition:通過一個自定義的分區(qū)函數(shù)來進行分區(qū),這個自定義函數(shù)實現(xiàn)了 backtype.storm.grouping.CustomStreamGrouping

2.3 聚合(Aggregation)     
     我們前面講過,Trident的一個很重要的特點就是它是以batch的形式處理tuple的。我們可以很容易想到的針對一個batch的最基本操作應該就是聚合。Trident提供了聚合API來處理batches,來看一個例子:

2.3.1 Aggregator:

public static class LocationAggregator extends BaseAggregator<Map<String, Integer>> {
    @Override
    public Map<String, Integer> init(Object batchId, TridentCollector collector) {
        return new HashMap<String, Integer>();
    }

    @Override
    public void aggregate(Map<String, Integer> val, TridentTuple tuple,
        TridentCollector collector) {
        String location = tuple.getString(0);
        val.put(location, MapUtils.getInteger(val, location, 0) + 1);
    }

    @Override
    public void complete(Map<String, Integer> val, TridentCollector collector) {
        collector.emit(new Values(val));
    }
}

Topology: 

topology.newStream("spout", spout)
	.aggregate(new Fields("location"), new LocationAggregator(), new Fields("location_counts"))
	.each(new Fields("location_counts"), new Utils.PrintFilter());

     這個aggregator很簡單:計算每一個batch的location的數(shù)量。通過這個例子我們可以看到Aggregator接口: 

init(): 當剛開始接收到一個batch時執(zhí)行
aggregate(): 在接收到batch中的每一個tuple時執(zhí)行
complete(): 在一個batch的結(jié)束時執(zhí)行     

     我們前面講過aggregate()方法是一個重定向方法,因為它會隨機啟動一個單獨的線程來進行這個聚合操作。
     下面我們來看一下測試結(jié)果:
[{USA=3, Spain=1, UK=1}]
[{USA=3, Spain=2}]
[{France=1, USA=4}]
[{USA=4, Spain=1}]
[{USA=5}]
     我們可以看到打印的結(jié)果,其中每一條的和都是5,這是因為我們的Spout的每個batch中tuple數(shù)量設(shè)置的是5,所以每個線程的計算結(jié)果也會是5。 除此之外,Trident還提供了其它兩個Aggregator接口: CombinerAggregator, ReducerAggregator,具體使用方法請參考Trident API。

2.3.2 partitionAggregate():
     如果我們將上面的Topology稍微改造一下,猜一下結(jié)果會是如何?

topology.newStream("spout", spout)
	.partitionBy(new Fields("location"))
	.partitionAggregate(new Fields("location"), new LocationAggregator(), new Fields("location_counts")).parallelismHint(3)
	.each(new Fields("location_counts"), new Utils.PrintFilter());

     我們一起來分析一下,首先partitionBy()方法將tuples按其location字段重定向到下一處理邏輯,而且相同location字段的tuple一定會被分配到同一個線程中處理。其次,partitionAggregate()方法,注意它與Aggregate不同,它不是一個重定向方法,它僅僅是對當前partition上的各個batch執(zhí)行聚合操作。因為我們根據(jù)location進行了重定向操作,測試數(shù)據(jù)一共有4個location,而當前一共有3個partition,因此可以猜測我們的最終測試結(jié)果中,有一個partition會處理兩個location的batch,最終測試結(jié)果如下: 

[{France=10, Spain=5}]
[{USA=63}]
[{UK=22}]
     需要注意的是,partitionAggregate雖然也是聚合操作,但與上面的Aggregate完全不同,它不是一個重定向操作。

2.4 groupBy
     我們可以看到上面幾個例子的測試結(jié)果,其實我們通常想要的是每個location的數(shù)量是多少,那該怎么處理呢?看下面這個Topology:

topology.newStream("spout", spout)
	.groupBy(new Fields("location"))
	.aggregate(new Fields("location"), new Count(), new Fields("count"))
	.each(new Fields("location", "count"), new Utils.PrintFilter());

     我們先看一下執(zhí)行的結(jié)果: 

[France, 25]
[UK, 2]
[USA, 25]
[Spain, 44]
[France, 26]
[UK, 3]
     上面這段代碼計算出了每個location的數(shù)量,即使我們的Count函數(shù)沒有指定并行度。這就是groupBy()起的作用,它會根據(jù)指定的字段創(chuàng)建一個GroupedStream,相同字段的tuple都會被重定向到一起,匯聚成一個group。groupBy()之后是aggregate,與之前的聚合整個batch不同,此時的aggregate會單獨聚合每個group。我們也可以這么認為,groupBy會把Stream按照指定字段分成一個個stream group,每個group就像一個batch一樣被處理。
     不過需要注意的是,groupBy()本身并不是一個重定向操作,但如果它后面跟的是aggregator的話就是,跟的是partitionAggregate的話就不是。
Storm Trident的詳細介紹

三、總結(jié) 
     Storm是一個實時流計算框架,Trident是對storm的一個更高層次的抽象,Trident最大的特點以batch的形式處理stream。
     一些最基本的操作函數(shù)有Filter、Function,F(xiàn)ilter可以過濾掉tuple,F(xiàn)unction可以修改tuple內(nèi)容,輸出0或多個tuple,并能把新增的字段追加到tuple后面。
     聚合有partitionAggregate和Aggregator接口。partitionAggregate對當前partition中的tuple進行聚合,它不是重定向操作。Aggregator有三個接口:CombinerAggregator, ReducerAggregator,Aggregator,它們屬于重定向操作,它們會把stream重定向到一個partition中進行聚合操作。
     重定向操作會改變數(shù)據(jù)流向,但不會改變數(shù)據(jù)內(nèi)容,重定向操會產(chǎn)生網(wǎng)絡(luò)傳輸,可能影響一部分效率。而Filter、Function、partitionAggregate則屬于本地操作,不會產(chǎn)生網(wǎng)絡(luò)傳輸。
     GroupBy會根據(jù)指定字段,把整個stream切分成一個個grouped stream,如果在grouped stream上做聚合操作,那么聚合就會發(fā)生在這些grouped stream上而不是整個batch。如果groupBy后面跟的是aggregator,則是聚合操作,如果跟的是partitionAggregate,則不是聚合操作。

到此,相信大家對“Storm Trident的詳細介紹”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學習!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI