溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Kafka Producer 攔截器

發(fā)布時(shí)間:2020-06-16 18:06:48 來源:網(wǎng)絡(luò) 閱讀:1621 作者:Java_老男孩 欄目:編程語言

Kafka中的攔截器(Interceptor)是0.10.x.x版本引入的一個(gè)功能,一共有兩種:Kafka Producer端的攔截器和Kafka Consumer端的攔截器。本篇主要講述的是Kafka Producer端的攔截器,它主要用來對(duì)消息進(jìn)行攔截或者修改,也可以用于Producer的Callback回調(diào)之前進(jìn)行相應(yīng)的預(yù)處理。

使用Kafka Producer端的攔截器非常簡單,主要是實(shí)現(xiàn)ProducerInterceptor接口,此接口包含4個(gè)方法:

    1. ProducerRecord<K, V> onSend(ProducerRecord<K, V> record):Producer在將消息序列化和分配分區(qū)之前會(huì)調(diào)用攔截器的這個(gè)方法來對(duì)消息進(jìn)行相應(yīng)的操作。一般來說最好不要修改消息ProducerRecord的topic、key以及partition等信息,如果要修改,也需確保對(duì)其有準(zhǔn)確的判斷,否則會(huì)與預(yù)想的效果出現(xiàn)偏差。比如修改key不僅會(huì)影響分區(qū)的計(jì)算,同樣也會(huì)影響B(tài)roker端日志壓縮(Log Compaction)的功能。
    1. void onAcknowledgement(RecordMetadata metadata, Exception exception):在消息被應(yīng)答(Acknowledgement)之前或者消息發(fā)送失敗時(shí)調(diào)用,優(yōu)先于用戶設(shè)定的Callback之前執(zhí)行。這個(gè)方法運(yùn)行在Producer的IO線程中,所以這個(gè)方法里實(shí)現(xiàn)的代碼邏輯越簡單越好,否則會(huì)影響消息的發(fā)送速率。
    1. void close():關(guān)閉當(dāng)前的攔截器,此方法主要用于執(zhí)行一些資源的清理工作。
    1. configure(Map<String, ?> configs):用來初始化此類的方法,這個(gè)是ProducerInterceptor接口的父接口Configurable中的方法。

一般情況下只需要關(guān)注并實(shí)現(xiàn)onSend或者onAcknowledgement方法即可。下面我們來舉個(gè)案例,通過onSend方法來過濾消息體為空的消息以及通過onAcknowledgement方法來計(jì)算發(fā)送消息的成功率。

public class ProducerInterceptorDemo implements ProducerInterceptor<String,String> {
    private volatile long sendSuccess = 0;
    private volatile long sendFailure = 0;

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        if(record.value().length()<=0)
            return null;
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            sendSuccess++;
        } else {
            sendFailure ++;
        }
    }

    @Override
    public void close() {
        double succe***atio = (double)sendSuccess / (sendFailure + sendSuccess);
        System.out.println("[INFO] 發(fā)送成功率="+String.format("%f", succe***atio * 100)+"%");
    }

    @Override
    public void configure(Map<String, ?> configs) {}
}

自定義的ProducerInterceptorDemo類實(shí)現(xiàn)之后就可以在Kafka Producer的主程序中指定,示例代碼如下:

public class ProducerMain {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "hidden-topic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("bootstrap.servers", brokerList);
        properties.put("interceptor.classes", "com.hidden.producer.ProducerInterceptorDemo");

        Producer<String, String> producer = new KafkaProducer<String, String>(properties);

        for(int i=0;i<100;i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, "msg-" + i);
            producer.send(producerRecord).get();
        }
        producer.close();
    }
}

Kafka Producer不僅可以指定一個(gè)攔截器,還可以指定多個(gè)攔截器以形成攔截鏈,這個(gè)攔截鏈會(huì)按照其中的攔截器的加入順序一一執(zhí)行。比如上面的程序多添加一個(gè)攔截器,示例如下:

properties.put("interceptor.classes", "com.hidden.producer.ProducerInterceptorDemo,com.hidden.producer.ProducerInterceptorDemoPlus");1

這樣Kafka Producer會(huì)先執(zhí)行攔截器ProducerInterceptorDemo,之后再執(zhí)行ProducerInterceptorDemoPlus。

有關(guān)interceptor.classes參數(shù),在kafka 1.0.0版本中的定義如下:

NAME DESCRIPTION TYPE DEFAULT VALID VALUES IMPORTANCE
interceptor.calssses A list of classes to use as interceptors. Implementing the org.apache.kafka.clients.producer.ProducerInterceptor interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there no interceptors. list null low

本文的重點(diǎn)是你有沒有收獲與成長,其余的都不重要,希望讀者們能謹(jǐn)記這一點(diǎn)。同時(shí)我經(jīng)過多年的收藏目前也算收集到了一套完整的學(xué)習(xí)資料,包括但不限于:分布式架構(gòu)、高可擴(kuò)展、高性能、高并發(fā)、Jvm性能調(diào)優(yōu)、Spring,MyBatis,Nginx源碼分析,Redis,ActiveMQ、、Mycat、Netty、Kafka、Mysql、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多個(gè)知識(shí)點(diǎn)高級(jí)進(jìn)階干貨,希望對(duì)想成為架構(gòu)師的朋友有一定的參考和幫助

需要更詳細(xì)思維導(dǎo)圖和以下資料的可以加一下技術(shù)交流分享群:“708 701 457”免費(fèi)獲取

Kafka Producer 攔截器
Kafka Producer 攔截器
Kafka Producer 攔截器
Kafka Producer 攔截器

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI