您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“Transactional topology怎么使用”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
你可以通過使用TransactionalTopologyBuilder來創(chuàng)建transactional topology. 下面就是一個(gè)transactional topology的定義, 它的作用是計(jì)算輸入流里面的tuple的個(gè)數(shù)。這段代碼來自storm-starter里面的TransactionalGlobalCount。
MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);
builder.setBolt("partial-count", new BatchCount(), 5)
.shuffleGrouping("spout");
builder.setBolt("sum", new UpdateGlobalCount())
.globalGrouping("partial-count");
TransactionalTopologyBuilder構(gòu)造器中接受如下的參數(shù):
?一個(gè)transaction topology的id
?spout在整個(gè)topology里面的id。
?一個(gè)transactional spout。
?一個(gè)可選的這個(gè)transactional spout的并行度。
topology的id是用來在zookeeper里面保存這個(gè)topology的當(dāng)前進(jìn)度狀態(tài)的,所以如果你重啟這個(gè)topology, 它可以接著前面的進(jìn)度繼續(xù)執(zhí)行。
一個(gè)transaction topology里面有一個(gè)唯一的TransactionalSpout, 這個(gè)spout是通過TransactionalTopologyBuilder的構(gòu)造函數(shù)來指定的。在這個(gè)例子里面,MemoryTransactionalSpout被用來從一個(gè)內(nèi)存變量里面讀取數(shù)據(jù)(DATA)。第二個(gè)參數(shù)指定spout發(fā)送的tuple的字段, 第三個(gè)參數(shù)指定每個(gè)batch的最大tuple數(shù)量。關(guān)于如何自定義TransactionalSpout我們會(huì)在后面介紹。
現(xiàn)在說說 bolts。這個(gè)topology并行地計(jì)算tuple的總數(shù)量。第一個(gè)bolt:BatchBolt,隨機(jī)地把輸入tuple分給各個(gè)task,然后各個(gè)task各自統(tǒng)計(jì)局部數(shù)量。第二個(gè)bolt:UpdateGlobalCount, 用全局grouping來匯總這個(gè)batch中tuple的數(shù)量,然后再更新到數(shù)據(jù)庫里面的全局?jǐn)?shù)量。
下面是BatchCount的定義:
public static class BatchCount extends BaseBatchBolt {
Object _id;
BatchOutputCollector _collector;
int _count = 0;
@Override
public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
_collector = collector;
_id = id;
}
@Override
public void execute(Tuple tuple) {
_count++;
}
@Override
public void finishBatch() {
_collector.emit(new Values(_id, _count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "count"));
}
}
storm會(huì)為每個(gè)正在處理的batch創(chuàng)建一個(gè)BatchCount對(duì)象,這個(gè)BatchCount是運(yùn)行在BatchBoltExecutor里面的。而BatchBoltExecutor負(fù)責(zé)創(chuàng)建以及清理這個(gè)對(duì)象的實(shí)例。
BatchCount對(duì)象的prepare方法接收如下參數(shù):
?Storm config
?Topology context
?Output collector
?這個(gè)batch的id (txid),在Transactional Topology中, 這個(gè)id則是一個(gè)TransactionAttempt對(duì)象。
這個(gè)batch bolt的抽象在DRPC里面也可以用, 只是txid的類型不一樣而已。實(shí)際上,BatchBolt可以接收一個(gè)txid類型的參數(shù),所以如果你只是想在transactioinal topology里面使用這個(gè)BatchBolt,你可以去繼承BaseTransactionalBolt類,如下定義:
public abstract class BaseTransactionalBolt extends BaseBatchBolt<TransactionAttempt> {
}
在transaction topology里面發(fā)射的所有的tuple都必須以TransactionAttempt作為第一個(gè)field, 然后storm可以根據(jù)這個(gè)field來判斷哪些tuple屬于一個(gè)batch。所以你在發(fā)射tuple的時(shí)候需要滿足這個(gè)條件。
TransactionAttempt包含兩個(gè)值: 一個(gè)transaction id,一個(gè)attempt id。transaction id的作用就是我們上面介紹的對(duì)于每個(gè)batch是唯一的,而且不管這個(gè)batch 被replay多少次都是一樣的。attempt id是對(duì)于每個(gè)batch唯一的一個(gè)id, 但是對(duì)于同一個(gè)batch,它replay之后的attempt id跟replay之前就不一樣了, 我們可以把a(bǔ)ttempt id理解成replay-times, storm利用這個(gè)id來區(qū)別一個(gè)batch發(fā)射的tuple的不同版本。
transaction id對(duì)于每個(gè)batch加一, 所以第一個(gè)batch的transaction id是”1″, 第二個(gè)batch是”2″,依次類推。
每收到一個(gè)batch中的tuple,execute方法便被調(diào)用一次。每次當(dāng)該方法被調(diào)用時(shí),你應(yīng)該把這個(gè)batch里面的狀態(tài)保持在一個(gè)本地變量里面。對(duì)于這個(gè)例子來說, 它在execute方法里面遞增tuple的個(gè)數(shù)。
最后, 當(dāng)這個(gè)bolt接收到某個(gè)batch的所有的tuple之后, finishBatch方法會(huì)被調(diào)用。這個(gè)例子里面的BatchCount類會(huì)在這個(gè)時(shí)候發(fā)射它的局部數(shù)量到它的輸出流里面去。
下面是UpdateGlobalCount類的定義:
public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {
TransactionAttempt _attempt;
BatchOutputCollector _collector;
int _sum = 0;
@Override
public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
_collector = collector;
_attempt = attempt;
}
@Override
public void execute(Tuple tuple) {
_sum+=tuple.getInteger(1);
}
@Override
public void finishBatch() {
Value val = DATABASE.get(GLOBAL_COUNT_KEY);
Value newval;
if(val == null || !val.txid.equals(_attempt.getTransactionId())) {
newval = new Value();
newval.txid = _attempt.getTransactionId();
if(val==null) {
newval.count = _sum;
} else {
newval.count = _sum + val.count;
}
DATABASE.put(GLOBAL_COUNT_KEY, newval);
} else {
newval = val;
}
_collector.emit(new Values(_attempt, newval.count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "sum"));
}
}
UpdateGlobalCount是Transactional Topologies相關(guān)的類,所以它繼承自BaseTransactionalBolt。在execute方法里面, UpdateGlobalCount累積這個(gè)batch的計(jì)數(shù), 比較有趣的是finishBatch方法。
首先, 注意這個(gè)bolt實(shí)現(xiàn)了ICommitter接口,這告訴storm要在這個(gè)事務(wù)的commit階段調(diào)用finishBatch方法,所以對(duì)于finishBatch的調(diào)用會(huì)保證強(qiáng)順序性(順序就是transaction id的升序),另一方面execute方法在processing或者commit階段都可以執(zhí)行。另外一種把bolt標(biāo)識(shí)為commiter的方法是調(diào)用TransactionalTopologyBuilder的setCommiterBolt來添加Bolt(而不是setBolt)。
UpdateGlobalCount里面finishBatch方法的邏輯是首先從數(shù)據(jù)庫中獲取當(dāng)前的值,并且把數(shù)據(jù)庫里面的transaction id與當(dāng)前這個(gè)batch的transaction id進(jìn)行比較。如果他們一樣, 那么忽略這個(gè)batch。否則把這個(gè)batch的結(jié)果加到總結(jié)果里面去,并且更新數(shù)據(jù)庫。
“Transactional topology怎么使用”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。