您好,登錄后才能下訂單哦!
Kafka producer如何使用?針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。
一、producer工作流程
producer使用用戶啟動producer的線程,將待發(fā)送的消息封裝到一個ProducerRecord類實例,然后將其序列化之后發(fā)送給partitioner,再由后者確定目標分區(qū)后一同發(fā)送到位于producer程序中的一塊內(nèi)存緩沖區(qū)中。而producer的另外一個線程(Sender線程)則負責實時從該緩沖區(qū)中提取出準備就緒的消息封裝進一個批次(batch),統(tǒng)一發(fā)送給對應的broker,具體流程如下圖:
二、producer示例程序開發(fā)
首先引入kafka相關依賴,在pom.xml文件中加入如下依賴:
<!--kafka--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>2.2.0</version> </dependency>
在resources下面創(chuàng)建kafka-producer.properties配置文件,用于設置kafka參數(shù),內(nèi)容如下:
bootstrap.servers=192.168.184.128:9092,192.168.184.128:9093,192.168.184.128:9094 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer acks=-1 retries=3 batch.size=323840 linger.ms=10 buffer.memory=33554432 max.block.ms=3000
其中,前三個參數(shù)必須明確指定,因為這三個參數(shù)沒有默認值(注:kafka的producer參數(shù)配置可以參考http://kafka.apache.org/documentation/),然后編寫producer發(fā)送消息的代碼:
/** * Kafka發(fā)送消息測試 * @throws IOException */ public void sendMsg() throws IOException { //1.構(gòu)造properties對象 Properties properties = new Properties(); FileInputStream fileInputStream = new FileInputStream("F:\\javaCode\\jvmdemo\\src\\main\\resources\\kafka-producer.properties"); properties.load(fileInputStream); fileInputStream.close(); //2.構(gòu)造kafkaProducer對象 KafkaProducer producer = new KafkaProducer(properties); for (int i = 0; i < 100; i++) { //3.構(gòu)造待發(fā)送消息的producerRecord對象,并指定消息要發(fā)送到哪個topic,消息的key和value ProducerRecord testTopic = new ProducerRecord("testTopic", Integer.toString(i), Integer.toString(i)); //4.調(diào)用kafkaProducer對象的send方法發(fā)送消息 producer.send(testTopic); } //5.關閉kafkaProducer producer.close(); }
然后登陸kafka所在服務器,執(zhí)行以下命令監(jiān)聽消息:
cd /usr/local/kafka/bin
./kafka-console-consumer.sh --bootstrap-server 192.168.184.128:9092,192.168.184.128:9093,192.168.184.128:9094 --topic testTopic --from-beginning
運行sendMsg方法,注意觀察消費端,
可以看到有0-99之間的數(shù)字依次被消費到,說明消息發(fā)送成功。
三、異步和同步發(fā)送消息
上面發(fā)送消息的示例程序中,沒有對發(fā)送結(jié)果進行處理,如果消息發(fā)送失敗我們也是無法得知的,這種方法在實際應用中是不推薦的。在實際使用場景中,一般使用異步和同步兩種常見發(fā)送方式。Java版本producer的send方法會返回一個Future對象,如果調(diào)用Future.get()方法就會無限等待返回結(jié)果,實現(xiàn)同步發(fā)送的效果,否則就是異步發(fā)送。
1.異步發(fā)送消息
Java版本producer的send()方法提供了回調(diào)類參數(shù)來實現(xiàn)異步發(fā)送以及對發(fā)送結(jié)果進行的響應,具體代碼如下:
/** * 異步發(fā)送消息 * * @throws IOException */ public void sendMsg() throws IOException { //1.構(gòu)造properties對象 Properties properties = new Properties(); FileInputStream fileInputStream = new FileInputStream("F:\\javaCode\\jvmdemo\\src\\main\\resources\\kafka-producer.properties"); properties.load(fileInputStream); fileInputStream.close(); //2.構(gòu)造kafkaProducer對象 KafkaProducer producer = new KafkaProducer(properties); for (int i = 0; i < 100; i++) { //3.構(gòu)造待發(fā)送消息的producerRecord對象,并指定消息要發(fā)送到哪個topic,消息的key和value ProducerRecord testTopic = new ProducerRecord("testTopic", Integer.toString(i), Integer.toString(i)); //4.調(diào)用kafkaProducer對象的send方法發(fā)送消息,傳入Callback回調(diào)參數(shù) producer.send(testTopic, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception exception) { if (null == exception) { //消息發(fā)送成功后的處理 System.out.println("消息發(fā)送成功"); } else { //消息發(fā)送失敗后的處理 System.out.println("消息發(fā)送失敗"); } } }); } //5.關閉kafkaProducer producer.close(); }
以上代碼中,send方法第二個參數(shù)傳入一個匿名內(nèi)部類對象,也可以傳入實現(xiàn)了org.apache.kafka.clients.producer.Callback接口的類對象。同時onCompletion方法的兩個入?yún)ecordMetadata和exception不會同時為空,當消息發(fā)送成功后,exception為null,消息發(fā)送失敗后recordMetadata為null。因此可以按照兩個入?yún)⑦M行成功和失敗邏輯的處理。
其次,Kafka發(fā)送消息失敗的類型包含兩類,可重試異常和不可重試異常。所有的可重試異常都繼承自org.apache.kafka.common.errors.RetriableException抽象類,理論上所有沒有繼承RetriableException 類的其他異常都屬于不可重試異常,鑒于此,可以在消息發(fā)送失敗后,按照是否可以重試,來進行不同的處理邏輯處理:
//4.調(diào)用kafkaProducer對象的send方法發(fā)送消息 producer.send(testTopic, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception exception) { if (null == exception) { //消息發(fā)送成功后的處理 System.out.println("消息發(fā)送成功"); } else { if(exception instanceof RetriableException){ // 可重試異常 System.out.println("可重試異常"); }else{ // 不可重試異常 System.out.println("不可重試異常"); } } } });
2.同步發(fā)送消息
同步發(fā)送和異步發(fā)送是通過Java的Futrue來區(qū)分的,調(diào)用Future.get()無限等待結(jié)果返回,即實現(xiàn)了同步發(fā)送的結(jié)果,具體代碼如下:
// 發(fā)送消息 Future future = producer.send(testTopic); try { // 調(diào)用get方法等待結(jié)果返回,發(fā)送失敗則會拋出異常 future.get(); } catch (Exception e) { System.out.println("消息發(fā)送失敗"); }
四、其他高級特性
1.消息分區(qū)機制
kafka producer提供了分區(qū)策略以及分區(qū)器(partitioner)用于確定將消息發(fā)送到指定topic的哪個分區(qū)中。默認分區(qū)器根據(jù)murmur2算法計算消息key的哈希值,然后對總分區(qū)數(shù)求模確認消息要被發(fā)送的目標分區(qū)號(這點讓我想起了redis集群中key值的分配方法),這樣就確保了相同key的消息被發(fā)送到相同分區(qū)。若消息沒有key值,將采用輪詢的方式確保消息在topic的所有分區(qū)上均勻分配。
除了使用kafka默認的分區(qū)機制,也可以通過實現(xiàn)org.apache.kafka.clients.producer.Partitioner接口來自定義分區(qū)器,此時需要在構(gòu)造KafkaProducer的 properties中增加partitioner.class來指明分區(qū)器實現(xiàn)類,如:partitioner.class=com.demo.service.CustomerPartitioner。
2.消息序列化
在本篇開始的producer示例程序中,在構(gòu)造KafkaProducer對象的時候,有兩個配置項
分別用于配置消息key和value的序列化方式為String類型,除此之外,Kafka中還提供了如下默認的序列化器:
ByteArraySerializer:本質(zhì)上什么也不做,因為網(wǎng)絡中傳輸就是以字節(jié)傳輸?shù)模?/p>
ByteBufferSerializer:序列化ByteBuffer消息;
BytesSerializer:序列化kafka自定義的Bytes類型;
IntegerSerializer:序列化Integer類型;
DoubleSerializer:序列化Double類型;
LongSerializer:序列化Long類型;
如果要自定義序列化器,則需要實現(xiàn)org.apache.kafka.common.serialization.Serializer接口,并且將key.serializer和value.serializer配置為自定義的序列化器。
3.消息壓縮
消息壓縮可以顯著降低磁盤占用以及帶寬占用,從而有效提升I/O密集型應用性能,但是引入壓縮同時會消耗額外的CPU,因此壓縮是I/O性能和CPU資源的平衡。kafka目前支持3種壓縮算法:CZIP,Snappy和LZ4,性能測試結(jié)果顯示三種壓縮算法的性能如下:LZ4>>Snappy>GZIP,目前啟用LZ4進行消息壓縮的producer的吞吐量是最高的。
默認情況下Kafka是不壓縮消息的,但是可以通過在創(chuàng)建KafkaProducer 對象的時候設置producer端參數(shù)compression.type來開啟消息壓縮,如配置compression.type=LZ4。那么什么時候開啟壓縮呢?首先判斷是否啟用壓縮的依據(jù)是I/O資源消耗與CPU資源消耗的對比,如果環(huán)境上I/O資源非常緊張,比如producer程序占用了大量的網(wǎng)絡帶寬或broker端的磁盤占用率很高,而producer端的CPU資源非常富裕,那么就可以考慮為producer開啟壓縮。
4.無消息丟失配置
在使用KafkaProducer.send()方法發(fā)送消息的時候,其實是把消息放入緩沖區(qū)中,再由一個專屬I/O線程負責從緩沖區(qū)提取消息并封裝消息到batch中,然后再發(fā)送出去。如果在I/O線程將消息發(fā)送出去之前,producer奔潰了,那么所有的消息都將丟失。同時,存在多消息發(fā)送時候由于網(wǎng)絡抖動導致消息亂序的問題,為了解決這兩個問題,可以通過在producer端以及broker端進行配置進行避免。
4.1 producer端配置
max.block.ms=3000:設置block的時長,當緩沖區(qū)被填滿或者metadata丟失時產(chǎn)生block,停止接收新的消息;
acks=all:等待所有follower都響應了發(fā)送消息認為消息發(fā)送成功;
retries=Integer.MAX_VALUE:設置重試次數(shù),設置一個比較大的值可以保證消息不丟失;
max.in.flight.requests.per.connection=1:限制producer在單個broker連接上能夠發(fā)送的未響應請求的數(shù)量,從而防止同topic統(tǒng)一分區(qū)下消息亂序問題;
除了設置以上參數(shù)之外,在發(fā)送消息的時候,應該盡量使用帶有回調(diào)參數(shù)的send方法來處理發(fā)送結(jié)果,如果數(shù)據(jù)發(fā)送失敗,則顯示調(diào)用KafkaProducer.close(0)方法來立即關閉producer,防止消息亂序。
4.2 broker端配置
unclean.leader.election.enable=false:關閉unclean leader選舉,即不允許非ISR中的副本被選舉為leader;
replication.factor>=3:至少使用3個副本保存數(shù)據(jù);
min.issync.replicas>1:控制某條消息至少被寫入到ISR中多少個副本才算成功,當且僅當producer端acks參數(shù)設置為all或者-1時,該參數(shù)才有效。
最后,確保replication.factor>min.issync.replicas,如果兩者相等,那么只要有一個副本掛掉,分區(qū)就無法工作,推薦配置replication.factor=min.issync.replicas+1。
關于producer端的開發(fā)就介紹到這兒,下一篇將介紹consumer端的開發(fā)。
關于Kafka producer如何使用問題的解答就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業(yè)資訊頻道了解更多相關知識。
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。