您好,登錄后才能下訂單哦!
Disruptor中怎么實(shí)現(xiàn)一個(gè)高性能隊(duì)列,很多新手對(duì)此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來(lái)學(xué)習(xí)下,希望你能有所收獲。
import java.util.concurrent.ThreadFactory import com.lmax.disruptor.dsl.{Disruptor, ProducerType} import com.lmax.disruptor.{BlockingWaitStrategy,EventFactory,EventHandler,EventTranslatorOneArg,WaitStrategy} object DisruptorTest { val disruptor = { val factory = new EventFactory[Event] { override def newInstance(): Event = Event(-1) } val threadFactory = new ThreadFactory(){ override def newThread(r: Runnable): Thread = new Thread(r) } val disruptor = new Disruptor[Event](factory, 4, threadFactory, ProducerType.SINGLE, new BlockingWaitStrategy()) disruptor.handleEventsWith(TestHandler).`then`(ThenHandler) disruptor } val translator = new EventTranslatorOneArg[Event, Int]() { override def translateTo(event: Event, sequence: Long, arg: Int): Unit = { event.id = arg println(s"translator: ${event}, sequence: ${sequence}, arg: ${arg}") } } def main(args: Array[String]): Unit = { disruptor.start() (0 until 100).foreach { i => disruptor.publishEvent(translator, i) } disruptor.shutdown() } } case class Event(var id: Int) { override def toString: String = s"event: ${id}" } object TestHandler extends EventHandler[Event] { override def onEvent(event: Event, sequence: Long, endOfBatch: Boolean): Unit = { println(s"${this.getClass.getSimpleName} ${System.nanoTime()} ${event}") } } object ThenHandler extends EventHandler[Event] { override def onEvent(event: Event, sequence: Long, endOfBatch: Boolean): Unit = { println(s"${this.getClass.getSimpleName} ${System.nanoTime()} ${event}") } }
先看 Disruptor 構(gòu)造方法
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory, final ProducerType producerType, final WaitStrategy waitStrategy) { this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), new BasicExecutor(threadFactory)); }
在看 RingBuffer.create, 最終通過(guò) fill 方法 將 eventFactory.newInstance() 作為默認(rèn)值,塞到 ringBuffer 里面
public static <E> RingBuffer<E> create(ProducerType producerType, EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) { switch (producerType) { case SINGLE: return createSingleProducer(factory, bufferSize, waitStrategy); case MULTI: return createMultiProducer(factory, bufferSize, waitStrategy); default: throw new IllegalStateException(producerType.toString()); } } public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) { SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy); return new RingBuffer<E>(factory, sequencer); } RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) { this.sequencer = sequencer; this.bufferSize = sequencer.getBufferSize(); if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.indexMask = bufferSize - 1; this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; fill(eventFactory); } private void fill(EventFactory<E> eventFactory) { for (int i = 0; i < bufferSize; i++) { entries[BUFFER_PAD + i] = eventFactory.newInstance(); } }
首先看 disruptor.start(): 消費(fèi)事件消息入口
private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<>(); public RingBuffer<T> start() { checkOnlyStartedOnce(); for (final ConsumerInfo consumerInfo : consumerRepository) { consumerInfo.start(executor); } return ringBuffer; }
consumerRepository 類型由 disruptor.handleEventsWith(TestHandler) 初始化, 并構(gòu)造事件消息處理鏈
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) { return createEventProcessors(new Sequence[0], handlers); } EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences, final EventHandler<? super T>[] eventHandlers) { checkNotStarted(); final Sequence[] processorSequences = new Sequence[eventHandlers.length]; final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) { final EventHandler<? super T> eventHandler = eventHandlers[i]; final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler); if (exceptionHandler != null) { batchEventProcessor.setExceptionHandler(exceptionHandler); } consumerRepository.add(batchEventProcessor, eventHandler, barrier); processorSequences[i] = batchEventProcessor.getSequence(); } updateGatingSequencesForNextInChain(barrierSequences, processorSequences); return new EventHandlerGroup<>(this, consumerRepository, processorSequences); }
回頭看 disruptor.start() 中的 consumerInfo.start(executor) executor = new BasicExecutor(threadFactory),BasicExecutor 在每次 execute 任務(wù)時(shí),都會(huì) new thread **但是 consumerRepository 的數(shù)量是有限的,所以 new thread 也沒(méi)啥問(wèn)題
public Disruptor( final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory, final ProducerType producerType, final WaitStrategy waitStrategy) { this( RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), new BasicExecutor(threadFactory)); } private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor) { this.ringBuffer = ringBuffer; this.executor = executor; } @Override public void start(final java.util.concurrent.Executor executor){ //EventProcessor extends Runnable //executor = BasicExecutor executor.execute(eventprocessor); } public final class BatchEventProcessor<T> implements EventProcessor { @Override public void run() { if (running.compareAndSet(IDLE, RUNNING)) { sequenceBarrier.clearAlert(); notifyStart(); try { if (running.get() == RUNNING) { processEvents(); } } finally { notifyShutdown(); running.set(IDLE); } } else { if (running.get() == RUNNING) { throw new IllegalStateException("Thread is already running"); } else { earlyExit(); } } } } private void processEvents() { T event = null; long nextSequence = sequence.get() + 1L; while (true) { try { final long availableSequence = sequenceBarrier.waitFor(nextSequence); if (batchStartAware != null) { batchStartAware.onBatchStart(availableSequence - nextSequence + 1); } while (nextSequence <= availableSequence) { event = dataProvider.get(nextSequence); eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); nextSequence++; } sequence.set(availableSequence); } catch (final TimeoutException e) { notifyTimeout(sequence.get()); } catch (final AlertException ex) { if (running.get() != RUNNING) { break; } } catch (final Throwable ex) { exceptionHandler.handleEventException(ex, nextSequence, event); sequence.set(nextSequence); nextSequence++; } } }
executor.execute 也就是 BasicExecutor.execute(eventHandler) 會(huì)異步的執(zhí)行 eventHandler, 也就是調(diào)用 BatchEventProcessor.run 方法
問(wèn)題來(lái)了,既然是異步執(zhí)行,多個(gè) eventHandler 是怎么按照順序去處理事件消息的?
我們看 processEvents 方法執(zhí)行邏輯
先獲取 BatchEventProcessor.sequence 并 +1
通過(guò) sequenceBarrier.waitFor 也就是 WaitStrategy.waitFor 獲取到可用的 availableSequence
先看下 BlockingWaitStrategy.waitFor 的實(shí)現(xiàn)
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException { long availableSequence; if (cursorSequence.get() < sequence) { lock.lock(); try { while (cursorSequence.get() < sequence) { barrier.checkAlert(); processorNotifyCondition.await(); } } finally { lock.unlock(); } } while ((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); } return availableSequence; }
如果 cursorSequence(ringbuffer 的索引) < sequence(batchEventProcessor 的索引) 則batchEventProcessor掛起等待 否則 就用 dependentSequence 作為 availableSequence 返回 然后 batchEventProcessor 會(huì)將 availableSequence 索引之前的數(shù)據(jù)一次性處理完,并更新自身的 sequence 索引值
dependentSequence 由 ProcessingSequenceBarrier 構(gòu)造方法初始化
final class ProcessingSequenceBarrier implements SequenceBarrier { private final WaitStrategy waitStrategy; private final Sequence dependentSequence; private volatile boolean alerted = false; private final Sequence cursorSequence; private final Sequencer sequencer; ProcessingSequenceBarrier(final Sequencer sequencer, final WaitStrategy waitStrategy, final Sequence cursorSequence, final Sequence[] dependentSequences) { this.sequencer = sequencer; this.waitStrategy = waitStrategy; this.cursorSequence = cursorSequence; if (0 == dependentSequences.length) { dependentSequence = cursorSequence; } else { dependentSequence = new FixedSequenceGroup(dependentSequences); } } }
在 Disruptor.createEventProcessors 中的, 進(jìn)行了初始化 ProcessingSequenceBarrier final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences)
createEventProcessors 僅會(huì)被 Disruptor.handleEventsWith 和 EventHandlerGroup.handleEventsWith
public class Disruptor<T> { public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) { return createEventProcessors(new Sequence[0], handlers); } EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences, final EventHandler<? super T>[] eventHandlers) { checkNotStarted(); final Sequence[] processorSequences = new Sequence[eventHandlers.length]; final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) { final EventHandler<? super T> eventHandler = eventHandlers[i]; final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler); if (exceptionHandler != null) { batchEventProcessor.setExceptionHandler(exceptionHandler); } consumerRepository.add(batchEventProcessor, eventHandler, barrier); processorSequences[i] = batchEventProcessor.getSequence(); } updateGatingSequencesForNextInChain(barrierSequences, processorSequences); return new EventHandlerGroup<>(this, consumerRepository, processorSequences); } } public class EventHandlerGroup<T> { private final Disruptor<T> disruptor; private final ConsumerRepository<T> consumerRepository; private final Sequence[] sequences; EventHandlerGroup(final Disruptor<T> disruptor, final ConsumerRepository<T> consumerRepository, final Sequence[] sequences) { this.disruptor = disruptor; this.consumerRepository = consumerRepository; this.sequences = Arrays.copyOf(sequences, sequences.length); } public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) { return disruptor.createEventProcessors(sequences, handlers); } public final EventHandlerGroup<T> then(final EventHandler<? super T>... handlers) { return handleEventsWith(handlers); } }
EventHandlerGroup 會(huì)拷貝一份 batchEventProcessor 中的 sequence demo 例子中 disruptor.handleEventsWith(TestHandler).then
(ThenHandler) 通過(guò) then 方法將 TestHandler 中的 sequence 傳遞給 ThenHandler 這樣 ThenHandler 就依賴了 TestHandler, ThenHandler 就會(huì)在 TestHandler 后執(zhí)行
接著看 disruptor.publishEvent(translator, i)
就是往 ringBuffer 里面放數(shù)據(jù),
public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0) { final long sequence = sequencer.next(); translateAndPublish(translator, sequence, arg0); } private <A> void translateAndPublish(EventTranslatorOneArg<E, A> translator, long sequence, A arg0) { try { translator.translateTo(get(sequence), sequence, arg0); } finally { sequencer.publish(sequence); } } public E get(long sequence) { return elementAt(sequence); }
get(sequence) 根據(jù) sequence [ringbuffer 索引] 獲取 ringbuffer 數(shù)組里的對(duì)象 translator 將其處理替換完后,ringbuffer 數(shù)組的的值將是新的值,publish 將會(huì)更新索引的標(biāo)記位 waitStrategy.signalAllWhenBlocking() 會(huì)通知阻塞等待的消費(fèi)者去繼續(xù)消費(fèi)消息
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); @Override public void publish(long sequence) { cursor.set(sequence); waitStrategy.signalAllWhenBlocking(); }
流程理清楚了,我們看看 知識(shí)點(diǎn)
ringbuffer
內(nèi)存使用率很高,不會(huì)造成內(nèi)存碎片,幾乎沒(méi)有浪費(fèi)。業(yè)務(wù)處理的同一時(shí)間,訪問(wèn)的內(nèi)存數(shù)據(jù)段集中。 可以更好的適應(yīng)不同系統(tǒng),取得較高的性能。內(nèi)存的物理布局簡(jiǎn)單單一,不太容易發(fā)生內(nèi)存越界、懸空指針等 bug,出了問(wèn)題也容易在內(nèi)存級(jí)別分析調(diào)試。 做出來(lái)的系統(tǒng)容易保持健壯。
cpu cache
CPU 訪問(wèn)內(nèi)存時(shí)會(huì)等待,導(dǎo)致計(jì)算資源大量閑置,降低 CPU 整體吞吐量。 由于內(nèi)存數(shù)據(jù)訪問(wèn)的熱點(diǎn)集中性,在 CPU 和內(nèi)存之間用較為快速而成本較高(相對(duì)于內(nèi)存)的介質(zhì)做一層緩存,就顯得性價(jià)比極高了
看完上述內(nèi)容是否對(duì)您有幫助呢?如果還想對(duì)相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝您對(duì)億速云的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。