溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

在Apache Pulsar上支持原生Kafka協(xié)議的示例分析

發(fā)布時間:2021-12-15 09:34:30 來源:億速云 閱讀:176 作者:柒染 欄目:大數(shù)據(jù)

本篇文章給大家分享的是有關(guān)在Apache Pulsar上支持原生Kafka協(xié)議的示例分析,小編覺得挺實用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

我們很高興地宣布 StreamNative 和 OVHcloud 開源了 KoP(Kafka on Pulsar)。KoP 將 Kafka 協(xié)議處理插件引入 Pulsar broker。這樣一來,Apache Pulsar 就支持原生 Apache Kafka 協(xié)議。

將 KoP 協(xié)議處理插件添加到現(xiàn)有 Pulsar 集群后,用戶不用修改代碼就可以將現(xiàn)有的 Kafka 應(yīng)用程序和服務(wù)遷移到 Pulsar。

這樣,Kafka 應(yīng)用程序就可以使用 Pulsar 的強大功能,例如:

  • 利用企業(yè)級多租戶特性簡化運營

  • 避免數(shù)據(jù)搬遷,簡化操作

  • 利用 Apache BookKeeper 和分層存儲持久保留事件流

  • 利用 Pulsar Functions 進行無服務(wù)器化事件處理

什么是 Apache Pulsar

Apache Pulsar 是一個事件流平臺。最初,Apache Pulsar 就采用云原生、分層分片的架構(gòu)。該架構(gòu)將服務(wù)和存儲分離開來,使系統(tǒng)實現(xiàn)更友好的容器化。

Pulsar 的云原生架構(gòu)具備強擴展性、高一致性和高彈性,使公司能通過實時數(shù)據(jù)解決方案擴展業(yè)務(wù)。自 2016 年開源以來,Pulsar 已得到廣泛采用,并于 2018 年成為 Apache 頂級項目。


對 KoP 的渴望

Plusar 為隊列和流工作負載提供統(tǒng)一的消息模型。Pulsar 支持自己基于 protobuf 的二進制協(xié)議,以確保高性能和低延遲。protobuf 有利于實現(xiàn) Pulsar 客戶端。

而且,該項目也支持 Java,Go,Python 和 C ++ 語言以及社區(qū)提供的第三方客戶端。但是,對于使用其他消息傳輸協(xié)議編寫的應(yīng)用程序,用戶必須重寫這些應(yīng)用程序,否則這些應(yīng)用程序無法采用 Pulsar 新的統(tǒng)一消息傳輸協(xié)議。
為了解決這一問題,Pulsar 社區(qū)開發(fā)了一些應(yīng)用程序,以便將 Kafka 應(yīng)用程序從其他消息系統(tǒng)遷移到 Pulsar。例如,Pulsar 在 Kafka Java API 上提供了 Kafka wrapper。  

Kafka wrapper 允許用戶在不改變代碼的情況下將其使用的 Kafka Java 客戶端應(yīng)用程序從 Kafka 切換到 Pulsar。Pulsar 還提供豐富的 connector 生態(tài)系統(tǒng),用于連接 Pulsar 和其他數(shù)據(jù)系統(tǒng)。


但是,那些想要從其他 Kafka 應(yīng)用程序切換到 Pulsar 的用戶仍然有強烈的需求。


StreamNative 和 OVHcloud 合作

StreamNative 收到大量的入站請求,請求幫助從其他消息系統(tǒng)遷移到 Pulsar 。同時,StreamNative 也意識到在 Pulsar 上原生支持其他消息傳輸協(xié)議(例如 AMQP 和 Kafka)的必要性。

 所以,StreamNative 開始致力于將通用協(xié)議處理插件框架引入到 Pulsar 中。該框架允許使用其他消息傳輸協(xié)議的開發(fā)人員使用 Pulsar。

多年來,OVHcloud 一直采用 Apache Kafka。  盡管他們有在 Kafka 上運行多個集群且每秒處理數(shù)百萬條消息的經(jīng)驗,但仍面臨艱巨的運營挑戰(zhàn)。例如,如果不使用多租戶特性,他們很難將成千上萬個用戶的數(shù)千個 Topic 放在一個集群中。

所以,OVHcloud 放棄 Kafka,決定將其主題即服務(wù)的產(chǎn)品(即 ioStream)轉(zhuǎn)移到 Pulsar,并在 Pulsar 上構(gòu)建其產(chǎn)品。與 Kafka 相比,Pulsar 支持多租戶特性且其整體架構(gòu)包含 Apache BookKeep 組件,這有助于簡化用戶操作。
在初步實驗之后,OVHcloud 決定將 KoP 作為 PoC proxy,將 Kafka 協(xié)議即時轉(zhuǎn)換到 Pulsar。在此過程中,OVHcloud 注意到 StreamNative 正在致力于將 Kafka 協(xié)議原生地引入到 Pulsar。于是,他們聯(lián)手開發(fā)了 KoP。  

KoP 旨在利用 Pulsar 和 BookKeeper 的事件流存儲架構(gòu)和 Pulsar 的可插拔協(xié)議處理插件框架來提供一種精簡而全面的解決方案。  KoP 是一個協(xié)議名稱為「kafka」的協(xié)議處理插件。KoP 綁定在 Pulsar broker上,并與 Pulsar broker 一起運行。



分布式日志

關(guān)于日志,Pulsar 和 Kafka 都采用非常相似的數(shù)據(jù)模型,用于發(fā)布/訂閱消息和事件流。例如,Pulsar 和 Kafka 都采用分布式日志。

這兩個系統(tǒng)的主要區(qū)別在于它們?nèi)绾螌崿F(xiàn)分布式日志。Kafka 采用分區(qū)的架構(gòu),將分布式日志(Kafka 分區(qū)中的日志)存儲在一組 broker 中。Pulsar 采用分片的架構(gòu),利用 Apache BookKeeper 作為其橫向擴展的分片存儲層,將分布式日志存儲在 Apache BookKeeper 中。

Pulsar 基于分片的架構(gòu)有助于避免數(shù)據(jù)搬遷、實現(xiàn)高擴展性、以及持久地存儲事件流。

Pulsar 和 Kafka 都基于相似的數(shù)據(jù)模型(分布式日志)進行搭建,而且 Pulsar 采用分布式日志存儲和可插拔的協(xié)議處理插件框架(在2.5.0 版本中引入),所以 Pulsar 可以很容易地實現(xiàn)兼容 Kafka 的協(xié)議處理插件。

實現(xiàn)方式

通過對比 Pulsar 和 Kafka,我們發(fā)現(xiàn)這兩種系統(tǒng)有很多相似之處。這兩種系統(tǒng)都包括以下操作:

  • Topic 查找 :所有客戶端都連接到任一 broker 以查找 Topic 的元數(shù)據(jù)(即 owner broker)。獲取元數(shù)據(jù)之后,客戶端與 owner broker 建立持久的 TCP 連接。
  • 發(fā)布 :客戶端與 Topic 區(qū)的 owner broker 進行對話,以將消息追加到 分布式日志 中。
  • 消費 :客戶端與 Topic 分區(qū)的 owner broker 進行對話,以便從分布式日志中讀取消息。
  • 偏移量 :為發(fā)布給 Topic 分區(qū)的消息分配偏移量。在 Pulsar 中,偏移量被稱為 MessageId。consumer 可以使用偏移量來查找日志中的給定位置,以便讀取消息。
  • 消費狀態(tài) :這兩個系統(tǒng)都維護訂閱中的 consumer( Kafka 稱之為消費組)的消費狀態(tài)。Kafka 將消費狀態(tài)存儲在  `__offsets`  Topic,而 Pulsar 將消費狀態(tài)存儲在  `cursors` 。

正如你所見,這些都是橫向擴展分布式日志存儲(例如 Apache BookKeeper)提供的所有原始操作。

Pulsar 的核心功能是在 Apache BookKeeper 上實現(xiàn)的。因此,我們可以非常簡單、直接地使用 Pulsar 在 BookKeeper 上開發(fā)的現(xiàn)有組件來實現(xiàn) Kafka 概念。  

下圖說明了我們?nèi)绾卧?Pulsar 中添加 Kafka 協(xié)議支持。我們引入一個新的協(xié)議處理插件,該協(xié)議處理插件利用 Pulsar 的現(xiàn)有組件(例如 Topic 發(fā)現(xiàn)、分布式日志庫-ManagedLedger、cursor 等)來實現(xiàn) Kafka 傳輸協(xié)議。

在Apache Pulsar上支持原生Kafka協(xié)議的示例分析

在Apache Pulsar上支持原生Kafka協(xié)議的示例分析

Topic    
Kafka 將所有 Topic 存儲在扁平的命名空間。但是,Pulsar 將 Topic 存儲在層次化、多租戶的命名空間。我們在 broker 配置中添加了    `kafkaNamespace`   配置,這樣管理員就可以將 Kafka Topic 映射到 Pulsar Topic。

為了方便 Kafka 用戶使用 Apache Pulsar 的多租戶特性,當(dāng) Kafka 用戶使用 SASL 驗證機制來驗證 Kafka 客戶端的時候,可以指定一個 Pulsar 租戶和命名空間作為其 SASL 用戶名。

消息 ID 和偏移量

Kafka 為每條被成功發(fā)布到 Topic 分區(qū)的消息都指定了一個偏移量。Pulsar 為每條消息指定了一個    `MessageID`  。消息 ID 由    `ledger-id`  、    `entry-id`   和    `batch-index`    組成。我們在 Pulsar-Kafka wrapper 中使用相同的方法將 Pulsar 的消息 ID 轉(zhuǎn)換為偏移量,反之亦然。
消息
Kafka 和 Pulsar 的消息都包含鍵、值、時間戳和 header(在 Pulsar 中被稱作 ‘properties’)。我們自動在 Kafka 消息和 Pulsar 消息之間轉(zhuǎn)換這些字段。

Topic 查找

我們?yōu)?Kafka 和 Pulsar 的請求處理插件提供相同的 Topic 查找方法。請求處理插件發(fā)現(xiàn) Topic,查找所請求的 Topic 分區(qū)的全部所有權(quán),然后將包含所有權(quán)信息的 Kafka `TopicMetadata` 返回給 Kafka 客戶端。

發(fā)布消息

當(dāng)收到 Kafka 客戶端發(fā)布的消息后,Kafka 請求處理插件逐一將多個字段(例如鍵、值、時間戳和 headers)進行映射,從而將 Kafka 消息轉(zhuǎn)換為 Pulsar 消息。

同時,Kafka 請求處理插件利用 ManagedLedger append API 將這些已轉(zhuǎn)化的 Pulsar 消息存儲在 BookKeeper。Kafka 請求處理插件將 Kafka 消息轉(zhuǎn)換為 Pulsar 消息后,現(xiàn)有的 Pulsar 應(yīng)用程序就可以接收 Kafka 客戶端發(fā)布的消息。

消費消息

當(dāng)收到 Kafka 客戶端的 consumer 請求時,Kafka 請求處理插件打開一個非持久 cursor,然后從請求的偏移量開始讀取 entries。

Kafka 請求處理插件將 Pulsar 消息轉(zhuǎn)換回 Kafka 消息后,現(xiàn)有的 Kafka 應(yīng)用程序就可以接收 Pulsar 客戶端發(fā)布的消息。

Group coordinator & 偏移量管理

最大的挑戰(zhàn)是實現(xiàn) group coordinator 和偏移量管理。Pulsar 不支持集中的 group coordinator,無法為消費組里的 consumer 分配分區(qū),也無法管理每個消費組的偏移量。

Pulsar broker 基于分區(qū)來管理分區(qū)分配,而分區(qū)的 owner broker 通過將確認信息存儲在 cursors 來管理偏移量。
我們很難讓 Pulsar 模型與 Kafka 模型保持一致。因此,為了完全兼容 Kafka 客戶端,我們將 coordinator group 的更改和偏移量存儲在 Pulsar 名為    `public/kafka/__offsets`   系統(tǒng) Topic 中,從而實現(xiàn) Kafka coordinator group。

這樣,我們能夠在 Pulsar 和 Kafka 之間建立橋梁,并允許用戶使用現(xiàn)有的 Pulsar 工具和策略來管理訂閱并監(jiān)控 Kafka consumer。我們在已實現(xiàn)的 coordinator group 中添加一個后臺線程,定期將偏移量更新從系統(tǒng) Topic 同步到 Pulsar cursor。

因此,實際上 Kafka 消費組被認為是 Pulsar 訂閱。所有現(xiàn)有的 Pulsar 工具也可以用于管理 Kafka 消費組。
連接兩種流行的消息生態(tài)系統(tǒng)

StreamNative 和 OVHcloud 都重視客戶的成功。我們相信,在 Apache Pulsar 上提供原生 Kafka 協(xié)議能夠幫助采用 Pulsar 的用戶更快地取得業(yè)務(wù)成功。
 KoP 整合了兩個流行的事件流生態(tài)系統(tǒng),解鎖了新的用例??蛻艨梢岳眠@兩個生態(tài)系統(tǒng)的優(yōu)勢,借助 Apache Pulsar 構(gòu)建一個真正統(tǒng)一的事件流平臺,加速開發(fā)實時應(yīng)用程序和服務(wù)。
KoP 使日志收集器可以繼續(xù)從其來源收集日志數(shù)據(jù),并使用現(xiàn)有的 Kafka 集成向 Apache Pulsar 發(fā)布消息。下游應(yīng)用程序可以使用 Pulsar Functions 來處理到達系統(tǒng)的事件,實現(xiàn)無服務(wù)器化事件流傳輸。

以上就是在Apache Pulsar上支持原生Kafka協(xié)議的示例分析,小編相信有部分知識點可能是我們?nèi)粘9ぷ鲿姷交蛴玫降?。希望你能通過這篇文章學(xué)到更多知識。更多詳情敬請關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI