溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spring Cloud如何開發(fā)消息微服務

發(fā)布時間:2021-12-24 10:31:39 來源:億速云 閱讀:150 作者:小新 欄目:云計算

這篇文章給大家分享的是有關Spring Cloud如何開發(fā)消息微服務的內(nèi)容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。

開發(fā)消息微服務

準備工作

        我們需要在微服務客戶端中實現(xiàn)消息生產(chǎn)者與消費者,先建立以下幾個項目:

  • spring-server:Eureka服務器,端口8761,代碼目錄“codes\08\8.4\spring-server”。

  • spring-consumer:消息消費者,Eureka客戶端,注冊到Eureka,端口8080,代碼目錄“codes\08\8.4\spring-consumer”。

  • spring-producer:消息生產(chǎn)者,Eureka客戶端,注冊到Eureka,端口8081,代碼目錄“codes\08\8.4\spring-producer”。

        整個集群加上消息代理,結構如圖8-10所示。

Spring Cloud如何開發(fā)消息微服務

圖8-10 程序結構

        由于Spring Cloud Stream幫我實現(xiàn)了與消息代者交互的功能,因此對于集群中的生產(chǎn)者與消費者來說,不需要關心外部使用的是哪一個消息框架,本小節(jié)的案例,默認使用RabbitMQ,默認情況下,會連接本地的5762端口,如果需要在微服務中修改RabbitMQ的連接信息,可使用以下配置:

spring:
  application:
    name: spring-producer
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

編寫生產(chǎn)者

        在“spring-producer”項目中,加入以下依賴:

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-config</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-eureka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

        主要引入“spring-coud-starter-stream-rabbit”依賴,該依賴會自動幫我們的項目引入“spring-cloud-stream”以及“spring-cloud-stream-binder”。

        接下來,編寫發(fā)送服務,請見代碼清單8-5。

        代碼清單8-5:codes\08\8.4\spring-producer\src\main\java\org\crazyit\cloud\SendService.java

public interface SendService {

    @Output("myInput")
    SubscribableChannel sendOrder();
}

        新建一個SendService接口,添加sendOrder方法,該方法使用@Output注解進行修飾。使用該注解,表示會創(chuàng)建“myInput”的消息通道,調用該方法后,會向“myInput”通道投遞消息,如果@Output注解不提供參數(shù),則使用方法名作為通道名稱。接下來,需要讓Spring容器開啟綁定的功能,在Application類中,加入@EnableBinding注解,請見代碼清單8-6。

        代碼清單8-6:

        codes\08\8.4\spring-producer\src\main\java\org\crazyit\cloud\ProducerApplication.java

@SpringBootApplication
@EnableEurekaClient
@EnableBinding(SendService.class)
public class ProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }
}

        @EnableBinding注解中,以SendService.class作為參數(shù),Spring容器在啟動時,會自動綁定SendService接口中定義的通道。編寫控制器,調用SendService的發(fā)送方法,往服務器發(fā)送消息,請見代碼清單8-7。

        代碼清單8-7:

        codes\08\8.4\spring-producer\src\main\java\org\crazyit\cloud\ProducerController.java

@RestController
public class ProducerController {

    @Autowired
    SendService sendService;

    @RequestMapping(value = "/send", method = RequestMethod.GET)    
    public String sendRequest() {
        // 創(chuàng)建消息
        Message msg = MessageBuilder.withPayload("Hello World".getBytes()).build();
        // 發(fā)送消息    
        sendService.sendOrder().send(msg);
        return "SUCCESS";
    }
}

        在控制器中,通過@Autowired自動注入SendService,調用sendOrder方法得到SubscribableChannel(通道)實例,再調用send方法,將“Hello World”字符串發(fā)送至“消息代理”中,本例默認的消息代理為RabbitMQ。下面,先實現(xiàn)消息者,再一起整合測試。

編寫消費者

        消費者項目(spring-consumer)所使用的依賴與生產(chǎn)者一致,先編寫接收消息的通道接口,請見代碼清單8-8。

        代碼清單8-8:

        codes\08\8.4\spring-consumer\src\main\java\org\crazyit\cloud\ReceiveService.java

public interface ReceiveService {

    @Input("myInput")
    SubscribableChannel myInput();
}

在ReceiveService中,定義了一個“myInput”的消息輸入通道,接下來,與生產(chǎn)者一樣,在啟動類中綁定消息通道,請見代碼清單8-9。

        代碼清單8-9:

        codes\08\8.4\spring-consumer\src\main\java\org\crazyit\cloud\ReceiverApplication.java

@SpringBootApplication
@EnableBinding(ReceiveService.class)
public class ReceiverApplication {
    
    public static void main(String[] args) {
        SpringApplication.run(ReceiverApplication.class, args);
    }

    @StreamListener("myInput")
    public void receive(byte[] msg) {
        System.out.println("接收到的消息:  " + new String(msg));
    }
}

        在啟動類中,同樣使用了@EnableBinding來開啟綁定,并聲明了通道的接口類。新建了一個receive方法,使用@StreamListener注解進行修飾,聲明了訂閱“myInput”通道的消息。

        依次啟動spring-server(8761端口)、spring-producer(8081端口)、spring-consumer(8080端口),訪問:http://localhost:8081/send,再打開消息者控制臺,可以看到“Hello World”字符串的輸出,證明消費者已經(jīng)可以從消息代理中獲取到消息。

更換綁定器

        前面的例子中,使用了RabbitMQ作為消息代理,如果想使用Kafka,可以更換Maven依賴來實現(xiàn)。在生產(chǎn)者與消費者的pom.xml中,將spring-cloud-starter-stream-rabbit的依賴,修改為spring-cloud-starter-stream-kafka,請見以下配置:

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>

        Spring Cloud提供了綁定器的API,目前實現(xiàn)了RabbitMQ與Kafka的綁定器。在筆者看來,綁定器更像適配器,對于我們的消息程序來說,并不關心使用了哪個消息代理,這些都由綁定器去實現(xiàn)。

感謝各位的閱讀!關于“Spring Cloud如何開發(fā)消息微服務”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。

AI