溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Kafka簡(jiǎn)單客戶端編程的示例分析

發(fā)布時(shí)間:2021-08-05 14:38:25 來(lái)源:億速云 閱讀:135 作者:小新 欄目:編程語(yǔ)言

這篇文章將為大家詳細(xì)講解有關(guān)Kafka簡(jiǎn)單客戶端編程的示例分析,小編覺(jué)得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。

一、創(chuàng)建配置類Config

這個(gè)類很簡(jiǎn)單,只是存放了兩個(gè)常量,一個(gè)是話題TOPIC,一個(gè)是線程數(shù)THREADS

package com.lya.kafka; 
 
/** 
 * 配置項(xiàng) 
 * @author liuyazhuang 
 * 
 */ 
public class Config { 
  
 /** 
  * 話題 
  */ 
 public static final String TOPIC = "wordcount"; 
 /** 
  * 線程數(shù) 
  */ 
 public static final Integer THREADS = 1; 
}

二、編程生產(chǎn)者類ProducerDemo

這個(gè)類的主要作用就是向Kafka寫(xiě)入相應(yīng)的消息,并且將消息寫(xiě)入wordcount話題。

package com.lya.kafka; 
 
import java.util.Properties; 
 
import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 
 
/** 
 * 生產(chǎn)者實(shí)例 
 * @author liuyazhuang 
 * 
 */ 
public class ProducerDemo { 
 public static void main(String[] args) throws Exception { 
  Properties props = new Properties(); 
  props.put("zk.connect", "192.168.209.121:2181"); 
  props.put("metadata.broker.list","192.168.209.121:9092"); 
  props.put("serializer.class", "kafka.serializer.StringEncoder"); 
  props.put("zk.connectiontimeout.ms", "15000"); 
  ProducerConfig config = new ProducerConfig(props); 
  Producer<String, String> producer = new Producer<String, String>(config); 
 
  // 發(fā)送業(yè)務(wù)消息 
  // 讀取文件 讀取內(nèi)存數(shù)據(jù)庫(kù) 讀socket端口 
  for (int i = 1; i <= 100; i++) { 
   Thread.sleep(500); 
   producer.send(new KeyedMessage<String, String>(Config.TOPIC, 
     "this number ===>>> " + i)); 
  } 
 
 } 
}

三、編寫(xiě)消息者類ConsumerDemo

這個(gè)類的主要作用就是消費(fèi)Kafka中wordcount話題的消息。

package com.lya.kafka; 
 
import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 
 
import kafka.consumer.Consumer; 
import kafka.consumer.ConsumerConfig; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 
import kafka.message.MessageAndMetadata; 
 
/** 
 * 消費(fèi)者實(shí)例 
 * @author liuyazhuang 
 * 
 */ 
public class ConsumerDemo { 
  
 
 public static void main(String[] args) { 
   
  Properties props = new Properties(); 
  props.put("zookeeper.connect", "192.168.209.121:2181"); 
  props.put("group.id", "1111"); 
  props.put("auto.offset.reset", "smallest"); 
  props.put("zk.connectiontimeout.ms", "15000"); 
 
  ConsumerConfig config = new ConsumerConfig(props); 
  ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config); 
  Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
  topicCountMap.put(Config.TOPIC, Config.THREADS); 
  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
  List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(Config.TOPIC); 
   
  for(final KafkaStream<byte[], byte[]> kafkaStream : streams){ 
   new Thread(new Runnable() { 
    @Override 
    public void run() { 
     for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){ 
      String msg = new String(mm.message()); 
      System.out.println(msg); 
     } 
    } 
    
   }).start(); 
   
  } 
 } 
}

四、運(yùn)行實(shí)例

首先,運(yùn)行消費(fèi)者類ConsumerDemo
運(yùn)行結(jié)果如下:

Kafka簡(jiǎn)單客戶端編程的示例分析

沒(méi)有打印任何信息。
此時(shí),我們運(yùn)行生產(chǎn)者類ProducerDemo
我們?cè)俅未蜷_(kāi)消費(fèi)者的控制臺(tái)查看如下:

Kafka簡(jiǎn)單客戶端編程的示例分析

打印出了生產(chǎn)者生產(chǎn)的消息。
至此,Kafka簡(jiǎn)單客戶端編程實(shí)例結(jié)束。

關(guān)于“Kafka簡(jiǎn)單客戶端編程的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺(jué)得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI