您好,登錄后才能下訂單哦!
Kafka重復消費場景及解決方案是什么,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。
Kafka消費者以消費者組(Consumer Group)的形式消費一個topic,發(fā)布到topic中的每個記錄將傳遞到每個訂閱消費者者組中的一個消費者實例。Consumer Group 之間彼此獨立,互不影響,它們能夠訂閱相同的一組主題而互不干涉。生產環(huán)境中消費者在消費消息的時候若不考慮消費者的相關特性可能會出現(xiàn)重復消費的問題。
在討論重復消費之前,首先來看一下kafka中跟消費者有關的幾個重要配置參數(shù)。
enable.auto.commit默認值true,表示消費者會周期性自動提交消費的offset
auto.commit.interval.ms在enable.auto.commit為true的情況下, 自動提交的間隔,默認值5000ms
max.poll.records 單次消費者拉取的最大數(shù)據(jù)條數(shù),默認值
500 max.poll.interval.ms默認值5分鐘,表示若5分鐘之內消費者沒有消費完上一次poll的消息,那么consumer會主動發(fā)起離開group的請求
在常見的使用場景下,我們的消費者配置比較簡單,特別是集成Spring組件進行消息的消費,通常情況下我們僅需通過一個注解就可以實現(xiàn)消息的消費。例如如下代碼:
這段代碼中我們配置了一個kafka消費注解,制定消費名為"test1"的topic,這個消費者屬于"group1"消費組。開發(fā)者只需要對得到的消息進行處理即可。那么這段 代碼中的消費者在這個過程中是如何拉取消息的呢,消費者消費消息之后又是如何提交對應消息的位移(offset)的呢?
實際上在auto.commit=true時,當上一次poll方法拉取的消息消費完時會進行下一次poll,在經(jīng)過auto.commit.interval.ms間隔后,下一次調用poll時會提交所有已消費消息的offset。
為了驗證consumer自動提交的時機,配置消費者參數(shù)如下:
為了便于獲取消費者消費進度,以下代碼通過kafka提供的相關接口定時每隔5s獲取一次消費者的消費進度信息,并將獲取到的信息打印到控制臺。
對于topic test1,為了便于觀察消費情況,我們僅設置了一個partition。對于消費者組group1的配置參數(shù),消費者會單次拉取消息數(shù)20條,消費每條消息耗費1s,部分記錄日志打印結果如下:
從日志中可以看出,消費組的offset每40s更新一次,因為每次poll會拉取20條消息,每個消息消費1s,在第一次poll之后,下一次poll因為沒有達到auto.commit.interval.ms=30s,所以不會提交offset。第二次poll時,已經(jīng)經(jīng)過40s,因此這次poll會提交之前兩次消費的消息,offset增加40。也就是說只有在經(jīng)過auto.commit.interval.ms間隔后,并且在下一次調用poll時才會提交所有 已消費消息的offset。
考慮到以上消費者消費消息的特點,在配置自動提交enable.auto.commit 默認值true情況下,出現(xiàn)重復消費的場景有以下幾種:
Consumer 在消費過程中,應用進程被強制kill掉或發(fā)生異常退出。
例如在一次poll500條消息后,消費到200條時,進程被強制kill消費導致offset 未提交,或出現(xiàn)異常退出導致消費到offset未提交。下次重啟時,依然會重新拉取這500消息,這樣就造成之前消費到200條消息重復消費了兩次。因此在有消費者線程的應用中,應盡量避免使用kill -9這樣強制殺進程的命令。
消費者消費時間過長
max.poll.interval.ms參數(shù)定義了兩次poll的最大間隔,它的默認值是 5 分鐘,表示你的 Consumer 程序如果在 5 分鐘之內無法消費完 poll 方法返回的消息,那么 Consumer 會主動發(fā)起“離開組”的請求,Coordinator 也會開啟新一輪 Rebalance。若消費者消費的消息比較耗時,那么這種情況可能就會出現(xiàn)。
為了復現(xiàn)這種場景,我們對消費者重新進行了配置,消費者參數(shù)如下:
在消費過程中消費者單次會拉取11條消息,每條消息耗時30s,11條消息耗時 5分鐘30秒,由于max.poll.interval.ms默認值5分鐘,所以理論上消費者無法在5分鐘內消費完,consumer會離開組,導致rebalance。
實際運行日志如下:
可以看到在消費完第11條消息后,因為消費時間超出max.poll.interval.ms默認值5分鐘,這時consumer已經(jīng)離開消費組了,開始rebalance,因此提交offset失敗。之后重新rebalance,消費者再次分配partition后,再次poll拉取消息依然從之前消費過的消息處開始消費,這樣就造成重復消費。而且若不解決消費單次消費時間過長的問題,這部分消息可能會一直重復消費。
對于上述重復消費的場景,若不進行相應的處理,那么有可能造成一些線上問題。為了避免因重復消費導致的問題,以下提供了兩種解決重復消費的思路。
第一種思路是提高消費能力,提高單條消息的處理速度,例如對消息處理中比 較耗時的步驟可通過異步的方式進行處理、利用多線程處理等。在縮短單條消息消費時常的同時,根據(jù)實際場景可將max.poll.interval.ms值設置大一點,避免不 必要的rebalance,此外可適當減小max.poll.records的值,默認值是500,可根 據(jù)實際消息速率適當調小。這種思路可解決因消費時間過長導致的重復消費問題, 對代碼改動較小,但無法絕對避免重復消費問題。
第二種思路是引入單獨去重機制,例如生成消息時,在消息中加入唯一標識符如消息id等。在消費端,我們可以保存最近的1000條消息id到redis或mysql表中,配置max.poll.records的值小于1000。在消費消息時先通過前置表去重后再進行消息的處理。
此外,在一些消費場景中,我們可以將消費的接口冪等處理,例如數(shù)據(jù)庫的查 詢操作天然具有冪等性,這時候可不用考慮重復消費的問題。對于例如新增數(shù)據(jù)的操作,可通過設置唯一鍵等方式以達到單次與多次操作對系統(tǒng)的影響相同,從而使接口具有冪等性。
關于 Kafka重復消費場景及解決方案是什么問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業(yè)資訊頻道了解更多相關知識。
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內容。