溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

spring-boot 2.1.x中怎么集成kafka

發(fā)布時(shí)間:2021-06-18 16:51:46 來源:億速云 閱讀:232 作者:Leah 欄目:大數(shù)據(jù)

今天就跟大家聊聊有關(guān)spring-boot 2.1.x中怎么集成kafka,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

 pom.xml配置:

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <version>${spring-boot.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>${spring-boot.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-tomcat</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-undertow</artifactId>
            <version>${spring-boot.version}</version>
        </dependency> 
<!--- 以下是調(diào)試后的配置 -->
<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>${kafka.version}</version>
        </dependency> 
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.3.1</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.10.0</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.10.0</version>
        </dependency>

yml配置:

spring:
  kafka:
    # 以逗號(hào)分隔的地址列表,用于建立與 Kafka 集群的初始連接 (kafka 默認(rèn)的端口號(hào)為 9092)
    bootstrap-servers: 127.0.0.1:9092
    producer:
      # 發(fā)生錯(cuò)誤后,消息重發(fā)的次數(shù)。
      retries: 0
      #當(dāng)有多個(gè)消息需要被發(fā)送到同一個(gè)分區(qū)時(shí),生產(chǎn)者會(huì)把它們放在同一個(gè)批次里。該參數(shù)指定了一個(gè)批次可以使用的內(nèi)存大小,按照字節(jié)數(shù)計(jì)算。
      batch-size: 16384
      # 設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)的大小。
      buffer-memory: 33554432
      # 鍵的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # acks=0 : 生產(chǎn)者在成功寫入消息之前不會(huì)等待任何來自服務(wù)器的響應(yīng)。
      # acks=1 : 只要集群的首領(lǐng)節(jié)點(diǎn)收到消息,生產(chǎn)者就會(huì)收到一個(gè)來自服務(wù)器成功響應(yīng)。
      # acks=all :只有當(dāng)所有參與復(fù)制的節(jié)點(diǎn)全部收到消息時(shí),生產(chǎn)者才會(huì)收到一個(gè)來自服務(wù)器的成功響應(yīng)。
      acks: 1
    consumer:
      # 自動(dòng)提交的時(shí)間間隔 在 spring boot 2.X 版本中這里采用的是值的類型為 Duration 需要符合特定的格式,如 1S,1M,2H,5D
      auto-commit-interval: 1S
      # 該屬性指定了消費(fèi)者在讀取一個(gè)沒有偏移量的分區(qū)或者偏移量無效的情況下該作何處理:
      # latest(默認(rèn)值)在偏移量無效的情況下,消費(fèi)者將從最新的記錄開始讀取數(shù)據(jù)(在消費(fèi)者啟動(dòng)之后生成的記錄)
      # earliest :在偏移量無效的情況下,消費(fèi)者將從起始位置讀取分區(qū)的記錄
      auto-offset-reset: earliest
      # 是否自動(dòng)提交偏移量,默認(rèn)值是 true,為了避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)置為 false,然后手動(dòng)提交偏移量
      enable-auto-commit: true
      # 鍵的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 在偵聽器容器中運(yùn)行的線程數(shù)。
      concurrency: 5

看完上述內(nèi)容,你們對(duì)spring-boot 2.1.x中怎么集成kafka有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI