溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Disruptor-07 中有哪些代碼范例

發(fā)布時(shí)間:2021-06-21 17:51:48 來源:億速云 閱讀:128 作者:Leah 欄目:大數(shù)據(jù)

這篇文章將為大家詳細(xì)講解有關(guān)Disruptor-07 中有哪些代碼范例,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。

public class Test {

	private static Logger logger = LogManager.getLogger();

	@SuppressWarnings("unchecked")
	public static void main(String[] args) throws InterruptedException {

		// The factory for the event
		TestEventFactory factory = new TestEventFactory();

		// Specify the size of the ring buffer, must be power of 2.
		int bufferSize = 1024;

		// Construct the Disruptor
		Disruptor<TestEvent> disruptor = new Disruptor<TestEvent>(factory, bufferSize,
				DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy());

		// Create EventHandler
		TestEventHandler handler1 = new TestEventHandler("handler1");
		TestEventHandler handler2 = new TestEventHandler("handler2");
		TestEventHandler handler3 = new TestEventHandler("handler3");
		TestEventHandler handler4 = new TestEventHandler("handler4");

		// Connect the handler

		int count = 100;

		// Unicast采取WorkPool方式,3個(gè)WorkHandler 累計(jì)執(zhí)行100次。
		// Event到達(dá)時(shí),哪個(gè)WorkHandler被調(diào)度不確定。
		// disruptor.handleEventsWithWorkerPool(handler1, handler2, handler3);

		// MulticastTest并發(fā)處理方式,3個(gè)EventHandler,各執(zhí)行100次,累計(jì)300次;
		// 每個(gè)Event到達(dá)時(shí),EventHandler的處理順序不確定。
		// 并發(fā)( handler1, handler2, handler3)
		// EventHandler:handler1--85:k-v
		// EventHandler:handler3--85:k-v
		// EventHandler:handler2--85:k-v

		// EventHandler:handler1--86:k-v
		// EventHandler:handler3--86:k-v
		// EventHandler:handler2--86:k-v <-----并發(fā)順序不確定

		// EventHandler:handler2--87:k-v <-----并發(fā)順序不確定
		// EventHandler:handler1--87:k-v
		// EventHandler:handler3--87:k-v

		// EventHandler:handler3--88:k-v
		// EventHandler:handler2--88:k-v
		// EventHandler:handler1--88:k-v

		// disruptor.handleEventsWith(handler1, handler2, handler3);

		// Pipeline串行處理方式,3個(gè)EventHandler,各執(zhí)行100次,累計(jì)300次。
		// 每個(gè)Event到達(dá)時(shí),EventHandler的處理順序與handleEventsWith的順序一致。
		// 順序:handler1->handler2->handler3
		// EventHandler:handler1--97:k-v
		// EventHandler:handler2--97:k-v
		// EventHandler:handler3--97:k-v

		// EventHandler:handler1--98:k-v
		// EventHandler:handler2--98:k-v
		// EventHandler:handler3--98:k-v

		// EventHandler:handler1--99:k-v
		// EventHandler:handler2--99:k-v
		// EventHandler:handler3--99:k-v

		// EventHandler:handler1--100:k-v
		// EventHandler:handler2--100:k-v
		// EventHandler:handler3--100:k-v
		
		//disruptor.handleEventsWith(handler1).handleEventsWith(handler2).handleEventsWith(handler3);
		
		//Diamond
		//按照 handler1-> 并發(fā)(handler2, hander3) ->handler4 調(diào)度
		disruptor.handleEventsWith(handler1).handleEventsWith(handler2,handler3).handleEventsWith(handler4);

		// Start the Disruptor, starts all threads running
		disruptor.start();

		// Get the ring buffer from the Disruptor to be used for publishing.
		RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer();

		TestEventProducer producer = new TestEventProducer(ringBuffer);

		for (int i = 1; i <= count; i++) {
			producer.onEvent("k", "v");
			Thread.sleep(100);
		}

		Thread.sleep(10000);
	}

}
public class TestEvent implements Event {

	private String key;
	private String value;

	public String getKey() {
		return key;
	}

	public void setKey(String key) {
		this.key = key;
	}

	public String getValue() {
		return value;
	}

	public void setValue(String value) {
		this.value = value;
	}

}
package com.lands.disruptor.unicast;

import com.lmax.disruptor.EventFactory;

public class TestEventFactory implements EventFactory<TestEvent> {

	public TestEvent newInstance() {
		return new TestEvent();
	}

}
package com.lands.disruptor.unicast;

import java.util.concurrent.atomic.AtomicInteger;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;

public class TestEventHandler implements EventHandler<TestEvent>, WorkHandler<TestEvent> {

	private static Logger logger = LogManager.getLogger();

	private String handlerName;

	private AtomicInteger count = new AtomicInteger();

	public TestEventHandler(String name) {
		this.handlerName = name;
	}

	public String getHandlerName() {
		return handlerName;
	}

	public void onEvent(TestEvent event) throws Exception {

		logger.info("WorkHandler:" + this.handlerName + "-" + count.decrementAndGet() + ":" + event.getKey() + "-"
				+ event.getValue());

		//Thread.sleep(100);
	}

	public void onEvent(TestEvent event, long sequence, boolean endOfBatch) throws Exception {
		logger.info("EventHandler:" + this.handlerName + "-" + count.decrementAndGet() + ":" + event.getKey() + "-"
				+ event.getValue());
		//Thread.sleep(100);
	}

}
package com.lands.disruptor.unicast;

import com.lands.disruptor.EventProducer;
import com.lmax.disruptor.RingBuffer;

public class TestEventProducer extends EventProducer<TestEvent> {

	public TestEventProducer(RingBuffer<TestEvent> ringBuffer) {
		super(ringBuffer);
	}

	@Override
	public void process(TestEvent event, String... data) {

		event.setKey(data[0]);
		event.setValue(data[1]);

	}

}

關(guān)于Disruptor-07 中有哪些代碼范例就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI