您好,登錄后才能下訂單哦!
Spring Cloud Stream的使用細(xì)節(jié)有哪些,相信很多沒有經(jīng)驗的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個問題。
上我們就來看看Spring Cloud Stream的一些使用細(xì)節(jié)。
上篇文章我們提到了Sink和Source兩個接口,這兩個接口中分別定義了輸入通道和輸出通道,而Processor通過繼承Source和Sink,同時具有輸入通道和輸出通道。這里我們就模仿Sink和Source,來定義一個自己的消息通道。
還是在上文的基礎(chǔ)上,首先我們定義一個接口叫做MySink,如下:
public interface MySink { String INPUT = "mychannel"; @Input(INPUT) SubscribableChannel input(); }
這里我們定義了一個名為mychannel的消息輸入通道,@Input注解的參數(shù)則表示了消息通道的名稱,同時我們還定義了一個方法返回一個SubscribableChannel對象,該對象用來維護(hù)消息通道訂閱者。然后,我們再定義一個名為MySource的接口,如下:
public interface MySource { @Output(MySink.INPUT) MessageChannel output(); }
@Output注解中描述了消息通道的名稱,還是mychannel,然后這里我們也定義了一個返回MessageChannel對象的方法,該對象中有一個向消息通道發(fā)送消息的方法。
最后我們定義一個消息接收類,如下:
@EnableBinding(value = {MySink.class}) public class SinkReceiver2 { private static Logger logger = LoggerFactory.getLogger(StreamHelloApplication.class); @StreamListener(MySink.INPUT) public void receive(Object playload) { logger.info("Received:" + playload); } }
OK,我們在這里綁定消息通道,然后監(jiān)聽自定義的消息通道,最后來一個單元測試測試一下,如下:
@RunWith(SpringJUnit4ClassRunner.class) @WebAppConfiguration @SpringBootTest(classes = StreamHelloApplication.class) @EnableBinding(MySource.class) public class StreamHelloApplicationTests { @Autowired private MySource mySource; @Test public void contextLoads() { mySource.output().send(MessageBuilder.withPayload("hello 123").build()); } }
運(yùn)行單元測試,我們可以看到如下日志,表示消息發(fā)送成功了:
如果想要發(fā)送對象也可以直接發(fā)送,不用進(jìn)行對象轉(zhuǎn)換,如下:
發(fā)送:
Book book = new Book(1l, "三國演義", "羅貫中"); mySource.output().send(MessageBuilder.withPayload(book).build());
接收:
@StreamListener(MySink.INPUT) public void receive(Book playload) { logger.info("Received:" + playload); }
如果我們想要在接收成功后給一個回執(zhí),也是OK的,如下:
@StreamListener(MySink.INPUT) @SendTo(Source.OUTPUT)//定義回執(zhí)發(fā)送的消息通道 public String receive(Book playload) { logger.info("Received:" + playload); return "receive msg :" + playload; }
方法的返回值就是回執(zhí)消息,回執(zhí)消息在系統(tǒng)默認(rèn)的output通道中,我們?nèi)绻胍邮者@個消息,當(dāng)然就要監(jiān)聽這個通道,如下:
@StreamListener(Source.OUTPUT) public void receive2(String msg) { System.out.println("msg:"+msg); }
當(dāng)然要記得Source類也要在@EnableBinding注解中進(jìn)行綁定。此時運(yùn)行結(jié)果如下:
由于我們的服務(wù)可能會有多個實例同時在運(yùn)行,如果不做任何設(shè)置,此時發(fā)送一條消息將會被所有的實例接收到,但是有的時候我們可能只希望消息被一個實例所接收,這個需求我們可以通過消息分組來解決。方式很簡單,給項目配置消息組和主題,如下:
spring.cloud.stream.bindings.mychannel.group=g1 spring.cloud.stream.bindings.mychannel.destination=dest1
這里我們設(shè)置該工程都屬于g1消費組,輸入通道的主題名則為dest1。這里配置完成之后,我們在消息發(fā)送方做如下配置:
spring.cloud.stream.bindings.mychannel.destination=dest1
也配置消息主題名為dest1(如果發(fā)送和接收就在同一個應(yīng)用中,則這里可以不配置)。OK,此時我們將我們的項目啟動兩個實例,注意兩個實例的端口不一樣,此時如果我們再發(fā)送消息,則只會被兩個實例中的一個接收到,另外一個應(yīng)用則接收不到,但是到底是兩個實例中的哪一個接收,則是不確定的。
有的時候,我們可能需要相同特征的消息能夠總是被發(fā)送到同一個消費者上去處理,如果我們只是單純的使用消費組則無法實現(xiàn)功能,此時我們需要借助于消息分區(qū),消息分區(qū)之后,具有相同特征的消息就可以總是被同一個消費者處理了,配置方式如下(這里的配置都是在消費組的配置基礎(chǔ)上完成的):
在消費者上添加如下配置:
spring.cloud.stream.bindings.mychannel.consumer.partitioned=true spring.cloud.stream.instance-count=2 spring.cloud.stream.instance-index=0
關(guān)于這個配置我說三點:
1.第一行表示開啟消息分區(qū)
2.第二行表示當(dāng)前消息者的總的實例個數(shù)
3.第三行表示當(dāng)前實例的索引,從0開始,當(dāng)我們啟動多個實例時,需要在啟動時在命令行配置索引
然后在消息生產(chǎn)者上添加如下配置:
spring.cloud.stream.bindings.mychannel.producer.partitionKeyExpression=payload spring.cloud.stream.bindings.mychannel.producer.partitionCount=2
第一行配置設(shè)置了分區(qū)鍵的表達(dá)式規(guī)則,第二行則設(shè)置了消息分區(qū)數(shù)量。
OK,此時我們再次啟動多個消費者實例,然后重復(fù)發(fā)送多條消息,這些消息都將被同一個消費者處理掉。
看完上述內(nèi)容,你們掌握Spring Cloud Stream的使用細(xì)節(jié)有哪些的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。