您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關(guān)spring-boot 2.1.x中怎么集成kafka,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
pom.xml配置:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <version>${spring-boot.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>${spring-boot.version}</version> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-undertow</artifactId> <version>${spring-boot.version}</version> </dependency> <!--- 以下是調(diào)試后的配置 --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.1</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.10.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.10.0</version> </dependency>
yml配置:
spring: kafka: # 以逗號(hào)分隔的地址列表,用于建立與 Kafka 集群的初始連接 (kafka 默認(rèn)的端口號(hào)為 9092) bootstrap-servers: 127.0.0.1:9092 producer: # 發(fā)生錯(cuò)誤后,消息重發(fā)的次數(shù)。 retries: 0 #當(dāng)有多個(gè)消息需要被發(fā)送到同一個(gè)分區(qū)時(shí),生產(chǎn)者會(huì)把它們放在同一個(gè)批次里。該參數(shù)指定了一個(gè)批次可以使用的內(nèi)存大小,按照字節(jié)數(shù)計(jì)算。 batch-size: 16384 # 設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)的大小。 buffer-memory: 33554432 # 鍵的序列化方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化方式 value-serializer: org.apache.kafka.common.serialization.StringSerializer # acks=0 : 生產(chǎn)者在成功寫入消息之前不會(huì)等待任何來自服務(wù)器的響應(yīng)。 # acks=1 : 只要集群的首領(lǐng)節(jié)點(diǎn)收到消息,生產(chǎn)者就會(huì)收到一個(gè)來自服務(wù)器成功響應(yīng)。 # acks=all :只有當(dāng)所有參與復(fù)制的節(jié)點(diǎn)全部收到消息時(shí),生產(chǎn)者才會(huì)收到一個(gè)來自服務(wù)器的成功響應(yīng)。 acks: 1 consumer: # 自動(dòng)提交的時(shí)間間隔 在 spring boot 2.X 版本中這里采用的是值的類型為 Duration 需要符合特定的格式,如 1S,1M,2H,5D auto-commit-interval: 1S # 該屬性指定了消費(fèi)者在讀取一個(gè)沒有偏移量的分區(qū)或者偏移量無效的情況下該作何處理: # latest(默認(rèn)值)在偏移量無效的情況下,消費(fèi)者將從最新的記錄開始讀取數(shù)據(jù)(在消費(fèi)者啟動(dòng)之后生成的記錄) # earliest :在偏移量無效的情況下,消費(fèi)者將從起始位置讀取分區(qū)的記錄 auto-offset-reset: earliest # 是否自動(dòng)提交偏移量,默認(rèn)值是 true,為了避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)置為 false,然后手動(dòng)提交偏移量 enable-auto-commit: true # 鍵的反序列化方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 值的反序列化方式 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer listener: # 在偵聽器容器中運(yùn)行的線程數(shù)。 concurrency: 5
看完上述內(nèi)容,你們對(duì)spring-boot 2.1.x中怎么集成kafka有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。