您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關(guān)Flume架構(gòu)中如何進(jìn)行MemoryChannel事務(wù)實現(xiàn),小編覺得挺實用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
Flume提供了可靠地日志采集功能,其高可靠是通過事務(wù)機(jī)制實現(xiàn)的。而對于Channel的事務(wù)我們本部分會介紹MemoryChannel和FileChannel的實現(xiàn)。
首先我們看下BasicChannelSemantics實現(xiàn):
public abstract class BasicChannelSemantics extends AbstractChannel { //1、事務(wù)使用ThreadLocal存儲,保證事務(wù)線程安全 private ThreadLocal<BasicTransactionSemantics> currentTransaction = new ThreadLocal<BasicTransactionSemantics>(); private boolean initialized = false; //2、進(jìn)行一些初始化工作 protected void initialize() {} //3、提供給實現(xiàn)類的創(chuàng)建事務(wù)的回調(diào) protected abstract BasicTransactionSemantics createTransaction(); //4、往Channel放Event,其直接委托給事務(wù)的put方法實現(xiàn) @Override public void put(Event event) throws ChannelException { BasicTransactionSemantics transaction = currentTransaction.get(); Preconditions.checkState(transaction != null, "No transaction exists for this thread"); transaction.put(event); } //5、從Channel獲取Event,也是直接委托給事務(wù)的take方法實現(xiàn) @Override public Event take() throws ChannelException { BasicTransactionSemantics transaction = currentTransaction.get(); Preconditions.checkState(transaction != null, "No transaction exists for this thread"); return transaction.take(); } //6、獲取事務(wù),如果本實例沒有初始化則先初始化;否則先從ThreadLocal獲取事務(wù),如果沒有或者關(guān)閉了則創(chuàng)建一個并綁定到ThreadLocal。 @Override public Transaction getTransaction() { if (!initialized) { synchronized (this) { if (!initialized) { initialize(); initialized = true; } } } BasicTransactionSemantics transaction = currentTransaction.get(); if (transaction == null || transaction.getState().equals( BasicTransactionSemantics.State.CLOSED)) { transaction = createTransaction(); currentTransaction.set(transaction); } return transaction; } }
首先我們來看下MemoryChannel的實現(xiàn),其是一個純內(nèi)存的Channel實現(xiàn),整個事務(wù)操作都是在內(nèi)存中完成。首先看下其內(nèi)存結(jié)構(gòu):
1、首先由一個Channel Queue用于存儲整個Channel的Event數(shù)據(jù);
2、每個事務(wù)都有一個Take Queue和Put Queue分別用于存儲事務(wù)相關(guān)的取數(shù)據(jù)和放數(shù)據(jù),等事務(wù)提交時才完全同步到Channel Queue,或者失敗把取數(shù)據(jù)回滾到Channel Queue。
MemoryChannel時設(shè)計時考慮了兩個容量:Channel Queue容量和事務(wù)容量,而這兩個容量涉及到了數(shù)量容量和字節(jié)數(shù)容量。
另外因為多個事務(wù)要操作Channel Queue,還要考慮Channel Queue的動態(tài)擴(kuò)容問題,因此MemoryChannel使用了鎖來實現(xiàn);而容量問題則使用了信號量來實現(xiàn)。
在configure方法中進(jìn)行了一些參數(shù)的初始化,如容量、Channel Queue等。首先看下Channel Queue的容量是如何計算的:
try { capacity = context.getInteger("capacity", defaultCapacity); } catch(NumberFormatException e) { capacity = defaultCapacity; } if (capacity <= 0) { capacity = defaultCapacity; }
即首先從配置文件讀取數(shù)量容量,如果沒有配置則是默認(rèn)容量(默認(rèn)100),而配置的容量小于等于0,則也是默認(rèn)容量。
接下來是初始化事務(wù)數(shù)量容量:
try { transCapacity = context.getInteger("transactionCapacity", defaultTransCapacity); } catch(NumberFormatException e) { transCapacity = defaultTransCapacity; } if (transCapacity <= 0) { transCapacity = defaultTransCapacity; } Preconditions.checkState(transCapacity <= capacity, "Transaction Capacity of Memory Channel cannot be higher than " + "the capacity.");
整個過程和Channel Queue數(shù)量容量初始化類似,但是最后做了前置條件判斷,事務(wù)容量必須小于等于Channel Queue容量。
接下來是字節(jié)容量限制:
try { byteCapacityBufferPercentage = context.getInteger("byteCapacityBufferPercentage", defaultByteCapacityBufferPercentage); } catch(NumberFormatException e) { byteCapacityBufferPercentage = defaultByteCapacityBufferPercentage; } try { byteCapacity = (int)((context.getLong("byteCapacity", defaultByteCapacity).longValue() * (1 - byteCapacityBufferPercentage * .01 )) /byteCapacitySlotSize); if (byteCapacity < 1) { byteCapacity = Integer.MAX_VALUE; } } catch(NumberFormatException e) { byteCapacity = (int)((defaultByteCapacity * (1 - byteCapacityBufferPercentage * .01 )) /byteCapacitySlotSize); }
byteCapacityBufferPercentage:用來確定byteCapacity的一個百分比參數(shù),即我們定義的字節(jié)容量和實際事件容量的百分比,因為我們定義的字節(jié)容量主要考慮Event body,而忽略Event header,因此需要減去Event header部分的內(nèi)存占用,可以認(rèn)為該參數(shù)定義了Event header占了實際字節(jié)容量的百分比,默認(rèn)20%;
byteCapacity:首先讀取配置文件定義的byteCapacity,如果沒有定義,則使用默認(rèn)defaultByteCapacity,而defaultByteCapacity默認(rèn)是JVM物理內(nèi)存的80%(Runtime.getRuntime().maxMemory() * .80);那么實際byteCapacity=定義的byteCapacity * (1- Event header百分比)/ byteCapacitySlotSize;byteCapacitySlotSize默認(rèn)100,即計算百分比的一個系數(shù)。
接下來定義keepAlive參數(shù):
try { keepAlive = context.getInteger("keep-alive", defaultKeepAlive); } catch(NumberFormatException e) { keepAlive = defaultKeepAlive; }
keepAlive定義了操作Channel Queue的等待超時事件,默認(rèn)3s。
接著初始化Channel Queue:
if(queue != null) { try { resizeQueue(capacity); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } else { synchronized(queueLock) { queue = new LinkedBlockingDeque<Event>(capacity); queueRemaining = new Semaphore(capacity); queueStored = new Semaphore(0); } }
首先如果Channel Queue不為null,表示動態(tài)擴(kuò)容;否則進(jìn)行Channel Queue的創(chuàng)建。
首先看下首次創(chuàng)建Channel Queue,首先使用queueLock鎖定,即在操作Channel Queue時都需要鎖定,因為之前說過Channel Queue可能動態(tài)擴(kuò)容,然后初始化信號量:Channel Queue剩余容量和向Channel Queue申請存儲的容量,用于事務(wù)操作中預(yù)占Channel Queue容量。
接著是調(diào)用resizeQueue動態(tài)擴(kuò)容:
private void resizeQueue(int capacity) throws InterruptedException { int oldCapacity; synchronized(queueLock) { //首先計算擴(kuò)容前的Channel Queue的容量 oldCapacity = queue.size() + queue.remainingCapacity(); } if(oldCapacity == capacity) {//如果新容量和老容量相等,不需要擴(kuò)容 return; } else if (oldCapacity > capacity) {//如果老容量大于新容量,縮容 //首先要預(yù)占老容量-新容量的大小,以便縮容容量 if(!queueRemaining.tryAcquire(oldCapacity - capacity, keepAlive, TimeUnit.SECONDS)) { //如果獲取失敗,默認(rèn)是記錄日志然后忽略 } else { //否則,直接縮容,然后復(fù)制老Queue的數(shù)據(jù),縮容時需要鎖定queueLock,因為這一系列操作要線程安全 synchronized(queueLock) { LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity); newQueue.addAll(queue); queue = newQueue; } } } else { //如果不是縮容,則直接擴(kuò)容即可 synchronized(queueLock) { LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity); newQueue.addAll(queue); queue = newQueue; } //增加/減少Channel Queue的新的容量 queueRemaining.release(capacity - oldCapacity); } } 到此,整個Channel Queue相關(guān)的數(shù)據(jù)初始化完畢,接著會調(diào)用start方法進(jìn)行初始化: public synchronized void start() { channelCounter.start(); channelCounter.setChannelSize(queue.size()); channelCounter.setChannelCapacity(Long.valueOf( queue.size() + queue.remainingCapacity())); super.start(); }
此處初始化了一個ChannelCounter,是一個計數(shù)器,記錄如當(dāng)前隊列放入Event數(shù)、取出Event數(shù)、成功數(shù)等。
之前已經(jīng)分析了大部分Channel會把put和take直接委托給事務(wù)去完成,因此接下來看下MemoryTransaction的實現(xiàn)。
首先看下MemoryTransaction的初始化:
private class MemoryTransaction extends BasicTransactionSemantics { private LinkedBlockingDeque<Event> takeList; private LinkedBlockingDeque<Event> putList; private final ChannelCounter channelCounter; private int putByteCounter = 0; private int takeByteCounter = 0; public MemoryTransaction(int transCapacity, ChannelCounter counter) { putList = new LinkedBlockingDeque<Event>(transCapacity); takeList = new LinkedBlockingDeque<Event>(transCapacity); channelCounter = counter; }
可以看出MemoryTransaction涉及到兩個事務(wù)容量大小定義的隊列(鏈表阻塞隊列)、隊列字節(jié)計數(shù)器、另外一個是Channel操作的計數(shù)器。
事務(wù)中的放入操作如下:
protected void doPut(Event event) throws InterruptedException { //1、增加放入事件計數(shù)器 channelCounter.incrementEventPutAttemptCount(); //2、estimateEventSize計算當(dāng)前Event body大小 int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize); //3、往事務(wù)隊列的putList中放入Event,如果滿了,則拋異常回滾事務(wù) if (!putList.offer(event)) { throw new ChannelException( "Put queue for MemoryTransaction of capacity " + putList.size() + " full, consider committing more frequently, " + "increasing capacity or increasing thread count"); } //4、增加放入隊列字節(jié)數(shù)計數(shù)器 putByteCounter += eventByteSize; }
整個doPut操作相對來說比較簡單,就是往事務(wù)putList隊列放入Event,如果滿了則直接拋異?;貪L事務(wù);否則放入putList暫存,等事務(wù)提交時轉(zhuǎn)移到Channel Queue。另外需要增加放入隊列的字節(jié)數(shù)計數(shù)器,以便之后做字節(jié)容量限制。
接下來是事務(wù)中的取出操作:
protected Event doTake() throws InterruptedException { //1、增加取出事件計數(shù)器 channelCounter.incrementEventTakeAttemptCount(); //2、如果takeList隊列沒有剩余容量,即當(dāng)前事務(wù)已經(jīng)消費了最大容量的Event if(takeList.remainingCapacity() == 0) { throw new ChannelException("Take list for MemoryTransaction, capacity " + takeList.size() + " full, consider committing more frequently, " + "increasing capacity, or increasing thread count"); } //3、queueStored試圖獲取一個信號量,超時直接返回null if(!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) { return null; } //4、從Channel Queue獲取一個Event Event event; synchronized(queueLock) {//對Channel Queue的操作必須加queueLock,因為之前說的動態(tài)擴(kuò)容問題 event = queue.poll(); } //5、因為信號量的保證,Channel Queue不應(yīng)該返回null,出現(xiàn)了就不正常了 Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " + "signalling existence of entry"); //6、暫存到事務(wù)的takeList隊列 takeList.put(event); //7、計算當(dāng)前Event body大小并增加取出隊列字節(jié)數(shù)計數(shù)器 int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize); takeByteCounter += eventByteSize; return event; }
接下來是提交事務(wù):
protected void doCommit() throws InterruptedException { //1、計算改變的Event數(shù)量,即取出數(shù)量-放入數(shù)量;如果放入的多,那么改變的Event數(shù)量將是負(fù)數(shù) int remainingChange = takeList.size() - putList.size(); //2、 如果remainingChange小于0,則需要獲取Channel Queue剩余容量的信號量 if(remainingChange < 0) { //2.1、首先獲取putByteCounter個字節(jié)容量信號量,如果失敗說明超過字節(jié)容量限制了,回滾事務(wù) if(!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) { throw new ChannelException("Cannot commit transaction. Byte capacity " + "allocated to store event body " + byteCapacity * byteCapacitySlotSize + "reached. Please increase heap space/byte capacity allocated to " + "the channel as the sinks may not be keeping up with the sources"); } //2.2、獲取Channel Queue的-remainingChange個信號量用于放入-remainingChange個Event,如果獲取不到,則釋放putByteCounter個字節(jié)容量信號量,并拋出異?;貪L事務(wù) if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) { bytesRemaining.release(putByteCounter); throw new ChannelFullException("Space for commit to queue couldn't be acquired." + " Sinks are likely not keeping up with sources, or the buffer size is too tight"); } } int puts = putList.size(); int takes = takeList.size(); synchronized(queueLock) {//操作Channel Queue時一定要鎖定queueLock if(puts > 0 ) { while(!putList.isEmpty()) { //3.1、如果有Event,則循環(huán)放入Channel Queue if(!queue.offer(putList.removeFirst())) { //3.2、如果放入Channel Queue失敗了,說明信號量控制出問題了,這種情況不應(yīng)該發(fā)生 throw new RuntimeException("Queue add failed, this shouldn't be able to happen"); } } } //4、操作成功后,清空putList和takeList隊列 putList.clear(); takeList.clear(); } //5.1、釋放takeByteCounter個字節(jié)容量信號量 bytesRemaining.release(takeByteCounter); //5.2、重置字節(jié)計數(shù)器 takeByteCounter = 0; putByteCounter = 0; //5.3、釋放puts個queueStored信號量,這樣doTake方法就可以獲取數(shù)據(jù)了 queueStored.release(puts); //5.4、釋放remainingChange個queueRemaining信號量 if(remainingChange > 0) { queueRemaining.release(remainingChange); } //6、ChannelCounter一些數(shù)據(jù)計數(shù) if (puts > 0) { channelCounter.addToEventPutSuccessCount(puts); } if (takes > 0) { channelCounter.addToEventTakeSuccessCount(takes); } channelCounter.setChannelSize(queue.size()); }
此處涉及到兩個信號量:
queueStored表示Channel Queue已存儲事件容量(已存儲的事件數(shù)量),隊列取出事件時-1,放入事件成功時+N,取出失敗時-N,即Channel Queue存儲了多少事件。queueStored信號量默認(rèn)為0。當(dāng)doTake取出Event時減少一個queueStored信號量,當(dāng)doCommit提交事務(wù)時需要增加putList 隊列大小的queueStored信號量,當(dāng)doRollback回滾事務(wù)時需要減少takeList隊列大小的queueStored信號量。
queueRemaining表示Channel Queue可存儲事件容量(可存儲的事件數(shù)量),取出事件成功時+N,放入事件成功時-N。queueRemaining信號量默認(rèn)為Channel Queue容量。其在提交事務(wù)時首先通過remainingChange = takeList.size() - putList.size()計算獲得需要增加多少變更事件;如果小于0表示放入的事件比取出的多,表示有- remainingChange個事件放入,此時應(yīng)該減少-queueRemaining信號量;而如果大于0,則表示取出的事件比放入的多,表示有queueRemaining個事件取出,此時應(yīng)該增加queueRemaining信號量;即消費事件時減少信號量,生產(chǎn)事件時增加信號量。
而bytesRemaining是字節(jié)容量信號量,超出容量則回滾事務(wù)。
最后看下回滾事務(wù):
protected void doRollback() { int takes = takeList.size(); synchronized(queueLock) { //操作Channel Queue時一定鎖住queueLock //1、前置條件判斷,檢查是否有足夠容量回滾事務(wù) Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " + "queue to rollback takes. This should never happen, please report"); //2、回滾事務(wù)的takeList隊列到Channel Queue while(!takeList.isEmpty()) { queue.addFirst(takeList.removeLast()); } putList.clear(); } //3、釋放putByteCounter個bytesRemaining信號量 bytesRemaining.release(putByteCounter); //4、計數(shù)器重置 putByteCounter = 0; takeByteCounter = 0; //5、釋放takeList隊列大小個已存儲事件容量 queueStored.release(takes); channelCounter.setChannelSize(queue.size()); } }
也就是說在回滾時,需要把takeList中暫存的事件回滾到Channel Queue,并回滾queueStored信號量。
以上就是Flume架構(gòu)中如何進(jìn)行MemoryChannel事務(wù)實現(xiàn),小編相信有部分知識點可能是我們?nèi)粘9ぷ鲿姷交蛴玫降?。希望你能通過這篇文章學(xué)到更多知識。更多詳情敬請關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。