溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

2020年,Kafka入門看這一篇就夠了!

發(fā)布時(shí)間:2020-05-17 21:20:21 來(lái)源:網(wǎng)絡(luò) 閱讀:302 作者:Java碼農(nóng)君 欄目:編程語(yǔ)言

Kafka 創(chuàng)建背景
Kafka 是一個(gè)消息系統(tǒng),原本開(kāi)發(fā)自 LinkedIn,用作 LinkedIn 的活動(dòng)流(Activity Stream)和運(yùn)營(yíng)數(shù)據(jù)處理管道(Pipeline)的基礎(chǔ)?,F(xiàn)在它已被多家不同類型的公司 作為多種類型的數(shù)據(jù)管道和消息系統(tǒng)使用。

活動(dòng)流數(shù)據(jù)是幾乎所有站點(diǎn)在對(duì)其網(wǎng)站使用情況做報(bào)表時(shí)都要用到的數(shù)據(jù)中最常規(guī)的部分?;顒?dòng)數(shù)據(jù)包括頁(yè)面訪問(wèn)量(Page View)、被查看內(nèi)容方面的信息以及搜索情況等內(nèi)容。這種數(shù)據(jù)通常的處理方式是先把各種活動(dòng)以日志的形式寫入某種文件,然后周期性地對(duì)這些文件進(jìn)行統(tǒng)計(jì)分析。運(yùn)營(yíng)數(shù)據(jù)指的是服務(wù)器的性能數(shù)據(jù)(CPU、IO 使用率、請(qǐng)求時(shí)間、服務(wù)日志等等數(shù)據(jù))。運(yùn)營(yíng)數(shù)據(jù)的統(tǒng)計(jì)方法種類繁多。

近年來(lái),活動(dòng)和運(yùn)營(yíng)數(shù)據(jù)處理已經(jīng)成為了網(wǎng)站軟件產(chǎn)品特性中一個(gè)至關(guān)重要的組成部分,這就需要一套稍微更加復(fù)雜的基礎(chǔ)設(shè)施對(duì)其提供支持。

Kafka 簡(jiǎn)介
Kafka 是一種分布式的,基于發(fā)布 / 訂閱的消息系統(tǒng)。

主要設(shè)計(jì)目標(biāo)如下:

以時(shí)間復(fù)雜度為 O(1) 的方式提供消息持久化能力,即使對(duì) TB 級(jí)以上數(shù)據(jù)也能保證常數(shù)時(shí)間復(fù)雜度的訪問(wèn)性能。

高吞吐率。即使在非常廉價(jià)的商用機(jī)器上也能做到單機(jī)支持每秒 100K 條以上消息的傳輸。

支持 Kafka Server 間的消息分區(qū),及分布式消費(fèi),同時(shí)保證每個(gè) Partition 內(nèi)的消息順序傳輸。

同時(shí)支持離線數(shù)據(jù)處理和實(shí)時(shí)數(shù)據(jù)處理。

Scale out:支持在線水平擴(kuò)展。

Kafka 基礎(chǔ)概念
概念一:生產(chǎn)者與消費(fèi)者

對(duì)于 Kafka 來(lái)說(shuō)客戶端有兩種基本類型:

生產(chǎn)者(Producer)

消費(fèi)者(Consumer)。

除此之外,還有用來(lái)做數(shù)據(jù)集成的 Kafka Connect API 和流式處理的 Kafka Streams 等高階客戶端,但這些高階客戶端底層仍然是生產(chǎn)者和消費(fèi)者API,它們只不過(guò)是在上層做了封裝。

這很容易理解,生產(chǎn)者(也稱為發(fā)布者)創(chuàng)建消息,而消費(fèi)者(也稱為訂閱者)負(fù)責(zé)消費(fèi)or讀取消息。

概念二:主題(Topic)與分區(qū)(Partition)

在 Kafka 中,消息以主題(Topic)來(lái)分類,每一個(gè)主題都對(duì)應(yīng)一個(gè) 「消息隊(duì)列」,這有點(diǎn)兒類似于數(shù)據(jù)庫(kù)中的表。但是如果我們把所有同類的消息都塞入到一個(gè)“中心”隊(duì)列中,勢(shì)必缺少可伸縮性,無(wú)論是生產(chǎn)者/消費(fèi)者數(shù)目的增加,還是消息數(shù)量的增加,都可能耗盡系統(tǒng)的性能或存儲(chǔ)。

我們使用一個(gè)生活中的例子來(lái)說(shuō)明:現(xiàn)在 A 城市生產(chǎn)的某商品需要運(yùn)輸?shù)?B 城市,走的是公路,那么單通道的高速公路不論是在「A 城市商品增多」還是「現(xiàn)在 C 城市也要往 B 城市運(yùn)輸東西」這樣的情況下都會(huì)出現(xiàn)「吞吐量不足」的問(wèn)題。所以我們現(xiàn)在引入分區(qū)(Partition)的概念,類似“允許多修幾條道”的方式對(duì)我們的主題完成了水平擴(kuò)展。

概念三:Broker 和集群(Cluster)

一個(gè) Kafka 服務(wù)器也稱為 Broker,它接受生產(chǎn)者發(fā)送的消息并存入磁盤;Broker 同時(shí)服務(wù)消費(fèi)者拉取分區(qū)消息的請(qǐng)求,返回目前已經(jīng)提交的消息。使用特定的機(jī)器硬件,一個(gè) Broker 每秒可以處理成千上萬(wàn)的分區(qū)和百萬(wàn)量級(jí)的消息。(現(xiàn)在動(dòng)不動(dòng)就百萬(wàn)量級(jí)..我特地去查了一把,好像確實(shí)集群的情況下吞吐量挺高的..嗯..)

若干個(gè) Broker 組成一個(gè)集群(Cluster),其中集群內(nèi)某個(gè) Broker 會(huì)成為集群控制器(Cluster Controller),它負(fù)責(zé)管理集群,包括分配分區(qū)到 Broker、監(jiān)控 Broker 故障等。在集群內(nèi),一個(gè)分區(qū)由一個(gè) Broker 負(fù)責(zé),這個(gè) Broker 也稱為這個(gè)分區(qū)的 Leader;當(dāng)然一個(gè)分區(qū)可以被復(fù)制到多個(gè) Broker 上來(lái)實(shí)現(xiàn)冗余,這樣當(dāng)存在 Broker 故障時(shí)可以將其分區(qū)重新分配到其他 Broker 來(lái)負(fù)責(zé)。下圖是一個(gè)樣例:

Kafka 的一個(gè)關(guān)鍵性質(zhì)是日志保留(retention),我們可以配置主題的消息保留策略,譬如只保留一段時(shí)間的日志或者只保留特定大小的日志。當(dāng)超過(guò)這些限制時(shí),老的消息會(huì)被刪除。我們也可以針對(duì)某個(gè)主題單獨(dú)設(shè)置消息過(guò)期策略,這樣對(duì)于不同應(yīng)用可以實(shí)現(xiàn)個(gè)性化。

概念四:多集群

隨著業(yè)務(wù)發(fā)展,我們往往需要多集群,通常處于下面幾個(gè)原因:

基于數(shù)據(jù)的隔離;

基于安全的隔離;

多數(shù)據(jù)中心(容災(zāi))

當(dāng)構(gòu)建多個(gè)數(shù)據(jù)中心時(shí),往往需要實(shí)現(xiàn)消息互通。舉個(gè)例子,假如用戶修改了個(gè)人資料,那么后續(xù)的請(qǐng)求無(wú)論被哪個(gè)數(shù)據(jù)中心處理,這個(gè)更新需要反映出來(lái)。又或者,多個(gè)數(shù)據(jù)中心的數(shù)據(jù)需要匯總到一個(gè)總控中心來(lái)做數(shù)據(jù)分析。

上面說(shuō)的分區(qū)復(fù)制冗余機(jī)制只適用于同一個(gè) Kafka 集群內(nèi)部,對(duì)于多個(gè) Kafka 集群消息同步可以使用 Kafka 提供的 MirrorMaker 工具。本質(zhì)上來(lái)說(shuō),MirrorMaker 只是一個(gè) Kafka 消費(fèi)者和生產(chǎn)者,并使用一個(gè)隊(duì)列連接起來(lái)而已。它從一個(gè)集群中消費(fèi)消息,然后往另一個(gè)集群生產(chǎn)消息。

二、Kafka 的設(shè)計(jì)與實(shí)現(xiàn)
上面我們知道了 Kafka 中的一些基本概念,但作為一個(gè)成熟的「消息隊(duì)列」中間件,其中有許多有意思的設(shè)計(jì)值得我們思考,下面我們簡(jiǎn)單列舉一些。

討論一:Kafka 存儲(chǔ)在文件系統(tǒng)上

是的,您首先應(yīng)該知道 Kafka 的消息是存在于文件系統(tǒng)之上的。Kafka 高度依賴文件系統(tǒng)來(lái)存儲(chǔ)和緩存消息,一般的人認(rèn)為 “磁盤是緩慢的”,所以對(duì)這樣的設(shè)計(jì)持有懷疑態(tài)度。實(shí)際上,磁盤比人們預(yù)想的快很多也慢很多,這取決于它們?nèi)绾伪皇褂茫灰粋€(gè)好的磁盤結(jié)構(gòu)設(shè)計(jì)可以使之跟網(wǎng)絡(luò)速度一樣快。

現(xiàn)代的操作系統(tǒng)針對(duì)磁盤的讀寫已經(jīng)做了一些優(yōu)化方案來(lái)加快磁盤的訪問(wèn)速度。比如,預(yù)讀會(huì)提前將一個(gè)比較大的磁盤快讀入內(nèi)存。后寫會(huì)將很多小的邏輯寫操作合并起來(lái)組合成一個(gè)大的物理寫操作。并且,操作系統(tǒng)還會(huì)將主內(nèi)存剩余的所有空閑內(nèi)存空間都用作磁盤緩存,所有的磁盤讀寫操作都會(huì)經(jīng)過(guò)統(tǒng)一的磁盤緩存(除了直接 I/O 會(huì)繞過(guò)磁盤緩存)。綜合這幾點(diǎn)優(yōu)化特點(diǎn),如果是針對(duì)磁盤的順序訪問(wèn),某些情況下它可能比隨機(jī)的內(nèi)存訪問(wèn)都要快,甚至可以和網(wǎng)絡(luò)的速度相差無(wú)幾。

上述的 Topic 其實(shí)是邏輯上的概念,面相消費(fèi)者和生產(chǎn)者,物理上存儲(chǔ)的其實(shí)是 Partition,每一個(gè) Partition 最終對(duì)應(yīng)一個(gè)目錄,里面存儲(chǔ)所有的消息和索引文件。默認(rèn)情況下,每一個(gè) Topic 在創(chuàng)建時(shí)如果不指定 Partition 數(shù)量時(shí)只會(huì)創(chuàng)建 1 個(gè) Partition。比如,我創(chuàng)建了一個(gè) Topic 名字為 test ,沒(méi)有指定 Partition 的數(shù)量,那么會(huì)默認(rèn)創(chuàng)建一個(gè) test-0 的文件夾,這里的命名規(guī)則是:<topic_name>-<partition_id>。

任何發(fā)布到 Partition 的消息都會(huì)被追加到 Partition 數(shù)據(jù)文件的尾部,這樣的順序?qū)懘疟P操作讓 Kafka 的效率非常高(經(jīng)驗(yàn)證,順序?qū)懘疟P效率比隨機(jī)寫內(nèi)存還要高,這是 Kafka 高吞吐率的一個(gè)很重要的保證)。

每一條消息被發(fā)送到 Broker 中,會(huì)根據(jù) Partition 規(guī)則選擇被存儲(chǔ)到哪一個(gè) Partition。如果 Partition 規(guī)則設(shè)置的合理,所有消息可以均勻分布到不同的 Partition中。

討論二:Kafka 中的底層存儲(chǔ)設(shè)計(jì)

假設(shè)我們現(xiàn)在 Kafka 集群只有一個(gè) Broker,我們創(chuàng)建 2 個(gè) Topic 名稱分別為:「topic1」和「topic2」,Partition 數(shù)量分別為 1、2,那么我們的根目錄下就會(huì)創(chuàng)建如下三個(gè)文件夾:

|--topic1-0|--topic2-0|--topic2-1

在 Kafka 的文件存儲(chǔ)中,同一個(gè) Topic 下有多個(gè)不同的 Partition,每個(gè) Partition 都為一個(gè)目錄,而每一個(gè)目錄又被平均分配成多個(gè)大小相等的 Segment File 中,Segment File 又由 index file 和 data file 組成,他們總是成對(duì)出現(xiàn),后綴 “.index” 和 “.log” 分表表示 Segment 索引文件和數(shù)據(jù)文件。

現(xiàn)在假設(shè)我們?cè)O(shè)置每個(gè) Segment 大小為 500 MB,并啟動(dòng)生產(chǎn)者向 topic1 中寫入大量數(shù)據(jù),topic1-0 文件夾中就會(huì)產(chǎn)生類似如下的一些文件:

|--topic1-0|--00000000000000000000.index|--00000000000000000000.log|--00000000000000368769.index|--00000000000000368769.log|--00000000000000737337.index|--00000000000000737337.log|--00000000000001105814.index|--00000000000001105814.log|--topic2-0|--topic2-1

Segment 是 Kafka 文件存儲(chǔ)的最小單位。Segment 文件命名規(guī)則:Partition 全局的第一個(gè) Segment 從 0 開(kāi)始,后續(xù)每個(gè) Segment 文件名為上一個(gè) Segment 文件最后一條消息的 offset 值。數(shù)值最大為 64 位 long 大小,19 位數(shù)字字符長(zhǎng)度,沒(méi)有數(shù)字用0填充。如 00000000000000368769.index 和 00000000000000368769.log。

以上面的一對(duì) Segment File 為例,說(shuō)明一下索引文件和數(shù)據(jù)文件對(duì)應(yīng)關(guān)系:

其中以索引文件中元數(shù)據(jù) <3, 497> 為例,依次在數(shù)據(jù)文件中表示第 3 個(gè) message(在全局 Partition 表示第 368769 + 3 = 368772 個(gè) message)以及該消息的物理偏移地址為 497。

注意該 index 文件并不是從0開(kāi)始,也不是每次遞增1的,這是因?yàn)?Kafka 采取稀疏索引存儲(chǔ)的方式,每隔一定字節(jié)的數(shù)據(jù)建立一條索引,它減少了索引文件大小,使得能夠把 index 映射到內(nèi)存,降低了查詢時(shí)的磁盤 IO 開(kāi)銷,同時(shí)也并沒(méi)有給查詢帶來(lái)太多的時(shí)間消耗。

因?yàn)槠湮募麨樯弦粋€(gè) Segment 最后一條消息的 offset ,所以當(dāng)需要查找一個(gè)指定 offset 的 message 時(shí),通過(guò)在所有 segment 的文件名中進(jìn)行二分查找就能找到它歸屬的 segment ,再在其 index 文件中找到其對(duì)應(yīng)到文件上的物理位置,就能拿出該 message 。

由于消息在 Partition 的 Segment 數(shù)據(jù)文件中是順序讀寫的,且消息消費(fèi)后不會(huì)刪除(刪除策略是針對(duì)過(guò)期的 Segment 文件),這種順序磁盤 IO 存儲(chǔ)設(shè)計(jì)師 Kafka 高性能很重要的原因。

Kafka 是如何準(zhǔn)確的知道 message 的偏移的呢?這是因?yàn)樵?Kafka 定義了標(biāo)準(zhǔn)的數(shù)據(jù)存儲(chǔ)結(jié)構(gòu),在 Partition 中的每一條 message 都包含了以下三個(gè)屬性:

offset:表示 message 在當(dāng)前 Partition 中的偏移量,是一個(gè)邏輯上的值,唯一確定了 Partition 中的一條 message,可以簡(jiǎn)單的認(rèn)為是一個(gè) id;

MessageSize:表示 message 內(nèi)容 data 的大??;

data:message 的具體內(nèi)容

討論三:生產(chǎn)者設(shè)計(jì)概要

當(dāng)我們發(fā)送消息之前,先問(wèn)幾個(gè)問(wèn)題:每條消息都是很關(guān)鍵且不能容忍丟失么?偶爾重復(fù)消息可以么?我們關(guān)注的是消息延遲還是寫入消息的吞吐量?

舉個(gè)例子,有一個(gè)信用卡交易處理系統(tǒng),當(dāng)交易發(fā)生時(shí)會(huì)發(fā)送一條消息到 Kafka,另一個(gè)服務(wù)來(lái)讀取消息并根據(jù)規(guī)則引擎來(lái)檢查交易是否通過(guò),將結(jié)果通過(guò) Kafka 返回。對(duì)于這樣的業(yè)務(wù),消息既不能丟失也不能重復(fù),由于交易量大因此吞吐量需要盡可能大,延遲可以稍微高一點(diǎn)。

再舉個(gè)例子,假如我們需要收集用戶在網(wǎng)頁(yè)上的點(diǎn)擊數(shù)據(jù),對(duì)于這樣的場(chǎng)景,少量消息丟失或者重復(fù)是可以容忍的,延遲多大都不重要只要不影響用戶體驗(yàn),吞吐則根據(jù)實(shí)時(shí)用戶數(shù)來(lái)決定。

不同的業(yè)務(wù)需要使用不同的寫入方式和配置。具體的方式我們?cè)谶@里不做討論,現(xiàn)在先看下生產(chǎn)者寫消息的基本流程:

流程如下:

首先,我們需要?jiǎng)?chuàng)建一個(gè)ProducerRecord,這個(gè)對(duì)象需要包含消息的主題(topic)和值(value),可以選擇性指定一個(gè)鍵值(key)或者分區(qū)(partition)。

發(fā)送消息時(shí),生產(chǎn)者會(huì)對(duì)鍵值和值序列化成字節(jié)數(shù)組,然后發(fā)送到分配器(partitioner)。

如果我們指定了分區(qū),那么分配器返回該分區(qū)即可;否則,分配器將會(huì)基于鍵值來(lái)選擇一個(gè)分區(qū)并返回。

選擇完分區(qū)后,生產(chǎn)者知道了消息所屬的主題和分區(qū),它將這條記錄添加到相同主題和分區(qū)的批量消息中,另一個(gè)線程負(fù)責(zé)發(fā)送這些批量消息到對(duì)應(yīng)的Kafka broker。

當(dāng)broker接收到消息后,如果成功寫入則返回一個(gè)包含消息的主題、分區(qū)及位移的RecordMetadata對(duì)象,否則返回異常。

生產(chǎn)者接收到結(jié)果后,對(duì)于異??赡軙?huì)進(jìn)行重試。

討論四:消費(fèi)者設(shè)計(jì)概要

消費(fèi)者與消費(fèi)組

假設(shè)這么個(gè)場(chǎng)景:我們從Kafka中讀取消息,并且進(jìn)行檢查,最后產(chǎn)生結(jié)果數(shù)據(jù)。我們可以創(chuàng)建一個(gè)消費(fèi)者實(shí)例去做這件事情,但如果生產(chǎn)者寫入消息的速度比消費(fèi)者讀取的速度快怎么辦呢?這樣隨著時(shí)間增長(zhǎng),消息堆積越來(lái)越嚴(yán)重。對(duì)于這種場(chǎng)景,我們需要增加多個(gè)消費(fèi)者來(lái)進(jìn)行水平擴(kuò)展。

Kafka消費(fèi)者是消費(fèi)組的一部分,當(dāng)多個(gè)消費(fèi)者形成一個(gè)消費(fèi)組來(lái)消費(fèi)主題時(shí),每個(gè)消費(fèi)者會(huì)收到不同分區(qū)的消息。假設(shè)有一個(gè)T1主題,該主題有4個(gè)分區(qū);同時(shí)我們有一個(gè)消費(fèi)組G1,這個(gè)消費(fèi)組只有一個(gè)消費(fèi)者C1。那么消費(fèi)者C1將會(huì)收到這4個(gè)分區(qū)的消息,如下所示:

如果我們?cè)黾有碌南M(fèi)者C2到消費(fèi)組G1,那么每個(gè)消費(fèi)者將會(huì)分別收到兩個(gè)分區(qū)的消息,如下所示:

如果增加到4個(gè)消費(fèi)者,那么每個(gè)消費(fèi)者將會(huì)分別收到一個(gè)分區(qū)的消息,如下所示:

但如果我們繼續(xù)增加消費(fèi)者到這個(gè)消費(fèi)組,剩余的消費(fèi)者將會(huì)空閑,不會(huì)收到任何消息:

總而言之,我們可以通過(guò)增加消費(fèi)組的消費(fèi)者來(lái)進(jìn)行水平擴(kuò)展提升消費(fèi)能力。這也是為什么建議創(chuàng)建主題時(shí)使用比較多的分區(qū)數(shù),這樣可以在消費(fèi)負(fù)載高的情況下增加消費(fèi)者來(lái)提升性能。另外,消費(fèi)者的數(shù)量不應(yīng)該比分區(qū)數(shù)多,因?yàn)槎喑鰜?lái)的消費(fèi)者是空閑的,沒(méi)有任何幫助。

Kafka一個(gè)很重要的特性就是,只需寫入一次消息,可以支持任意多的應(yīng)用讀取這個(gè)消息。換句話說(shuō),每個(gè)應(yīng)用都可以讀到全量的消息。為了使得每個(gè)應(yīng)用都能讀到全量消息,應(yīng)用需要有不同的消費(fèi)組。對(duì)于上面的例子,假如我們新增了一個(gè)新的消費(fèi)組G2,而這個(gè)消費(fèi)組有兩個(gè)消費(fèi)者,那么會(huì)是這樣的:

在這個(gè)場(chǎng)景中,消費(fèi)組G1和消費(fèi)組G2都能收到T1主題的全量消息,在邏輯意義上來(lái)說(shuō)它們屬于不同的應(yīng)用。

最后,總結(jié)起來(lái)就是:如果應(yīng)用需要讀取全量消息,那么請(qǐng)為該應(yīng)用設(shè)置一個(gè)消費(fèi)組;如果該應(yīng)用消費(fèi)能力不足,那么可以考慮在這個(gè)消費(fèi)組里增加消費(fèi)者。

消費(fèi)組與分區(qū)重平衡

可以看到,當(dāng)新的消費(fèi)者加入消費(fèi)組,它會(huì)消費(fèi)一個(gè)或多個(gè)分區(qū),而這些分區(qū)之前是由其他消費(fèi)者負(fù)責(zé)的;另外,當(dāng)消費(fèi)者離開(kāi)消費(fèi)組(比如重啟、宕機(jī)等)時(shí),它所消費(fèi)的分區(qū)會(huì)分配給其他分區(qū)。這種現(xiàn)象稱為重平衡(rebalance)。重平衡是 Kafka 一個(gè)很重要的性質(zhì),這個(gè)性質(zhì)保證了高可用和水平擴(kuò)展。不過(guò)也需要注意到,在重平衡期間,所有消費(fèi)者都不能消費(fèi)消息,因此會(huì)造成整個(gè)消費(fèi)組短暫的不可用。而且,將分區(qū)進(jìn)行重平衡也會(huì)導(dǎo)致原來(lái)的消費(fèi)者狀態(tài)過(guò)期,從而導(dǎo)致消費(fèi)者需要重新更新?tīng)顟B(tài),這段期間也會(huì)降低消費(fèi)性能。后面我們會(huì)討論如何安全的進(jìn)行重平衡以及如何盡可能避免。

消費(fèi)者通過(guò)定期發(fā)送心跳(hearbeat)到一個(gè)作為組協(xié)調(diào)者(group coordinator)的 broker 來(lái)保持在消費(fèi)組內(nèi)存活。這個(gè) broker 不是固定的,每個(gè)消費(fèi)組都可能不同。當(dāng)消費(fèi)者拉取消息或者提交時(shí),便會(huì)發(fā)送心跳。

如果消費(fèi)者超過(guò)一定時(shí)間沒(méi)有發(fā)送心跳,那么它的會(huì)話(session)就會(huì)過(guò)期,組協(xié)調(diào)者會(huì)認(rèn)為該消費(fèi)者已經(jīng)宕機(jī),然后觸發(fā)重平衡??梢钥吹剑瑥南M(fèi)者宕機(jī)到會(huì)話過(guò)期是有一定時(shí)間的,這段時(shí)間內(nèi)該消費(fèi)者的分區(qū)都不能進(jìn)行消息消費(fèi);通常情況下,我們可以進(jìn)行優(yōu)雅關(guān)閉,這樣消費(fèi)者會(huì)發(fā)送離開(kāi)的消息到組協(xié)調(diào)者,這樣組協(xié)調(diào)者可以立即進(jìn)行重平衡而不需要等待會(huì)話過(guò)期。

在 0.10.1 版本,Kafka 對(duì)心跳機(jī)制進(jìn)行了修改,將發(fā)送心跳與拉取消息進(jìn)行分離,這樣使得發(fā)送心跳的頻率不受拉取的頻率影響。另外更高版本的 Kafka 支持配置一個(gè)消費(fèi)者多長(zhǎng)時(shí)間不拉取消息但仍然保持存活,這個(gè)配置可以避免活鎖(livelock)?;铈i,是指應(yīng)用沒(méi)有故障但是由于某些原因不能進(jìn)一步消費(fèi)。

Partition 與消費(fèi)模型

上面提到,Kafka 中一個(gè) topic 中的消息是被打散分配在多個(gè) Partition(分區(qū)) 中存儲(chǔ)的, Consumer Group 在消費(fèi)時(shí)需要從不同的 Partition 獲取消息,那最終如何重建出 Topic 中消息的順序呢?

答案是:沒(méi)有辦法。Kafka 只會(huì)保證在 Partition 內(nèi)消息是有序的,而不管全局的情況。

下一個(gè)問(wèn)題是:Partition 中的消息可以被(不同的 Consumer Group)多次消費(fèi),那 Partition中被消費(fèi)的消息是何時(shí)刪除的? Partition 又是如何知道一個(gè) Consumer Group 當(dāng)前消費(fèi)的位置呢?

無(wú)論消息是否被消費(fèi),除非消息到期 Partition 從不刪除消息。例如設(shè)置保留時(shí)間為 2 天,則消息發(fā)布 2 天內(nèi)任何 Group 都可以消費(fèi),2 天后,消息自動(dòng)被刪除。 Partition 會(huì)為每個(gè) Consumer Group 保存一個(gè)偏移量,記錄 Group 消費(fèi)到的位置。 如下圖:

為什么 Kafka 是 pull 模型

消費(fèi)者應(yīng)該向 Broker 要數(shù)據(jù)(pull)還是 Broker 向消費(fèi)者推送數(shù)據(jù)(push)?作為一個(gè)消息系統(tǒng),Kafka 遵循了傳統(tǒng)的方式,選擇由 Producer 向 broker push 消息并由 Consumer 從 broker pull 消息。一些 logging-centric system,比如 Facebook 的Scribe和 Cloudera 的Flume,采用 push 模式。事實(shí)上,push 模式和 pull 模式各有優(yōu)劣。

push 模式很難適應(yīng)消費(fèi)速率不同的消費(fèi)者,因?yàn)橄l(fā)送速率是由 broker 決定的。push 模式的目標(biāo)是盡可能以最快速度傳遞消息,但是這樣很容易造成 Consumer 來(lái)不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。而 pull 模式則可以根據(jù) Consumer 的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息。

對(duì)于 Kafka 而言,pull 模式更合適。pull 模式可簡(jiǎn)化 broker 的設(shè)計(jì),Consumer 可自主控制消費(fèi)消息的速率,同時(shí) Consumer 可以自己控制消費(fèi)方式——即可批量消費(fèi)也可逐條消費(fèi),同時(shí)還能選擇不同的提交方式從而實(shí)現(xiàn)不同的傳輸語(yǔ)義。

討論五:Kafka 如何保證可靠性

當(dāng)我們討論可靠性的時(shí)候,我們總會(huì)提到*保證**這個(gè)詞語(yǔ)??煽啃员WC是基礎(chǔ),我們基于這些基礎(chǔ)之上構(gòu)建我們的應(yīng)用。比如關(guān)系型數(shù)據(jù)庫(kù)的可靠性保證是ACID,也就是原子性(Atomicity)、一致性(Consistency)、隔離性(Isolation)和持久性(Durability)。

Kafka 中的可靠性保證有如下四點(diǎn):

對(duì)于一個(gè)分區(qū)來(lái)說(shuō),它的消息是有序的。如果一個(gè)生產(chǎn)者向一個(gè)分區(qū)先寫入消息A,然后寫入消息B,那么消費(fèi)者會(huì)先讀取消息A再讀取消息B。

當(dāng)消息寫入所有in-sync狀態(tài)的副本后,消息才會(huì)認(rèn)為已提交(committed)。這里的寫入有可能只是寫入到文件系統(tǒng)的緩存,不一定刷新到磁盤。生產(chǎn)者可以等待不同時(shí)機(jī)的確認(rèn),比如等待分區(qū)主副本寫入即返回,后者等待所有in-sync狀態(tài)副本寫入才返回。

一旦消息已提交,那么只要有一個(gè)副本存活,數(shù)據(jù)不會(huì)丟失。

消費(fèi)者只能讀取到已提交的消息。

使用這些基礎(chǔ)保證,我們構(gòu)建一個(gè)可靠的系統(tǒng),這時(shí)候需要考慮一個(gè)問(wèn)題:究竟我們的應(yīng)用需要多大程度的可靠性?可靠性不是無(wú)償?shù)模c系統(tǒng)可用性、吞吐量、延遲和硬件價(jià)格息息相關(guān),得此失彼。因此,我們往往需要做權(quán)衡,一味的追求可靠性并不實(shí)際。

動(dòng)手搭一個(gè) Kafka

通過(guò)上面的描述,我們已經(jīng)大致了解到了「Kafka」是何方神圣了,現(xiàn)在我們開(kāi)始嘗試自己動(dòng)手本地搭一個(gè)來(lái)實(shí)際體驗(yàn)一把。

素材來(lái)源于網(wǎng)絡(luò),如有侵權(quán)請(qǐng)聯(lián)系作者刪除

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI