溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Pulsar 消息概念1

發(fā)布時間:2020-03-19 14:02:03 來源:網(wǎng)絡(luò) 閱讀:364 作者:qq5dc264c690eab 欄目:大數(shù)據(jù)

Pulsar官方文檔 概念和架構(gòu)-Messaging Concepts中主要內(nèi)容

1 消息組成

組成 說明
Value / data payload 消息攜帶的數(shù)據(jù),所有pulsar的消息攜帶原始bytes,但是消息數(shù)據(jù)也需要遵循數(shù)據(jù)shcema
Key 消息可以被Key打標(biāo)簽。這可以對topic壓縮之類的事情起作用
Properties 用戶定義屬性的可選鍵/值映射
Producer name 生成消息的生產(chǎn)者的名稱(生產(chǎn)者自動被賦予默認名稱,但您也可以顯式地應(yīng)用自己的名稱)
Sequence ID 每一個消息在其主題上都屬于一個有序序列。消息的序列ID是它在該序列中的順序
Publish time 消息發(fā)布時的時間戳(由生產(chǎn)者自動應(yīng)用)
Event time 一個可選的時間戳,應(yīng)用程序可以附加到消息上,表示某個事件發(fā)生的時間,例如消息被處理的時間。如果未顯式設(shè)置,則消息的事件時間為0。

2 生產(chǎn)者 發(fā)送模式

模式 說明
同步發(fā)送 生產(chǎn)者將在發(fā)送每個消息后等待代理的確認。如果未收到確認,則生產(chǎn)者將認為發(fā)送操作失敗
異步發(fā)送 生產(chǎn)者將把消息放入阻塞隊列并立即返回。客戶端庫隨后將消息發(fā)送到后臺的代理。如果隊列已滿(最大大小可配置,則在調(diào)用API時,生產(chǎn)者可能會被阻止或立即失敗,具體取決于傳遞給生產(chǎn)者的參數(shù)

2.1 消息壓縮支持
LZ4
ZLIB
ZSTD
SNAPPY

2.2 支持批處理
如果啟用批處理,則生產(chǎn)者將在單個請求中累積并發(fā)送一批消息。批處理大小是由消息的最大數(shù)量和最大發(fā)布延遲定義的。
3 消費者 接收模式

模式 說明
同步接收 同步接收將被阻止,直到有消息可用
異步接收 異步接收將立即返回一個future值,例如Java中的CompletableFuture,該值在新消息可用時完成

3.1 監(jiān)聽
客戶端庫為用戶提供偵聽器實現(xiàn)。例如,Java客戶機提供了一個MessageListener接口。在這個接口中,只要接收到新消息,就會調(diào)用received方法。
3.2 確認
當(dāng)使用者成功使用消息時,使用者會向代理發(fā)送確認請求,以便代理丟棄該消息。否則,它將存儲消息。
消息可以逐個確認,也可以累積確認。對于累積確認,消費者只需要確認它收到的最后一條消息。流中直至(包括)所提供消息的所有消息將不會重新傳遞給該使用者
( 累積確認不能與共享訂閱模式一起使用,因為共享模式涉及多個對同一訂閱具有訪問權(quán)限的使用者)
在共享訂閱模式下,可以單獨確認消息。
3.3 否定確認
當(dāng)使用者一次未成功使用消息,并且希望再次使用該消息時,使用者可以向代理發(fā)送否定的確認,然后代理將重新傳遞該消息。

消息可以被一個接一個地否定或累積地承認,這取決于消費訂閱模式。
在獨占訂閱模式和故障轉(zhuǎn)移訂閱模式中,消費者只會消極地確認他們收到的最后一條消息。

在“共享”和“密鑰共享”訂閱模式中,您可以分別對消息進行否定性確認

3.4 確認超時
當(dāng)消息未成功使用,并且您希望觸發(fā)代理自動重新傳遞消息時,可以采用未確認消息自動重新傳遞機制??蛻舳藢⒃谡麄€確認超時時間范圍內(nèi)跟蹤未確認消息,并在指定確認超時時自動向代理發(fā)送重新傳遞未確認消息請求
注意
在確認超時之前使用否定確認。否定確認以更精確的方式控制單個消息的重新傳遞,并在消息處理時間超過確認超時時避免無效的重新傳遞。
4 死信主題
在某些消息無法由消費者使用時,會成生新的消息。在這種機制中,無法使用的消息存儲在一個單獨的主題中,稱為死信主題。您可以決定如何處理死信主題中的消息
下面的示例演示如何使用默認的死信主題在Java客戶機中啟用死信主題

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.build())
.subscribe();
默認的死信主題使用以下格式:
<topicname>-<subscriptionname>-DLQ
如果要指定死信主題的名稱,請使用以下Java客戶端示例:
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.deadLetterTopic("your-topic-name")
.build())
.subscribe();
死信主題取決于郵件的重新傳遞。由于確認超時或否定確認,消息將重新傳遞。如果要對消息使用否定確認,請確保在確認超時之前對其進行否定確認。
注意
目前,死信主題僅在共享訂閱模式下啟用

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI