溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Disruptor中怎么實(shí)現(xiàn)一個(gè)高性能隊(duì)列

發(fā)布時(shí)間:2021-06-21 16:57:40 來(lái)源:億速云 閱讀:109 作者:Leah 欄目:大數(shù)據(jù)

Disruptor中怎么實(shí)現(xiàn)一個(gè)高性能隊(duì)列,很多新手對(duì)此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來(lái)學(xué)習(xí)下,希望你能有所收獲。

Disruptor 例子

import java.util.concurrent.ThreadFactory
import com.lmax.disruptor.dsl.{Disruptor, ProducerType}
import com.lmax.disruptor.{BlockingWaitStrategy,EventFactory,EventHandler,EventTranslatorOneArg,WaitStrategy}

object DisruptorTest {

  val disruptor = {
    val factory = new EventFactory[Event] {
      override def newInstance(): Event = Event(-1)
    }

    val threadFactory = new ThreadFactory(){
      override def newThread(r: Runnable): Thread = new Thread(r)
    }
    
    val disruptor = new Disruptor[Event](factory, 4, threadFactory, ProducerType.SINGLE, 
                        new BlockingWaitStrategy())

    disruptor.handleEventsWith(TestHandler).`then`(ThenHandler)
    
    disruptor
  }
  
  val translator = new EventTranslatorOneArg[Event, Int]() {
    override def translateTo(event: Event, sequence: Long, arg: Int): Unit = {
      event.id = arg
      println(s"translator: ${event}, sequence: ${sequence}, arg: ${arg}")
    }
  }

  def main(args: Array[String]): Unit = {
    disruptor.start()
    (0 until 100).foreach { i =>
      disruptor.publishEvent(translator, i)
    }
    disruptor.shutdown()
  }
}

case class Event(var id: Int) {
  override def toString: String = s"event: ${id}"
}

object TestHandler extends EventHandler[Event] {
  override def onEvent(event: Event, sequence: Long, endOfBatch: Boolean): Unit = {
    println(s"${this.getClass.getSimpleName} ${System.nanoTime()} ${event}")
  }
}

object ThenHandler extends EventHandler[Event] {
  override def onEvent(event: Event, sequence: Long, endOfBatch: Boolean): Unit = {
    println(s"${this.getClass.getSimpleName} ${System.nanoTime()} ${event}")
  }
}

源碼閱讀

disrutpor 初始化

先看 Disruptor 構(gòu)造方法

public Disruptor(final EventFactory<T> eventFactory, 
  final int ringBufferSize, 
  final ThreadFactory threadFactory, 
  final ProducerType producerType,
  final WaitStrategy waitStrategy) {
    this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), 
        new BasicExecutor(threadFactory));
}

在看 RingBuffer.create, 最終通過(guò) fill 方法 將 eventFactory.newInstance() 作為默認(rèn)值,塞到 ringBuffer 里面

public static <E> RingBuffer<E> create(ProducerType producerType, 
  EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {
    switch (producerType) {
        case SINGLE:
            return createSingleProducer(factory, bufferSize, waitStrategy);
        case MULTI:
            return createMultiProducer(factory, bufferSize, waitStrategy);
        default:
            throw new IllegalStateException(producerType.toString());
    }
}

public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, 
    WaitStrategy waitStrategy) {
    SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);

    return new RingBuffer<E>(factory, sequencer);
}

RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {
    this.sequencer = sequencer;
    this.bufferSize = sequencer.getBufferSize();

    if (bufferSize < 1) {
        throw new IllegalArgumentException("bufferSize must not be less than 1");
    }
    if (Integer.bitCount(bufferSize) != 1) {
        throw new IllegalArgumentException("bufferSize must be a power of 2");
    }

    this.indexMask = bufferSize - 1;
    this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
    fill(eventFactory);
}

private void fill(EventFactory<E> eventFactory) {
    for (int i = 0; i < bufferSize; i++) {
        entries[BUFFER_PAD + i] = eventFactory.newInstance();
    }
}

消費(fèi)事件消息

首先看 disruptor.start(): 消費(fèi)事件消息入口

private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<>();

public RingBuffer<T> start() {
    checkOnlyStartedOnce();
    for (final ConsumerInfo consumerInfo : consumerRepository) {
        consumerInfo.start(executor);
    }

    return ringBuffer;
}

consumerRepository 類型由 disruptor.handleEventsWith(TestHandler) 初始化, 并構(gòu)造事件消息處理鏈

public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) {
    return createEventProcessors(new Sequence[0], handlers);
}

EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences, final EventHandler<? super T>[] eventHandlers) {
    checkNotStarted();

    final Sequence[] processorSequences = new Sequence[eventHandlers.length];
    final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);

    for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {
        final EventHandler<? super T> eventHandler = eventHandlers[i];

        final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);

        if (exceptionHandler != null) {
            batchEventProcessor.setExceptionHandler(exceptionHandler);
        }

        consumerRepository.add(batchEventProcessor, eventHandler, barrier);
        processorSequences[i] = batchEventProcessor.getSequence();
    }

    updateGatingSequencesForNextInChain(barrierSequences, processorSequences);

    return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}

回頭看 disruptor.start() 中的 consumerInfo.start(executor) executor = new BasicExecutor(threadFactory),BasicExecutor 在每次 execute 任務(wù)時(shí),都會(huì) new thread **但是 consumerRepository 的數(shù)量是有限的,所以 new thread 也沒(méi)啥問(wèn)題

public Disruptor(
        final EventFactory<T> eventFactory,
        final int ringBufferSize,
        final ThreadFactory threadFactory,
        final ProducerType producerType,
        final WaitStrategy waitStrategy) {
    this(
        RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
        new BasicExecutor(threadFactory));
}

private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor) {
    this.ringBuffer = ringBuffer;
    this.executor = executor;
}

@Override
public void start(final java.util.concurrent.Executor executor){
    //EventProcessor extends Runnable
    //executor = BasicExecutor 
    executor.execute(eventprocessor);
}

public final class BatchEventProcessor<T> implements EventProcessor {
  @Override
  public void run() {
      if (running.compareAndSet(IDLE, RUNNING)) {
          sequenceBarrier.clearAlert();

          notifyStart();
          try {
              if (running.get() == RUNNING) {
                  processEvents();
              }
          } finally {
              notifyShutdown();
              running.set(IDLE);
          }
      } else {
          if (running.get() == RUNNING) {
              throw new IllegalStateException("Thread is already running");
          } else {
              earlyExit();
          }
      }
  }
}

private void processEvents() {
    T event = null;
    long nextSequence = sequence.get() + 1L;

    while (true) {
        try {
            final long availableSequence = sequenceBarrier.waitFor(nextSequence);
            if (batchStartAware != null) {
                batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
            }

            while (nextSequence <= availableSequence) {
                event = dataProvider.get(nextSequence);
                eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                nextSequence++;
            }

            sequence.set(availableSequence);
        } catch (final TimeoutException e) {
            notifyTimeout(sequence.get());
        } catch (final AlertException ex) {
            if (running.get() != RUNNING) {
                break;
            }
        } catch (final Throwable ex) {
            exceptionHandler.handleEventException(ex, nextSequence, event);
            sequence.set(nextSequence);
            nextSequence++;
        }
    }
}

executor.execute 也就是 BasicExecutor.execute(eventHandler) 會(huì)異步的執(zhí)行 eventHandler, 也就是調(diào)用 BatchEventProcessor.run 方法

問(wèn)題來(lái)了,既然是異步執(zhí)行,多個(gè) eventHandler 是怎么按照順序去處理事件消息的?

我們看 processEvents 方法執(zhí)行邏輯

  1. 先獲取 BatchEventProcessor.sequence 并 +1

  2. 通過(guò) sequenceBarrier.waitFor 也就是 WaitStrategy.waitFor 獲取到可用的 availableSequence

  3. 先看下 BlockingWaitStrategy.waitFor 的實(shí)現(xiàn)

     public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, 
        SequenceBarrier barrier)
        throws AlertException, InterruptedException {
        long availableSequence;
        if (cursorSequence.get() < sequence) {
            lock.lock();
            try {
                while (cursorSequence.get() < sequence) {
                    barrier.checkAlert();
                    processorNotifyCondition.await();
                }
            }
            finally {
                lock.unlock();
            }
        }
    
        while ((availableSequence = dependentSequence.get()) < sequence) {
            barrier.checkAlert();
        }
    
        return availableSequence;
    }


    如果 cursorSequence(ringbuffer 的索引) < sequence(batchEventProcessor 的索引) 則batchEventProcessor掛起等待 否則 就用 dependentSequence 作為 availableSequence 返回 然后 batchEventProcessor 會(huì)將 availableSequence 索引之前的數(shù)據(jù)一次性處理完,并更新自身的 sequence 索引值

  4. dependentSequence 由 ProcessingSequenceBarrier 構(gòu)造方法初始化

    final class ProcessingSequenceBarrier implements SequenceBarrier {
        private final WaitStrategy waitStrategy;
        private final Sequence dependentSequence;
        private volatile boolean alerted = false;
        private final Sequence cursorSequence;
        private final Sequencer sequencer;
    
        ProcessingSequenceBarrier(final Sequencer sequencer, final WaitStrategy waitStrategy,
            final Sequence cursorSequence, final Sequence[] dependentSequences) {
            this.sequencer = sequencer;
            this.waitStrategy = waitStrategy;
            this.cursorSequence = cursorSequence;
            if (0 == dependentSequences.length) {
                dependentSequence = cursorSequence;
            } else {
                dependentSequence = new FixedSequenceGroup(dependentSequences);
            }
        }
    }


    在 Disruptor.createEventProcessors 中的, 進(jìn)行了初始化 ProcessingSequenceBarrier final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences) createEventProcessors 僅會(huì)被 Disruptor.handleEventsWithEventHandlerGroup.handleEventsWith

    public class Disruptor<T> {
        public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) {
            return createEventProcessors(new Sequence[0], handlers);
        }
    
        EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,
            final EventHandler<? super T>[] eventHandlers) {
            checkNotStarted();
    
            final Sequence[] processorSequences = new Sequence[eventHandlers.length];
            final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
    
            for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {
                final EventHandler<? super T> eventHandler = eventHandlers[i];
    
                final BatchEventProcessor<T> batchEventProcessor = 
                    new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);
    
                if (exceptionHandler != null) {
                    batchEventProcessor.setExceptionHandler(exceptionHandler);
                }
    
                consumerRepository.add(batchEventProcessor, eventHandler, barrier);
                processorSequences[i] = batchEventProcessor.getSequence();
            }
    
            updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
    
            return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
        }
    }
    
    public class EventHandlerGroup<T> {
        private final Disruptor<T> disruptor;
        private final ConsumerRepository<T> consumerRepository;
        private final Sequence[] sequences;
    
        EventHandlerGroup(final Disruptor<T> disruptor, final ConsumerRepository<T> consumerRepository,
            final Sequence[] sequences) {
            this.disruptor = disruptor;
            this.consumerRepository = consumerRepository;
            this.sequences = Arrays.copyOf(sequences, sequences.length);
        }
    
        public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) {
            return disruptor.createEventProcessors(sequences, handlers);
        }
    
        public final EventHandlerGroup<T> then(final EventHandler<? super T>... handlers) {
            return handleEventsWith(handlers);
        }
    }


    EventHandlerGroup 會(huì)拷貝一份 batchEventProcessor 中的 sequence demo 例子中 disruptor.handleEventsWith(TestHandler).then(ThenHandler) 通過(guò) then 方法將 TestHandler 中的 sequence 傳遞給 ThenHandler 這樣 ThenHandler 就依賴了 TestHandler, ThenHandler 就會(huì)在 TestHandler 后執(zhí)行

生產(chǎn)事件消息

接著看 disruptor.publishEvent(translator, i) 就是往 ringBuffer 里面放數(shù)據(jù),

public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0) {
    final long sequence = sequencer.next();
    translateAndPublish(translator, sequence, arg0);
}

private <A> void translateAndPublish(EventTranslatorOneArg<E, A> translator, long sequence, A arg0) {
    try {
        translator.translateTo(get(sequence), sequence, arg0);
    } finally {
        sequencer.publish(sequence);
    }
}

public E get(long sequence) {
    return elementAt(sequence);
}

get(sequence) 根據(jù) sequence [ringbuffer 索引] 獲取 ringbuffer 數(shù)組里的對(duì)象 translator 將其處理替換完后,ringbuffer 數(shù)組的的值將是新的值,publish 將會(huì)更新索引的標(biāo)記位 waitStrategy.signalAllWhenBlocking() 會(huì)通知阻塞等待的消費(fèi)者去繼續(xù)消費(fèi)消息

protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
@Override
public void publish(long sequence) {
    cursor.set(sequence);
    waitStrategy.signalAllWhenBlocking();
}

總結(jié)

流程理清楚了,我們看看 知識(shí)點(diǎn)

  • ringbuffer

    • 內(nèi)存使用率很高,不會(huì)造成內(nèi)存碎片,幾乎沒(méi)有浪費(fèi)。業(yè)務(wù)處理的同一時(shí)間,訪問(wèn)的內(nèi)存數(shù)據(jù)段集中。 可以更好的適應(yīng)不同系統(tǒng),取得較高的性能。內(nèi)存的物理布局簡(jiǎn)單單一,不太容易發(fā)生內(nèi)存越界、懸空指針等 bug,出了問(wèn)題也容易在內(nèi)存級(jí)別分析調(diào)試。 做出來(lái)的系統(tǒng)容易保持健壯。

  • cpu cache

    • CPU 訪問(wèn)內(nèi)存時(shí)會(huì)等待,導(dǎo)致計(jì)算資源大量閑置,降低 CPU 整體吞吐量。 由于內(nèi)存數(shù)據(jù)訪問(wèn)的熱點(diǎn)集中性,在 CPU 和內(nèi)存之間用較為快速而成本較高(相對(duì)于內(nèi)存)的介質(zhì)做一層緩存,就顯得性價(jià)比極高了

看完上述內(nèi)容是否對(duì)您有幫助呢?如果還想對(duì)相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝您對(duì)億速云的支持。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI