溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Transactional topology怎么使用

發(fā)布時(shí)間:2021-12-22 17:20:20 來源:億速云 閱讀:143 作者:iii 欄目:云計(jì)算

本篇內(nèi)容介紹了“Transactional topology怎么使用”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

你可以通過使用TransactionalTopologyBuilder來創(chuàng)建transactional topology. 下面就是一個(gè)transactional topology的定義, 它的作用是計(jì)算輸入流里面的tuple的個(gè)數(shù)。這段代碼來自storm-starter里面的TransactionalGlobalCount。

MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);

TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);

builder.setBolt("partial-count", new BatchCount(), 5)

        .shuffleGrouping("spout");

builder.setBolt("sum", new UpdateGlobalCount())

        .globalGrouping("partial-count");

TransactionalTopologyBuilder構(gòu)造器中接受如下的參數(shù):

?一個(gè)transaction topology的id

?spout在整個(gè)topology里面的id。

?一個(gè)transactional spout。

?一個(gè)可選的這個(gè)transactional spout的并行度。

topology的id是用來在zookeeper里面保存這個(gè)topology的當(dāng)前進(jìn)度狀態(tài)的,所以如果你重啟這個(gè)topology, 它可以接著前面的進(jìn)度繼續(xù)執(zhí)行。

一個(gè)transaction topology里面有一個(gè)唯一的TransactionalSpout, 這個(gè)spout是通過TransactionalTopologyBuilder的構(gòu)造函數(shù)來指定的。在這個(gè)例子里面,MemoryTransactionalSpout被用來從一個(gè)內(nèi)存變量里面讀取數(shù)據(jù)(DATA)。第二個(gè)參數(shù)指定spout發(fā)送的tuple的字段, 第三個(gè)參數(shù)指定每個(gè)batch的最大tuple數(shù)量。關(guān)于如何自定義TransactionalSpout我們會(huì)在后面介紹。

現(xiàn)在說說 bolts。這個(gè)topology并行地計(jì)算tuple的總數(shù)量。第一個(gè)bolt:BatchBolt,隨機(jī)地把輸入tuple分給各個(gè)task,然后各個(gè)task各自統(tǒng)計(jì)局部數(shù)量。第二個(gè)bolt:UpdateGlobalCount, 用全局grouping來匯總這個(gè)batch中tuple的數(shù)量,然后再更新到數(shù)據(jù)庫里面的全局?jǐn)?shù)量。

下面是BatchCount的定義:

public static class BatchCount extends BaseBatchBolt {

    Object _id;

    BatchOutputCollector _collector;

    int _count = 0;

    @Override

    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {

        _collector = collector;

        _id = id;

    }

    @Override

    public void execute(Tuple tuple) {

        _count++;

    }

    @Override

    public void finishBatch() {

        _collector.emit(new Values(_id, _count));

    }

    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("id", "count"));

    }

}

storm會(huì)為每個(gè)正在處理的batch創(chuàng)建一個(gè)BatchCount對(duì)象,這個(gè)BatchCount是運(yùn)行在BatchBoltExecutor里面的。而BatchBoltExecutor負(fù)責(zé)創(chuàng)建以及清理這個(gè)對(duì)象的實(shí)例。

BatchCount對(duì)象的prepare方法接收如下參數(shù):

?Storm config

?Topology context

?Output collector

?這個(gè)batch的id (txid),在Transactional Topology中, 這個(gè)id則是一個(gè)TransactionAttempt對(duì)象。

這個(gè)batch bolt的抽象在DRPC里面也可以用, 只是txid的類型不一樣而已。實(shí)際上,BatchBolt可以接收一個(gè)txid類型的參數(shù),所以如果你只是想在transactioinal topology里面使用這個(gè)BatchBolt,你可以去繼承BaseTransactionalBolt類,如下定義:

public abstract class BaseTransactionalBolt extends BaseBatchBolt<TransactionAttempt> {

}

在transaction topology里面發(fā)射的所有的tuple都必須以TransactionAttempt作為第一個(gè)field, 然后storm可以根據(jù)這個(gè)field來判斷哪些tuple屬于一個(gè)batch。所以你在發(fā)射tuple的時(shí)候需要滿足這個(gè)條件。

TransactionAttempt包含兩個(gè)值: 一個(gè)transaction id,一個(gè)attempt id。transaction id的作用就是我們上面介紹的對(duì)于每個(gè)batch是唯一的,而且不管這個(gè)batch 被replay多少次都是一樣的。attempt id是對(duì)于每個(gè)batch唯一的一個(gè)id, 但是對(duì)于同一個(gè)batch,它replay之后的attempt id跟replay之前就不一樣了, 我們可以把a(bǔ)ttempt id理解成replay-times, storm利用這個(gè)id來區(qū)別一個(gè)batch發(fā)射的tuple的不同版本。

transaction id對(duì)于每個(gè)batch加一, 所以第一個(gè)batch的transaction id是”1″, 第二個(gè)batch是”2″,依次類推。

每收到一個(gè)batch中的tuple,execute方法便被調(diào)用一次。每次當(dāng)該方法被調(diào)用時(shí),你應(yīng)該把這個(gè)batch里面的狀態(tài)保持在一個(gè)本地變量里面。對(duì)于這個(gè)例子來說, 它在execute方法里面遞增tuple的個(gè)數(shù)。

最后, 當(dāng)這個(gè)bolt接收到某個(gè)batch的所有的tuple之后, finishBatch方法會(huì)被調(diào)用。這個(gè)例子里面的BatchCount類會(huì)在這個(gè)時(shí)候發(fā)射它的局部數(shù)量到它的輸出流里面去。

下面是UpdateGlobalCount類的定義:

public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {

    TransactionAttempt _attempt;

    BatchOutputCollector _collector;

    int _sum = 0;

    @Override

    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {

        _collector = collector;

        _attempt = attempt;

    }

    @Override

    public void execute(Tuple tuple) {

        _sum+=tuple.getInteger(1);

    }

    @Override

    public void finishBatch() {

        Value val = DATABASE.get(GLOBAL_COUNT_KEY);

        Value newval;

        if(val == null || !val.txid.equals(_attempt.getTransactionId())) {

            newval = new Value();

            newval.txid = _attempt.getTransactionId();

            if(val==null) {

                newval.count = _sum;

            } else {

                newval.count = _sum + val.count;

            }

            DATABASE.put(GLOBAL_COUNT_KEY, newval);

        } else {

            newval = val;

        }

        _collector.emit(new Values(_attempt, newval.count));

    }

    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("id", "sum"));

    }

}

UpdateGlobalCount是Transactional Topologies相關(guān)的類,所以它繼承自BaseTransactionalBolt。在execute方法里面, UpdateGlobalCount累積這個(gè)batch的計(jì)數(shù), 比較有趣的是finishBatch方法。

首先, 注意這個(gè)bolt實(shí)現(xiàn)了ICommitter接口,這告訴storm要在這個(gè)事務(wù)的commit階段調(diào)用finishBatch方法,所以對(duì)于finishBatch的調(diào)用會(huì)保證強(qiáng)順序性(順序就是transaction id的升序),另一方面execute方法在processing或者commit階段都可以執(zhí)行。另外一種把bolt標(biāo)識(shí)為commiter的方法是調(diào)用TransactionalTopologyBuilder的setCommiterBolt來添加Bolt(而不是setBolt)。

UpdateGlobalCount里面finishBatch方法的邏輯是首先從數(shù)據(jù)庫中獲取當(dāng)前的值,并且把數(shù)據(jù)庫里面的transaction id與當(dāng)前這個(gè)batch的transaction id進(jìn)行比較。如果他們一樣, 那么忽略這個(gè)batch。否則把這個(gè)batch的結(jié)果加到總結(jié)果里面去,并且更新數(shù)據(jù)庫。

“Transactional topology怎么使用”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI