您好,登錄后才能下訂單哦!
這篇文章主要介紹了Disruptor、Kafka、Netty如何整合,具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
整個(gè)網(wǎng)關(guān)的核心是一個(gè)netty server,各個(gè)應(yīng)用程序(包括web server,手機(jī)app等)連到這個(gè)netty server上請(qǐng)求數(shù)據(jù);關(guān)于數(shù)據(jù)來(lái)源,需要監(jiān)聽(tīng)多個(gè)kafka topic(而且這里的topic是可變的,也就是說(shuō)需要kafka consumer的動(dòng)態(tài)開(kāi)始和停止),之后需要把所有這些topic的數(shù)據(jù)整合在一起,通過(guò)channel發(fā)送給客戶端應(yīng)用程序。
下面把大部分的代碼貼出來(lái),有需要的同學(xué)可以參考。會(huì)對(duì)關(guān)鍵的技術(shù)點(diǎn)進(jìn)行說(shuō)明,偏業(yè)務(wù)部分大家自行忽略吧。
啟動(dòng)disruptor;監(jiān)聽(tīng)一個(gè)固定的topic,把獲取到的msg,交給ConsumerProcessorGroup來(lái)完成kafka consumer的創(chuàng)建和停止。
public static void main(String[] args) {
DisruptorHelper.getInstance().start();
Properties props = ConsumerProps.getConsumerProps();
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("uavlst"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
ConsumerRecord<String, String> lastRecord = null;
for (ConsumerRecord<String, String> record : records)
lastRecord = record;
if (lastRecord != null){
ConsumerProcessorGroup.getInstance().recieveNewUavLst(lastRecord.value());
}
}
}
DisruptorHelper是一個(gè)單例,主要是包含了一個(gè)disruptor 對(duì)象,在new這個(gè)對(duì)象的時(shí)候,用到了ProducerType.MULTI和new BlockingWaitStrategy(),其中前者意味著我們需要多個(gè)producer共同來(lái)工作,后者其實(shí)是默認(rèn)的producer的等待策略,后續(xù)根據(jù)實(shí)際情況進(jìn)行調(diào)整。
public class DisruptorHelper {
private static DisruptorHelper instance = null;
public static DisruptorHelper getInstance() {
if (instance == null) {
instance = new DisruptorHelper();
}
return instance;
}
private final int BUFFER_SIZE = 1024;
private Disruptor<MsgEvent> disruptor = null;
private DisruptorHelper() {
MsgEventHandler eventHandler = new MsgEventHandler();
disruptor = new Disruptor(new MsgEventFactory(), BUFFER_SIZE, new ConsumerThreadFactory(), ProducerType.MULTI, new BlockingWaitStrategy());
disruptor.handleEventsWith(eventHandler);
}
public void start() {
disruptor.start();
}
public void shutdown() {
disruptor.shutdown();
}
public void produce(ConsumerRecord<String, String> record) {
RingBuffer<MsgEvent> ringBuffer = disruptor.getRingBuffer();
long sequence = ringBuffer.next();
try {
ringBuffer.get(sequence).setRecord(record);
} finally {
ringBuffer.publish(sequence);
}
}
}
ConsumerProcessorGroup是一個(gè)單例,當(dāng)中包含一個(gè)fixedThreadPool,動(dòng)態(tài)的啟動(dòng)線程來(lái)進(jìn)行kafka topic的消費(fèi)。
public class ConsumerProcessorGroup {
private static ConsumerProcessorGroup instance = null;
public static ConsumerProcessorGroup getInstance(){
if (instance == null){
instance = new ConsumerProcessorGroup();
}
return instance;
}
private ConsumerProcessorGroup() {
}
private ExecutorService fixedThreadPool = Executors.newFixedThreadPool(20);
public List<String> uavIDLst = new Vector<String>();
public void recieveNewUavLst(String uavIDs){
List<String> newUavIDs = Arrays.asList(uavIDs.split(","));
for (String uavID : newUavIDs){
if (!uavIDLst.contains(uavID)){
fixedThreadPool.execute(new ConsumerThread(uavID));
uavIDLst.add(uavID);
}
}
List<String> tmpLstForDel = new ArrayList<String>();
for (String uavID : uavIDLst){
if (!newUavIDs.contains(uavID)){
tmpLstForDel.add(uavID);
}
}
uavIDLst.removeAll(tmpLstForDel);
}
}
對(duì)kafka topic進(jìn)行消費(fèi),通過(guò)DisruptorHelper將獲取的record寫(xiě)入disruptor的ring buffer當(dāng)中。
public class ConsumerThread implements Runnable {
private String uavID;
public ConsumerThread(String uavID) {
this.uavID = uavID;
}
public void run() {
Properties props = ConsumerProps.getConsumerProps();
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(uavID));
System.out.println(uavID + " consumer started! Current thread id is " + Thread.currentThread().getId());
while (ConsumerProcessorGroup.getInstance().uavIDLst.contains(uavID)) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
DisruptorHelper.getInstance().produce(record);
}
}
System.out.println(uavID + " consumer finished! Current thread id is " + Thread.currentThread().getId());
}
}
Disruptor的消費(fèi)者,依次從Ring Buffer當(dāng)中讀取數(shù)據(jù)并執(zhí)行相應(yīng)的處理。
public class MsgEventHandler implements EventHandler<MsgEvent> {
private Map<Integer, String> converterMap;
public void onEvent(MsgEvent event, long sequence, boolean endOfBatch) throws Exception {
ConsumerRecord<String, String> record = event.getRecord();
System.out.printf("topic = %s, part = %d, offset = %d, key = %s, value = %s \n\r", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“Disruptor、Kafka、Netty如何整合”這篇文章對(duì)大家有幫助,同時(shí)也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,更多相關(guān)知識(shí)等著你來(lái)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。