溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

基于kafka怎么實(shí)現(xiàn)Spring?Cloud?Bus消息總線

發(fā)布時(shí)間:2022-04-28 13:45:56 來源:億速云 閱讀:164 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要介紹“基于kafka怎么實(shí)現(xiàn)Spring Cloud Bus消息總線”的相關(guān)知識(shí),小編通過實(shí)際案例向大家展示操作過程,操作方法簡(jiǎn)單快捷,實(shí)用性強(qiáng),希望這篇“基于kafka怎么實(shí)現(xiàn)Spring Cloud Bus消息總線”文章能幫助大家解決問題。

一、什么是消息總線

相信大多數(shù)讀者之前都使用過各種各樣的消息隊(duì)列,例如RabbitMQ、kafka等等,消息總線和他的概念差不多,在微服務(wù)系統(tǒng)的架構(gòu)中,我們通常會(huì)使用輕量級(jí)的消息代理來 構(gòu)建一個(gè)共用的消息主題讓系統(tǒng)中所有的微服務(wù)都連接上來,由于該主題中產(chǎn)生的消息會(huì)被所有實(shí)例監(jiān)聽和消費(fèi),所以 我們稱他們?yōu)橄⒖偩€。在總線上的各個(gè)實(shí)例都可以方便的廣播一些需要讓其他連接到該主題上的實(shí)例都知道的消息,例如配置的變更或者其他一些管理操作等。

二、整合消息總線實(shí)現(xiàn)配置自動(dòng)刷新

在上一篇博客中spring cloud config 中我們實(shí)現(xiàn)了微服務(wù)架構(gòu)中的分布式配置中心,但是存在一個(gè)問題就是,當(dāng)我們?cè)趃it上修改了配置以后,需要我們手動(dòng)通知每一個(gè)服務(wù)實(shí)例,這樣的操作在實(shí)例較多的項(xiàng)目中是會(huì)死人的,這樣的問題sping cloud 家族肯定也是會(huì)考慮到并且給出解決方案的,下面我們就來搞一下。

2.1 面向客戶端基本架構(gòu)

基于kafka怎么實(shí)現(xiàn)Spring?Cloud?Bus消息總線

當(dāng)我們系統(tǒng)按照上圖啟動(dòng)以后,圖中的 serviceA的三個(gè)實(shí)例會(huì)請(qǐng)求Config Server以獲取配置,Config Server根據(jù)應(yīng)用配置的規(guī)則從Git倉(cāng)庫(kù)中獲取配置信息并返回。

此時(shí),如果我們想要修改serviceA的配置。首先,去git服務(wù)器上修改對(duì)應(yīng)的參數(shù)值,但是這樣并不會(huì)觸發(fā)serviceA實(shí)例的屬性更新。此時(shí)我們向?qū)嵗?發(fā)送post請(qǐng)求,此時(shí),實(shí)例3就會(huì)將刷新請(qǐng)求發(fā)送到消息總線中,該消息事件會(huì)被serviceA的實(shí)例1和實(shí)例2從總線中獲取到,并重新從config server中獲取他們的配置信息,從而實(shí)現(xiàn)配置信息的動(dòng)態(tài)更新。

2.2 面向服務(wù)端的架構(gòu)

基于kafka怎么實(shí)現(xiàn)Spring?Cloud?Bus消息總線

在之前的架構(gòu)中,服務(wù)的配置更新需要通過具體服務(wù)中的某個(gè)實(shí)例發(fā)送請(qǐng)求,再觸發(fā)對(duì)整個(gè)服務(wù)集群的配置更新。雖然能 傷心啊功能,但是 這樣的結(jié)果是,我們指定的應(yīng)用實(shí)例會(huì)不同于集群中的其他應(yīng)用 實(shí)例,這樣會(huì)增加集群內(nèi)容的復(fù)雜度,不利于將來的運(yùn)維工作。

三、利用kafka實(shí)現(xiàn)消息總線

3.1 Spring Boot 整合kafka

可以參考這篇文章

如果 spring boot 版本采用 2.2.5,則kafka版本使用2.4.0.RELEASE。

3.2 實(shí)現(xiàn)動(dòng)態(tài) 刷新

我們利用上一篇博客中的config 的兩個(gè)工程來進(jìn)行改造。

3.2.1 服務(wù)端改造

增加依賴:

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-bus</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

增加配置:

spring.kafka.bootstrap-servers=211.159.167.180:9092
spring.kafka.consumer.group-id=test-consumer-group
spring.cloud.bus.enabled=true
management.endpoints.web.exposure.include= *

關(guān)于management.endpoints.web.exposure.include= * 的配置需要注意

注意:

  • __如果是yum的話 &lsquo;&rsquo; 需要加 &lsquo; &rsquo; 單引號(hào)*

  • include: &lsquo;*&rsquo; http://localhost:8769/actuator/bus-refresh 刷新所有微服務(wù)

  • include: &lsquo;refresh&rsquo; http://localhost:8769/actuator/bus-refresh 不能訪問

3.2.2 客戶端改造

增加依賴:

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-bus</artifactId>
        </dependency>

增加配置:

management.endpoints.web.exposure.include= *
spring.kafka.bootstrap-servers=211.159.167.180:9092
spring.cloud.bus.enabled=true

這樣就ok 了,啟動(dòng)項(xiàng)目以后,當(dāng)配置修改以后,我們 給服務(wù)端發(fā)發(fā)送POST請(qǐng)求:http://localhost:7071/actuator/bus-refresh

就可以實(shí)現(xiàn)動(dòng)態(tài)刷新。

3.3 指定刷新范圍

在上面的例子中,我們通過向服務(wù)端請(qǐng)求/actuator/bus-refresh接口,從而觸發(fā)總線上所有服務(wù)實(shí)例刷新,但是在一些特殊場(chǎng)景下,我們希望可以刷新服務(wù)中某個(gè)具體實(shí)例的配置,Spring Cloud Bus 對(duì)這種場(chǎng)景也有很好的支持,/actuator/bus-refreshdestination=customers:9000 提供了一個(gè)destination參數(shù),用來定位具體要刷新的應(yīng)用程序。當(dāng)我們調(diào)用帶有destination參數(shù)的 接口時(shí),此時(shí)總線上的個(gè)應(yīng)用實(shí)例會(huì)根據(jù)destination屬性的值來判斷是否為自己的實(shí)例名,若符合才進(jìn)行配置刷新,若不符合就忽略該 消息。

關(guān)于“基于kafka怎么實(shí)現(xiàn)Spring Cloud Bus消息總線”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí),可以關(guān)注億速云行業(yè)資訊頻道,小編每天都會(huì)為大家更新不同的知識(shí)點(diǎn)。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI