溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何使用KafkaAPI-ProducerAPI

發(fā)布時(shí)間:2021-10-13 14:24:20 來源:億速云 閱讀:126 作者:iii 欄目:編程語言

這篇文章主要講解了“如何使用KafkaAPI-ProducerAPI”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“如何使用KafkaAPI-ProducerAPI”吧!

1.消息發(fā)送流程

    Kafka 的 Producer 發(fā)送消息采用的是異步發(fā)送的方式。在消息發(fā)送的過程中,涉及到了兩個線程——main 線程和 Sender 線程,以及一個線程共享變量——RecordAccumulator。main 線程將消息發(fā)送給 RecordAccumulator, Sender 線程不斷從 RecordAccumulator 中拉取消息發(fā)送到 Kafka broker。

如何使用KafkaAPI-ProducerAPI

相關(guān)參數(shù):
batch.size: 只有數(shù)據(jù)積累到 batch.size 之后, sender 才會發(fā)送數(shù)據(jù)。
linger.ms: 如果數(shù)據(jù)遲遲未達(dá)到 batch.size, sender 等待 linger.time 之后就會發(fā)送數(shù)據(jù)。

2.異步發(fā)送API

1)導(dǎo)入依賴
 

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.7.0</version>
        </dependency>

2)編寫代碼

需要用到的類:
KafkaProducer:需要創(chuàng)建一個生產(chǎn)者對象,用來發(fā)送數(shù)據(jù)
ProducerConfig:獲取所需的一系列配置參數(shù)
ProducerRecord:每條數(shù)據(jù)都要封裝成一個 ProducerRecord 對象

2.1). 不帶回調(diào)函數(shù)的API
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class MyProducer {
    public static void main(String[] args) {
        //生產(chǎn)者配置信息可以從ProducerConfig中取Key
         //1.創(chuàng)建kafka生產(chǎn)者的配置信息
        Properties properties=new Properties();
        //2.指定連接的kafka集群
        //properties.put("bootstrap.servers","192.168.1.106:9091");
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.106:9091");
        //3.ACK應(yīng)答級別
        //properties.put("acks","all");
        properties.put(ProducerConfig.ACKS_CONFIG,"all");
        //4.重試次數(shù)
        //properties.put("retries",3);
        properties.put(ProducerConfig.RETRIES_CONFIG,3);
        //5.批次大小 16k
        //properties.put("batch.size",16384);
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
        //6.等待時(shí)間
        //properties.put("linger.ms",1);
        properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
        //7.RecordAccumulator 緩沖區(qū)大小 32M
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
        //properties.put("buffer.memory",33554432);
        //8.Key,Value 的序列化類
        //properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //9.創(chuàng)建生產(chǎn)者對象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        //10.發(fā)送數(shù)據(jù)
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("first","atguigu--"+i);
            producer.send(producerRecord);
        }
        //11.關(guān)閉資源
        producer.close();
    }
}
2.2) 帶回調(diào)函數(shù)的API

    回調(diào)函數(shù)會在 producer 收到 ack 時(shí)調(diào)用,為異步調(diào)用, 該方法有兩個參數(shù),分別是RecordMetadata 和 Exception,如果 Exception 為 null,說明消息發(fā)送成功,如果Exception 不為 null,說明消息發(fā)送失敗。
    注意:消息發(fā)送失敗會自動重試,不需要我們在回調(diào)函數(shù)中手動重試。

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class CallBackProducer {
    public static void main(String[] args) {
        //生產(chǎn)者配置信息可以從ProducerConfig中取Key
        Properties properties=new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.106:9091,192.168.1.106:9092,192.168.1.106:9093");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        //創(chuàng)建生產(chǎn)者對象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        /*創(chuàng)建topic
        /opt/kafka/kafka03/bin/kafka-topics.sh  --create --zookeeper 192.168.1.106:2181,192.168.1.106:2182,192.168.1.106:2183  --replication-factor 3  --partitions 2 --topic aaa
        * */

        //發(fā)送數(shù)據(jù)
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("bbb","d","bbb-atguigu++"+i);
            producer.send(producerRecord, (recordMetadata, e) -> {
                if (e==null){
                    System.out.println("aaa  "+recordMetadata.partition()+ "--"+recordMetadata.offset());
                }else {
                    e.printStackTrace();
                }
            });
        }
        //11.關(guān)閉資源
        producer.close();
    }
}

3.    同步發(fā)送API

    同步發(fā)送的意思就是,一條消息發(fā)送之后,會阻塞當(dāng)前線程, 直至返回 ack。由于 send 方法返回的是一個 Future 對象,根據(jù) Futrue 對象的特點(diǎn),我們也可以實(shí)現(xiàn)同步發(fā)送的效果,只需在調(diào)用 Future 對象的 get 方發(fā)即可、

        //10.發(fā)送數(shù)據(jù)
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("first","atguigu--"+i);
            producer.send(producerRecord).get();
        }

4.自定義分區(qū)器

默認(rèn)分區(qū)策略源碼:

org.apache.kafka.clients.producer.internals.DefaultPartitioner

1.1.    自定義分區(qū)器代碼:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;

import java.util.List;
import java.util.Map;

public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        /*自定義分區(qū)規(guī)則*/
        List<PartitionInfo> partitionInfos = cluster.availablePartitionsForTopic(topic);
        Integer integer =partitionInfos.size();
        return key.toString().hashCode()%integer;
        /*指定分區(qū)*/
       /* return 1;*/
    }
    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> map) {
    }
}

1.2.    生產(chǎn)者使用自定義分區(qū)器

//配置方法
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.zhl.kafkademo.partitioner.MyPartitioner");

完整代碼:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class PartitionProducer {
    public static void main(String[] args) {
        //生產(chǎn)者配置信息可以從ProducerConfig中取Key
        Properties properties=new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.106:9091,192.168.1.106:9092,192.168.1.106:9093");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //配置分區(qū)器的全類名 partitioner.class
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.zhl.kafkademo.partitioner.MyPartitioner");
        //創(chuàng)建生產(chǎn)者對象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        //發(fā)送數(shù)據(jù)
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("bbb","d","bbb-atguigu++"+i);
            producer.send(producerRecord, (recordMetadata, e) -> {
                if (e==null){
                    System.out.println(recordMetadata.topic()+"--"+ recordMetadata.partition()+ "--"+recordMetadata.offset());
                }else {
                    e.printStackTrace();
                }
            });
        }
        //11.關(guān)閉資源
        producer.close();
    }
}

感謝各位的閱讀,以上就是“如何使用KafkaAPI-ProducerAPI”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對如何使用KafkaAPI-ProducerAPI這一問題有了更深刻的體會,具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識點(diǎn)的文章,歡迎關(guān)注!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI