溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何解析Apache Pulsar的消息存儲(chǔ)模型

發(fā)布時(shí)間:2022-01-18 15:23:35 來源:億速云 閱讀:280 作者:柒染 欄目:云計(jì)算

小編今天帶大家了解如何解析Apache Pulsar的消息存儲(chǔ)模型,文中知識(shí)點(diǎn)介紹的非常詳細(xì)。覺得有幫助的朋友可以跟著小編一起瀏覽文章的內(nèi)容,希望能夠幫助更多想解決這個(gè)問題的朋友找到問題的答案,下面跟著小編一起深入學(xué)習(xí)“如何解析Apache Pulsar的消息存儲(chǔ)模型”的知識(shí)吧。

導(dǎo)讀

Apache Pulsar 是 Apache 軟件基金會(huì)頂級項(xiàng)目,是下一代云原生分布式消息流平臺(tái),集消息、存儲(chǔ)、輕量化函數(shù)式計(jì)算為一體,采用計(jì)算與存儲(chǔ)分離架構(gòu)設(shè)計(jì),支持多租戶、持久化存儲(chǔ)、多機(jī)房跨區(qū)域數(shù)據(jù)復(fù)制,具有強(qiáng)一致性、高吞吐、低延時(shí)及高可擴(kuò)展性等流數(shù)據(jù)存儲(chǔ)特性。

 

背景

在社區(qū)中,我們經(jīng)??梢钥吹接脩粲嘘P(guān) Backlog,storage size 和 retention 等策略的困惑,比較常見的一些問題,諸如:

  • 我沒有設(shè)置 Retention 策略,為什么通過 topics stats 可以查看到 storage size 遠(yuǎn)大于 backlog size?
  • 我的 msg backlog size 很小,但是 storage size 確一直在增長?

Pulsar 的消息模型

首先,我們先來看一下 Pulsar 的消息模型


如何解析Apache Pulsar的消息存儲(chǔ)模型  


如上圖所示,Pulsar 提供了最基本的 pub-sub 的處理模型。


Producer


首先 Producer 端生產(chǎn)消息,將消息以 append 的形式追加到 Topic 中,這里具體分發(fā)到哪一個(gè) Topic 中,根據(jù)消息是否設(shè)置了 msg key 會(huì)有所不同。


  • 設(shè)置了 msg key,消息會(huì)基于 key 做 hash,將消息分發(fā)到不同的 partitions 中
  • 未設(shè)置 msg key,消息會(huì)以 round robin 的形式,分發(fā)到不同的 partitions 中


在消息分發(fā)的模型中,Pulsar 與 Kafka 類似。


Consumer


在 Consumer 之外,Pulsar 抽象了一層訂閱層,用于訂閱 Topic。通過訂閱層的抽象,Pulsar 可以靈活的支持 Queue 和 Streaming 這兩種類型的消息隊(duì)列。每一個(gè) sub 都可以拿到這個(gè) Topic 中所有數(shù)據(jù)的完整 copy,有點(diǎn)類似 Kafka 中的 consumer group。根據(jù)訂閱類型的不同,每一個(gè)訂閱下面可以有一個(gè)或者多個(gè) Consumer 來接收消息。


目前,Pulsar 支持如下四種消息訂閱模型:


  • Exclusive
  • Failover
  • Shared
  • Key_Shared

 

存儲(chǔ)模型


消息在每個(gè) Partition Topic 的分布式日志中只存儲(chǔ)一次


這就意味著,當(dāng) Producer 成功發(fā)送消息到 Topic 之后,這個(gè)消息只會(huì)在存儲(chǔ)層存儲(chǔ)一次,無論你有多少個(gè) Subscription 訂閱到這個(gè) Topic 中,實(shí)際上操作的都是同一份數(shù)據(jù)?;谶@個(gè)基礎(chǔ),我們可以看到 Apache Pulsar 從上到下的層級抽象概念如下圖所示:


如何解析Apache Pulsar的消息存儲(chǔ)模型  


