您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“KAFKA中rebalance是什么”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!
讓我們從頭到尾梳理一下rebalance。
中文直譯,就是重平衡。
是什么去重平衡呢?消費組內(nèi)的消費者成員去重平衡。(消費組的概念如果不清楚各位先自行百度,后續(xù)我寫到消費模塊的時候才會提到這些概念)
為什么需要重平衡呢?因為消費組內(nèi)成員的故障轉(zhuǎn)移和動態(tài)分區(qū)分配。
翻譯一下:
消費組內(nèi)成員的故障轉(zhuǎn)移:當(dāng)一個消費組內(nèi)有三個消費者A,B,C,分別消費分區(qū):a,b,c
A -> a B -> b C -> c
此時如果A消費者出了點問題,那么就意味著a分區(qū)沒有消費者進行消費了,那這肯定不行,那么就通過rebalance去將a分區(qū)分配給其他還存活著的消費者客戶端,rebalance后可能得到的消費策略:
A -> a (GG) B -> b,a C -> c
這就是消費組內(nèi)成員的故障轉(zhuǎn)移,就是某個消費者客戶端出問題之后把它原本消費的分區(qū)通過REBALNACE分配給其他存活的消費者客戶端。
動態(tài)分區(qū)分配:當(dāng)某個topic的分區(qū)數(shù)變化,對于消費組而言可消費的分區(qū)數(shù)變化了,因此就需要rebalance去重新進行動態(tài)分區(qū)分配,舉個栗子,原本某topic只有3個分區(qū),我現(xiàn)在擴成了10個分區(qū),那么不就意味著多了7個分區(qū)沒有消費者消費嗎?這顯然是不行的,因此就需要rebalance過程去進行分區(qū)分配,讓現(xiàn)有的消費者去把這10個分區(qū)全部消費到。
這個其實在上面一小節(jié)已經(jīng)提到的差不多了,在這個小節(jié)再做一點補充和總結(jié)。
觸發(fā)條件:
消費組內(nèi)成員變化:下線/上線/故障被踢出。
消費的分區(qū)數(shù)變化:topic被刪了,topic分區(qū)數(shù)增加了。
coordinator節(jié)點出問題了:因為消費組的元數(shù)據(jù)信息都是在coordinator節(jié)點的,因此coordinator節(jié)點出問題也會觸發(fā)rebalance去找一個新的coordinator節(jié)點。怎么找呢?顯然就是走一遍FIND_COORDINATOR請求嘛,然后找到負(fù)載最低的那個節(jié)點問一下,我的新的coordinator在哪兒呀?然后得到答案之后讓消費者客戶端去連新的coordinator節(jié)點。
整個rebalance的過程,是一個狀態(tài)機流轉(zhuǎn)的過程,整體過程示意圖如下:圖源:https://www.cnblogs.com/huxi2b/p/6815797.html
其實上面這個狀態(tài)機流轉(zhuǎn)過程在明白原理的情況下,已經(jīng)非常清晰了,但是如果沒看過源碼的,依舊不知道為什么是這么流轉(zhuǎn)的,什么情況下狀態(tài)是Empty呢,什么狀態(tài)下是Stable呢?什么時候Empty狀態(tài)會轉(zhuǎn)換為PreparingRebalance狀態(tài)呢?
下面我就根據(jù)請求順序來看下整個狀態(tài)的流轉(zhuǎn)過程:
讓我們來回答上個小節(jié)后面提出的幾個比較細節(jié)的問題:
這些請求都帶有哪些關(guān)鍵數(shù)據(jù)?
在FIND_COORDINATOR請求的時候,會帶上自己的group.id值,這個值是用來計算它的coordinator到底在哪兒的,對應(yīng)的計算方法就是:coordinatorId=groupId.hash % 50
這個算出來是個數(shù)字,代表著具體的分區(qū),哪個topic的分區(qū)呢?顯然是__consumer_offsets了。
在JOIN_GROUP請求的時候,是沒帶什么關(guān)鍵參數(shù)的,但是在響應(yīng)的時候會挑選一個客戶端作為leader,然后在響應(yīng)中告訴它被選為了leader并且把消費組元數(shù)據(jù)信息發(fā)給它,然后讓該客戶端去進行分區(qū)分配。
在SYNC_GROUP請求的時候,leader就會帶上它根據(jù)具體的策略已經(jīng)分配好的分區(qū)分配方案,服務(wù)端收到后就更新到元數(shù)據(jù)里面去,然后其余的consumer客戶端只要一發(fā)送SYNC請求過來就告訴它要消費哪些分區(qū),然后讓它自己去消費就ok了。
到底是哪個階段導(dǎo)致rebalance過程會劣化到幾分鐘?
我圖中特意將JOIN階段標(biāo)位紅色,就是讓這個階段顯得顯眼一些,沒錯就是這個階段會導(dǎo)致rebalance整個過程耗時劣化到幾分鐘。
具體的原因就是JOIN階段會等待原先組內(nèi)存活的成員發(fā)送JOIN_GROUP請求過來,如果原先組內(nèi)的成員因為業(yè)務(wù)處理一直沒有發(fā)送JOIN_GROUP請求過來,服務(wù)端就會一直等待,直到超時。這個超時時間就是max.poll.interval.ms
的值,默認(rèn)是5分鐘,因此這種情況下rebalance的耗時就會劣化到5分鐘,導(dǎo)致所有消費者都無法進行正常消費,影響非常大。
為什么要分為這么多階段?
這個主要是設(shè)計上的考慮,整個過程設(shè)計的還是非常優(yōu)雅的,第一次連上的情況下需要三次請求,正常運行的consumer去進行rebalance只需要兩次請求,因為它原先就知道自己的coordinator在哪兒,因此就不需要FIND_COORDINATOR請求了,除非是它的coordinator宕機了。
回答完這些問題,是不是對整個rebalance過程理解加深一些了呢?其實還有很多細節(jié)沒有涉及到,例如consumer客戶端什么時候會進入rebalance狀態(tài)?服務(wù)端是如何等待原先消費組內(nèi)的成員發(fā)送JOIN_GROUP請求的呢?這些問題就只能一步步看源碼了。
FIND_COORDINATOR請求的源碼我就不打?qū)懥耍芎唵未蠹铱梢宰约悍幌?,就是帶了個group.id上去,上面都提到了。
從這段函數(shù)我們知道,如果加入一個新的消費組,服務(wù)端收到第一個JOIN請求的時候會創(chuàng)建group,這個group的初始狀態(tài)為Empty
// 如果group都還不存在,就有了memberId,則認(rèn)為是非法請求,直接拒絕。 groupManager.getGroup(groupId) match { case None => // 這里group都還不存在的情況下,memberId自然是空的 if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) { responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID)) } else { // 初始狀態(tài)是EMPTY val group = groupManager.addGroup(new GroupMetadata(groupId, initialState = Empty)) // 執(zhí)行具體的加組操作 doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback) } case Some(group) => doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback) }
讓我們進入doJoinGroup函數(shù),看下里面的核心邏輯:
case Empty | Stable => // 初始狀態(tài)是EMPTY,添加member并且執(zhí)行rebalance if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { // if the member id is unknown, register the member to the group addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback) } else { // ... } else { //... }
private def addMemberAndRebalance(rebalanceTimeoutMs: Int, sessionTimeoutMs: Int, clientId: String, clientHost: String, protocolType: String, protocols: List[(String, Array[Byte])], group: GroupMetadata, callback: JoinCallback) = { // 根據(jù)clientID初始化memberID val memberId = clientId + "-">
def add(member: MemberMetadata) { if (members.isEmpty) this.protocolType = Some(member.protocolType) assert(groupId == member.groupId) assert(this.protocolType.orNull == member.protocolType) assert(supportsProtocols(member.protocols)) // coordinator選舉leader很簡單,就第一個發(fā)送join_group請求的那個member if (leaderId.isEmpty) leaderId = Some(member.memberId) members.put(member.memberId, member) }
上面的代碼翻譯一下很簡單,就是新來了一個member,封裝一下,添加到這個group中,需要說一下的就是當(dāng)組狀態(tài)是Empty的情況下,誰先連上誰就是leader。緊接著就準(zhǔn)備rebalance:
private def maybePrepareRebalance(group: GroupMetadata) { group.inLock { if (group.canRebalance) prepareRebalance(group) } }
// 這里是傳入PreparingRebalance狀態(tài),然后獲取到一個SET // 翻譯一下:就是只有這個SET(Stable, CompletingRebalance, Empty)里面的狀態(tài),才能開啟rebalance def canRebalance = GroupMetadata.validPreviousStates(PreparingRebalance).contains(state) private val validPreviousStates: Map[GroupState, Set[GroupState]] = Map(Dead -> Set(Stable, PreparingRebalance, CompletingRebalance, Empty, Dead), CompletingRebalance -> Set(PreparingRebalance), Stable -> Set(CompletingRebalance), PreparingRebalance -> Set(Stable, CompletingRebalance, Empty), Empty -> Set(PreparingRebalance))
private def prepareRebalance(group: GroupMetadata) { // if any members are awaiting sync, cancel their request and have them rejoin if (group.is(CompletingRebalance)) resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS) val delayedRebalance = if (group.is(Empty)) new InitialDelayedJoin(this, joinPurgatory, group, groupConfig.groupInitialRebalanceDelayMs,// 默認(rèn)3000ms,即3s groupConfig.groupInitialRebalanceDelayMs, max(group.rebalanceTimeoutMs - groupConfig.groupInitialRebalanceDelayMs, 0)) else new DelayedJoin(this, group, group.rebalanceTimeoutMs)// 這里這個超時時間是客戶端的poll間隔,默認(rèn)5分鐘 // 狀態(tài)機轉(zhuǎn)換:EMPTY -> PreparingRebalance group.transitionTo(PreparingRebalance) // rebalance開始標(biāo)志日志 info(s"Preparing to rebalance group ${group.groupId} with old generation ${group.generationId} " + s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})") // 加入時間輪 val groupKey = GroupKey(group.groupId) joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey)) }
上面這段代碼有兩個關(guān)鍵點,一個是判斷當(dāng)前能否進入rebalance過程,可以看到只有(Stable, CompletingRebalance, Empty)里面的狀態(tài),才能開啟rebalance,而最開始來到第一個member的時候,組的狀態(tài)是Empty顯然是能進來的,但是近來之后就給轉(zhuǎn)換為了PreparingRebalance狀態(tài),那么后續(xù)的member發(fā)送JOIN請求過來之后就進不來了,就只能設(shè)置個回調(diào)后一直等。
那么要等到什么時候呢?第二段代碼寫的很清楚就是等待延時任務(wù)超時,這個延時任務(wù)創(chuàng)建是根據(jù)當(dāng)前狀態(tài)來判斷的,如果是Empty就創(chuàng)建一個InitialDelayedJoin延時任務(wù),超時時間是3s;如果不是Empty就創(chuàng)建一個DelayedJoin,超時時間默認(rèn)是5min。看,源碼出真知,這就是JOIN階段等待member的代碼實現(xiàn)。
這里需要補充一下,為什么Empty的狀態(tài)下要等待3s呢?這其實是一個優(yōu)化,主要就是優(yōu)化多消費者同時連入的情況。舉個栗子,10個消費者都能在3s內(nèi)啟動然后練上,如果你等著3s時間那么一次rebalance過程就搞定了,如果你不等,那么就意味著來一個就又要開啟一次rebalance,一共要進行10次rebalance,這個耗時就比較長了。具體的細節(jié)可以查看:https://www.cnblogs.com/huxi2b/p/6815797.html
另外就是,為什么狀態(tài)不是Empty的時候就延時5分鐘呢?這個其實上面就回答了,要等待原來消費組內(nèi)在線的消費者發(fā)送JOIN請求,這個也是rebalance過程耗時劣化的主要原因。
接下來我們看看這兩個延時任務(wù),在超時的時候分別都會做些啥,首先是InitialDelayedJoin:
/** * Delayed rebalance operation that is added to the purgatory when a group is transitioning from * Empty to PreparingRebalance * * When onComplete is triggered we check if any new members have been added and if there is still time remaining * before the rebalance timeout. If both are true we then schedule a further delay. Otherwise we complete the * rebalance. */ private[group] class InitialDelayedJoin(coordinator: GroupCoordinator, purgatory: DelayedOperationPurgatory[DelayedJoin], group: GroupMetadata, configuredRebalanceDelay: Int, delayMs: Int, remainingMs: Int) extends DelayedJoin(coordinator, group, delayMs) { // 這里寫死是false,是為了在tryComplete的時候不被完成 override def tryComplete(): Boolean = false override def onComplete(): Unit = { // 延時任務(wù)處理 group.inLock { // newMemberAdded是后面有新的member加進來就會是true // remainingMs第一次創(chuàng)建該延時任務(wù)的時候就是3s。 // 所以這個條件在第一次的時候都是成立的 if (group.newMemberAdded && remainingMs != 0) { group.newMemberAdded = false val delay = min(configuredRebalanceDelay, remainingMs) // 最新計算的remaining恒等于0,其實本質(zhì)上就是3-3=0, // 所以哪怕這里是新創(chuàng)建了一個InitialDelayedJoin,這個任務(wù)的超時時間就是下一刻 // 這么寫的目的其實就是相當(dāng)于去完成這個延時任務(wù) val remaining = max(remainingMs - delayMs, 0) purgatory.tryCompleteElseWatch(new InitialDelayedJoin(coordinator, purgatory, group, configuredRebalanceDelay, delay, remaining ), Seq(GroupKey(group.groupId))) } else // 如果沒有新的member加入,直接調(diào)用父類的函數(shù) // 完成JOIN階段 super.onComplete() } } }
大意我都寫在注釋里面了,其實就是等待3s,然后完了之后調(diào)用父類的函數(shù)完成整個JOIN階段,不過不聯(lián)系上下文去看,還是挺費勁的,對了看這個需要對時間輪源碼有了解,正好我前面有寫,大家如果有什么不清楚的可以去看下。
接著看下DelayedJoin超時后會干嘛:
/** * Delayed rebalance operations that are added to the purgatory when group is preparing for rebalance * * Whenever a join-group request is received, check if all known group members have requested * to re-join the group; if yes, complete this operation to proceed rebalance. * * When the operation has expired, any known members that have not requested to re-join * the group are marked as failed, and complete this operation to proceed rebalance with * the rest of the group. */ private[group] class DelayedJoin(coordinator: GroupCoordinator, group: GroupMetadata, rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout, Some(group.lock)) { override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete _) override def onExpiration() = coordinator.onExpireJoin() override def onComplete() = coordinator.onCompleteJoin(group) } // 超時之后啥也沒干,哈哈,因為確實不用做啥,置空就好了 // 核心是onComplete函數(shù)和tryComplete函數(shù) def onExpireJoin() { // TODO: add metrics for restabilize timeouts }
def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = { group.inLock { if (group.notYetRejoinedMembers.isEmpty) forceComplete() else false } } def notYetRejoinedMembers = members.values.filter(_.awaitingJoinCallback == null).toList def forceComplete(): Boolean = { if (completed.compareAndSet(false, true)) { // cancel the timeout timer cancel() onComplete() true } else { false } }
def onCompleteJoin(group: GroupMetadata) { group.inLock { // remove any members who haven't joined the group yet // 如果組內(nèi)成員依舊沒能連上,那么就刪除它,接收當(dāng)前JOIN階段 group.notYetRejoinedMembers.foreach { failedMember => group.remove(failedMember.memberId) // TODO: cut the socket connection to the client } if (!group.is(Dead)) { // 狀態(tài)機流轉(zhuǎn) : preparingRebalancing -> CompletingRebalance group.initNextGeneration() if (group.is(Empty)) { info(s"Group ${group.groupId} with generation ${group.generationId} is now empty " + s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})") groupManager.storeGroup(group, Map.empty, error => { if (error != Errors.NONE) { // we failed to write the empty group metadata. If the broker fails before another rebalance, // the previous generation written to the log will become active again (and most likely timeout). // This should be safe since there are no active members in an empty generation, so we just warn. warn(s"Failed to write empty metadata for group ${group.groupId}: ${error.message}") } }) } else { // JOIN階段標(biāo)志結(jié)束日志 info(s"Stabilized group ${group.groupId} generation ${group.generationId} " + s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})") // trigger the awaiting join group response callback for all the members after rebalancing for (member <- group.allMemberMetadata) { assert(member.awaitingJoinCallback != null) val joinResult = JoinGroupResult( // 如果是leader 就返回member列表及其元數(shù)據(jù)信息 members = if (group.isLeader(member.memberId)) { group.currentMemberMetadata } else { Map.empty }, memberId = member.memberId, generationId = group.generationId, subProtocol = group.protocolOrNull, leaderId = group.leaderOrNull, error = Errors.NONE) member.awaitingJoinCallback(joinResult) member.awaitingJoinCallback = null completeAndScheduleNextHeartbeatExpiration(group, member) } } } } }
上面這一串代碼有幾個要點,首先,這個任務(wù)超時的時候是啥也不干的,為什么呢?這里要了解時間輪的機制,代碼也在上面,當(dāng)一個任務(wù)超時的時候,時間輪強制執(zhí)行對應(yīng)任務(wù)的onComplete函數(shù),然后執(zhí)行onExpiration函數(shù),其實onExpiration函數(shù)對于這個延時任務(wù)來說是沒有意義的,并不需要做什么,打日志都懶得打。
第二點就是這個任務(wù)onComplete什么時候會被調(diào)用呢?難道就只能等待5分鐘超時才能被調(diào)用嗎?那不是每一次rebalance都必須要等待5分鐘?當(dāng)然不可能啦,這里就需要先看下tryComplete函數(shù)的內(nèi)容,發(fā)現(xiàn)這個內(nèi)容會去檢查還沒連上的member,如果發(fā)現(xiàn)到期了,就強制完成。那么我們看下這tryComplete是在哪兒被調(diào)用的?這里需要插入一點之前沒貼全的代碼,在doJoinGroup函數(shù)中的而最后一段:
if (group.is(PreparingRebalance)) joinPurgatory.checkAndComplete(GroupKey(group.groupId))
這段代碼非常關(guān)鍵,當(dāng)當(dāng)前狀態(tài)是PreparingRebalance的時候,會嘗試去完成當(dāng)前的延時任務(wù),最終調(diào)用的代碼:
private[server] def maybeTryComplete(): Boolean = { var retry = false var done = false do { if (lock.tryLock()) { try { tryCompletePending.set(false) done = tryComplete() } finally { lock.unlock() } // While we were holding the lock, another thread may have invoked `maybeTryComplete` and set // `tryCompletePending`. In this case we should retry. retry = tryCompletePending.get() } else { // Another thread is holding the lock. If `tryCompletePending` is already set and this thread failed to // acquire the lock, then the thread that is holding the lock is guaranteed to see the flag and retry. // Otherwise, we should set the flag and retry on this thread since the thread holding the lock may have // released the lock and returned by the time the flag is set. retry = !tryCompletePending.getAndSet(true) } } while (!isCompleted && retry) done }
就是上面的tryComplete函數(shù),最終會調(diào)用到DelayedJoin中的tryComplete函數(shù),什么意思呢?已經(jīng)很明顯了,每來一個JOIN請求的時候,如果處于PreparingRebalance階段,都會去檢查一下group中原來的成員是否已經(jīng)到齊了,到齊了就立刻結(jié)束JOIN階段往后走??吹竭@兒,回頭看下InitialDelayedJoin這個延時任務(wù)的tryComplete為什么就默認(rèn)實現(xiàn)了個false呢?也明白了,就是初始化延時任務(wù)的時候不讓你嘗試完成,我就等3s,不需要你們來觸發(fā)我提前完成。
以上,我們就看完了整個服務(wù)端的JOIN請求處理過程,其實主要核心就是這兩個延時任務(wù),如果不聯(lián)系上下文,不了解時間輪機制,看起來確實費勁。接下來就看下SYNC階段是如何處理的。
直接看下面的核心源碼邏輯:
private def doSyncGroup(group: GroupMetadata, generationId: Int, memberId: String, groupAssignment: Map[String, Array[Byte]], responseCallback: SyncCallback) { group.inLock { if (!group.has(memberId)) { responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID) } else if (generationId != group.generationId) { responseCallback(Array.empty, Errors.ILLEGAL_GENERATION) } else { group.currentState match { case Empty | Dead => responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID) case PreparingRebalance => responseCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS) // 只有g(shù)roup處于compeletingRebalance狀態(tài)下才會被處理 // 其余狀態(tài)都是錯誤的狀態(tài) case CompletingRebalance => // 給當(dāng)前member設(shè)置回調(diào),之后就啥也不干,也不返回 // 等到leader的分區(qū)方案就緒后,才會被返回。 group.get(memberId).awaitingSyncCallback = responseCallback // if this is the leader, then we can attempt to persist state and transition to stable // 只有收到leader的SYNC才會被處理,并進行狀態(tài)機流轉(zhuǎn) if (group.isLeader(memberId)) { info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}") // fill any missing members with an empty assignment val missing = group.allMembers -- groupAssignment.keySet val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap groupManager.storeGroup(group, assignment, (error: Errors) => { group.inLock { // another member may have joined the group while we were awaiting this callback, // so we must ensure we are still in the CompletingRebalance state and the same generation // when it gets invoked. if we have transitioned to another state, then do nothing if (group.is(CompletingRebalance) && generationId == group.generationId) { if (error != Errors.NONE) { resetAndPropagateAssignmentError(group, error) maybePrepareRebalance(group) } else { setAndPropagateAssignment(group, assignment) // 狀態(tài)機流轉(zhuǎn):CompletingRebalance -> Stable group.transitionTo(Stable) } } } }) } // 如果已經(jīng)處于stable狀態(tài),說明leader已經(jīng)把分區(qū)分配方案傳上來了 // 那么直接從group的元數(shù)據(jù)里面返回對應(yīng)的方案就好了 case Stable => // if the group is stable, we just return the current assignment val memberMetadata = group.get(memberId) responseCallback(memberMetadata.assignment, Errors.NONE) // 開啟心跳檢測 completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId)) } } } }
我們可能對上面的代碼案處理會有一個疑問,為什么只有l(wèi)eader的SYNC請求才會被處理呢?要是其他consumer比leader早上來了難道就卡這兒不管了?不像JOIN階段那樣加入個時間輪設(shè)置個最大超時時間?這要是leader一直不發(fā)送SNYC請求,那不就所有成員都這兒干等著,無限等待了?
我們一個個來回答,首先,我們看上面的代碼,每個請求過來第一件事是先設(shè)置回調(diào),然后才去卡住等著,直到leader把分區(qū)分配方案通過SYNC請求帶上來。
第二個問題,如果其他consumer比leader早到了就這么干等著嗎?是的,沒錯,代碼就是這么寫的。
第三個問題,為什么不設(shè)置個最大超時時間啥的?我們可以看下客戶端的代碼,一旦開啟rebalance之后,就只會進行相關(guān)請求的收發(fā),意味著leader在收到JOIN階段的返回后,中間不會有任何業(yè)務(wù)代碼的影響,直接就是分配完分區(qū)然后發(fā)送SYNC請求;這就意味著leader的JOIN響應(yīng)和SYNC請求之間理論上是不存在阻塞的,因此就可以不用設(shè)置超時,就不用加入時間輪了。
第四個問題,leader一直不發(fā)送SYNC請求就干等著?是的,代碼就是這么寫的。不過你想想,哪些情況能讓leader一直不發(fā)送SYNC請求?我能想到的就是GC/leader宕機了,無論是哪種情況都會因為心跳線程出了問題被服務(wù)端檢測到,因此在對應(yīng)的心跳任務(wù)超時后重新開啟下一輪的rebalance。哪怕是GC很長時間之后恢復(fù)了繼續(xù)發(fā)SYNC請求過來,也會因為generation不匹配而得到錯誤返回開啟下一輪rebalance。
最后再看下leader到了之后會具體做啥:
private def setAndPropagateAssignment(group: GroupMetadata, assignment: Map[String, Array[Byte]]) { assert(group.is(CompletingRebalance)) // 給每個member的分配方案賦值 group.allMemberMetadata.foreach(member => member.assignment = assignment(member.memberId)) // 在整個group中傳播這個分配方案 propagateAssignment(group, Errors.NONE) } private def propagateAssignment(group: GroupMetadata, error: Errors) { // 遍歷 // 如果是follower比leader先到SYNC請求 // 那么就只會設(shè)置個callback,就啥都不干了,也不會返回 // 直到leader帶著分配方案來了以后,把狀態(tài)更改為stable之后,才會遍歷 // 看看有哪些member已經(jīng)發(fā)送了請求過來,設(shè)置了callback,然后一次性給他們返回回去對應(yīng)的分區(qū)方案 // 所以這個名稱叫做【傳播分配方案】 // 真是絕妙 for (member <- group.allMemberMetadata) { if (member.awaitingSyncCallback != null) { // 通過回調(diào)告訴member對應(yīng)的分配方案 member.awaitingSyncCallback(member.assignment, error) member.awaitingSyncCallback = null // reset the session timeout for members after propagating the member's assignment. // This is because if any member's session expired while we were still awaiting either // the leader sync group or the storage callback, its expiration will be ignored and no // future heartbeat expectations will not be scheduled. completeAndScheduleNextHeartbeatExpiration(group, member) } } }
看,最開始設(shè)置的回調(diào),在收到leader請求時候,起了作用;會被挨個遍歷后響應(yīng)具體的分區(qū)分配方案,另外就是kafka里面的命名都很準(zhǔn)確。
SYNC階段簡單說起來就是等待leader把分區(qū)分配方案傳上來,如果member先到就設(shè)置個回調(diào)先等著,如果leader先到,就直接把分區(qū)分配方案存到group的元數(shù)據(jù)中,然后狀態(tài)修改為Stable,后續(xù)其他member來的SYNC請求就直接從group的元數(shù)據(jù)取分區(qū)分配方案,然后自己消費去了。
八、線上如何排查rebalance問題?
看完理論,讓我們來看下線上問題怎么排查rebalance問題。 rebalance有哪些問題呢?我們來整理一下:
為什么會rebalance呢?是什么引起的?能定位到是哪個客戶端嘛?
rebalance耗時了多久?為什么會劣化? 常見的就上面兩個問題,我們按個來回答。
首先,為什么會rebalance,這個就三種情況,分區(qū)信息變化、客戶端變化、coordinator變化。
一般線上常見的就是客戶端變化,那么客戶端有哪些可能的變化呢?——新增成員,減少成員。
新增成員怎么看呢?很簡單嘛,找到coordinator,然后去kafka-request.log
里面搜:cat kafka-request.log |grep -i find | grep -i ${group.id}
不過一般FIND_COORDINATOR請求的處理時間都小于10ms,所以只能打開debug日志才能看到。一般這種讓客戶自己看,對應(yīng)的時間點是不是有啟動kafka-consumer就行了,其實也不常見,這種情況。畢竟很少有人頻繁開啟關(guān)閉消費者,就算是有也是不好的業(yè)務(wù)使用方式。
減少成員呢?又分為兩種:心跳超時,poll間隔超過配置 心跳超時的標(biāo)識日志:
def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) { group.inLock { if (!shouldKeepMemberAlive(member, heartbeatDeadline)) { // 標(biāo)識日志 info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group") removeMemberAndUpdateGroup(group, member) } } }
很遺憾poll間隔超時,在1.1.0版本的info級別下并沒有可查找的日志,檢測poll時間間隔超時的是對應(yīng)客戶端的心跳線程,在檢測到超過配置后就會主動leaveGroup從而觸發(fā)rebalance,而這個請求在服務(wù)端依舊沒有info級別的請求,因此,要判斷是poll間隔超時引起的rebalance,就只能看下有沒有上面心跳超時的日志,如果沒有可能就是因為這個原因造成的。目前大多數(shù)的rebalance都是因為這個原因造成的,而且這個原因引發(fā)的rebalance同時還可能伴隨著很長的rebalance耗時。
來看下服務(wù)端是如何做poll間隔超時的呢?
} else if (heartbeat.pollTimeoutExpired(now)) { // the poll timeout has expired, which means that the foreground thread has stalled // in between calls to poll(), so we explicitly leave the group. maybeLeaveGroup(); } public boolean sessionTimeoutExpired(long now) { return now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeout; } public synchronized void maybeLeaveGroup() { if (!coordinatorUnknown() && state != MemberState.UNJOINED && generation != Generation.NO_GENERATION) { // this is a minimal effort attempt to leave the group. we do not // attempt any resending if the request fails or times out. log.debug("Sending LeaveGroup request to coordinator {}", coordinator); LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(groupId, generation.memberId); client.send(coordinator, request) .compose(new LeaveGroupResponseHandler()); client.pollNoWakeup(); } resetGeneration(); }
總結(jié)一下,怎么定位rebalance的問題,就是找標(biāo)志日志,然后排除法,實在不行了就打開debug日志。
接著看第二個問題,rebalance一次的時間耗費了多久?為什么會劣化到幾分鐘? 因為整個rebalance過程是線性的過程,就是狀態(tài)按照請求順序流轉(zhuǎn),因此呢找到對應(yīng)的標(biāo)志日志就好啦。 開啟的標(biāo)志日志:
// rebalance開始標(biāo)志日志 info(s"Preparing to rebalance group ${group.groupId} with old generation ${group.generationId} " + s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")
結(jié)束的兩種標(biāo)識日志:這兩種結(jié)束日志都行,因為都差不多代表著rebalance過程完成,原因在上面已經(jīng)講的很清楚了。
// JOIN階段標(biāo)志結(jié)束日志 info(s"Stabilized group ${group.groupId} generation ${group.generationId} " + s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})") // SYNC階段結(jié)束日志 info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")
那么如何統(tǒng)計整個rebalance過程的時間呢? 顯而易見,結(jié)束時間 - 開始時間呀。
知道是怎么什么原因開啟了rebalance之后,該怎么定位業(yè)務(wù)問題呢? 心跳超時:因為心跳線程是守護線程,一般都是因為客戶端的機器負(fù)載太高導(dǎo)致心跳現(xiàn)場無法獲取到CPU導(dǎo)致的。
poll間隔超過配置:顯然嘛,就是poll出來數(shù)據(jù)之后,進行業(yè)務(wù)處理的時候太慢了,建議根據(jù)業(yè)務(wù)優(yōu)化消費邏輯,改成多線程消費或者異步消費。
這個很簡單,我們想一下,與這個group有關(guān)的元數(shù)據(jù)全部都在coordinator那里,哪些請求會與coordinator交互呢?HEARTBEAT/OFFSET_COMMIT嘛,就這倆,那么其實正常的member都是靠這兩個請求來感知到自己要去進行rebalance的,我們分別來看下。
首先是HEARTBEAT請求,每次都會帶上當(dāng)前消費組的generation值,也就是紀(jì)元值,要是服務(wù)端rebalance已經(jīng)完成了,紀(jì)元值+1,那么此時就會發(fā)現(xiàn)自己沒匹配上,然后緊接著就去設(shè)置自己的RejoinNeeded的標(biāo)識,在下一輪poll 的時候就會去開啟rebalance。
如果說是rebalance還沒完成,那就更簡單了,發(fā)現(xiàn)group的狀態(tài)不是stable,直接就返回對應(yīng)的錯誤,然后設(shè)置標(biāo)識,加入到rebalance過程中。
服務(wù)端源碼:
case Some(group) => group.inLock { group.currentState match { case Dead => // if the group is marked as dead, it means some other thread has just removed the group // from the coordinator metadata; this is likely that the group has migrated to some other // coordinator OR the group is in a transient unstable phase. Let the member retry // joining without the specified member id, responseCallback(Errors.UNKNOWN_MEMBER_ID) case Empty => responseCallback(Errors.UNKNOWN_MEMBER_ID) case CompletingRebalance => if (!group.has(memberId)) responseCallback(Errors.UNKNOWN_MEMBER_ID) else responseCallback(Errors.REBALANCE_IN_PROGRESS) case PreparingRebalance => if (!group.has(memberId)) { responseCallback(Errors.UNKNOWN_MEMBER_ID) } else if (generationId != group.generationId) { responseCallback(Errors.ILLEGAL_GENERATION) } else { val member = group.get(memberId) completeAndScheduleNextHeartbeatExpiration(group, member) responseCallback(Errors.REBALANCE_IN_PROGRESS) } case Stable => if (!group.has(memberId)) { responseCallback(Errors.UNKNOWN_MEMBER_ID) // 紀(jì)元切換 } else if (generationId != group.generationId) { responseCallback(Errors.ILLEGAL_GENERATION) } else { val member = group.get(memberId) // 完成上次的延時,新建新的延時任務(wù) completeAndScheduleNextHeartbeatExpiration(group, member) // 回調(diào)響應(yīng) responseCallback(Errors.NONE) }
客戶端源碼:
private class HeartbeatResponseHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> { @Override public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) { sensors.heartbeatLatency.record(response.requestLatencyMs()); Errors error = heartbeatResponse.error(); if (error == Errors.NONE) { log.debug("Received successful Heartbeat response"); future.complete(null); } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) { log.debug("Attempt to heartbeat since coordinator {} is either not started or not valid.", coordinator()); markCoordinatorUnknown(); future.raise(error); } else if (error == Errors.REBALANCE_IN_PROGRESS) { log.debug("Attempt to heartbeat failed since group is rebalancing"); requestRejoin(); future.raise(Errors.REBALANCE_IN_PROGRESS); } else if (error == Errors.ILLEGAL_GENERATION) { log.debug("Attempt to heartbeat failed since generation {} is not current", generation.generationId); resetGeneration(); future.raise(Errors.ILLEGAL_GENERATION); } else if (error == Errors.UNKNOWN_MEMBER_ID) { log.debug("Attempt to heartbeat failed for since member id {} is not valid.", generation.memberId); resetGeneration(); future.raise(Errors.UNKNOWN_MEMBER_ID); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId)); } else { future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message())); } } } protected synchronized void requestRejoin() { this.rejoinNeeded = true; }
所以我們客戶端看到這種異常,就知道怎么回事了,就是我在rebalance的過程中,或者已經(jīng)完成了,客戶端的紀(jì)元不對。
REBALANCE_IN_PROGRESS(27, "The group is rebalancing, so a rejoin is needed.", new ApiExceptionBuilder() { @Override public ApiException build(String message) { return new RebalanceInProgressException(message); } }), ILLEGAL_GENERATION(22, "Specified group generation id is not valid.", new ApiExceptionBuilder() { @Override public ApiException build(String message) { return new IllegalGenerationException(message); } }),
我們再看OFFSET_COMMIT請求,其實和HEARTBEAT請求是基本一致的。
服務(wù)端:
group.inLock { if (group.is(Dead)) { responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID)) } else if ((generationId < 0 && group.is(Empty)) || (producerId != NO_PRODUCER_ID)) { // The group is only using Kafka to store offsets. // Also, for transactional offset commits we don't need to validate group membership and the generation. groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, producerId, producerEpoch) } else if (group.is(CompletingRebalance)) { responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS)) } else if (!group.has(memberId)) { responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID)) } else if (generationId != group.generationId) { responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION)) } else { val member = group.get(memberId) completeAndScheduleNextHeartbeatExpiration(group, member) groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback) } } }
客戶端:
else if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.ILLEGAL_GENERATION || error == Errors.REBALANCE_IN_PROGRESS) { // need to re-join group resetGeneration(); future.raise(new CommitFailedException()); return; /** * Reset the generation and memberId because we have fallen out of the group. */ protected synchronized void resetGeneration() { this.generation = Generation.NO_GENERATION; this.rejoinNeeded = true; this.state = MemberState.UNJOINED; }
從源碼我們可以看到,客戶端在感知rebalance主要通過兩個機制,一個是狀態(tài),一個是紀(jì)元;狀態(tài)生效于rebalance過程中,紀(jì)元生效于rebalance的JOIN階段結(jié)束后。
與coordinator交互的這兩個請求都會帶上自己的紀(jì)元信息,在服務(wù)端處理前都會校驗一下狀態(tài)已經(jīng)紀(jì)元信息,一旦不對,就告訴客戶端你需要rebalance了。
首先明確下,rebalance會有什么影響?引用JVM的術(shù)語來說,就是STOP THE WORLD
。
一旦開啟rebalance過程,在消費者進入JOIN階段后就無法再繼續(xù)消費,就是整個group的成員全部STW,所以對業(yè)務(wù)的影響還是很大的。
“KAFKA中rebalance是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。