您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)Disruptor-07 中有哪些代碼范例,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。
public class Test { private static Logger logger = LogManager.getLogger(); @SuppressWarnings("unchecked") public static void main(String[] args) throws InterruptedException { // The factory for the event TestEventFactory factory = new TestEventFactory(); // Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024; // Construct the Disruptor Disruptor<TestEvent> disruptor = new Disruptor<TestEvent>(factory, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy()); // Create EventHandler TestEventHandler handler1 = new TestEventHandler("handler1"); TestEventHandler handler2 = new TestEventHandler("handler2"); TestEventHandler handler3 = new TestEventHandler("handler3"); TestEventHandler handler4 = new TestEventHandler("handler4"); // Connect the handler int count = 100; // Unicast采取WorkPool方式,3個(gè)WorkHandler 累計(jì)執(zhí)行100次。 // Event到達(dá)時(shí),哪個(gè)WorkHandler被調(diào)度不確定。 // disruptor.handleEventsWithWorkerPool(handler1, handler2, handler3); // MulticastTest并發(fā)處理方式,3個(gè)EventHandler,各執(zhí)行100次,累計(jì)300次; // 每個(gè)Event到達(dá)時(shí),EventHandler的處理順序不確定。 // 并發(fā)( handler1, handler2, handler3) // EventHandler:handler1--85:k-v // EventHandler:handler3--85:k-v // EventHandler:handler2--85:k-v // EventHandler:handler1--86:k-v // EventHandler:handler3--86:k-v // EventHandler:handler2--86:k-v <-----并發(fā)順序不確定 // EventHandler:handler2--87:k-v <-----并發(fā)順序不確定 // EventHandler:handler1--87:k-v // EventHandler:handler3--87:k-v // EventHandler:handler3--88:k-v // EventHandler:handler2--88:k-v // EventHandler:handler1--88:k-v // disruptor.handleEventsWith(handler1, handler2, handler3); // Pipeline串行處理方式,3個(gè)EventHandler,各執(zhí)行100次,累計(jì)300次。 // 每個(gè)Event到達(dá)時(shí),EventHandler的處理順序與handleEventsWith的順序一致。 // 順序:handler1->handler2->handler3 // EventHandler:handler1--97:k-v // EventHandler:handler2--97:k-v // EventHandler:handler3--97:k-v // EventHandler:handler1--98:k-v // EventHandler:handler2--98:k-v // EventHandler:handler3--98:k-v // EventHandler:handler1--99:k-v // EventHandler:handler2--99:k-v // EventHandler:handler3--99:k-v // EventHandler:handler1--100:k-v // EventHandler:handler2--100:k-v // EventHandler:handler3--100:k-v //disruptor.handleEventsWith(handler1).handleEventsWith(handler2).handleEventsWith(handler3); //Diamond //按照 handler1-> 并發(fā)(handler2, hander3) ->handler4 調(diào)度 disruptor.handleEventsWith(handler1).handleEventsWith(handler2,handler3).handleEventsWith(handler4); // Start the Disruptor, starts all threads running disruptor.start(); // Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer(); TestEventProducer producer = new TestEventProducer(ringBuffer); for (int i = 1; i <= count; i++) { producer.onEvent("k", "v"); Thread.sleep(100); } Thread.sleep(10000); } }
public class TestEvent implements Event { private String key; private String value; public String getKey() { return key; } public void setKey(String key) { this.key = key; } public String getValue() { return value; } public void setValue(String value) { this.value = value; } }
package com.lands.disruptor.unicast; import com.lmax.disruptor.EventFactory; public class TestEventFactory implements EventFactory<TestEvent> { public TestEvent newInstance() { return new TestEvent(); } }
package com.lands.disruptor.unicast; import java.util.concurrent.atomic.AtomicInteger; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler; public class TestEventHandler implements EventHandler<TestEvent>, WorkHandler<TestEvent> { private static Logger logger = LogManager.getLogger(); private String handlerName; private AtomicInteger count = new AtomicInteger(); public TestEventHandler(String name) { this.handlerName = name; } public String getHandlerName() { return handlerName; } public void onEvent(TestEvent event) throws Exception { logger.info("WorkHandler:" + this.handlerName + "-" + count.decrementAndGet() + ":" + event.getKey() + "-" + event.getValue()); //Thread.sleep(100); } public void onEvent(TestEvent event, long sequence, boolean endOfBatch) throws Exception { logger.info("EventHandler:" + this.handlerName + "-" + count.decrementAndGet() + ":" + event.getKey() + "-" + event.getValue()); //Thread.sleep(100); } }
package com.lands.disruptor.unicast; import com.lands.disruptor.EventProducer; import com.lmax.disruptor.RingBuffer; public class TestEventProducer extends EventProducer<TestEvent> { public TestEventProducer(RingBuffer<TestEvent> ringBuffer) { super(ringBuffer); } @Override public void process(TestEvent event, String... data) { event.setKey(data[0]); event.setValue(data[1]); } }
關(guān)于Disruptor-07 中有哪些代碼范例就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。