溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Kafka分組消費的示例分析

發(fā)布時間:2021-12-15 09:25:46 來源:億速云 閱讀:242 作者:柒染 欄目:大數(shù)據(jù)

這篇文章將為大家詳細(xì)講解有關(guān)Kafka分組消費的示例分析,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。

1
Kafka消費模式

從kafka消費消息,kafka客戶端提供兩種模式: 分區(qū)消費,分組消費。

分區(qū)消費對應(yīng)的就是我們的DirectKafkaInputDStream

分組消費對應(yīng)的就是我們的KafkaInputDStream

消費者數(shù)目跟分區(qū)數(shù)目的關(guān)系:

1),一個消費者可以消費一個到全部分區(qū)數(shù)據(jù)

2),分組消費,同一個分組內(nèi)所有消費者消費一份完整的數(shù)據(jù),此時一個分區(qū)數(shù)據(jù)只能被一個消費者消費,而一個消費者可以消費多個分區(qū)數(shù)據(jù)

3),同一個消費組內(nèi),消費者數(shù)目大于分區(qū)數(shù)目后,消費者會有空余=分區(qū)數(shù)-消費者數(shù)

Kafka分組消費的示例分析

2
分組消費再平衡策略

當(dāng)一個group中,有consumer加入或者離開時,會觸發(fā)partitions均衡partition.assignment.strategy,決定了partition分配給消費者的分配策略,有兩種分配策略:

1,org.apache.kafka.clients.consumer.RangeAssignor

默認(rèn)采用的是這種再平衡方式,這種方式分配只是針對消費者訂閱的topic的單個topic所有分區(qū)再分配,Consumer Rebalance的算法如下:

1),將目標(biāo)Topic下的所有Partirtion排序,存于TP

2),對某Consumer Group下所有Consumer按照名字根據(jù)字典排序,存于CG,第i個Consumer記為Ci

3),N=size(TP)/size(CG)

4),R=size(TP)%size(CG)

5),Ci獲取的分區(qū)起始位置=N*i+min(i,R)

6),Ci獲取的分區(qū)總數(shù)=N+(if (i+ 1 > R) 0 else 1)

2,org.apache.kafka.clients.consumer.RoundRobinAssignor

這種分配策略是針對消費者消費的所有topic的所有分區(qū)進(jìn)行分配。當(dāng)有新的消費者加入或者有消費者退出,就會觸發(fā)rebalance。這種方式有兩點要求

A),在實例化每個消費者時給每個topic指定相同的流數(shù)

B),每個消費者實例訂閱的topic必須相同

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(1));Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

其中,topic對應(yīng)的value就是流數(shù)目。對應(yīng)的kafka源碼是在

在kafka.consumer.ZookeeperConsumerConnector的consume方法里,根據(jù)這個參數(shù)構(gòu)建了相同數(shù)目的KafkaStream。

這種策略的具體分配步驟:

1),對所有topic的所有分區(qū)按照topic+partition轉(zhuǎn)string之后的hash進(jìn)行排序

2),對消費者按字典進(jìn)行排序

3),然后輪訓(xùn)的方式將分區(qū)分配給消費者

3,舉例對比

舉個例子,比如有兩個消費者(c0,c1),兩個topic(t0,t1),每個topic有三個分區(qū)p(0-2),

那么采用RangeAssignor,結(jié)果為:

* C0: [t0p0, t0p1, t1p0, t1p1]

* C1: [t0p2, t1p2]

采用RoundRobinAssignor,結(jié)果為:

* C0: [t0p0, t0p2, t1p1]

* C1: [t0p1, t1p0, t1p2]

4
分組成員的存活檢測

分組消費有一個比較好的功能就是自動檢測失敗的消費者并將其踢出分組,然后重新進(jìn)行分區(qū)分配。那么kafka是如何檢測失敗的消費者的呢。我們就拿0.10.x為例進(jìn)行講解說明。

消費著訂閱了一組的topic后,會在調(diào)用poll(long)函數(shù)的時候加入分組,分組內(nèi)新增消費者就會進(jìn)行再平衡。Poll 函數(shù)的設(shè)計目標(biāo)就是來保證消費者存活的。只要持續(xù)不斷的調(diào)用poll函數(shù),消費者就會留在分組里,連續(xù)的從分配給他的分區(qū)里消費消息。消費者也會使用一個后臺線程發(fā)送周期性的心跳給broker。如果消費者掛掉或者無法在session.timeout.ms時間范圍內(nèi)發(fā)送心跳,消費者會被視為死亡,它的分區(qū)就會被重新分配。session.timeout.ms默認(rèn)是10000ms。該值要在group.max.session.timeout.ms=300000ms和group.min.session.timeout.ms=6000ms之間。

由于心跳是后臺線程周期性發(fā)送的,那么會存在消費者心跳正常發(fā)送,但是不消費消息的情況。為了避免這種消費者無限期的占用分配給他的分區(qū)這種情況,kafka提供了一種存活檢測機(jī)制,使用max.poll.interval.ms配置。根本上來說,兩次調(diào)用poll函數(shù)的間隔大于該值,消費者就會離開分組,然后它的分區(qū)會被其它消費著消費。當(dāng)發(fā)生這種情況時,你會收到一個offset提交失敗的異常。這種機(jī)制確保了只有活躍的消費者才能提交offset。

消費者有兩個配置來控制poll函數(shù)的行為:

  1. max.poll.interval.ms:增加兩次調(diào)用poll的間隔,實際上就是增加消費者處理上次poll所拉取消息的時間。當(dāng)然,弊端是增加該值會增加消費者組再平衡的時間,因為僅僅在調(diào)用poll的過程中消費者才能參與再平衡。要注意一點,request.timeout.ms=305000,默認(rèn)值要修改比max.poll.interval.ms大,也即是大于5min。該值是當(dāng)消費者進(jìn)行再平衡時,JoinGroup請求在server端的阻塞時間。

  2. max.poll.records:限制每次調(diào)用poll返回消息的最大數(shù)。有了該參數(shù)我們就可以預(yù)估兩次

有些情況下,數(shù)據(jù)處理時間不可預(yù)期,上面的兩個參數(shù)并不難滿足需求。這種情況下,推薦將消息處理放到其它后臺線程中執(zhí)行,這樣消費者就可以持續(xù)的調(diào)用poll函數(shù)了。但是這中情況下,要處理好offset提交的問題。典型做法就是禁止掉自動提交offset,改為手動再消息處理結(jié)束后提交offset。這種情況下,需要對消費的分區(qū)調(diào)用pause函數(shù),這樣在調(diào)用poll函數(shù)的時候就不會接受新的數(shù)據(jù),然后處理完之后調(diào)用resume(Collection)即可恢復(fù)消費。

關(guān)于Kafka分組消費的示例分析就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI