溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

?Kafka順序消費線程模型的優(yōu)化方法是什么

發(fā)布時間:2021-10-23 11:02:32 來源:億速云 閱讀:161 作者:iii 欄目:編程語言

本篇內(nèi)容主要講解“ Kafka順序消費線程模型的優(yōu)化方法是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學(xué)習(xí)“ Kafka順序消費線程模型的優(yōu)化方法是什么”吧!

 Kafka 順序消費線程模型的實踐與優(yōu)化

各類消息中間件對順序消息實現(xiàn)的做法是將具有順序性的一類消息發(fā)往相同的主題分區(qū)中,只需要將這類消息設(shè)置相同的 Key 即可,而 Kafka 會在任意時刻保證一個消費組同時只能有一個消費者監(jiān)聽消費,因此可在消費時按分區(qū)進(jìn)行順序消費,保證每個分區(qū)的消息具備局部順序性。由于需要確保分區(qū)消息的順序性,并不能并發(fā)地消費消費,對消費的吞吐量會造成一定的影響。那么,如何在保證消息順序性的前提下,最大限度的提高消費者的消費能力?

本文將會對 Kafka 消費者拉取消息流程進(jìn)行深度分析之后,對 Kafka 消費者順序消費線程模型進(jìn)行一次實踐與優(yōu)化。

Kafka 消費者拉取消息流程分析

在講實現(xiàn) Kafka 順序消費線程模型之前,我們需要先深入分析 Kafka 消費者的消息拉取機制,只有當(dāng)你對 Kafka 消費者拉取消息的整個流程有深入的了解之后,你才能夠很好地理解本次線程模型改造的方案。

我先給大家模擬一下消息拉取的實際現(xiàn)象,這里 max.poll.records = 500。

1、消息沒有堆積時:

?Kafka順序消費線程模型的優(yōu)化方法是什么

可以發(fā)現(xiàn),在消息沒有堆積時,消費者拉取時,如果某個分區(qū)沒有的消息不足 500 條,會從其他分區(qū)湊夠 500 條后再返回。

2、多個分區(qū)都有堆積時:

在消息有堆積時,可以發(fā)現(xiàn)每次返回的都是同一個分區(qū)的消息,但經(jīng)過不斷 debug,消費者在拉取過程中并不是等某個分區(qū)消費完沒有堆積了,再拉取下一個分區(qū)的消息,而是不斷循環(huán)的拉取各個分區(qū)的消息,但是這個循環(huán)并不是說分區(qū) p0 拉取完 500 條,后面一定會拉取分區(qū) p1 的消息,很有可能后面還會拉取 p0 分區(qū)的消息,為了弄明白這種現(xiàn)象,我仔細(xì)閱讀了相關(guān)源碼。

org.apache.kafka.clients.consumer.KafkaConsumer#poll

private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {  try {// poll for new data until the timeout expiresdo {      // 客戶端拉取消息核心邏輯  final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);      if (!records.isEmpty()) {//  在返回數(shù)據(jù)之前, 發(fā)送下次的 fetch 請求, 避免用戶在下次獲取數(shù)據(jù)時線程阻塞if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {          // 調(diào)用 ConsumerNetworkClient#poll 方法將 FetchRequest 發(fā)送出去。  client.pollNoWakeup();
        }return this.interceptors.onConsume(new ConsumerRecords<>(records));
      }
    } while (timer.notExpired());return ConsumerRecords.empty();
  } finally {
    release();
  }
}

我們使用 Kafka consumer 進(jìn)行消費的時候通常會給一個時間,比如:

consumer.poll(Duration.ofMillis(3000));

從以上代碼邏輯可以看出來,用戶給定的這個時間,目的是為了等待消息湊夠 max.poll.records 條消息后再返回,即使消息條數(shù)不夠 max.poll.records 消息,時間到了用戶給定的等待時間后,也會返回。

pollForFetches 方法是客戶端拉取消息核心邏輯,但并不是真正去 broker 中拉取,而是從緩存中去獲取消息。在 pollForFetches 拉取消息后,如果消息不為零,還會調(diào)用 fetcher.sendFetches() 與 client.pollNoWakeup(),調(diào)用這兩個方法究竟有什么用呢?

fetcher.sendFetches() 經(jīng)過源碼閱讀后,得知該方法目的是為了構(gòu)建拉取請求 FetchRequest 并進(jìn)行發(fā)送,但是這里的發(fā)送并不是真正的發(fā)送,而是將 FetchRequest 請求對象存放在 unsend 緩存當(dāng)中,然后會在 ConsumerNetworkClient#poll 方法調(diào)用時才會被真正地執(zhí)行發(fā)送。

fetcher.sendFetches() 在構(gòu)建 FetchRequest 前,會對當(dāng)前可拉取分區(qū)進(jìn)行篩選,而這個也是決定多分區(qū)拉取消息規(guī)律的核心,后面我會講到。

從 KafkaConsumer#poll 方法源碼可以看出來,其實 Kafka 消費者在拉取消息過程中,有兩條線程在工作,其中用戶主線程調(diào)用 pollForFetches 方法從緩存中獲取消息消費,在獲取消息后,會再調(diào)用 ConsumerNetworkClient#poll 方法從 Broker 發(fā)送拉取請求,然后將拉取到的消息緩存到本地,這里為什么在拉取完消息后,會主動調(diào)用 ConsumerNetworkClient#poll 方法呢?我想這里的目的是為了下次 poll 的時候可以立即從緩存中拉取消息。

pollForFetches 方法會調(diào)用 Fetcher#fetchedRecords 方法從緩存中獲取并解析消息:

public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
  Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
  int recordsRemaining = maxPollRecords;  try {while (recordsRemaining > 0) {      // 如果當(dāng)前獲取消息的 PartitionRecords 為空,或者已經(jīng)拉取完畢  // 則需要從 completedFetches 重新獲取 completedFetch 并解析成 PartitionRecords  if (nextInLineRecords == null || nextInLineRecords.isFetched) {// 如果上一個分區(qū)緩存中的數(shù)據(jù)已經(jīng)拉取完了,直接中斷本次循環(huán)拉取,并返回空的消息列表// 直至有緩存數(shù)據(jù)為止CompletedFetch completedFetch = completedFetches.peek();if (completedFetch == null) break;try {          // CompletedFetch 即拉取消息的本地緩存數(shù)據(jù)  // 緩存數(shù)據(jù)中 CompletedFetch 解析成 PartitionRecords  nextInLineRecords = parseCompletedFetch(completedFetch);
        } catch (Exception e) {          // ...}
        completedFetches.poll();
      } else {// 從分區(qū)緩存中獲取指定條數(shù)的消息List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);// ...fetched.put(partition, records);
        recordsRemaining -= records.size();
      }
    }
  }
} catch (KafkaException e) {  // ...}return fetched;
}

completedFetches 是拉取到的消息緩存,以上代碼邏輯就是圍繞著如何從 completedFetches 緩存中獲取消息的,從以上代碼邏輯可以看出:

maxPollRecords 為本次拉取的最大消息數(shù)量,該值可通過 max.poll.records 參數(shù)配置,默認(rèn)為 500 條,該方法每次從 completedFetches 中取出一個 CompletedFetch 并解析成可以拉取的 PartitionRecords 對象,即方法中的 nextInLineRecords,請注意,PartitionRecords 中的消息數(shù)量可能大與 500 條,因此可能本次可能一次性從 PartitionRecords 獲取 500 條消息后即返回,如果 PartitionRecords 中消息數(shù)量不足 500 條,會從 completedFetches 緩存中取出下一個要拉取的分區(qū)消息,recordsRemaining 會記錄本次剩余還有多少消息沒拉取,通過循環(huán)不斷地從 completedFetches 緩存中取消息,直至 recordsRemaining 為 0。

以上代碼即可解釋為什么消息有堆積的情況下,每次拉取的消息很大概率是同一個分區(qū)的消息,因為緩存 CompletedFetch 緩存中的消息很大概率會多余每次拉取消息數(shù)量,Kafka 客戶端每次從 Broker 拉取的消息數(shù)據(jù)并不是通過 max.poll.records 決定的,該參數(shù)僅決定用戶每次從本地緩存中獲取多少條數(shù)據(jù),真正決定從 Broker 拉取的消息數(shù)據(jù)量是通過 fetch.min.bytes、max.partition.fetch.bytes、fetch.max.bytes 等參數(shù)決定的。

我們再想一下,假設(shè)某個分區(qū)的消息一直都處于堆積狀態(tài),Kafka 會每次都拉取這個分區(qū)直至將該分區(qū)消費完畢嗎?(根據(jù)假設(shè),Kafka 消費者每次都會從這個分區(qū)拉取消息,并將消息存到分區(qū)關(guān)聯(lián)的 CompletedFetch 緩存中,根據(jù)以上代碼邏輯,nextInLineRecords 一直處于還沒拉取完的狀態(tài),導(dǎo)致每次拉取都會從該分區(qū)中拉取消息。)

答案顯然不會,不信你打開 Kafka-manager 觀察每個分區(qū)的消費進(jìn)度情況,每個分區(qū)都會有消費者在消費中。

那 Kafka 消費者是如何循環(huán)地拉取它監(jiān)聽的分區(qū)呢?我們接著往下分析。

發(fā)送拉取請求邏輯:

org.apache.kafka.clients.consumer.internals.Fetcher#sendFetches

public synchronized int sendFetches() {  // 解析本次可拉取的分區(qū)
  Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests();  for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {final Node fetchTarget = entry.getKey();final FetchSessionHandler.FetchRequestData data = entry.getValue();// 構(gòu)建請求對象final FetchRequest.Builder request = FetchRequest.Builder
      .forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
      .isolationLevel(isolationLevel)
      .setMaxBytes(this.maxBytes)
      .metadata(data.metadata())
      .toForget(data.toForget());// 發(fā)送請求,但不是真的發(fā)送,而是將請求保存在 unsent 中client.send(fetchTarget, request)
      .addListener(new RequestFutureListener<ClientResponse>() {@Overridepublic void onSuccess(ClientResponse resp) {
          synchronized (Fetcher.this) {// ... ...// 創(chuàng)建 CompletedFetch, 并緩存到 completedFetches 隊列中completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
                                                    resp.requestHeader().apiVersion()));
          }

        }
      }                   // ... ...   });
  }  return fetchRequestMap.size();
}

以上代碼邏輯很好理解,在發(fā)送拉取請求前,先檢查哪些分區(qū)可拉取,接著為每個分區(qū)構(gòu)建一個 FetchRequest 對象,F(xiàn)etchRequest 中的 minBytes 和 maxBytes,分別可通過 fetch.min.bytes 和 fetch.max.bytes 參數(shù)設(shè)置。這也是每次從 Broker 中拉取的消息不一定等于 max.poll.records 的原因。

?Kafka順序消費線程模型的優(yōu)化方法是什么

prepareFetchRequests 方法會調(diào)用 Fetcher#fetchablePartitions 篩選可拉取的分區(qū),我們來看下 Kafka 消費者是如何進(jìn)行篩選的:

org.apache.kafka.clients.consumer.internals.Fetcher#fetchablePartitions

private List<TopicPartition> fetchablePartitions() {
  Set<TopicPartition> exclude = new HashSet<>();
  List<TopicPartition> fetchable = subscriptions.fetchablePartitions();  if (nextInLineRecords != null && !nextInLineRecords.isFetched) {
    exclude.add(nextInLineRecords.partition);
  }  for (CompletedFetch completedFetch : completedFetches) {
    exclude.add(completedFetch.partition);
  }
  fetchable.removeAll(exclude);  return fetchable;
}

nextInLineRecords 即我們上面提到的根據(jù)某個分區(qū)緩存 CompletedFetch 解析得到的,如果 nextInLineRecords 中的緩存還沒拉取完,則不從 broker 中拉取消息了,以及如果此時 completedFetches 緩存中存在該分區(qū)的緩存,也不進(jìn)行拉取消息。

我們可以很清楚的得出結(jié)論:

當(dāng)緩存中還存在中還存在某個分區(qū)的消息數(shù)據(jù)時,消費者不會繼續(xù)對該分區(qū)進(jìn)行拉取請求,直到該分區(qū)的本地緩存被消費完,才會繼續(xù)發(fā)送拉取請求。

為了更加清晰的表達(dá)這段邏輯,我舉個例子并將整個流程用圖表達(dá)出來:

假設(shè)某消費者監(jiān)聽三個分區(qū),每個分區(qū)每次從 Broker 中拉取 4 條消息,用戶每次從本地緩存中獲取 2 條消息:

?Kafka順序消費線程模型的優(yōu)化方法是什么

這種消費模型創(chuàng)建多個 KafkaConsumer 對象,每個線程維護(hù)一個 KafkaConsumer,從而實現(xiàn)線程隔離消費,由于每個分區(qū)同一時刻只能有一個消費者消費,所以這種消費模型天然支持順序消費。

但是缺點是無法提升單個分區(qū)的消費能力,如果一個主題分區(qū)數(shù)量很多,只能通過增加 KafkaConsumer 實例提高消費能力,這樣一來線程數(shù)量過多,導(dǎo)致項目 Socket 連接開銷巨大,項目中一般不用該線程模型去消費。

2、單 KafkaConsumer 實例 + 多 worker 線程

?Kafka順序消費線程模型的優(yōu)化方法是什么

首先在初始化的時候,會對消費線程池進(jìn)行初始化,具體是根據(jù) threadsNumMax 的數(shù)量創(chuàng)建若干個單個線程的線程池,單個線程的線程池就是為了保證每個分區(qū)取模后拿到線程池是串行消費的,但這里創(chuàng)建 threadsNumMax 個線程池是不合理的,后面我會說到。

com.zto.consumer.KafkaConsumerProxy#submitRecords

?Kafka順序消費線程模型的優(yōu)化方法是什么

以上就是目前 ZMS 順序消費的線程模型,用圖表示以上代碼邏輯:

?Kafka順序消費線程模型的優(yōu)化方法是什么

在消息流量大的時候,順序消息消費時卻退化成單線程消費了。

如何提高 Kafka 順序消費的并發(fā)度?

經(jīng)過對 ZMS 的消費線程模型以及對 Kafka 消費者拉取消息流程的深入了解之后,我想到了如下幾個方面對 ZMS 的消費線程模型進(jìn)行優(yōu)化:

1、細(xì)化消息順序粒度

之前的做法是將每個分區(qū)單獨一條線程消費,無法再繼續(xù)在分區(qū)之上增加消費能力,我們知道業(yè)務(wù)方發(fā)送順序消息時,會將同一類型具有順序性的消息給一個相同的 Key,以保證這類消息發(fā)送到同一個分區(qū)進(jìn)行消費,從而達(dá)到消息順序消費的目的,而同一個分區(qū)會接收多種類型(即不同 Key)的消息,每次拉取的消息具有很大可能是不同類型的,那么我們就可以將同一個分區(qū)的消息,分配一個獨立的線程池,再利用消息 Key 進(jìn)行取模放入對應(yīng)的線程中消費,達(dá)到并發(fā)消費的目的,且不打亂消息的順序性。

2、細(xì)化位移提交粒度

由于 ZMS 目前是手動提交位移,目前每次拉取消息必須先消費完才能進(jìn)行位移提交,既然已經(jīng)對分區(qū)消息進(jìn)行指定的線程池消費了,由于分區(qū)之間的位移先后提交不影響,那么我們可以將位移提交交給每個分區(qū)進(jìn)行管理,這樣拉取主線程不必等到是否消費完才進(jìn)行下一輪的消息拉取。

3、異步拉取與限流

異步拉取有個問題,就是如果節(jié)點消費跟不上,而拉取消息過多地保存在本地,很可能會造成內(nèi)存溢出,因此我們需要對消息拉取進(jìn)行限流,當(dāng)本地消息緩存量達(dá)到一定量時,阻止消息拉取。

上面在分析 Kafka 消費者拉取消息流程時,我們知道消費者在發(fā)送拉取請求時,首先會判斷本地緩存中是否存在該分區(qū)的緩存,如果存在,則不發(fā)送拉取請求,但由于 ZMS 需要改造成異步拉取的形式,由于 Comsumer#poll 不再等待消息消費完再進(jìn)行下一輪拉取,因此 Kafka 的本地緩存中幾乎不會存在數(shù)據(jù)了,導(dǎo)致 Kafka 每次都會發(fā)送拉取請求,相當(dāng)于將 Kafka 的本地緩存放到 ZMS 中,因此我們需要 ZMS 層面上對消息拉取進(jìn)行限流,Kafka 消費者有兩個方法可以設(shè)置訂閱的分區(qū)是否可以發(fā)送拉取請求:

// 暫停分區(qū)消費(即暫停該分區(qū)發(fā)送拉取消息請求)org.apache.kafka.clients.consumer.KafkaConsumer#pause// 恢復(fù)分區(qū)消費(即恢復(fù)該分區(qū)發(fā)送拉取消息請求)org.apache.kafka.clients.consumer.KafkaConsumer#resume

以上兩個方法,其實就是改變了消費者的訂閱分區(qū)的狀態(tài)值 paused,當(dāng) paused = true 時,暫停分區(qū)消費,當(dāng) paused = false 時,恢復(fù)分區(qū)消費,這個參數(shù)是在哪里使用到呢?上面在分析 Kafka 消費者拉取消息流程時我們有提到發(fā)送拉取請求之前,會對可拉取的分區(qū)進(jìn)行篩選,其中一個條件即分區(qū) paused = false:

org.apache.kafka.clients.consumer.internals.SubscriptionState.TopicPartitionState#isFetchable

private boolean isFetchable() {  return !paused && hasValidPosition();
}

由于 KafkaConsumer 是非線程安全的,如果我們在異步線程 KafkaConsumer 相關(guān)的類,會報如下錯誤:

KafkaConsumer is not safe for multi-threaded access

只需要確保 KafkaConsumer 相關(guān)方法在 KafkaConsumer#poll 方法線程中調(diào)用即可,具體做法可以設(shè)置一個線程安全上下文容器,異步線程操作 KafkaConsumer 相關(guān)方法是,只需要將具體的分區(qū)放到上下文容器即可,后續(xù)統(tǒng)一由 poll 線程執(zhí)行。

因此我們只需要利用好這個特性,就可以實現(xiàn)拉取限流,消費者主線程的 Comsumer#poll 方法依然是異步不斷地從緩存中獲取消息,同時不會造成兩次 poll 之間的時間過大導(dǎo)致消費者被踢出消費組。

以上優(yōu)化改造的核心是在不打亂消息順序的前提下利用消息 Key 盡可能地并發(fā)消費,但如果遇到分區(qū)中的消息都是相同 Key,并且在有一定的積壓下每次拉取都是同一個分區(qū)的消息時,以上模型可能沒有理想情況下的那么好。這時是否可以將 fetch.max.bytes 與 max.partition.fetch.bytes 參數(shù)設(shè)置小一點,讓每個分區(qū)的本地緩存都不足 500 條,這樣每次 poll 的消息列表都可以包含多個分區(qū)的消息了,但這樣又會導(dǎo)致 RPC 請求增多,這就需要針對業(yè)務(wù)消息大小,對這些參數(shù)進(jìn)行調(diào)優(yōu)。

以上線程模型,需要增加一個參數(shù) orderlyConsumePartitionParallelism,用于設(shè)置分區(qū)消費并行度,假設(shè)某個消費組被分配 5 個分區(qū)進(jìn)行消費,則每個分區(qū)默認(rèn)啟動一條線程消費,一共 5 * 1 = 5 條消費線程,當(dāng) orderlyConsumePartitionParallelism = 3,則每個分區(qū)啟動 3 條線程消費,一共 5 * 3 = 15 條消費線程。orderlyConsumePartitionParallelism = 1 時,則說明該分區(qū)所有消息都處在順序(串行)消費;當(dāng) orderlyConsumePartitionParallelism > 1 時,則根據(jù)分區(qū)消息的 Key 進(jìn)行取模分配線程消費,保證不了整個分區(qū)順序消費,但保證相同 Key 的消息順序消費。

注意,當(dāng) orderlyConsumePartitionParallelism > 1 時,分區(qū)消費線程的有效使用率取決于該分區(qū)消息的 Key:

1、如果該分區(qū)所有消息的 Key 都相同,則消費的 Key 取模都分配都同一條線程當(dāng)中,并行度退化成 orderlyConsumePartitionParallelism = 1;

2、如果該分區(qū)相同 Key 的消息過于集中,會導(dǎo)致每次拉取都是相同 key 的一批消息,同樣并行度退化成 orderlyConsumePartitionParallelism = 1。

綜合對比:

優(yōu)化前,ZMS 可保證整個分區(qū)消息的順序性,優(yōu)化后可根據(jù)消息 Key 在分區(qū)的基礎(chǔ)上不打亂相同 Key 消息的順序性前提下進(jìn)行并發(fā)消費,有效地提升了單分區(qū)的消費吞吐量;優(yōu)化前,有很大的概率會退化成同一時刻單線程消費,優(yōu)化后盡可能至少保證每個分區(qū)一條線程消費,情況好的時候每個分區(qū)可多條線程消費。

通過以上場景分析,該優(yōu)化方案不是提高順序消費吞吐量的銀彈,它有很大的局限性,用戶在業(yè)務(wù)的實現(xiàn)上不能重度依賴順序消費去實現(xiàn),以免影響業(yè)務(wù)性能上的需求。

到此,相信大家對“ Kafka順序消費線程模型的優(yōu)化方法是什么”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI