溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

KAFKA中rebalance是什么

發(fā)布時間:2021-11-22 09:48:14 來源:億速云 閱讀:348 作者:iii 欄目:云計算

本篇內(nèi)容介紹了“KAFKA中rebalance是什么”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!


一、寫在前面

讓我們從頭到尾梳理一下rebalance。


二、什么是rebalance?

中文直譯,就是重平衡。
是什么去重平衡呢?消費組內(nèi)的消費者成員去重平衡。(消費組的概念如果不清楚各位先自行百度,后續(xù)我寫到消費模塊的時候才會提到這些概念)
為什么需要重平衡呢?因為消費組內(nèi)成員的故障轉(zhuǎn)移和動態(tài)分區(qū)分配。

翻譯一下:
消費組內(nèi)成員的故障轉(zhuǎn)移:當(dāng)一個消費組內(nèi)有三個消費者A,B,C,分別消費分區(qū):a,b,c

A -> a
B -> b
C -> c

此時如果A消費者出了點問題,那么就意味著a分區(qū)沒有消費者進行消費了,那這肯定不行,那么就通過rebalance去將a分區(qū)分配給其他還存活著的消費者客戶端,rebalance后可能得到的消費策略:

A -> a (GG)
B -> b,a
C -> c

這就是消費組內(nèi)成員的故障轉(zhuǎn)移,就是某個消費者客戶端出問題之后把它原本消費的分區(qū)通過REBALNACE分配給其他存活的消費者客戶端。

動態(tài)分區(qū)分配:當(dāng)某個topic的分區(qū)數(shù)變化,對于消費組而言可消費的分區(qū)數(shù)變化了,因此就需要rebalance去重新進行動態(tài)分區(qū)分配,舉個栗子,原本某topic只有3個分區(qū),我現(xiàn)在擴成了10個分區(qū),那么不就意味著多了7個分區(qū)沒有消費者消費嗎?這顯然是不行的,因此就需要rebalance過程去進行分區(qū)分配,讓現(xiàn)有的消費者去把這10個分區(qū)全部消費到。


三、rebalance是怎么觸發(fā)的?

這個其實在上面一小節(jié)已經(jīng)提到的差不多了,在這個小節(jié)再做一點補充和總結(jié)。
觸發(fā)條件:

  1. 消費組內(nèi)成員變化:下線/上線/故障被踢出。

  2. 消費的分區(qū)數(shù)變化:topic被刪了,topic分區(qū)數(shù)增加了。

  3. coordinator節(jié)點出問題了:因為消費組的元數(shù)據(jù)信息都是在coordinator節(jié)點的,因此coordinator節(jié)點出問題也會觸發(fā)rebalance去找一個新的coordinator節(jié)點。怎么找呢?顯然就是走一遍FIND_COORDINATOR請求嘛,然后找到負(fù)載最低的那個節(jié)點問一下,我的新的coordinator在哪兒呀?然后得到答案之后讓消費者客戶端去連新的coordinator節(jié)點。


四、rebalance的宏觀過程

整個rebalance的過程,是一個狀態(tài)機流轉(zhuǎn)的過程,整體過程示意圖如下:圖源:https://www.cnblogs.com/huxi2b/p/6815797.html KAFKA中rebalance是什么
其實上面這個狀態(tài)機流轉(zhuǎn)過程在明白原理的情況下,已經(jīng)非常清晰了,但是如果沒看過源碼的,依舊不知道為什么是這么流轉(zhuǎn)的,什么情況下狀態(tài)是Empty呢,什么狀態(tài)下是Stable呢?什么時候Empty狀態(tài)會轉(zhuǎn)換為PreparingRebalance狀態(tài)呢?
下面我就根據(jù)請求順序來看下整個狀態(tài)的流轉(zhuǎn)過程: KAFKA中rebalance是什么
讓我們來回答上個小節(jié)后面提出的幾個比較細節(jié)的問題:

這些請求都帶有哪些關(guān)鍵數(shù)據(jù)?
在FIND_COORDINATOR請求的時候,會帶上自己的group.id值,這個值是用來計算它的coordinator到底在哪兒的,對應(yīng)的計算方法就是:coordinatorId=groupId.hash % 50 這個算出來是個數(shù)字,代表著具體的分區(qū),哪個topic的分區(qū)呢?顯然是__consumer_offsets了。
在JOIN_GROUP請求的時候,是沒帶什么關(guān)鍵參數(shù)的,但是在響應(yīng)的時候會挑選一個客戶端作為leader,然后在響應(yīng)中告訴它被選為了leader并且把消費組元數(shù)據(jù)信息發(fā)給它,然后讓該客戶端去進行分區(qū)分配。
在SYNC_GROUP請求的時候,leader就會帶上它根據(jù)具體的策略已經(jīng)分配好的分區(qū)分配方案,服務(wù)端收到后就更新到元數(shù)據(jù)里面去,然后其余的consumer客戶端只要一發(fā)送SYNC請求過來就告訴它要消費哪些分區(qū),然后讓它自己去消費就ok了。

到底是哪個階段導(dǎo)致rebalance過程會劣化到幾分鐘?
我圖中特意將JOIN階段標(biāo)位紅色,就是讓這個階段顯得顯眼一些,沒錯就是這個階段會導(dǎo)致rebalance整個過程耗時劣化到幾分鐘。
具體的原因就是JOIN階段會等待原先組內(nèi)存活的成員發(fā)送JOIN_GROUP請求過來,如果原先組內(nèi)的成員因為業(yè)務(wù)處理一直沒有發(fā)送JOIN_GROUP請求過來,服務(wù)端就會一直等待,直到超時。這個超時時間就是max.poll.interval.ms的值,默認(rèn)是5分鐘,因此這種情況下rebalance的耗時就會劣化到5分鐘,導(dǎo)致所有消費者都無法進行正常消費,影響非常大。

為什么要分為這么多階段?
這個主要是設(shè)計上的考慮,整個過程設(shè)計的還是非常優(yōu)雅的,第一次連上的情況下需要三次請求,正常運行的consumer去進行rebalance只需要兩次請求,因為它原先就知道自己的coordinator在哪兒,因此就不需要FIND_COORDINATOR請求了,除非是它的coordinator宕機了。

回答完這些問題,是不是對整個rebalance過程理解加深一些了呢?其實還有很多細節(jié)沒有涉及到,例如consumer客戶端什么時候會進入rebalance狀態(tài)?服務(wù)端是如何等待原先消費組內(nèi)的成員發(fā)送JOIN_GROUP請求的呢?這些問題就只能一步步看源碼了。

FIND_COORDINATOR請求的源碼我就不打?qū)懥耍芎唵未蠹铱梢宰约悍幌?,就是帶了個group.id上去,上面都提到了。


六、JOIN階段源碼分析

從這段函數(shù)我們知道,如果加入一個新的消費組,服務(wù)端收到第一個JOIN請求的時候會創(chuàng)建group,這個group的初始狀態(tài)為Empty

      // 如果group都還不存在,就有了memberId,則認(rèn)為是非法請求,直接拒絕。
      groupManager.getGroup(groupId) match {
        case None =>
          // 這里group都還不存在的情況下,memberId自然是空的
          if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
            responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
          } else {
            // 初始狀態(tài)是EMPTY
            val group = groupManager.addGroup(new GroupMetadata(groupId, initialState = Empty))
            // 執(zhí)行具體的加組操作
            doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
          }

        case Some(group) =>
          doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
      }

讓我們進入doJoinGroup函數(shù),看下里面的核心邏輯:

          case Empty | Stable =>
            // 初始狀態(tài)是EMPTY,添加member并且執(zhí)行rebalance
            if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
              // if the member id is unknown, register the member to the group
              addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
            } else {
			// ...
              } else {
			  //...
              }
  private def addMemberAndRebalance(rebalanceTimeoutMs: Int,
                                    sessionTimeoutMs: Int,
                                    clientId: String,
                                    clientHost: String,
                                    protocolType: String,
                                    protocols: List[(String, Array[Byte])],
                                    group: GroupMetadata,
                                    callback: JoinCallback) = {
    // 根據(jù)clientID初始化memberID
    val memberId = clientId + "-">
  def add(member: MemberMetadata) {
    if (members.isEmpty)
      this.protocolType = Some(member.protocolType)

    assert(groupId == member.groupId)
    assert(this.protocolType.orNull == member.protocolType)
    assert(supportsProtocols(member.protocols))
    // coordinator選舉leader很簡單,就第一個發(fā)送join_group請求的那個member
    if (leaderId.isEmpty)
      leaderId = Some(member.memberId)
    members.put(member.memberId, member)
  }

上面的代碼翻譯一下很簡單,就是新來了一個member,封裝一下,添加到這個group中,需要說一下的就是當(dāng)組狀態(tài)是Empty的情況下,誰先連上誰就是leader。緊接著就準(zhǔn)備rebalance:

  private def maybePrepareRebalance(group: GroupMetadata) {
    group.inLock {
      if (group.canRebalance)
        prepareRebalance(group)
    }
  }
  // 這里是傳入PreparingRebalance狀態(tài),然后獲取到一個SET
  // 翻譯一下:就是只有這個SET(Stable, CompletingRebalance, Empty)里面的狀態(tài),才能開啟rebalance
  def canRebalance = GroupMetadata.validPreviousStates(PreparingRebalance).contains(state)

  private val validPreviousStates: Map[GroupState, Set[GroupState]] =
    Map(Dead -> Set(Stable, PreparingRebalance, CompletingRebalance, Empty, Dead),
      CompletingRebalance -> Set(PreparingRebalance),
      Stable -> Set(CompletingRebalance),
      PreparingRebalance -> Set(Stable, CompletingRebalance, Empty),
      Empty -> Set(PreparingRebalance))
  private def prepareRebalance(group: GroupMetadata) {
    // if any members are awaiting sync, cancel their request and have them rejoin
    if (group.is(CompletingRebalance))
      resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)

    val delayedRebalance = if (group.is(Empty))
      new InitialDelayedJoin(this,
        joinPurgatory,
        group,
        groupConfig.groupInitialRebalanceDelayMs,// 默認(rèn)3000ms,即3s
        groupConfig.groupInitialRebalanceDelayMs,
        max(group.rebalanceTimeoutMs - groupConfig.groupInitialRebalanceDelayMs, 0))
    else
      new DelayedJoin(this, group, group.rebalanceTimeoutMs)// 這里這個超時時間是客戶端的poll間隔,默認(rèn)5分鐘
    // 狀態(tài)機轉(zhuǎn)換:EMPTY -> PreparingRebalance
    group.transitionTo(PreparingRebalance)
    // rebalance開始標(biāo)志日志
    info(s"Preparing to rebalance group ${group.groupId} with old generation ${group.generationId} " +
      s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")
    // 加入時間輪
    val groupKey = GroupKey(group.groupId)
    joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
  }

上面這段代碼有兩個關(guān)鍵點,一個是判斷當(dāng)前能否進入rebalance過程,可以看到只有(Stable, CompletingRebalance, Empty)里面的狀態(tài),才能開啟rebalance,而最開始來到第一個member的時候,組的狀態(tài)是Empty顯然是能進來的,但是近來之后就給轉(zhuǎn)換為了PreparingRebalance狀態(tài),那么后續(xù)的member發(fā)送JOIN請求過來之后就進不來了,就只能設(shè)置個回調(diào)后一直等。
那么要等到什么時候呢?第二段代碼寫的很清楚就是等待延時任務(wù)超時,這個延時任務(wù)創(chuàng)建是根據(jù)當(dāng)前狀態(tài)來判斷的,如果是Empty就創(chuàng)建一個InitialDelayedJoin延時任務(wù),超時時間是3s;如果不是Empty就創(chuàng)建一個DelayedJoin,超時時間默認(rèn)是5min。看,源碼出真知,這就是JOIN階段等待member的代碼實現(xiàn)。
這里需要補充一下,為什么Empty的狀態(tài)下要等待3s呢?這其實是一個優(yōu)化,主要就是優(yōu)化多消費者同時連入的情況。舉個栗子,10個消費者都能在3s內(nèi)啟動然后練上,如果你等著3s時間那么一次rebalance過程就搞定了,如果你不等,那么就意味著來一個就又要開啟一次rebalance,一共要進行10次rebalance,這個耗時就比較長了。具體的細節(jié)可以查看:https://www.cnblogs.com/huxi2b/p/6815797.html
另外就是,為什么狀態(tài)不是Empty的時候就延時5分鐘呢?這個其實上面就回答了,要等待原來消費組內(nèi)在線的消費者發(fā)送JOIN請求,這個也是rebalance過程耗時劣化的主要原因。

接下來我們看看這兩個延時任務(wù),在超時的時候分別都會做些啥,首先是InitialDelayedJoin:

/**
  * Delayed rebalance operation that is added to the purgatory when a group is transitioning from
  * Empty to PreparingRebalance
  *
  * When onComplete is triggered we check if any new members have been added and if there is still time remaining
  * before the rebalance timeout. If both are true we then schedule a further delay. Otherwise we complete the
  * rebalance.
  */
private[group] class InitialDelayedJoin(coordinator: GroupCoordinator,
                                        purgatory: DelayedOperationPurgatory[DelayedJoin],
                                        group: GroupMetadata,
                                        configuredRebalanceDelay: Int,
                                        delayMs: Int,
                                        remainingMs: Int) extends DelayedJoin(coordinator, group, delayMs) {

  // 這里寫死是false,是為了在tryComplete的時候不被完成
  override def tryComplete(): Boolean = false

  override def onComplete(): Unit = {
    // 延時任務(wù)處理
    group.inLock  {
      // newMemberAdded是后面有新的member加進來就會是true
      // remainingMs第一次創(chuàng)建該延時任務(wù)的時候就是3s。
      // 所以這個條件在第一次的時候都是成立的
      if (group.newMemberAdded && remainingMs != 0) {
        group.newMemberAdded = false
        val delay = min(configuredRebalanceDelay, remainingMs)
        // 最新計算的remaining恒等于0,其實本質(zhì)上就是3-3=0,
        // 所以哪怕這里是新創(chuàng)建了一個InitialDelayedJoin,這個任務(wù)的超時時間就是下一刻
        // 這么寫的目的其實就是相當(dāng)于去完成這個延時任務(wù)
        val remaining = max(remainingMs - delayMs, 0)
        purgatory.tryCompleteElseWatch(new InitialDelayedJoin(coordinator,
          purgatory,
          group,
          configuredRebalanceDelay,
          delay,
          remaining
        ), Seq(GroupKey(group.groupId)))
      } else
        // 如果沒有新的member加入,直接調(diào)用父類的函數(shù)
        // 完成JOIN階段
        super.onComplete()
    }
  }
}

大意我都寫在注釋里面了,其實就是等待3s,然后完了之后調(diào)用父類的函數(shù)完成整個JOIN階段,不過不聯(lián)系上下文去看,還是挺費勁的,對了看這個需要對時間輪源碼有了解,正好我前面有寫,大家如果有什么不清楚的可以去看下。
接著看下DelayedJoin超時后會干嘛:

/**
 * Delayed rebalance operations that are added to the purgatory when group is preparing for rebalance
 *
 * Whenever a join-group request is received, check if all known group members have requested
 * to re-join the group; if yes, complete this operation to proceed rebalance.
 *
 * When the operation has expired, any known members that have not requested to re-join
 * the group are marked as failed, and complete this operation to proceed rebalance with
 * the rest of the group.
 */
private[group] class DelayedJoin(coordinator: GroupCoordinator,
                                 group: GroupMetadata,
                                 rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout, Some(group.lock)) {

  override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete _)
  override def onExpiration() = coordinator.onExpireJoin()
  override def onComplete() = coordinator.onCompleteJoin(group)
}

  // 超時之后啥也沒干,哈哈,因為確實不用做啥,置空就好了
  // 核心是onComplete函數(shù)和tryComplete函數(shù)
  def onExpireJoin() {
    // TODO: add metrics for restabilize timeouts
  }
  def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
    group.inLock {
      if (group.notYetRejoinedMembers.isEmpty)
        forceComplete()
      else false
    }
  }
  def notYetRejoinedMembers = members.values.filter(_.awaitingJoinCallback == null).toList
  
  def forceComplete(): Boolean = {
    if (completed.compareAndSet(false, true)) {
      // cancel the timeout timer
      cancel()
      onComplete()
      true
    } else {
      false
    }
  }
  def onCompleteJoin(group: GroupMetadata) {
    group.inLock {
      // remove any members who haven't joined the group yet
      // 如果組內(nèi)成員依舊沒能連上,那么就刪除它,接收當(dāng)前JOIN階段
      group.notYetRejoinedMembers.foreach { failedMember =>
        group.remove(failedMember.memberId)
        // TODO: cut the socket connection to the client
      }

      if (!group.is(Dead)) {
        // 狀態(tài)機流轉(zhuǎn) : preparingRebalancing -> CompletingRebalance
        group.initNextGeneration()
        if (group.is(Empty)) {
          info(s"Group ${group.groupId} with generation ${group.generationId} is now empty " +
            s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")

          groupManager.storeGroup(group, Map.empty, error => {
            if (error != Errors.NONE) {
              // we failed to write the empty group metadata. If the broker fails before another rebalance,
              // the previous generation written to the log will become active again (and most likely timeout).
              // This should be safe since there are no active members in an empty generation, so we just warn.
              warn(s"Failed to write empty metadata for group ${group.groupId}: ${error.message}")
            }
          })
        } else {
          // JOIN階段標(biāo)志結(jié)束日志
          info(s"Stabilized group ${group.groupId} generation ${group.generationId} " +
            s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")

          // trigger the awaiting join group response callback for all the members after rebalancing
          for (member <- group.allMemberMetadata) {
            assert(member.awaitingJoinCallback != null)
            val joinResult = JoinGroupResult(
              // 如果是leader 就返回member列表及其元數(shù)據(jù)信息
              members = if (group.isLeader(member.memberId)) {
                group.currentMemberMetadata
              } else {
                Map.empty
              },
              memberId = member.memberId,
              generationId = group.generationId,
              subProtocol = group.protocolOrNull,
              leaderId = group.leaderOrNull,
              error = Errors.NONE)

            member.awaitingJoinCallback(joinResult)
            member.awaitingJoinCallback = null
            completeAndScheduleNextHeartbeatExpiration(group, member)
          }
        }
      }
    }
  }

上面這一串代碼有幾個要點,首先,這個任務(wù)超時的時候是啥也不干的,為什么呢?這里要了解時間輪的機制,代碼也在上面,當(dāng)一個任務(wù)超時的時候,時間輪強制執(zhí)行對應(yīng)任務(wù)的onComplete函數(shù),然后執(zhí)行onExpiration函數(shù),其實onExpiration函數(shù)對于這個延時任務(wù)來說是沒有意義的,并不需要做什么,打日志都懶得打。
第二點就是這個任務(wù)onComplete什么時候會被調(diào)用呢?難道就只能等待5分鐘超時才能被調(diào)用嗎?那不是每一次rebalance都必須要等待5分鐘?當(dāng)然不可能啦,這里就需要先看下tryComplete函數(shù)的內(nèi)容,發(fā)現(xiàn)這個內(nèi)容會去檢查還沒連上的member,如果發(fā)現(xiàn)到期了,就強制完成。那么我們看下這tryComplete是在哪兒被調(diào)用的?這里需要插入一點之前沒貼全的代碼,在doJoinGroup函數(shù)中的而最后一段:

if (group.is(PreparingRebalance))
      joinPurgatory.checkAndComplete(GroupKey(group.groupId))

這段代碼非常關(guān)鍵,當(dāng)當(dāng)前狀態(tài)是PreparingRebalance的時候,會嘗試去完成當(dāng)前的延時任務(wù),最終調(diào)用的代碼:

  private[server] def maybeTryComplete(): Boolean = {
    var retry = false
    var done = false
    do {
      if (lock.tryLock()) {
        try {
          tryCompletePending.set(false)
          done = tryComplete()
        } finally {
          lock.unlock()
        }
        // While we were holding the lock, another thread may have invoked `maybeTryComplete` and set
        // `tryCompletePending`. In this case we should retry.
        retry = tryCompletePending.get()
      } else {
        // Another thread is holding the lock. If `tryCompletePending` is already set and this thread failed to
        // acquire the lock, then the thread that is holding the lock is guaranteed to see the flag and retry.
        // Otherwise, we should set the flag and retry on this thread since the thread holding the lock may have
        // released the lock and returned by the time the flag is set.
        retry = !tryCompletePending.getAndSet(true)
      }
    } while (!isCompleted && retry)
    done
  }

就是上面的tryComplete函數(shù),最終會調(diào)用到DelayedJoin中的tryComplete函數(shù),什么意思呢?已經(jīng)很明顯了,每來一個JOIN請求的時候,如果處于PreparingRebalance階段,都會去檢查一下group中原來的成員是否已經(jīng)到齊了,到齊了就立刻結(jié)束JOIN階段往后走??吹竭@兒,回頭看下InitialDelayedJoin這個延時任務(wù)的tryComplete為什么就默認(rèn)實現(xiàn)了個false呢?也明白了,就是初始化延時任務(wù)的時候不讓你嘗試完成,我就等3s,不需要你們來觸發(fā)我提前完成。

以上,我們就看完了整個服務(wù)端的JOIN請求處理過程,其實主要核心就是這兩個延時任務(wù),如果不聯(lián)系上下文,不了解時間輪機制,看起來確實費勁。接下來就看下SYNC階段是如何處理的。


七、SYNC階段源碼分析

直接看下面的核心源碼邏輯:

  private def doSyncGroup(group: GroupMetadata,
                          generationId: Int,
                          memberId: String,
                          groupAssignment: Map[String, Array[Byte]],
                          responseCallback: SyncCallback) {
    group.inLock {
      if (!group.has(memberId)) {
        responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
      } else if (generationId != group.generationId) {
        responseCallback(Array.empty, Errors.ILLEGAL_GENERATION)
      } else {
        group.currentState match {
          case Empty | Dead =>
            responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)

          case PreparingRebalance =>
            responseCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS)
          // 只有g(shù)roup處于compeletingRebalance狀態(tài)下才會被處理
          // 其余狀態(tài)都是錯誤的狀態(tài)
          case CompletingRebalance =>
            // 給當(dāng)前member設(shè)置回調(diào),之后就啥也不干,也不返回
            // 等到leader的分區(qū)方案就緒后,才會被返回。
            group.get(memberId).awaitingSyncCallback = responseCallback

            // if this is the leader, then we can attempt to persist state and transition to stable
            // 只有收到leader的SYNC才會被處理,并進行狀態(tài)機流轉(zhuǎn)
            if (group.isLeader(memberId)) {
              info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")

              // fill any missing members with an empty assignment
              val missing = group.allMembers -- groupAssignment.keySet
              val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap

              groupManager.storeGroup(group, assignment, (error: Errors) => {
                group.inLock {
                  // another member may have joined the group while we were awaiting this callback,
                  // so we must ensure we are still in the CompletingRebalance state and the same generation
                  // when it gets invoked. if we have transitioned to another state, then do nothing
                  if (group.is(CompletingRebalance) && generationId == group.generationId) {
                    if (error != Errors.NONE) {
                      resetAndPropagateAssignmentError(group, error)
                      maybePrepareRebalance(group)
                    } else {
                      setAndPropagateAssignment(group, assignment)
                      // 狀態(tài)機流轉(zhuǎn):CompletingRebalance -> Stable
                      group.transitionTo(Stable)
                    }
                  }
                }
              })
            }
          // 如果已經(jīng)處于stable狀態(tài),說明leader已經(jīng)把分區(qū)分配方案傳上來了
          // 那么直接從group的元數(shù)據(jù)里面返回對應(yīng)的方案就好了
          case Stable =>
            // if the group is stable, we just return the current assignment
            val memberMetadata = group.get(memberId)
            responseCallback(memberMetadata.assignment, Errors.NONE)
            // 開啟心跳檢測
            completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
        }
      }
    }
  }

我們可能對上面的代碼案處理會有一個疑問,為什么只有l(wèi)eader的SYNC請求才會被處理呢?要是其他consumer比leader早上來了難道就卡這兒不管了?不像JOIN階段那樣加入個時間輪設(shè)置個最大超時時間?這要是leader一直不發(fā)送SNYC請求,那不就所有成員都這兒干等著,無限等待了?
我們一個個來回答,首先,我們看上面的代碼,每個請求過來第一件事是先設(shè)置回調(diào),然后才去卡住等著,直到leader把分區(qū)分配方案通過SYNC請求帶上來。
第二個問題,如果其他consumer比leader早到了就這么干等著嗎?是的,沒錯,代碼就是這么寫的。
第三個問題,為什么不設(shè)置個最大超時時間啥的?我們可以看下客戶端的代碼,一旦開啟rebalance之后,就只會進行相關(guān)請求的收發(fā),意味著leader在收到JOIN階段的返回后,中間不會有任何業(yè)務(wù)代碼的影響,直接就是分配完分區(qū)然后發(fā)送SYNC請求;這就意味著leader的JOIN響應(yīng)和SYNC請求之間理論上是不存在阻塞的,因此就可以不用設(shè)置超時,就不用加入時間輪了。
第四個問題,leader一直不發(fā)送SYNC請求就干等著?是的,代碼就是這么寫的。不過你想想,哪些情況能讓leader一直不發(fā)送SYNC請求?我能想到的就是GC/leader宕機了,無論是哪種情況都會因為心跳線程出了問題被服務(wù)端檢測到,因此在對應(yīng)的心跳任務(wù)超時后重新開啟下一輪的rebalance。哪怕是GC很長時間之后恢復(fù)了繼續(xù)發(fā)SYNC請求過來,也會因為generation不匹配而得到錯誤返回開啟下一輪rebalance。
最后再看下leader到了之后會具體做啥:

  private def setAndPropagateAssignment(group: GroupMetadata, assignment: Map[String, Array[Byte]]) {
    assert(group.is(CompletingRebalance))
    // 給每個member的分配方案賦值
    group.allMemberMetadata.foreach(member => member.assignment = assignment(member.memberId))
    // 在整個group中傳播這個分配方案
    propagateAssignment(group, Errors.NONE)
  }
  
  private def propagateAssignment(group: GroupMetadata, error: Errors) {
    // 遍歷
    // 如果是follower比leader先到SYNC請求
    // 那么就只會設(shè)置個callback,就啥都不干了,也不會返回
    // 直到leader帶著分配方案來了以后,把狀態(tài)更改為stable之后,才會遍歷
    // 看看有哪些member已經(jīng)發(fā)送了請求過來,設(shè)置了callback,然后一次性給他們返回回去對應(yīng)的分區(qū)方案
    // 所以這個名稱叫做【傳播分配方案】
    // 真是絕妙
    for (member <- group.allMemberMetadata) {
      if (member.awaitingSyncCallback != null) {
        // 通過回調(diào)告訴member對應(yīng)的分配方案
        member.awaitingSyncCallback(member.assignment, error)
        member.awaitingSyncCallback = null

        // reset the session timeout for members after propagating the member's assignment.
        // This is because if any member's session expired while we were still awaiting either
        // the leader sync group or the storage callback, its expiration will be ignored and no
        // future heartbeat expectations will not be scheduled.
        completeAndScheduleNextHeartbeatExpiration(group, member)
      }
    }
  }

看,最開始設(shè)置的回調(diào),在收到leader請求時候,起了作用;會被挨個遍歷后響應(yīng)具體的分區(qū)分配方案,另外就是kafka里面的命名都很準(zhǔn)確。

SYNC階段簡單說起來就是等待leader把分區(qū)分配方案傳上來,如果member先到就設(shè)置個回調(diào)先等著,如果leader先到,就直接把分區(qū)分配方案存到group的元數(shù)據(jù)中,然后狀態(tài)修改為Stable,后續(xù)其他member來的SYNC請求就直接從group的元數(shù)據(jù)取分區(qū)分配方案,然后自己消費去了。


八、線上如何排查rebalance問題?
看完理論,讓我們來看下線上問題怎么排查rebalance問題。 rebalance有哪些問題呢?我們來整理一下:

  • 為什么會rebalance呢?是什么引起的?能定位到是哪個客戶端嘛?

  • rebalance耗時了多久?為什么會劣化? 常見的就上面兩個問題,我們按個來回答。

首先,為什么會rebalance,這個就三種情況,分區(qū)信息變化、客戶端變化、coordinator變化。
一般線上常見的就是客戶端變化,那么客戶端有哪些可能的變化呢?——新增成員,減少成員

新增成員怎么看呢?很簡單嘛,找到coordinator,然后去kafka-request.log里面搜:cat kafka-request.log |grep -i find | grep -i ${group.id} 不過一般FIND_COORDINATOR請求的處理時間都小于10ms,所以只能打開debug日志才能看到。一般這種讓客戶自己看,對應(yīng)的時間點是不是有啟動kafka-consumer就行了,其實也不常見,這種情況。畢竟很少有人頻繁開啟關(guān)閉消費者,就算是有也是不好的業(yè)務(wù)使用方式。

減少成員呢?又分為兩種:心跳超時,poll間隔超過配置 心跳超時的標(biāo)識日志:

  def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
    group.inLock {
      if (!shouldKeepMemberAlive(member, heartbeatDeadline)) {
        // 標(biāo)識日志
        info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group")
        removeMemberAndUpdateGroup(group, member)
      }
    }
  }

很遺憾poll間隔超時,在1.1.0版本的info級別下并沒有可查找的日志,檢測poll時間間隔超時的是對應(yīng)客戶端的心跳線程,在檢測到超過配置后就會主動leaveGroup從而觸發(fā)rebalance,而這個請求在服務(wù)端依舊沒有info級別的請求,因此,要判斷是poll間隔超時引起的rebalance,就只能看下有沒有上面心跳超時的日志,如果沒有可能就是因為這個原因造成的。目前大多數(shù)的rebalance都是因為這個原因造成的,而且這個原因引發(fā)的rebalance同時還可能伴隨著很長的rebalance耗時。
來看下服務(wù)端是如何做poll間隔超時的呢?

} else if (heartbeat.pollTimeoutExpired(now)) {
// the poll timeout has expired, which means that the foreground thread has stalled
// in between calls to poll(), so we explicitly leave the group.
    maybeLeaveGroup();
}

public boolean sessionTimeoutExpired(long now) {
    return now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeout;
}

    public synchronized void maybeLeaveGroup() {
        if (!coordinatorUnknown() && state != MemberState.UNJOINED && generation != Generation.NO_GENERATION) {
            // this is a minimal effort attempt to leave the group. we do not
            // attempt any resending if the request fails or times out.
            log.debug("Sending LeaveGroup request to coordinator {}", coordinator);
            LeaveGroupRequest.Builder request =
                    new LeaveGroupRequest.Builder(groupId, generation.memberId);
            client.send(coordinator, request)
                    .compose(new LeaveGroupResponseHandler());
            client.pollNoWakeup();
        }

        resetGeneration();
    }

總結(jié)一下,怎么定位rebalance的問題,就是找標(biāo)志日志,然后排除法,實在不行了就打開debug日志

接著看第二個問題,rebalance一次的時間耗費了多久?為什么會劣化到幾分鐘? 因為整個rebalance過程是線性的過程,就是狀態(tài)按照請求順序流轉(zhuǎn),因此呢找到對應(yīng)的標(biāo)志日志就好啦。 開啟的標(biāo)志日志:

// rebalance開始標(biāo)志日志
info(s"Preparing to rebalance group ${group.groupId} with old generation ${group.generationId} " +
 s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")

結(jié)束的兩種標(biāo)識日志:這兩種結(jié)束日志都行,因為都差不多代表著rebalance過程完成,原因在上面已經(jīng)講的很清楚了。

 // JOIN階段標(biāo)志結(jié)束日志
 info(s"Stabilized group ${group.groupId} generation ${group.generationId} " +
 s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")

// SYNC階段結(jié)束日志
info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")

那么如何統(tǒng)計整個rebalance過程的時間呢? 顯而易見,結(jié)束時間 - 開始時間呀。

知道是怎么什么原因開啟了rebalance之后,該怎么定位業(yè)務(wù)問題呢? 心跳超時:因為心跳線程是守護線程,一般都是因為客戶端的機器負(fù)載太高導(dǎo)致心跳現(xiàn)場無法獲取到CPU導(dǎo)致的。
poll間隔超過配置:顯然嘛,就是poll出來數(shù)據(jù)之后,進行業(yè)務(wù)處理的時候太慢了,建議根據(jù)業(yè)務(wù)優(yōu)化消費邏輯,改成多線程消費或者異步消費。


九、消費者如何感知到rebalance的呢?

這個很簡單,我們想一下,與這個group有關(guān)的元數(shù)據(jù)全部都在coordinator那里,哪些請求會與coordinator交互呢?HEARTBEAT/OFFSET_COMMIT嘛,就這倆,那么其實正常的member都是靠這兩個請求來感知到自己要去進行rebalance的,我們分別來看下。

首先是HEARTBEAT請求,每次都會帶上當(dāng)前消費組的generation值,也就是紀(jì)元值,要是服務(wù)端rebalance已經(jīng)完成了,紀(jì)元值+1,那么此時就會發(fā)現(xiàn)自己沒匹配上,然后緊接著就去設(shè)置自己的RejoinNeeded的標(biāo)識,在下一輪poll 的時候就會去開啟rebalance。
如果說是rebalance還沒完成,那就更簡單了,發(fā)現(xiàn)group的狀態(tài)不是stable,直接就返回對應(yīng)的錯誤,然后設(shè)置標(biāo)識,加入到rebalance過程中。
服務(wù)端源碼:

        case Some(group) =>
          group.inLock {
            group.currentState match {
              case Dead =>
                // if the group is marked as dead, it means some other thread has just removed the group
                // from the coordinator metadata; this is likely that the group has migrated to some other
                // coordinator OR the group is in a transient unstable phase. Let the member retry
                // joining without the specified member id,
                responseCallback(Errors.UNKNOWN_MEMBER_ID)

              case Empty =>
                responseCallback(Errors.UNKNOWN_MEMBER_ID)

              case CompletingRebalance =>
                if (!group.has(memberId))
                  responseCallback(Errors.UNKNOWN_MEMBER_ID)
                else
                  responseCallback(Errors.REBALANCE_IN_PROGRESS)

              case PreparingRebalance =>
                if (!group.has(memberId)) {
                  responseCallback(Errors.UNKNOWN_MEMBER_ID)
                } else if (generationId != group.generationId) {
                  responseCallback(Errors.ILLEGAL_GENERATION)
                } else {
                  val member = group.get(memberId)
                  completeAndScheduleNextHeartbeatExpiration(group, member)
                  responseCallback(Errors.REBALANCE_IN_PROGRESS)
                }

              case Stable =>
                if (!group.has(memberId)) {
                  responseCallback(Errors.UNKNOWN_MEMBER_ID)
                  // 紀(jì)元切換
                } else if (generationId != group.generationId) {
                  responseCallback(Errors.ILLEGAL_GENERATION)
                } else {
                  val member = group.get(memberId)
                  // 完成上次的延時,新建新的延時任務(wù)
                  completeAndScheduleNextHeartbeatExpiration(group, member)
                  // 回調(diào)響應(yīng)
                  responseCallback(Errors.NONE)
                }

客戶端源碼:

    private class HeartbeatResponseHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
        @Override
        public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
            sensors.heartbeatLatency.record(response.requestLatencyMs());
            Errors error = heartbeatResponse.error();
            if (error == Errors.NONE) {
                log.debug("Received successful Heartbeat response");
                future.complete(null);
            } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                    || error == Errors.NOT_COORDINATOR) {
                log.debug("Attempt to heartbeat since coordinator {} is either not started or not valid.",
                        coordinator());
                markCoordinatorUnknown();
                future.raise(error);
            } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                log.debug("Attempt to heartbeat failed since group is rebalancing");
                requestRejoin();
                future.raise(Errors.REBALANCE_IN_PROGRESS);
            } else if (error == Errors.ILLEGAL_GENERATION) {
                log.debug("Attempt to heartbeat failed since generation {} is not current", generation.generationId);
                resetGeneration();
                future.raise(Errors.ILLEGAL_GENERATION);
            } else if (error == Errors.UNKNOWN_MEMBER_ID) {
                log.debug("Attempt to heartbeat failed for since member id {} is not valid.", generation.memberId);
                resetGeneration();
                future.raise(Errors.UNKNOWN_MEMBER_ID);
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                future.raise(new GroupAuthorizationException(groupId));
            } else {
                future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
            }
        }
    }
	
    protected synchronized void requestRejoin() {
        this.rejoinNeeded = true;
    }

所以我們客戶端看到這種異常,就知道怎么回事了,就是我在rebalance的過程中,或者已經(jīng)完成了,客戶端的紀(jì)元不對。

    REBALANCE_IN_PROGRESS(27, "The group is rebalancing, so a rejoin is needed.",
        new ApiExceptionBuilder() {
            @Override
            public ApiException build(String message) {
                return new RebalanceInProgressException(message);
            }
        }),

    ILLEGAL_GENERATION(22, "Specified group generation id is not valid.",
        new ApiExceptionBuilder() {
            @Override
            public ApiException build(String message) {
                return new IllegalGenerationException(message);
            }
        }),

我們再看OFFSET_COMMIT請求,其實和HEARTBEAT請求是基本一致的。
服務(wù)端:

    group.inLock {
      if (group.is(Dead)) {
        responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
      } else if ((generationId < 0 && group.is(Empty)) || (producerId != NO_PRODUCER_ID)) {
        // The group is only using Kafka to store offsets.
        // Also, for transactional offset commits we don't need to validate group membership and the generation.
        groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, producerId, producerEpoch)
      } else if (group.is(CompletingRebalance)) {
        responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS))
      } else if (!group.has(memberId)) {
        responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
      } else if (generationId != group.generationId) {
        responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION))
      } else {
        val member = group.get(memberId)
        completeAndScheduleNextHeartbeatExpiration(group, member)
        groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback)
      }
    }
  }

客戶端:

else if (error == Errors.UNKNOWN_MEMBER_ID
                            || error == Errors.ILLEGAL_GENERATION
                            || error == Errors.REBALANCE_IN_PROGRESS) {
                        // need to re-join group
                        resetGeneration();
                        future.raise(new CommitFailedException());
                        return;

    /**
     * Reset the generation and memberId because we have fallen out of the group.
     */
    protected synchronized void resetGeneration() {
        this.generation = Generation.NO_GENERATION;
        this.rejoinNeeded = true;
        this.state = MemberState.UNJOINED;
    }

從源碼我們可以看到,客戶端在感知rebalance主要通過兩個機制,一個是狀態(tài),一個是紀(jì)元;狀態(tài)生效于rebalance過程中,紀(jì)元生效于rebalance的JOIN階段結(jié)束后。
與coordinator交互的這兩個請求都會帶上自己的紀(jì)元信息,在服務(wù)端處理前都會校驗一下狀態(tài)已經(jīng)紀(jì)元信息,一旦不對,就告訴客戶端你需要rebalance了。


十、線上如何減小rebalance的影響?

首先明確下,rebalance會有什么影響?引用JVM的術(shù)語來說,就是STOP THE WORLD。
一旦開啟rebalance過程,在消費者進入JOIN階段后就無法再繼續(xù)消費,就是整個group的成員全部STW,所以對業(yè)務(wù)的影響還是很大的。

“KAFKA中rebalance是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI