溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

kafka序列化器和攔截器怎么自定義使用

發(fā)布時(shí)間:2023-05-10 15:01:52 來源:億速云 閱讀:122 作者:iii 欄目:開發(fā)技術(shù)

本篇內(nèi)容主要講解“kafka序列化器和攔截器怎么自定義使用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“kafka序列化器和攔截器怎么自定義使用”吧!

    介紹

    序列化器是和數(shù)據(jù)在網(wǎng)絡(luò)中的傳輸有關(guān),數(shù)據(jù)在網(wǎng)絡(luò)中的傳輸為字節(jié)流,所以生產(chǎn)者在發(fā)送時(shí)需要將其序列化為字節(jié)流,消費(fèi)者收到消息時(shí),需要將字節(jié)流反序列化為我們能夠識別的對象,我們不難看出,這就是RPC通信,kafka中實(shí)現(xiàn)了很多自定義協(xié)議,我們知道,在RPC通信中,只有生產(chǎn)者和消費(fèi)者的協(xié)議一樣,才能相互傳輸和解析數(shù)據(jù),在使用HTTP時(shí),我們就不用去關(guān)注協(xié)議本身,因?yàn)镠TTP是TCP的上層建筑,它自己實(shí)現(xiàn)了一套協(xié)議,我們不用去關(guān)注,但是使用RPC,我們是面向TCP編程,所以自然得約定和實(shí)現(xiàn)自己的協(xié)議,而序列化就是這過程中很重要的一部分。

    攔截器是一個(gè)隨處可見的詞,基本上很多框架中都有攔截器機(jī)制,它的作用主要是對請求進(jìn)行攔截,我們可以對請求進(jìn)行過濾和處理,以達(dá)到業(yè)務(wù)目的,比如Spring中有HandlerInterceptor攔截器,在kafka種也有攔截器,我們可以自定義攔截器,對消息進(jìn)行攔截,比如某些異常消息我們不需要發(fā)送,那么就將其攔截下來。

    序列化器

    數(shù)據(jù)在網(wǎng)絡(luò)中傳輸是以字節(jié)流的形式進(jìn)行傳輸,在生產(chǎn)者端發(fā)送消息需要先進(jìn)行序列化,消費(fèi)者端進(jìn)行反序列化,序列化的方式有很多,比如jdk,json,protobuf,kryo,hessian,avro等等,在大數(shù)據(jù)量的傳輸中,序列化和反序列化的效率對吞吐量有一定的影響,kafka提供了許多序列化和反序列化器,如StringDeserializerStringSerializer,如果我們需要自定義一個(gè)序列化和反序列化器,那么實(shí)現(xiàn)Serializer,Deserializer接口即可。

    如下,kafka生產(chǎn)者在發(fā)送消息到broker之前需要序列化,消費(fèi)者從broker獲取消息后需要反序列化。

    kafka序列化器和攔截器怎么自定義使用

    設(shè)置序列化和反序列化

    生產(chǎn)者端設(shè)置序列化

    //序列化
    props.put("key.serializer", StringSerializer.class.getName());
    props.put("value.serializer", StringSerializer.class.getName());

    消費(fèi)者端設(shè)置反序列化

    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());

    自定義序列化

    /**
     * 功能說明: JSON序列化
     * <p>
     * Original @Author: steakliu , 2022-11-02  15:14
     */
    public class JsonSerializer<T> implements Serializer<T> {
    
      @Override
      public byte[] serialize(String topic, T obj) {
        try {
          return obj == null ? null : JSON.toJSONBytes(obj);
        }catch (Exception e){
          throw new SerializationException("json serializing exception");
        }
      }
      
    }

    自定義反序列化

    /**
     * 功能說明:JSON反序列化
     * <p>
     * Original @Author: steakliu-劉牌, 2022-11-11  09:38
     */
    public class  JsonDeserializer<T> implements Deserializer<T> {
      @Override
      public T deserialize(String topic, byte[] data) {
        return (T) JSON.parse(data);
      }
    }

    如上簡單的使用fastjson作為序列化和反序列化工具,演示了自定義kafka的序列化和反序列化機(jī)制,我們可以根據(jù)實(shí)際情況來設(shè)計(jì)不同的序列化反序列化機(jī)制,當(dāng)然,不會(huì)是像上面這些簡單,如果使用spring,那么spring提供了JSON序列化和反序列化器直接使用。

    思考

    雖然我們可以自定義序列化和反序列化器,但是自定義序列化和反序列化器在使用上也要保持一些一致,也就是說生產(chǎn)者和消費(fèi)者要保持使用一種類型的序列化機(jī)制,不然會(huì)出現(xiàn)消息轉(zhuǎn)換問題,如果我們以kafka的方式向別人提供服務(wù),那么他們就需要使用我們的制定的序列化方式,所以這可能就存在一定的耦合,如果使用Kafka的String序列化和反序列化機(jī)制,因?yàn)槭撬悄J(rèn)方式并且是字符串,通用性比較好,所以就不用去考慮序列化和反序列化,直接拿到字符串轉(zhuǎn)為對象,再進(jìn)行業(yè)務(wù)處理,使用自定義序列化的話,就直接拿到序列化后的對象,不用進(jìn)行字符串轉(zhuǎn)對象操作。

    在實(shí)際場景中,我們可以根據(jù)自己的業(yè)務(wù)來使用何種序列化方式,沒有最好的,只有合適的。

    攔截器

    kafka中消費(fèi)者和生產(chǎn)者都有攔截器,分別為ConsumerInterceptorProducerInterceptor,只需實(shí)現(xiàn)它們即可實(shí)現(xiàn)攔截,加入攔截器后,生產(chǎn)者會(huì)在發(fā)送消息之前對消息進(jìn)行攔截處理,消費(fèi)者在收到消息之前也會(huì)經(jīng)過攔截器,那么我們就可以在攔截器中加入一些自己需要的邏輯。

    如下消費(fèi)者攔截器對消息進(jìn)行攔截,如果有異常消息,則對異常消息進(jìn)行處理,只要需要對消息進(jìn)行處理,監(jiān)控等,都可以使用攔截器。

    /**
     * 功能說明: 消費(fèi)者攔截器
     * <p>
     * Original @Author: steakliu-劉牌, 2023-03-15  10:17
     */
    public class MyConsumerInterceptor implements ConsumerInterceptor<String, Message> {
    
      @Override
      public ConsumerRecords<String, Message> onConsume(ConsumerRecords<String, Message> records) {
        long currentTimeMillis = System.currentTimeMillis();
        records.forEach(record -> {
          if ("消息異常".equals(record.value().getMessageText())) {
            //處理異常消息
            this.handleMsg(record);
          }
        });
        return records;
      }
      
      private void handleMsg(ConsumerRecord<String, Message> record) {
        //處理異常消息
      }
      @Override
      public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}
      @Override
      public void close() {}
      @Override
      public void configure(Map<String, ?> configs) { }
    }

    攔截器可以有多個(gè),如果設(shè)置多個(gè)攔截器,那么就形成一個(gè)攔截器鏈,一個(gè)一個(gè)地執(zhí)行。

    下面是使用spring-kafka時(shí)所配置的攔截器和序列化器的基本配置。

    spring:
      kafka:
        bootstrap-servers: 127.0.0.1:9092
        consumer:
          # 反序列化器
          key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
          value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
          properties:
            # 攔截器
            interceptor:
              classes: com.steakliu.kafka.interceptor.MyConsumerInterceptor
            spring:
              json:
                trusted:
                  packages: '*'
        producer:
          key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
          value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
          properties:
            # 攔截器
            interceptor:
              classes: com.steakliu.kafka.interceptor.MyProducerInterceptor,com.steakliu.kafka.interceptor.MyProducerInterceptor2

    到此,相信大家對“kafka序列化器和攔截器怎么自定義使用”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

    向AI問一下細(xì)節(jié)

    免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

    AI