首先第一層抽象是 Topic(Partition),用來存儲(chǔ) Producer 追加的 messages 信息,Topic 之下對應(yīng)的是一個(gè)個(gè)的 ledger,ledger 里面又劃分為一個(gè)個(gè)的分片,在一個(gè)個(gè)的分片中存儲(chǔ)了更小粒度的 ertries,entries 中存儲(chǔ)的是 【一條】或者 【一個(gè) batch】 的消息。


  • Tips: 在 Pulsar 中,一個(gè) batch 在 broker 端會(huì)被當(dāng)作一條消息來處理,batch 解析的具體邏輯是在 consumer 端接收消息時(shí)候去操作的。
  • Node: 在 Bookkeeper 中,對數(shù)據(jù)操作的最小單元是按照 segment 這個(gè)粒度來進(jìn)行操作的。


為什么需要做分層抽象呢?


在這里最直白的解釋其實(shí)就是,為了確保數(shù)據(jù)被在每一個(gè) bk 節(jié)點(diǎn)中打的足夠散,分布的足夠均勻。這也是分層分片架構(gòu)設(shè)計(jì)的好處之一。


Ack 機(jī)制


在 Pulsar 中支持了兩種 Ack 的機(jī)制,分別是單條 Ack 和批量 Ack。單條 Ack(AckIndividual)是指 Consumer 可以根據(jù)消息的 messageID 來針對某一個(gè)特定的消息進(jìn)行 Ack 操作;批量 Ack(AckCumulative)是指一次 Ack 多條消息。


訂閱機(jī)制


為了更好的理解 Strorage Size 以及 Backlog, 我們首先需要去了解 Pulsar 中的訂閱機(jī)制,如下圖所示:


如何解析Apache Pulsar的消息存儲(chǔ)模型  


當(dāng)有消息積壓時(shí),你可以通過 clear-backlog 來清除積壓的消息。清除 backlog 中積壓的消息是相對危險(xiǎn)的操作,所以系統(tǒng)會(huì)提示你,是否確認(rèn)要?jiǎng)h除 backlog 中的消息, clear-backlog 提供了 -f(--force) 的參數(shù)來屏蔽該提示。


Producer 還是按照追加的形式不斷往 Topic 中發(fā)送消息,Consumer 端會(huì)創(chuàng)建一個(gè) Subscription 去訂閱這個(gè) Topic,當(dāng)成功訂閱時(shí),會(huì)初始化一個(gè) Cursor 指向具體的消息的位置,默認(rèn)情況下是 Latest。


Cursor 是用來存儲(chǔ)一個(gè)訂閱中消費(fèi)的狀態(tài)信息


上圖中,我們可以看到該訂閱下面的 Topic 已經(jīng)成功 Receive 并且 Ack 掉了 m4 這條消息。那么包含 m4 在內(nèi)的所有的消息狀態(tài)都會(huì)被標(biāo)記為可刪除的狀態(tài)。在 Pulsar 中,使用 MarkDeletePosition 來標(biāo)記這個(gè)位置。之后的所有消息,代表這個(gè)訂閱還沒有消費(fèi)的消息。


隨著時(shí)間的推移,假設(shè)在 AckCumulative 的場景下,上述訂閱中的 Consumer 又消費(fèi)了一些消息,目前 Cursor 的位置移動(dòng)到了 m8 的位置,意味著 m8 之前的消息都可以進(jìn)入刪除狀態(tài)。


如何解析Apache Pulsar的消息存儲(chǔ)模型  


假設(shè)是在 AckIndividual 的場景下,上述訂閱中的 Consumer 只消費(fèi)了 m7 這條消息并且發(fā)送了 Ack 請求,m5, m6 這兩條消息仍然沒有被成功消費(fèi),那么目前處于可刪除狀態(tài)的消息是 m4 之前的消息和 m7 這條消息。也就是說,在這種場景下,由于使用單條 Ack 導(dǎo)致 Topic 中間出現(xiàn)了 Ack 的空洞。


Cursor = Offset + IndevidualDeletes, Ack 會(huì)觸發(fā) Cursor 的移動(dòng),但是不會(huì)刪除任何消息


如何解析Apache Pulsar的消息存儲(chǔ)模型  


隨著時(shí)間的推移,在單條 Ack 的場景下,Ack 的空洞可能會(huì)自己消失,如下圖所示:


如何解析Apache Pulsar的消息存儲(chǔ)模型  


上面我們描述了,單個(gè)訂閱在單條 Ack 和批量 Ack 混合的場景下,Topic 中 cursor 的移動(dòng)情況。假設(shè)目前有多個(gè) Subscription 訂閱了這個(gè) Topic,那么每一個(gè) Subscription 都可以拿到這個(gè) Topic 中數(shù)據(jù)的完整 Copy,也就是一個(gè) Subscription 會(huì)在這個(gè) Topic 中初始化一個(gè)新的 Cursor, 每一個(gè) Cursor 之間消費(fèi)的進(jìn)度是沒有交集、互不影響的,所以就可能出現(xiàn)下圖中的情況:


如何解析Apache Pulsar的消息存儲(chǔ)模型  


在上圖中,針對該 Topic,有兩個(gè)訂閱:Subscription-1 和 Subscription-2。Subscription-1中的 Consumer 消費(fèi)掉了 m4 之前的消息,Subscription-2 中的 Consumer 消費(fèi)掉了 m8 之前的消息。而 m4-m8 之間的這四條消息,雖然被 Subscription-2 消費(fèi)完成,但是 Subscription-1 還沒有消費(fèi)完成這部分?jǐn)?shù)據(jù),所以這部分消息還不可以被刪除。目前處于可刪除狀態(tài)的消息是 m4 之前的消息,即這個(gè) Topic 中消費(fèi)進(jìn)度最慢的那個(gè) Subscription 所消費(fèi)完成的消息。那么這就會(huì)有一個(gè)問題,假設(shè)我目前 Subscription-1 掉線了,它的 Cursor 的位置一直沒有變化,這就會(huì)導(dǎo)致這個(gè) Topic 中的數(shù)據(jù)一直處于不可刪除的狀態(tài)。


針對上述場景,Pulsar 引入了 TTL 的概念,即允許用戶設(shè)置 TTL 的時(shí)間,當(dāng)消息到達(dá) TTL 指定的閾值 Cursor 仍然沒有移動(dòng)的話,那么會(huì)觸發(fā) TTL 的機(jī)制,將 Cursor 自動(dòng)向后移到指定的位置。在這里需要注意的一點(diǎn)是,我們一直強(qiáng)調(diào)的是 TTL 會(huì)移動(dòng) Cursor 的位置,到目前為止,我們還沒有提到消息刪除的概念,不要將二者混淆了。TTL 會(huì)做的只是去移動(dòng) Cursor 的位置,不會(huì)有任何跟消息刪除的邏輯。


Backlog


為了更好的表述 Topic 中沒有被消費(fèi)的數(shù)據(jù),Pulsar 引入了 Backlog 的概念來描述這一部分消息。Backlog 可以分為如下兩種形式:


  • Topic Backlog: 最慢的那個(gè)訂閱的 Backlog 的集合
  • Subscription Backlog: 指針對單個(gè)訂閱級別的沒有消費(fèi)的數(shù)據(jù)的集合


如下圖所示:Backlog A 屬于 Topic Backlog;Backlog A 屬于 Subscription-1 Backlog;Backlog B 屬于 Subscription-2 的 Backlog。


如何解析Apache Pulsar的消息存儲(chǔ)模型  


隨著時(shí)間的推移,Backlog 的會(huì)不斷的變化,如下圖所示:


如何解析Apache Pulsar的消息存儲(chǔ)模型  


在這里需要說明的一點(diǎn)是,這里的 backlogSize 記錄的是帶 batch 的消息,也就是一個(gè) batch 會(huì)被當(dāng)作一條消息來進(jìn)行處理。因?yàn)樵?broker 端去解析整個(gè) batch 會(huì)給 broker 帶來一定的負(fù)擔(dān),同時(shí)浪費(fèi)大量的 CPU 資源,所以,具體 batch 邏輯的解析放到了 Consumer 端來進(jìn)行處理。所以 Backlog 本質(zhì)上記錄的是上面我們提到的 entries 的數(shù)量。


在 Pulsar 中,針對 Backlog 有兩個(gè)指標(biāo),具體如下:


  • msgBacklog: 記錄的是所有未被 Ack 的 entries 的集合
  • backlogSize:記錄的是所有沒有被 Ack 的消息的大小


Retention 機(jī)制


在 Apache Pulsar 中,使用了 BookKeeper 來作為存儲(chǔ)層,允許用戶將消息持久化,為了確保消息不會(huì)無限期的持久化下去,Pulsar 引入了 Retention 的機(jī)制,允許用戶來配置消息持久化的策略。默認(rèn)情況下,持久化的機(jī)制是關(guān)閉的,即消息被 Ack 之后,就會(huì)進(jìn)入刪除的邏輯。


配置 Retention 策略時(shí),有如下兩個(gè)參數(shù)可以指定:


  • size:指持久化大小的閾值。0 代表不配置 Retention 大小策略,-1 代表設(shè)置的大小無限大
  • time:指持久化時(shí)間的閾值。0 代表不配置 Retention 時(shí)間策略,-1 代表時(shí)間無限大


在引入 Retention 策略之后,整個(gè) Topic 表示的視圖如下所示,m0-m5 代表已經(jīng)被所有訂閱確認(rèn)的消息并且已經(jīng)超過了 Retention 策略的閾值,即這些消息正在 準(zhǔn)備刪除。注意,我這里描述的是 【準(zhǔn)備刪除】具體是否可以被刪除,現(xiàn)在還不能確定。


如何解析Apache Pulsar的消息存儲(chǔ)模型  


在最開始,我們從最上層的 Topic 一步步抽象到了一條具體的 msg,(在這里為了方便描述,我們忽略掉 batch 的概念,即一條 msg 等價(jià)于一個(gè) entry)現(xiàn)在我們再反過來把所有的概念都疊加回去。因?yàn)樵?bk 中,允許操作的最小的單元是一個(gè) segment,所以在具體的 msg(entry)級別,是沒辦法針對一條消息進(jìn)行刪除的,刪除操作需要針對一個(gè) segment 來進(jìn)行操作。如下圖所示:


假設(shè) m0-m3 屬于 segment3;m4-m7 屬于segment2;m8-m11 屬于 segment1。按照上圖的描述,m0-m5 的消息都可以進(jìn)行刪除操作, 但是 segment 2 中包含了 m6, m7 并沒有達(dá)到 Retention 的閾值,所以 segment 目前還不可以被刪除。


Storage Size


為了更方便的表述當(dāng)前消息占用的存儲(chǔ)空間的大小,Pulsar 引入了 storageSize 來描述整個(gè)概念。如下圖所示:當(dāng) backlog B 與 storage Size 標(biāo)識(shí)的消息相同時(shí),backlogSize 等價(jià)于 storageSize。


如何解析Apache Pulsar的消息存儲(chǔ)模型  


當(dāng)由于引入單條 Ack,Retention 策略以及 Bookkeeper 基于 segment 刪除的設(shè)定,那么很有可能造成 Storage Size 大于 backlog Size 的場景,如下圖所示:


如何解析Apache Pulsar的消息存儲(chǔ)模型    
  1. 消息在每個(gè) Partition Topic 的分布式日志中只會(huì)存儲(chǔ)一次

  2. Cursor 是用來存儲(chǔ)一個(gè)訂閱下 Consumer 的消費(fèi)狀態(tài)的

  3. Cursor 等價(jià)于 offset(kafka)+ individualDeletes

  4. Ack 會(huì)去更新 Topic 中 Cursor 的位置

  5. 當(dāng)某條消息被所有訂閱者都 Ack 之后,這條消息進(jìn)入【可以被刪除】的狀態(tài)

  6. 所有沒有被確認(rèn)的消息會(huì)一直保存在 Subscription backlog 中

  7. TTL 可以通過設(shè)定一個(gè)時(shí)間閾值來自動(dòng)更新 Cursor 的位置

  8. Retention 策略是用來操作那些被 Ack 之后的消息應(yīng)該怎么處理

  9. 消息的刪除是以 segment 為單位的,而不是 entry。

感謝大家的閱讀,以上就是“如何解析Apache Pulsar的消息存儲(chǔ)模型”的全部內(nèi)容了,學(xué)會(huì)的朋友趕緊操作起來吧。相信億速云小編一定會(huì)給大家?guī)砀鼉?yōu)質(zhì)的文章。謝謝大家對億速云網(wǎng)站的支持!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI