溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Spring Boot 集成 Kafkad的實(shí)現(xiàn)方法

發(fā)布時(shí)間:2021-04-09 14:09:20 來源:億速云 閱讀:191 作者:啵贊 欄目:開發(fā)技術(shù)

本篇內(nèi)容介紹了“Spring Boot 集成 Kafkad的實(shí)現(xiàn)方法”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

Spring Boot 作為主流微服務(wù)框架,擁有成熟的社區(qū)生態(tài)。市場應(yīng)用廣泛,為了方便大家,整理了一個(gè)基于spring boot的常用中間件快速集成入門系列手冊,涉及RPC、緩存、消息隊(duì)列、分庫分表、注冊中心、分布式配置等常用開源組件,大概有幾十篇文章,陸續(xù)會(huì)開放出來,感興趣同學(xué)請?zhí)崆瓣P(guān)注&收藏

消息通信有兩種基本模型,即發(fā)布-訂閱(Pub-Sub)模型和點(diǎn)對點(diǎn)(Point to Point)模型,發(fā)布-訂閱支持生產(chǎn)者消費(fèi)者之間的一對多關(guān)系,而點(diǎn)對點(diǎn)模型中有且僅有一個(gè)消費(fèi)者。

前言

Kafka是由Apache軟件基金會(huì)開發(fā)的一個(gè)開源流處理平臺(tái),由Scala和Java編寫。該項(xiàng)目的目標(biāo)是為處理實(shí)時(shí)數(shù)據(jù)提供一個(gè)統(tǒng)一、高吞吐、低延遲的平臺(tái)。其持久化層本質(zhì)上是一個(gè)“按照分布式事務(wù)日志架構(gòu)的大規(guī)模發(fā)布/訂閱消息隊(duì)列”。

Kafka高效地處理實(shí)時(shí)流式數(shù)據(jù),可以實(shí)現(xiàn)與Storm、HBase和Spark的集成。作為聚類部署到多臺(tái)服務(wù)器上,Kafka處理它所有的發(fā)布和訂閱消息系統(tǒng)使用了四個(gè)API,即生產(chǎn)者API、消費(fèi)者API、Stream API和Connector API。它能夠傳遞大規(guī)模流式消息,自帶容錯(cuò)功能,已經(jīng)取代了一些傳統(tǒng)消息系統(tǒng),如JMS、AMQP等。

為什么使用kafka?

  • 削峰填谷。緩沖上下游瞬時(shí)突發(fā)流量,保護(hù) “脆弱” 的下游系統(tǒng)不被壓垮,避免引發(fā)全鏈路服務(wù) “雪崩”。

  • 系統(tǒng)解耦。發(fā)送方和接收方的松耦合,一定程度簡化了開發(fā)成本,減少了系統(tǒng)間不必要的直接依賴。

  • 異步通信:消息隊(duì)列允許用戶把消息放入隊(duì)列但不立即處理它。

  • 可恢復(fù)性:即使一個(gè)處理消息的進(jìn)程掛掉,加入隊(duì)列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理。

業(yè)務(wù)場景

  • 一些同步業(yè)務(wù)流程的非核心邏輯,對時(shí)間要求不是特別高,可以解耦異步來執(zhí)行

  • 系統(tǒng)日志收集,采集并同步到kafka,一般采用ELK組合玩法

  • 一些大數(shù)據(jù)平臺(tái),用于各個(gè)系統(tǒng)間數(shù)據(jù)傳遞

基本架構(gòu)

Kafka 運(yùn)行在一個(gè)由一臺(tái)或多臺(tái)服務(wù)器組成的集群上,并且分區(qū)可以跨集群節(jié)點(diǎn)分布

Spring Boot 集成 Kafkad的實(shí)現(xiàn)方法

1、Producer 生產(chǎn)消息,發(fā)送到Broker中

2、Leader狀態(tài)的Broker接收消息,寫入到相應(yīng)topic中。在一個(gè)分區(qū)內(nèi),這些消息被索引并連同時(shí)間戳存儲(chǔ)在一起

3、Leader狀態(tài)的Broker接收完畢以后,傳給Follow狀態(tài)的Broker作為副本備份

4、 Consumer 消費(fèi)者的進(jìn)程可以從分區(qū)訂閱,并消費(fèi)消息

常用術(shù)語

  • Broker。負(fù)責(zé)接收和處理客戶端發(fā)送過來的請求,以及對消息進(jìn)行持久化。雖然多個(gè) Broker 進(jìn)程能夠運(yùn)行在同一臺(tái)機(jī)器上,但更常見的做法是將不同的 Broker 分散運(yùn)行在不同的機(jī)器上

  • 主題:Topic。主題是承載消息的邏輯容器,在實(shí)際使用中多用來區(qū)分具體的業(yè)務(wù)。

  • 分區(qū):Partition。一個(gè)有序不變的消息序列。每個(gè)主題下可以有多個(gè)分區(qū)。

  • 消息:這里的消息就是指 Kafka 處理的主要對象。

  • 消息位移:Offset。表示分區(qū)中每條消息的位置信息,是一個(gè)單調(diào)遞增且不變的值。

  • 副本:Replica。Kafka 中同一條消息能夠被拷貝到多個(gè)地方以提供數(shù)據(jù)冗余,這些地方就是所謂的副本。副本還分為領(lǐng)導(dǎo)者副本和追隨者副本,各自有不同的角色劃分。每個(gè)分區(qū)可配置多個(gè)副本實(shí)現(xiàn)高可用。一個(gè)分區(qū)的N個(gè)副本一定在N個(gè)不同的Broker上。

  • Leader:每個(gè)分區(qū)多個(gè)副本的“主”副本,生產(chǎn)者發(fā)送數(shù)據(jù)的對象,以及消費(fèi)者消費(fèi)數(shù)據(jù)的對象,都是 Leader。

  • Follower:每個(gè)分區(qū)多個(gè)副本的“從”副本,實(shí)時(shí)從 Leader 中同步數(shù)據(jù),保持和 Leader 數(shù)據(jù)的同步。Leader 發(fā)生故障時(shí),某個(gè) Follower 還會(huì)成為新的 Leader。

  • 生產(chǎn)者:Producer。向主題發(fā)布新消息的應(yīng)用程序。

  • 消費(fèi)者:Consumer。從主題訂閱新消息的應(yīng)用程序。

  • 消費(fèi)者位移:Consumer Offset。表示消費(fèi)者消費(fèi)進(jìn)度,每個(gè)消費(fèi)者都有自己的消費(fèi)者位移。offset保存在broker端的內(nèi)部topic中,不是在clients中保存

  • 消費(fèi)者組:Consumer Group。多個(gè)消費(fèi)者實(shí)例共同組成的一個(gè)組,同時(shí)消費(fèi)多個(gè)分區(qū)以實(shí)現(xiàn)高吞吐。

  • 重平衡:Rebalance。消費(fèi)者組內(nèi)某個(gè)消費(fèi)者實(shí)例掛掉后,其他消費(fèi)者實(shí)例自動(dòng)重新分配訂閱主題分區(qū)的過程。Rebalance 是 Kafka 消費(fèi)者端實(shí)現(xiàn)高可用的重要手段。

代碼演示

外部依賴:

在 pom.xml 中添加 Kafka 依賴:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

由于spring-boot-starter-parent 指定的版本號(hào)是2.1.5.RELEASE,spring boot 會(huì)對外部框架的版本號(hào)統(tǒng)一管理,spring-kafka 引入的版本是 2.2.6.RELEASE

配置文件:

在配置文件 application.yaml 中配置 Kafka 的相關(guān)參數(shù),具體內(nèi)容如下:

Spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      retries: 3  # 生產(chǎn)者發(fā)送失敗時(shí),重試次數(shù)
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer # 生產(chǎn)者消息key和消息value的序列化處理類
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: tomge-consumer-group  # 默認(rèn)消費(fèi)者group id
      auto-offset-reset: earliest
      enable-auto-commit: true
      auto-commit-interval: 100
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

對應(yīng)的配置類 org.springframework.boot.autoconfigure.kafka.KafkaProperties,來初始化kafka相關(guān)的bean實(shí)例對象,并注冊到spring容器中。

發(fā)送消息:

Spring Boot 作為一款支持快速開發(fā)的集成性框架,同樣提供了一批以 -Template 命名的模板工具類用于實(shí)現(xiàn)消息通信。對于 Kafka 而言,這個(gè)工具類就是KafkaTemplate。

KafkaTemplate 提供了一系列 send 方法用來發(fā)送消息,典型的 send 方法定義如下代碼所示:

public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
 。。。。 省略
}

生產(chǎn)端提供了一個(gè)restful接口,模擬發(fā)送一條創(chuàng)建新用戶消息。

@GetMapping("/add_user")
public Object add() {
    try {
        Long id = Long.valueOf(new Random().nextInt(1000));
        User user = User.builder().id(id).userName("TomGE").age(29).address("上海").build();
        ListenableFuture<SendResult> listenableFuture = kafkaTemplate.send(addUserTopic, JSON.toJSONString(user));
        
        // 提供回調(diào)方法,可以監(jiān)控消息的成功或失敗的后續(xù)處理
        listenableFuture.addCallback(new ListenableFutureCallback<SendResult>() {
            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("發(fā)送消息失敗," + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult sendResult) {
                // 消息發(fā)送到的topic
                String topic = sendResult.getRecordMetadata().topic();
                // 消息發(fā)送到的分區(qū)
                int partition = sendResult.getRecordMetadata().partition();
                // 消息在分區(qū)內(nèi)的offset
                long offset = sendResult.getRecordMetadata().offset();
                System.out.println(String.format("發(fā)送消息成功,topc:%s, partition: %s, offset:%s ", topic, partition, offset));
            }
        });
        return "消息發(fā)送成功";
    } catch (Exception e) {
        e.printStackTrace();
        return "消息發(fā)送失敗";
    }
}

實(shí)際上開發(fā)使用的Kafka默認(rèn)允許自動(dòng)創(chuàng)建Topic,創(chuàng)建Topic時(shí)默認(rèn)的分區(qū)數(shù)量是1,可以通過server.properties文件中的num.partitions=1修改默認(rèn)分區(qū)數(shù)量。在生產(chǎn)環(huán)境中通常會(huì)關(guān)閉自動(dòng)創(chuàng)建功能,Topic需要由運(yùn)維人員先創(chuàng)建好。

消費(fèi)消息:

在 Kafka 中消息通過服務(wù)器推送給各個(gè)消費(fèi)者,而 Kafka 的消費(fèi)者在消費(fèi)消息時(shí),需要提供一個(gè)監(jiān)聽器(Listener)對某個(gè) Topic 實(shí)現(xiàn)監(jiān)聽,從而獲取消息,這也是 Kafka 消費(fèi)消息的唯一方式。

定義一個(gè)消費(fèi)類,在處理具體消息業(yè)務(wù)邏輯的方法上添加 @KafkaListener 注解,并配置要消費(fèi)的topic,代碼如下所示:

@Component
public class UserConsumer {

    @KafkaListener(topics = "add_user")
    public void receiveMesage(String content) {
        System.out.println("消費(fèi)消息:" + content);
    }
}

是不是很簡單,添加kafka依賴、使用KafkaTemplate、@KafkaListener注解就完成消息的生產(chǎn)和消費(fèi),其實(shí)是SpringBoot在背后默默的做了很多工作,如果感興趣可以研究下spring-boot-autoconfigure ,里面提供了常用開源框架的客戶端實(shí)例封裝。

“Spring Boot 集成 Kafkad的實(shí)現(xiàn)方法”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI