溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Kafka Consumer如何理解

發(fā)布時(shí)間:2021-12-15 11:55:50 來(lái)源:億速云 閱讀:133 作者:柒染 欄目:開(kāi)發(fā)技術(shù)

這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)?lái)有關(guān)Kafka Consumer如何理解,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

Kafka 消費(fèi)者概念

應(yīng)用程序使用 KafkaConsumer 從 Kafka 中訂閱主題并接收來(lái)自這些主題的消息,然后再把他們保存起來(lái)。應(yīng)用程序首先需要?jiǎng)?chuàng)建一個(gè) KafkaConsumer 對(duì)象,訂閱主題并開(kāi)始接受消息,驗(yàn)證消息并保存結(jié)果。一段時(shí)間后,生產(chǎn)者往主題寫入的速度超過(guò)了應(yīng)用程序驗(yàn)證數(shù)據(jù)的速度,這時(shí)候該如何處理?如果只使用單個(gè)消費(fèi)者的話,應(yīng)用程序會(huì)跟不上消息生成的速度,就像多個(gè)生產(chǎn)者像相同的主題寫入消息一樣,這時(shí)候就需要多個(gè)消費(fèi)者共同參與消費(fèi)主題中的消息,對(duì)消息進(jìn)行分流處理。

Kafka 消費(fèi)者從屬于消費(fèi)者群組。一個(gè)群組中的消費(fèi)者訂閱的都是相同的主題,每個(gè)消費(fèi)者接收主題一部分分區(qū)的消息。下面是一個(gè) Kafka 分區(qū)消費(fèi)示意圖

Kafka Consumer如何理解

上圖中的主題 T1 有四個(gè)分區(qū),分別是分區(qū)0、分區(qū)1、分區(qū)2、分區(qū)3,我們創(chuàng)建一個(gè)消費(fèi)者群組1,消費(fèi)者群組中只有一個(gè)消費(fèi)者,它訂閱主題T1,接收到 T1 中的全部消息。由于一個(gè)消費(fèi)者處理四個(gè)生產(chǎn)者發(fā)送到分區(qū)的消息,壓力有些大,需要幫手來(lái)幫忙分擔(dān)任務(wù),于是就演變?yōu)橄聢D

Kafka Consumer如何理解

這樣一來(lái),消費(fèi)者的消費(fèi)能力就大大提高了,但是在某些環(huán)境下比如用戶產(chǎn)生消息特別多的時(shí)候,生產(chǎn)者產(chǎn)生的消息仍舊讓消費(fèi)者吃不消,那就繼續(xù)增加消費(fèi)者。

Kafka Consumer如何理解

如上圖所示,每個(gè)分區(qū)所產(chǎn)生的消息能夠被每個(gè)消費(fèi)者群組中的消費(fèi)者消費(fèi),如果向消費(fèi)者群組中增加更多的消費(fèi)者,那么多余的消費(fèi)者將會(huì)閑置,如下圖所示

Kafka Consumer如何理解

向群組中增加消費(fèi)者是橫向伸縮消費(fèi)能力的主要方式??偠灾覀兛梢酝ㄟ^(guò)增加消費(fèi)組的消費(fèi)者來(lái)進(jìn)行水平擴(kuò)展提升消費(fèi)能力。這也是為什么建議創(chuàng)建主題時(shí)使用比較多的分區(qū)數(shù),這樣可以在消費(fèi)負(fù)載高的情況下增加消費(fèi)者來(lái)提升性能。另外,消費(fèi)者的數(shù)量不應(yīng)該比分區(qū)數(shù)多,因?yàn)槎喑鰜?lái)的消費(fèi)者是空閑的,沒(méi)有任何幫助。

Kafka 一個(gè)很重要的特性就是,只需寫入一次消息,可以支持任意多的應(yīng)用讀取這個(gè)消息。換句話說(shuō),每個(gè)應(yīng)用都可以讀到全量的消息。為了使得每個(gè)應(yīng)用都能讀到全量消息,應(yīng)用需要有不同的消費(fèi)組。對(duì)于上面的例子,假如我們新增了一個(gè)新的消費(fèi)組 G2,而這個(gè)消費(fèi)組有兩個(gè)消費(fèi)者,那么就演變?yōu)橄聢D這樣

Kafka Consumer如何理解

在這個(gè)場(chǎng)景中,消費(fèi)組 G1 和消費(fèi)組 G2 都能收到 T1 主題的全量消息,在邏輯意義上來(lái)說(shuō)它們屬于不同的應(yīng)用。

總結(jié)起來(lái)就是如果應(yīng)用需要讀取全量消息,那么請(qǐng)為該應(yīng)用設(shè)置一個(gè)消費(fèi)組;如果該應(yīng)用消費(fèi)能力不足,那么可以考慮在這個(gè)消費(fèi)組里增加消費(fèi)者。

消費(fèi)者組和分區(qū)重平衡

消費(fèi)者組是什么

消費(fèi)者組(Consumer Group)是由一個(gè)或多個(gè)消費(fèi)者實(shí)例(Consumer Instance)組成的群組,具有可擴(kuò)展性和可容錯(cuò)性的一種機(jī)制。消費(fèi)者組內(nèi)的消費(fèi)者共享一個(gè)消費(fèi)者組ID,這個(gè)ID 也叫做 Group ID,組內(nèi)的消費(fèi)者共同對(duì)一個(gè)主題進(jìn)行訂閱和消費(fèi),同一個(gè)組中的消費(fèi)者只能消費(fèi)一個(gè)分區(qū)的消息,多余的消費(fèi)者會(huì)閑置,派不上用場(chǎng)。

我們?cè)谏厦嫣岬搅藘煞N消費(fèi)方式

  •  一個(gè)消費(fèi)者群組消費(fèi)一個(gè)主題中的消息,這種消費(fèi)模式又稱為點(diǎn)對(duì)點(diǎn)的消費(fèi)方式,點(diǎn)對(duì)點(diǎn)的消費(fèi)方式又被稱為消息隊(duì)列

  •  一個(gè)主題中的消息被多個(gè)消費(fèi)者群組共同消費(fèi),這種消費(fèi)模式又稱為發(fā)布-訂閱模式

消費(fèi)者重平衡

我們從上面的消費(fèi)者演變圖中可以知道這么一個(gè)過(guò)程:最初是一個(gè)消費(fèi)者訂閱一個(gè)主題并消費(fèi)其全部分區(qū)的消息,后來(lái)有一個(gè)消費(fèi)者加入群組,隨后又有更多的消費(fèi)者加入群組,而新加入的消費(fèi)者實(shí)例分?jǐn)偭俗畛跸M(fèi)者的部分消息,這種把分區(qū)的所有權(quán)通過(guò)一個(gè)消費(fèi)者轉(zhuǎn)到其他消費(fèi)者的行為稱為重平衡,英文名也叫做 Rebalance 。如下圖所示

Kafka Consumer如何理解

重平衡非常重要,它為消費(fèi)者群組帶來(lái)了高可用性 和 伸縮性,我們可以放心的添加消費(fèi)者或移除消費(fèi)者,不過(guò)在正常情況下我們并不希望發(fā)生這樣的行為。在重平衡期間,消費(fèi)者無(wú)法讀取消息,造成整個(gè)消費(fèi)者組在重平衡的期間都不可用。另外,當(dāng)分區(qū)被重新分配給另一個(gè)消費(fèi)者時(shí),消息當(dāng)前的讀取狀態(tài)會(huì)丟失,它有可能還需要去刷新緩存,在它重新恢復(fù)狀態(tài)之前會(huì)拖慢應(yīng)用程序。

消費(fèi)者通過(guò)向組織協(xié)調(diào)者(Kafka Broker)發(fā)送心跳來(lái)維護(hù)自己是消費(fèi)者組的一員并確認(rèn)其擁有的分區(qū)。對(duì)于不同不的消費(fèi)群體來(lái)說(shuō),其組織協(xié)調(diào)者可以是不同的。只要消費(fèi)者定期發(fā)送心跳,就會(huì)認(rèn)為消費(fèi)者是存活的并處理其分區(qū)中的消息。當(dāng)消費(fèi)者檢索記錄或者提交它所消費(fèi)的記錄時(shí)就會(huì)發(fā)送心跳。

如果過(guò)了一段時(shí)間 Kafka 停止發(fā)送心跳了,會(huì)話(Session)就會(huì)過(guò)期,組織協(xié)調(diào)者就會(huì)認(rèn)為這個(gè) Consumer 已經(jīng)死亡,就會(huì)觸發(fā)一次重平衡。如果消費(fèi)者宕機(jī)并且停止發(fā)送消息,組織協(xié)調(diào)者會(huì)等待幾秒鐘,確認(rèn)它死亡了才會(huì)觸發(fā)重平衡。在這段時(shí)間里,死亡的消費(fèi)者將不處理任何消息。在清理消費(fèi)者時(shí),消費(fèi)者將通知協(xié)調(diào)者它要離開(kāi)群組,組織協(xié)調(diào)者會(huì)觸發(fā)一次重平衡,盡量降低處理停頓。

重平衡是一把雙刃劍,它為消費(fèi)者群組帶來(lái)高可用性和伸縮性的同時(shí),還有有一些明顯的缺點(diǎn)(bug),而這些 bug 到現(xiàn)在社區(qū)還無(wú)法修改。

重平衡的過(guò)程對(duì)消費(fèi)者組有極大的影響。因?yàn)槊看沃仄胶膺^(guò)程中都會(huì)導(dǎo)致萬(wàn)物靜止,參考 JVM 中的垃圾回收機(jī)制,也就是 Stop The World ,STW,(引用自《深入理解 Java 虛擬機(jī)》中 p76 關(guān)于 Serial 收集器的描述):

更重要的是它在進(jìn)行垃圾收集時(shí),必須暫停其他所有的工作線程。直到它收集結(jié)束。Stop The World 這個(gè)名字聽(tīng)起來(lái)很帥,但這項(xiàng)工作實(shí)際上是由虛擬機(jī)在后臺(tái)自動(dòng)發(fā)起并完成的,在用戶不可見(jiàn)的情況下把用戶正常工作的線程全部停掉,這對(duì)很多應(yīng)用來(lái)說(shuō)都是難以接受的。

也就是說(shuō),在重平衡期間,消費(fèi)者組中的消費(fèi)者實(shí)例都會(huì)停止消費(fèi),等待重平衡的完成。而且重平衡這個(gè)過(guò)程很慢......

創(chuàng)建消費(fèi)者

上面的理論說(shuō)的有點(diǎn)多,下面就通過(guò)代碼來(lái)講解一下消費(fèi)者是如何消費(fèi)的

在讀取消息之前,需要先創(chuàng)建一個(gè) KafkaConsumer 對(duì)象。創(chuàng)建 KafkaConsumer 對(duì)象與創(chuàng)建 KafkaProducer 對(duì)象十分相似 --- 把需要傳遞給消費(fèi)者的屬性放在 properties 對(duì)象中,后面我們會(huì)著重討論 Kafka 的一些配置,這里我們先簡(jiǎn)單的創(chuàng)建一下,使用3個(gè)屬性就足矣,分別是 bootstrap.server,key.deserializer,value.deserializer 。

這三個(gè)屬性我們已經(jīng)用過(guò)很多次了,如果你還不是很清楚的話,可以參考 帶你漲姿勢(shì)是認(rèn)識(shí)一下Kafka Producer

還有一個(gè)屬性是 group.id 這個(gè)屬性不是必須的,它指定了 KafkaConsumer 是屬于哪個(gè)消費(fèi)者群組。創(chuàng)建不屬于任何一個(gè)群組的消費(fèi)者也是可以的

Properties properties = new Properties();          properties.put("bootstrap.server","192.168.1.9:9092");     properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");   properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");  KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);

主題訂閱

創(chuàng)建好消費(fèi)者之后,下一步就開(kāi)始訂閱主題了。subscribe() 方法接受一個(gè)主題列表作為參數(shù),使用起來(lái)比較簡(jiǎn)單

consumer.subscribe(Collections.singletonList("customerTopic"));

為了簡(jiǎn)單我們只訂閱了一個(gè)主題 customerTopic,參數(shù)傳入的是一個(gè)正則表達(dá)式,正則表達(dá)式可以匹配多個(gè)主題,如果有人創(chuàng)建了新的主題,并且主題的名字與正則表達(dá)式相匹配,那么會(huì)立即觸發(fā)一次重平衡,消費(fèi)者就可以讀取新的主題。

要訂閱所有與 test 相關(guān)的主題,可以這樣做

consumer.subscribe("test.*");

輪詢

我們知道,Kafka 是支持訂閱/發(fā)布模式的,生產(chǎn)者發(fā)送數(shù)據(jù)給 Kafka Broker,那么消費(fèi)者是如何知道生產(chǎn)者發(fā)送了數(shù)據(jù)呢?其實(shí)生產(chǎn)者產(chǎn)生的數(shù)據(jù)消費(fèi)者是不知道的,KafkaConsumer 采用輪詢的方式定期去 Kafka Broker 中進(jìn)行數(shù)據(jù)的檢索,如果有數(shù)據(jù)就用來(lái)消費(fèi),如果沒(méi)有就再繼續(xù)輪詢等待,下面是輪詢等待的具體實(shí)現(xiàn)

try {    while (true) {      ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));      for (ConsumerRecord<String, String> record : records) {        int updateCount = 1;        if (map.containsKey(record.value())) {          updateCount = (int) map.get(record.value() + 1);        }        map.put(record.value(), updateCount);      }    }  }finally {    consumer.close();  }
  •  這是一個(gè)無(wú)限循環(huán)。消費(fèi)者實(shí)際上是一個(gè)長(zhǎng)期運(yùn)行的應(yīng)用程序,它通過(guò)輪詢的方式向 Kafka 請(qǐng)求數(shù)據(jù)。

  •  第三行代碼非常重要,Kafka 必須定期循環(huán)請(qǐng)求數(shù)據(jù),否則就會(huì)認(rèn)為該 Consumer 已經(jīng)掛了,會(huì)觸發(fā)重平衡,它的分區(qū)會(huì)移交給群組中的其它消費(fèi)者。傳給 poll() 方法的是一個(gè)超市時(shí)間,用 java.time.Duration 類來(lái)表示,如果該參數(shù)被設(shè)置為 0 ,poll() 方法會(huì)立刻返回,否則就會(huì)在指定的毫秒數(shù)內(nèi)一直等待 broker 返回?cái)?shù)據(jù)。

  •  poll() 方法會(huì)返回一個(gè)記錄列表。每條記錄都包含了記錄所屬主題的信息,記錄所在分區(qū)的信息、記錄在分區(qū)中的偏移量,以及記錄的鍵值對(duì)。我們一般會(huì)遍歷這個(gè)列表,逐條處理每條記錄。

  •  在退出應(yīng)用程序之前使用 close() 方法關(guān)閉消費(fèi)者。網(wǎng)絡(luò)連接和 socket 也會(huì)隨之關(guān)閉,并立即觸發(fā)一次重平衡,而不是等待群組協(xié)調(diào)器發(fā)現(xiàn)它不再發(fā)送心跳并認(rèn)定它已經(jīng)死亡。

線程安全性

在同一個(gè)群組中,我們無(wú)法讓一個(gè)線程運(yùn)行多個(gè)消費(fèi)者,也無(wú)法讓多個(gè)線程安全的共享一個(gè)消費(fèi)者。按照規(guī)則,一個(gè)消費(fèi)者使用一個(gè)線程,如果一個(gè)消費(fèi)者群組中多個(gè)消費(fèi)者都想要運(yùn)行的話,那么必須讓每個(gè)消費(fèi)者在自己的線程中運(yùn)行,可以使用 Java 中的 ExecutorService 啟動(dòng)多個(gè)消費(fèi)者進(jìn)行進(jìn)行處理。

消費(fèi)者配置

到目前為止,我們學(xué)習(xí)了如何使用消費(fèi)者 API,不過(guò)只介紹了幾個(gè)最基本的屬性,Kafka 文檔列出了所有與消費(fèi)者相關(guān)的配置說(shuō)明。大部分參數(shù)都有合理的默認(rèn)值,一般不需要修改它們,下面我們就來(lái)介紹一下這些參數(shù)。

  •  fetch.min.bytes

該屬性指定了消費(fèi)者從服務(wù)器獲取記錄的最小字節(jié)數(shù)。broker 在收到消費(fèi)者的數(shù)據(jù)請(qǐng)求時(shí),如果可用的數(shù)據(jù)量小于 fetch.min.bytes 指定的大小,那么它會(huì)等到有足夠的可用數(shù)據(jù)時(shí)才把它返回給消費(fèi)者。這樣可以降低消費(fèi)者和 broker 的工作負(fù)載,因?yàn)樗鼈冊(cè)谥黝}使用頻率不是很高的時(shí)候就不用來(lái)回處理消息。如果沒(méi)有很多可用數(shù)據(jù),但消費(fèi)者的 CPU 使用率很高,那么就需要把該屬性的值設(shè)得比默認(rèn)值大。如果消費(fèi)者的數(shù)量比較多,把該屬性的值調(diào)大可以降低 broker 的工作負(fù)載。

  •  fetch.max.wait.ms

我們通過(guò)上面的 fetch.min.bytes 告訴 Kafka,等到有足夠的數(shù)據(jù)時(shí)才會(huì)把它返回給消費(fèi)者。而 fetch.max.wait.ms 則用于指定 broker 的等待時(shí)間,默認(rèn)是 500 毫秒。如果沒(méi)有足夠的數(shù)據(jù)流入 kafka 的話,消費(fèi)者獲取的最小數(shù)據(jù)量要求就得不到滿足,最終導(dǎo)致 500 毫秒的延遲。如果要降低潛在的延遲,就可以把參數(shù)值設(shè)置的小一些。如果 fetch.max.wait.ms 被設(shè)置為 100 毫秒的延遲,而 fetch.min.bytes 的值設(shè)置為 1MB,那么 Kafka 在收到消費(fèi)者請(qǐng)求后,要么返回 1MB 的數(shù)據(jù),要么在 100 ms 后返回所有可用的數(shù)據(jù)。就看哪個(gè)條件首先被滿足。

  •  max.partition.fetch.bytes

該屬性指定了服務(wù)器從每個(gè)分區(qū)里返回給消費(fèi)者的最大字節(jié)數(shù)。它的默認(rèn)值時(shí) 1MB,也就是說(shuō),KafkaConsumer.poll() 方法從每個(gè)分區(qū)里返回的記錄最多不超過(guò) max.partition.fetch.bytes 指定的字節(jié)。如果一個(gè)主題有20個(gè)分區(qū)和5個(gè)消費(fèi)者,那么每個(gè)消費(fèi)者需要至少4 MB的可用內(nèi)存來(lái)接收記錄。在為消費(fèi)者分配內(nèi)存時(shí),可以給它們多分配一些,因?yàn)槿绻航M里有消費(fèi)者發(fā)生崩潰,剩下的消費(fèi)者需要處理更多的分區(qū)。max.partition.fetch.bytes 的值必須比 broker 能夠接收的最大消息的字節(jié)數(shù)(通過(guò) max.message.size 屬性配置大),否則消費(fèi)者可能無(wú)法讀取這些消息,導(dǎo)致消費(fèi)者一直掛起重試。 在設(shè)置該屬性時(shí),另外一個(gè)考量的因素是消費(fèi)者處理數(shù)據(jù)的時(shí)間。消費(fèi)者需要頻繁的調(diào)用 poll() 方法來(lái)避免會(huì)話過(guò)期和發(fā)生分區(qū)再平衡,如果單次調(diào)用poll() 返回的數(shù)據(jù)太多,消費(fèi)者需要更多的時(shí)間進(jìn)行處理,可能無(wú)法及時(shí)進(jìn)行下一個(gè)輪詢來(lái)避免會(huì)話過(guò)期。如果出現(xiàn)這種情況,可以把 max.partition.fetch.bytes 值改小,或者延長(zhǎng)會(huì)話過(guò)期時(shí)間。

  •  session.timeout.ms

這個(gè)屬性指定了消費(fèi)者在被認(rèn)為死亡之前可以與服務(wù)器斷開(kāi)連接的時(shí)間,默認(rèn)是 3s。如果消費(fèi)者沒(méi)有在 session.timeout.ms 指定的時(shí)間內(nèi)發(fā)送心跳給群組協(xié)調(diào)器,就會(huì)被認(rèn)定為死亡,協(xié)調(diào)器就會(huì)觸發(fā)重平衡。把它的分區(qū)分配給消費(fèi)者群組中的其它消費(fèi)者,此屬性與 heartbeat.interval.ms 緊密相關(guān)。heartbeat.interval.ms 指定了 poll() 方法向群組協(xié)調(diào)器發(fā)送心跳的頻率,session.timeout.ms 則指定了消費(fèi)者可以多久不發(fā)送心跳。所以,這兩個(gè)屬性一般需要同時(shí)修改,heartbeat.interval.ms 必須比 session.timeout.ms 小,一般是 session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那么 heartbeat.interval.ms 應(yīng)該是 1s。把 session.timeout.ms 值設(shè)置的比默認(rèn)值小,可以更快地檢測(cè)和恢復(fù)崩憤的節(jié)點(diǎn),不過(guò)長(zhǎng)時(shí)間的輪詢或垃圾收集可能導(dǎo)致非預(yù)期的重平衡。把該屬性的值設(shè)置得大一些,可以減少意外的重平衡,不過(guò)檢測(cè)節(jié)點(diǎn)崩潰需要更長(zhǎng)的時(shí)間。

  •  auto.offset.reset

該屬性指定了消費(fèi)者在讀取一個(gè)沒(méi)有偏移量的分區(qū)或者偏移量無(wú)效的情況下的該如何處理。它的默認(rèn)值是 latest,意思指的是,在偏移量無(wú)效的情況下,消費(fèi)者將從最新的記錄開(kāi)始讀取數(shù)據(jù)。另一個(gè)值是 earliest,意思指的是在偏移量無(wú)效的情況下,消費(fèi)者將從起始位置處開(kāi)始讀取分區(qū)的記錄。

  •  enable.auto.commit

我們稍后將介紹幾種不同的提交偏移量的方式。該屬性指定了消費(fèi)者是否自動(dòng)提交偏移量,默認(rèn)值是 true,為了盡量避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)置為 false,由自己控制何時(shí)提交偏移量。如果把它設(shè)置為 true,還可以通過(guò) auto.commit.interval.ms 屬性來(lái)控制提交的頻率

  •  partition.assignment.strategy

我們知道,分區(qū)會(huì)分配給群組中的消費(fèi)者。PartitionAssignor 會(huì)根據(jù)給定的消費(fèi)者和主題,決定哪些分區(qū)應(yīng)該被分配給哪個(gè)消費(fèi)者,Kafka 有兩個(gè)默認(rèn)的分配策略Range 和 RoundRobin

  •  client.id

該屬性可以是任意字符串,broker 用他來(lái)標(biāo)識(shí)從客戶端發(fā)送過(guò)來(lái)的消息,通常被用在日志、度量指標(biāo)和配額中

  •  max.poll.records

該屬性用于控制單次調(diào)用 call() 方法能夠返回的記錄數(shù)量,可以幫你控制在輪詢中需要處理的數(shù)據(jù)量。

  •  receive.buffer.bytes 和 send.buffer.bytes

socket 在讀寫數(shù)據(jù)時(shí)用到的 TCP 緩沖區(qū)也可以設(shè)置大小。如果它們被設(shè)置為 -1,就使用操作系統(tǒng)默認(rèn)值。如果生產(chǎn)者或消費(fèi)者與 broker 處于不同的數(shù)據(jù)中心內(nèi),可以適當(dāng)增大這些值,因?yàn)榭鐢?shù)據(jù)中心的網(wǎng)絡(luò)一般都有比較高的延遲和比較低的帶寬。

提交和偏移量的概念

特殊偏移

我們上面提到,消費(fèi)者在每次調(diào)用poll() 方法進(jìn)行定時(shí)輪詢的時(shí)候,會(huì)返回由生產(chǎn)者寫入 Kafka 但是還沒(méi)有被消費(fèi)者消費(fèi)的記錄,因此我們可以追蹤到哪些記錄是被群組里的哪個(gè)消費(fèi)者讀取的。消費(fèi)者可以使用 Kafka 來(lái)追蹤消息在分區(qū)中的位置(偏移量)

消費(fèi)者會(huì)向一個(gè)叫做 _consumer_offset 的特殊主題中發(fā)送消息,這個(gè)主題會(huì)保存每次所發(fā)送消息中的分區(qū)偏移量,這個(gè)主題的主要作用就是消費(fèi)者觸發(fā)重平衡后記錄偏移使用的,消費(fèi)者每次向這個(gè)主題發(fā)送消息,正常情況下不觸發(fā)重平衡,這個(gè)主題是不起作用的,當(dāng)觸發(fā)重平衡后,消費(fèi)者停止工作,每個(gè)消費(fèi)者可能會(huì)分到對(duì)應(yīng)的分區(qū),這個(gè)主題就是讓消費(fèi)者能夠繼續(xù)處理消息所設(shè)置的。

如果提交的偏移量小于客戶端最后一次處理的偏移量,那么位于兩個(gè)偏移量之間的消息就會(huì)被重復(fù)處理

Kafka Consumer如何理解

如果提交的偏移量大于最后一次消費(fèi)時(shí)的偏移量,那么處于兩個(gè)偏移量中間的消息將會(huì)丟失

Kafka Consumer如何理解

既然_consumer_offset 如此重要,那么它的提交方式是怎樣的呢?下面我們就來(lái)說(shuō)一下

提交方式

KafkaConsumer API 提供了多種方式來(lái)提交偏移量

自動(dòng)提交

最簡(jiǎn)單的方式就是讓消費(fèi)者自動(dòng)提交偏移量。如果 enable.auto.commit 被設(shè)置為true,那么每過(guò) 5s,消費(fèi)者會(huì)自動(dòng)把從 poll() 方法輪詢到的最大偏移量提交上去。提交時(shí)間間隔由 auto.commit.interval.ms 控制,默認(rèn)是 5s。與消費(fèi)者里的其他東西一樣,自動(dòng)提交也是在輪詢中進(jìn)行的。消費(fèi)者在每次輪詢中會(huì)檢查是否提交該偏移量了,如果是,那么就會(huì)提交從上一次輪詢中返回的偏移量。

提交當(dāng)前偏移量

把 auto.commit.offset 設(shè)置為 false,可以讓應(yīng)用程序決定何時(shí)提交偏移量。使用 commitSync() 提交偏移量。這個(gè) API 會(huì)提交由 poll() 方法返回的最新偏移量,提交成功后馬上返回,如果提交失敗就拋出異常。

commitSync() 將會(huì)提交由 poll() 返回的最新偏移量,如果處理完所有記錄后要確保調(diào)用了 commitSync(),否則還是會(huì)有丟失消息的風(fēng)險(xiǎn),如果發(fā)生了在均衡,從最近一批消息到發(fā)生在均衡之間的所有消息都將被重復(fù)處理。

異步提交

異步提交 commitAsync() 與同步提交 commitSync() 最大的區(qū)別在于異步提交不會(huì)進(jìn)行重試,同步提交會(huì)一致進(jìn)行重試。

同步和異步組合提交

一般情況下,針對(duì)偶爾出現(xiàn)的提交失敗,不進(jìn)行重試不會(huì)有太大的問(wèn)題,因?yàn)槿绻峤皇∈且驗(yàn)榕R時(shí)問(wèn)題導(dǎo)致的,那么后續(xù)的提交總會(huì)有成功的。但是如果在關(guān)閉消費(fèi)者或再均衡前的最后一次提交,就要確保提交成功。

因此,在消費(fèi)者關(guān)閉之前一般會(huì)組合使用commitAsync和commitSync提交偏移量。

提交特定的偏移量

消費(fèi)者API允許調(diào)用 commitSync() 和 commitAsync() 方法時(shí)傳入希望提交的 partition 和 offset 的 map,即提交特定的偏移量。

上述就是小編為大家分享的Kafka Consumer如何理解了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI