您好,登錄后才能下訂單哦!
基于Queue + Stream的統(tǒng)一消息消費(fèi)模型是怎么樣的,很多新手對(duì)此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。
之前的文章,我們描述了Apache Pulsar能夠成為企業(yè)級(jí)流和消息系統(tǒng)的原因。Pulsar的企業(yè)特性包括消息的持久化存儲(chǔ),多租戶,多機(jī)房互聯(lián)互備,加密和安全性等。我們經(jīng)常被問到的一個(gè)問題是Apache Pulsar和Apache Kafka有什么不同。
在用戶選擇一個(gè)消息系統(tǒng)時(shí),消息模型是用戶首先考慮的事情。消息模型應(yīng)涵蓋以下3個(gè)方面:
消息消費(fèi) - 如何發(fā)送和消費(fèi)消息;
消息確認(rèn)(ack) - 如何確認(rèn)消息;
消息保存 - 消息保留多長時(shí)間,觸發(fā)消息刪除的原因以及怎樣刪除;
消息消費(fèi)模型
在實(shí)時(shí)流式架構(gòu)中,消息傳遞可以分為兩類:隊(duì)列(Queue)和流(Stream)。
隊(duì)列(Queue)模型
隊(duì)列模型主要是采用無序或者共享的方式來消費(fèi)消息。通過隊(duì)列模型,用戶可以創(chuàng)建多個(gè)消費(fèi)者從單個(gè)管道中接收消息;當(dāng)一條消息從隊(duì)列發(fā)送出來后,多個(gè)消費(fèi)者中的只有一個(gè)(任何一個(gè)都有可能)接收和消費(fèi)這條消息。消息系統(tǒng)的具體實(shí)現(xiàn)決定了最終哪個(gè)消費(fèi)者實(shí)際接收到消息。
隊(duì)列模型通常與無狀態(tài)應(yīng)用程序一起結(jié)合使用。無狀態(tài)應(yīng)用程序不關(guān)心排序,但它們確實(shí)需要能夠確認(rèn)(ack)或刪除單條消息,以及盡可能地?cái)U(kuò)展消費(fèi)并行性的能力。典型的基于隊(duì)列模型的消息系統(tǒng)包括RabbitMQ和RocketMQ。
流式(Stream)模型
相比之下,流模型要求消息的消費(fèi)嚴(yán)格排序或獨(dú)占消息消費(fèi)。對(duì)于一個(gè)管道,使用流式模型,始終只會(huì)有一個(gè)消費(fèi)者使用和消費(fèi)消息。消費(fèi)者按照消息寫入管道的確切順序接收從管道發(fā)送的消息。
流模型通常與有狀態(tài)應(yīng)用程序相關(guān)聯(lián)。有狀態(tài)的應(yīng)用程序更加關(guān)注消息的順序及其狀態(tài)。消息的消費(fèi)順序決定了有狀態(tài)應(yīng)用程序的狀態(tài)。消息的順序?qū)⒂绊憫?yīng)用程序處理邏輯的正確性。
在面向微服務(wù)或事件驅(qū)動(dòng)的體系結(jié)構(gòu)中,隊(duì)列模型和流模型都是必需的。
Pulsar的消息消費(fèi)模型
Apache Pulsar通過“訂閱”,抽象出了統(tǒng)一的: producer-topic-subscription-consumer 消費(fèi)模型。Pulsar的消息模型既支持隊(duì)列模型,也支持流模型。
在Pulsar的消息消費(fèi)模型中,Topic是用于發(fā)送消息的通道。每一個(gè)Topic對(duì)應(yīng)著Apache BookKeeper中的一個(gè)分布式日志。發(fā)布者發(fā)布的每條消息只在Topic中存儲(chǔ)一次;存儲(chǔ)的過程中,BookKeeper會(huì)將消息復(fù)制存儲(chǔ)在多個(gè)存儲(chǔ)節(jié)點(diǎn)上;Topic中的每條消息,可以根據(jù)消費(fèi)者的訂閱需求,多次被使用,每個(gè)訂閱對(duì)應(yīng)一個(gè)消費(fèi)者組(Consumer Group)。
主題(Topic)是消費(fèi)消息的真實(shí)來源。盡管消息僅在主題(Topic)上存儲(chǔ)一次,但是用戶可以有不同的訂閱方式來消費(fèi)這些消息:
消費(fèi)者被組合在一起以消費(fèi)消息,每個(gè)消費(fèi)組是一個(gè)訂閱。
每個(gè)Topic可以有不同的消費(fèi)組。
每組消費(fèi)者都是對(duì)主題的一個(gè)訂閱。
每組消費(fèi)者可以擁有自己不同的消費(fèi)方式: 獨(dú)占(Exclusive),故障切換(Failover)或共享(Share)。
Pulsar通過這種模型,將隊(duì)列模型和流模型這兩種模型結(jié)合在了一起,提供了統(tǒng)一的API接口。 這種模型,既不會(huì)影響消息系統(tǒng)的性能,也不會(huì)帶來額外的開銷,同時(shí)還為用戶提供了更多靈活性,方便用戶程序以最匹配模式來使用消息系統(tǒng)。
獨(dú)占訂閱(Stream流模型)
顧名思義,獨(dú)占訂閱中,在任何時(shí)間,一個(gè)消費(fèi)者組(訂閱)中有且只有一個(gè)消費(fèi)者來消費(fèi)Topic中的消息。下圖是獨(dú)占訂閱的示例。在這個(gè)示例中有一個(gè)有訂閱A的活躍消費(fèi)者A-0,消息m0到m4按順序傳送并由A-0消費(fèi)。如果另一個(gè)消費(fèi)者A-1想要附加到訂閱A,則是不被允許的。
故障切換(Stream流模型)
使用故障切換訂閱,多個(gè)消費(fèi)者(Consumer)可以附加到同一訂閱。 但是,一個(gè)訂閱中的所有消費(fèi)者,只會(huì)有一個(gè)消費(fèi)者被選為該訂閱的主消費(fèi)者。 其他消費(fèi)者將被指定為故障轉(zhuǎn)移消費(fèi)者。
當(dāng)主消費(fèi)者斷開連接時(shí),分區(qū)將被重新分配給其中一個(gè)故障轉(zhuǎn)移消費(fèi)者,而新分配的消費(fèi)者將成為新的主消費(fèi)者。 發(fā)生這種情況時(shí),所有未確認(rèn)(ack)的消息都將傳遞給新的主消費(fèi)者。 這類似于Apache Kafka中的Consumer partition rebalance。
下圖是故障切換訂閱的示例。 消費(fèi)者B-0和B-1通過訂閱B訂閱消費(fèi)消息。B-0是主消費(fèi)者并接收所有消息。 B-1是故障轉(zhuǎn)移消費(fèi)者,如果消費(fèi)者B-0出現(xiàn)故障,它將接管消費(fèi)。
共享訂閱(Queue隊(duì)列模型)
使用共享訂閱,在同一個(gè)訂閱背后,用戶按照應(yīng)用的需求掛載任意多的消費(fèi)者。 訂閱中的所有消息以循環(huán)分發(fā)形式發(fā)送給訂閱背后的多個(gè)消費(fèi)者,并且一個(gè)消息僅傳遞給一個(gè)消費(fèi)者。
當(dāng)消費(fèi)者斷開連接時(shí),所有傳遞給它但是未被確認(rèn)(ack)的消息將被重新分配和組織,以便發(fā)送給該訂閱上剩余的剩余消費(fèi)者。
下圖是共享訂閱的示例。 消費(fèi)者C-1,C-2和C-3都在同一主題上消費(fèi)消息。 每個(gè)消費(fèi)者接收大約所有消息的1/3。
如果想提高消費(fèi)的速度,用戶不需要不增加分區(qū)數(shù)量,只需要在同一個(gè)訂閱中添加更多的消費(fèi)者。
三種訂閱模式的選擇
獨(dú)占和故障切換訂閱,僅允許一個(gè)消費(fèi)者來使用和消費(fèi),每個(gè)對(duì)主題的訂閱。這兩種模式都按主題分區(qū)順序使用消息。它們最適用于需要嚴(yán)格消息順序的流(Stream)用例。
共享訂閱允許每個(gè)主題分區(qū)有多個(gè)消費(fèi)者。同一訂閱中的每個(gè)消費(fèi)者僅接收主題分區(qū)的一部分消息。共享訂閱最適用于不需要保證消息順序的隊(duì)列(Queue)的使用模式,并且可以按照需要任意擴(kuò)展消費(fèi)者的數(shù)量。
Pulsar中的訂閱實(shí)際上與Apache Kafka中的Consumer Group的概念類似。創(chuàng)建訂閱的操作很輕量化,而且具有高度可擴(kuò)展性,用戶可以根據(jù)應(yīng)用的需要?jiǎng)?chuàng)建任意數(shù)量的訂閱。
對(duì)同一主題的不同訂閱,也可以采用不同的訂閱類型。比如用戶可以在同一主題上可以提供一個(gè)包含3個(gè)消費(fèi)者的故障切換訂閱,同時(shí)也提供一個(gè)包含20個(gè)消費(fèi)者的共享訂閱,并且可以在不改變分區(qū)數(shù)量的情況下,向共享訂閱添加更多的消費(fèi)者。
下圖描繪了一個(gè)包含3個(gè)訂閱A,B和C的主題,并說明了消息如何從生產(chǎn)者流向消費(fèi)者。
除了統(tǒng)一消息API之外,由于Pulsar主題分區(qū)實(shí)際上是存儲(chǔ)在Apache BookKeeper中,它還提供了一個(gè)讀取API(Reader),類似于消費(fèi)者API(但Reader沒有游標(biāo)管理),以便用戶完全控制如何使用Topic中的消息。
Pulsar的消息確認(rèn)(ACK)
由于分布式系統(tǒng)的特性,當(dāng)使用分布式消息系統(tǒng)時(shí),可能會(huì)發(fā)生故障。比如在消費(fèi)者從消息系統(tǒng)中的主題消費(fèi)消息的過程中,消費(fèi)消息的消費(fèi)者和服務(wù)于主題分區(qū)的消息代理(Broker)都可能發(fā)生錯(cuò)誤。消息確認(rèn)(ACK)的目的就是保證當(dāng)發(fā)生這樣的故障后,消費(fèi)者能夠從上一次停止的地方恢復(fù)消費(fèi),保證既不會(huì)丟失消息,也不會(huì)重復(fù)處理已經(jīng)確認(rèn)(ACK)的消息。
在Apache Kafka中,恢復(fù)點(diǎn)通常稱為Offset,更新恢復(fù)點(diǎn)的過程稱為消息確認(rèn)或提交Offset。
在Apache Pulsar中,每個(gè)訂閱中都使用一個(gè)專門的數(shù)據(jù)結(jié)構(gòu)--游標(biāo)(Cursor)來跟蹤訂閱中的每條消息的確認(rèn)(ACK)狀態(tài)。每當(dāng)消費(fèi)者在主題分區(qū)上確認(rèn)消息時(shí),游標(biāo)都會(huì)更新。更新游標(biāo)可確保消費(fèi)者不會(huì)再次收到消息。
Apache Pulsar提供兩種消息確認(rèn)方法,單條確認(rèn)(Individual Ack)和累積確認(rèn)(Cumulative Ack)。通過累積確認(rèn),消費(fèi)者只需要確認(rèn)它收到的最后一條消息。主題分區(qū)中的所有消息(包括)提供消息ID將被標(biāo)記為已確認(rèn),并且不會(huì)再次傳遞給消費(fèi)者。累積確認(rèn)與Apache Kafka中的Offset更新類似。
Apache Pulsar可以支持消息的單條確認(rèn),也就是選擇性確認(rèn)。消費(fèi)者可以單獨(dú)確認(rèn)一條消息。 被確認(rèn)后的消息將不會(huì)被重新傳遞。下圖說明了單條確認(rèn)和累積確認(rèn)的差異(灰色框中的消息被確認(rèn)并且不會(huì)被重新傳遞)。在圖的上半部分,它顯示了累計(jì)確認(rèn)的一個(gè)例子,M12之前的消息被標(biāo)記為acked。在圖的下半部分,它顯示了單獨(dú)進(jìn)行acking的示例。僅確認(rèn)消息M7和M12 - 在消費(fèi)者失敗的情況下,除了M7和M12之外,其他所有消息將被重新傳送。
獨(dú)占訂閱或故障切換訂閱的消費(fèi)者能夠?qū)ο⑦M(jìn)行單條確認(rèn)和累積確認(rèn);共享訂閱的消費(fèi)者只允許對(duì)消息進(jìn)行單條確認(rèn)。單條確認(rèn)消息的能力為處理消費(fèi)者故障提供了更好的體驗(yàn)。對(duì)于某些應(yīng)用來說,處理一條消息可能需要很長時(shí)間或者非常昂貴,防止重新傳送已經(jīng)確認(rèn)的消息非常重要。
這個(gè)管理Ack的專門的數(shù)據(jù)結(jié)構(gòu)--游標(biāo)(Cursor),由Broker來管理,利用BookKeeper的Ledger提供存儲(chǔ),在后面的文章中我們會(huì)介紹更多的關(guān)于游標(biāo)(Cursor)的細(xì)節(jié)。
Apache Pulsar提供了靈活的消息消費(fèi)訂閱類型和消息確認(rèn)方法,通過簡(jiǎn)單的統(tǒng)一的API,就可以支持各種消息和流的使用場(chǎng)景。
Pulsar的消息保留(Retention)
在消息被確認(rèn)后,Pulsar的Broker會(huì)更新對(duì)應(yīng)的游標(biāo)。當(dāng)Topic里面中的一條消息,被所有的訂閱都確認(rèn)ack后,才能刪除這條消息。Pulsar還允許通過設(shè)置保留時(shí)間,將消息保留更長時(shí)間,即使所有訂閱已經(jīng)確認(rèn)消費(fèi)了它們。
下圖說明了如何在有2個(gè)訂閱的主題中保留消息。訂閱A在M6和訂閱B已經(jīng)消耗了M10之前的所有消息之前已經(jīng)消耗了所有消息。這意味著M6之前的所有消息(灰色框中)都可以安全刪除。訂閱A仍未使用M6和M9之間的消息,無法刪除它們。如果主題配置了消息保留期,則消息M0到M5將在配置的時(shí)間段內(nèi)保持不變,即使A和B已經(jīng)確認(rèn)消費(fèi)了它們。
在消息保留策略中,Pulsar還支持消息生存時(shí)間(TTL)。如果消息未在配置的TTL時(shí)間段內(nèi)被任何消費(fèi)者使用,則消息將自動(dòng)標(biāo)記為已確認(rèn)。 消息保留期消息TTL之間的區(qū)別在于:消息保留期作用于標(biāo)記為已確認(rèn)并設(shè)置為已刪除的消息,而TTL作用于未ack的消息。 上面的圖例中說明了Pulsar中的TTL。 例如,如果訂閱B沒有活動(dòng)消費(fèi)者,則在配置的TTL時(shí)間段過后,消息M10將自動(dòng)標(biāo)記為已確認(rèn),即使沒有消費(fèi)者實(shí)際讀取該消息。
Pulsar VS. Kafka
通過以上幾個(gè)方面,我們對(duì)Pulsar和Kafka在消息模型方面的不同點(diǎn)進(jìn)行一個(gè)總結(jié)。
模型概念
Kafka: Producer - topic - consumer group - consumer;
Pulsar:Producer - topic - subscription - consumer。
消費(fèi)模式
Kafka: 主要集中在流(Stream)模式,對(duì)單個(gè)partition是獨(dú)占消費(fèi),沒有共享(Queue)的消費(fèi)模式;
Pulsar:提供了統(tǒng)一的消息模型和API。流(Stream)模式 -- 獨(dú)占和故障切換訂閱方式;隊(duì)列(Queue)模式 -- 共享訂閱的方式。
消息確認(rèn)(Ack)
Kafka: 使用偏移Offset;
Pulsar:使用專門的Cursor管理。累積確認(rèn)和Kafka效果一樣;提供單條或選擇性確認(rèn)。
消息保留
Kafka:根據(jù)設(shè)置的保留期來刪除消息。有可能消息沒被消費(fèi),過期后被刪除。 不支持TTL。
Pulsar:消息只有被所有訂閱消費(fèi)后才會(huì)刪除,不會(huì)丟失數(shù)據(jù)。也允許設(shè)置保留期,保留被消費(fèi)的數(shù)據(jù)。支持TTL。
對(duì)比總結(jié):
Apache Pulsar將高性能的流(Apache Kafka所追求的)和靈活的傳統(tǒng)隊(duì)列(RabbitMQ所追求的)結(jié)合到一個(gè)統(tǒng)一的消息模型和API中。 Pulsar使用統(tǒng)一的API為用戶提供一個(gè)支持流和隊(duì)列的系統(tǒng),且具有同樣的高性能。
看完上述內(nèi)容是否對(duì)您有幫助呢?如果還想對(duì)相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝您對(duì)億速云的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。