溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Spring Cloud Stream的使用細(xì)節(jié)有哪些

發(fā)布時間:2021-11-10 15:48:30 來源:億速云 閱讀:146 作者:柒染 欄目:大數(shù)據(jù)

Spring Cloud Stream的使用細(xì)節(jié)有哪些,相信很多沒有經(jīng)驗的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個問題。

上我們就來看看Spring Cloud Stream的一些使用細(xì)節(jié)。

自定義消息通道

上篇文章我們提到了Sink和Source兩個接口,這兩個接口中分別定義了輸入通道和輸出通道,而Processor通過繼承Source和Sink,同時具有輸入通道和輸出通道。這里我們就模仿Sink和Source,來定義一個自己的消息通道。

還是在上文的基礎(chǔ)上,首先我們定義一個接口叫做MySink,如下:

public interface MySink {
    String INPUT = "mychannel";

    @Input(INPUT)
    SubscribableChannel input();
}

這里我們定義了一個名為mychannel的消息輸入通道,@Input注解的參數(shù)則表示了消息通道的名稱,同時我們還定義了一個方法返回一個SubscribableChannel對象,該對象用來維護(hù)消息通道訂閱者。然后,我們再定義一個名為MySource的接口,如下:

public interface MySource {
    @Output(MySink.INPUT)
    MessageChannel output();
}

@Output注解中描述了消息通道的名稱,還是mychannel,然后這里我們也定義了一個返回MessageChannel對象的方法,該對象中有一個向消息通道發(fā)送消息的方法。

最后我們定義一個消息接收類,如下:

@EnableBinding(value = {MySink.class})
public class SinkReceiver2 {
    private static Logger logger = LoggerFactory.getLogger(StreamHelloApplication.class);

    @StreamListener(MySink.INPUT)
    public void receive(Object playload) {
        logger.info("Received:" + playload);
    }
}

OK,我們在這里綁定消息通道,然后監(jiān)聽自定義的消息通道,最后來一個單元測試測試一下,如下:

@RunWith(SpringJUnit4ClassRunner.class)
@WebAppConfiguration
@SpringBootTest(classes = StreamHelloApplication.class)
@EnableBinding(MySource.class)
public class StreamHelloApplicationTests {

    @Autowired
    private MySource mySource;

    @Test
    public void contextLoads() {
        mySource.output().send(MessageBuilder.withPayload("hello 123").build());
    }
}

運(yùn)行單元測試,我們可以看到如下日志,表示消息發(fā)送成功了:

Spring Cloud Stream的使用細(xì)節(jié)有哪些  

如果想要發(fā)送對象也可以直接發(fā)送,不用進(jìn)行對象轉(zhuǎn)換,如下:

發(fā)送:

Book book = new Book(1l, "三國演義", "羅貫中");
mySource.output().send(MessageBuilder.withPayload(book).build());

接收:

@StreamListener(MySink.INPUT)
public void receive(Book playload) {
    logger.info("Received:" + playload);
}

如果我們想要在接收成功后給一個回執(zhí),也是OK的,如下:

@StreamListener(MySink.INPUT)
@SendTo(Source.OUTPUT)//定義回執(zhí)發(fā)送的消息通道
public String receive(Book playload) {
    logger.info("Received:" + playload);
    return "receive msg :" + playload;
}

方法的返回值就是回執(zhí)消息,回執(zhí)消息在系統(tǒng)默認(rèn)的output通道中,我們?nèi)绻胍邮者@個消息,當(dāng)然就要監(jiān)聽這個通道,如下:

@StreamListener(Source.OUTPUT)
public void receive2(String msg) {
    System.out.println("msg:"+msg);
}

當(dāng)然要記得Source類也要在@EnableBinding注解中進(jìn)行綁定。此時運(yùn)行結(jié)果如下:

Spring Cloud Stream的使用細(xì)節(jié)有哪些  

消費組

由于我們的服務(wù)可能會有多個實例同時在運(yùn)行,如果不做任何設(shè)置,此時發(fā)送一條消息將會被所有的實例接收到,但是有的時候我們可能只希望消息被一個實例所接收,這個需求我們可以通過消息分組來解決。方式很簡單,給項目配置消息組和主題,如下:

spring.cloud.stream.bindings.mychannel.group=g1
spring.cloud.stream.bindings.mychannel.destination=dest1

這里我們設(shè)置該工程都屬于g1消費組,輸入通道的主題名則為dest1。這里配置完成之后,我們在消息發(fā)送方做如下配置:

spring.cloud.stream.bindings.mychannel.destination=dest1

也配置消息主題名為dest1(如果發(fā)送和接收就在同一個應(yīng)用中,則這里可以不配置)。OK,此時我們將我們的項目啟動兩個實例,注意兩個實例的端口不一樣,此時如果我們再發(fā)送消息,則只會被兩個實例中的一個接收到,另外一個應(yīng)用則接收不到,但是到底是兩個實例中的哪一個接收,則是不確定的。

消息分區(qū)

有的時候,我們可能需要相同特征的消息能夠總是被發(fā)送到同一個消費者上去處理,如果我們只是單純的使用消費組則無法實現(xiàn)功能,此時我們需要借助于消息分區(qū),消息分區(qū)之后,具有相同特征的消息就可以總是被同一個消費者處理了,配置方式如下(這里的配置都是在消費組的配置基礎(chǔ)上完成的):

在消費者上添加如下配置:

spring.cloud.stream.bindings.mychannel.consumer.partitioned=true
spring.cloud.stream.instance-count=2
spring.cloud.stream.instance-index=0

關(guān)于這個配置我說三點:

1.第一行表示開啟消息分區(qū)
2.第二行表示當(dāng)前消息者的總的實例個數(shù)
3.第三行表示當(dāng)前實例的索引,從0開始,當(dāng)我們啟動多個實例時,需要在啟動時在命令行配置索引

然后在消息生產(chǎn)者上添加如下配置:

spring.cloud.stream.bindings.mychannel.producer.partitionKeyExpression=payload
spring.cloud.stream.bindings.mychannel.producer.partitionCount=2

第一行配置設(shè)置了分區(qū)鍵的表達(dá)式規(guī)則,第二行則設(shè)置了消息分區(qū)數(shù)量。

OK,此時我們再次啟動多個消費者實例,然后重復(fù)發(fā)送多條消息,這些消息都將被同一個消費者處理掉。

看完上述內(nèi)容,你們掌握Spring Cloud Stream的使用細(xì)節(jié)有哪些的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI