溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何保證Kafka不丟失消息

發(fā)布時間:2021-07-12 13:45:09 來源:億速云 閱讀:178 作者:chen 欄目:大數據

本篇內容介紹了“如何保證Kafka不丟失消息”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

 

kafka如何保證不丟消息

ps:這篇文章自我感覺說的很大白話了!希望你們看過了之后能有收獲。

不了解 Kafka 的朋友建議先看一看我的下面這幾篇文章,第一篇一定要看,其他的可以按需學習。

  1. 入門篇!大白話帶你認識 Kafka!
  2. 5分鐘帶你體驗一把 Kafka
  3. Kafka系列第三篇!10 分鐘學會如何在 Spring Boot 程序中使用 Kafka 作為消息隊列?
 

生產者丟失消息的情況

生產者(Producer) 調用send方法發(fā)送消息之后,消息可能因為網絡問題并沒有發(fā)送過去。

所以,我們不能默認在調用send方法發(fā)送消息之后消息消息發(fā)送成功了。為了確定消息是發(fā)送成功,我們要判斷消息發(fā)送的結果。但是要注意的是  Kafka 生產者(Producer) 使用  send 方法發(fā)送消息實際上是異步的操作,我們可以通過 get()方法獲取調用結果,但是這樣也讓它變?yōu)榱送讲僮?,示例代碼如下:

詳細代碼見我的這篇文章:Kafka系列第三篇!10 分鐘學會如何在 Spring Boot 程序中使用 Kafka 作為消息隊列?

SendResult<String, Object> sendResult = kafkaTemplate.send(topic, o).get();
if (sendResult.getRecordMetadata() != null) {
 logger.info("生產者成功發(fā)送消息到" + sendResult.getProducerRecord().topic() + "-> " + sendRe
             sult.getProducerRecord().value().toString());
}
 

但是一般不推薦這么做!可以采用為其添加回調函數的形式,示例代碼如下:

        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
       future.addCallback(result -> logger.info("生產者成功發(fā)送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),
               ex -> logger.error("生產者發(fā)送消失敗,原因:{}", ex.getMessage()));
 

如果消息發(fā)送失敗的話,我們檢查失敗的原因之后重新發(fā)送即可!

另外這里推薦為 Producer 的retries(重試次數)設置一個比較合理的值,一般是 3 ,但是為了保證消息不丟失的話一般會設置比較大一點。設置完成之后,當出現網絡問題之后能夠自動重試消息發(fā)送,避免消息丟失。另外,建議還要設置重試間隔,因為間隔太小的話重試的效果就不明顯了,網絡波動一次你3次一下子就重試完了

 

消費者丟失消息的情況

我們知道消息在被追加到 Partition(分區(qū))的時候都會分配一個特定的偏移量(offset)。偏移量(offset)表示 Consumer 當前消費到的 Partition(分區(qū))的所在的位置。Kafka 通過偏移量(offset)可以保證消息在分區(qū)內的順序性。

如何保證Kafka不丟失消息  
kafka offset

當消費者拉取到了分區(qū)的某個消息之后,消費者會自動提交了 offset。自動提交的話會有一個問題,試想一下,當消費者剛拿到這個消息準備進行真正消費的時候,突然掛掉了,消息實際上并沒有被消費,但是 offset 卻被自動提交了。

解決辦法也比較粗暴,我們手動關閉自動提交 offset,每次在真正消費完消息之后之后再自己手動提交 offset 。 但是,細心的朋友一定會發(fā)現,這樣會帶來消息被重新消費的問題。比如你剛剛消費完消息之后,還沒提交 offset,結果自己掛掉了,那么這個消息理論上就會被消費兩次。

 

Kafka 弄丟了消息

我們知道 Kafka 為分區(qū)(Partition)引入了多副本(Replica)機制。分區(qū)(Partition)中的多個副本之間會有一個叫做 leader 的家伙,其他副本稱為 follower。我們發(fā)送的消息會被發(fā)送到 leader 副本,然后 follower 副本才能從 leader 副本中拉取消息進行同步。生產者和消費者只與 leader 副本交互。你可以理解為其他副本只是 leader 副本的拷貝,它們的存在只是為了保證消息存儲的安全性。

試想一種情況:假如 leader 副本所在的 broker 突然掛掉,那么就要從 follower 副本重新選出一個 leader ,但是 leader 的數據還有一些沒有被 follower 副本的同步的話,就會造成消息丟失。

 
設置 acks = all

解決辦法就是我們設置  acks = all。acks 是 Kafka 生產者(Producer)  很重要的一個參數。

acks 的默認值即為1,代表我們的消息被leader副本接收之后就算被成功發(fā)送。當我們配置 acks = all 代表則所有副本都要接收到該消息之后該消息才算真正成功被發(fā)送。

 
設置 replication.factor >= 3

為了保證 leader 副本能有 follower 副本能同步消息,我們一般會為 topic 設置 replication.factor >= 3。這樣就可以保證每個 分區(qū)(partition) 至少有 3 個副本。雖然造成了數據冗余,但是帶來了數據的安全性。

 
設置 min.insync.replicas > 1

一般情況下我們還需要設置 min.insync.replicas> 1 ,這樣配置代表消息至少要被寫入到 2 個副本才算是被成功發(fā)送。min.insync.replicas 的默認值為 1 ,在實際生產中應盡量避免默認值 1。

但是,為了保證整個 Kafka 服務的高可用性,你需要確保 replication.factor > min.insync.replicas 。為什么呢?設想一下加入兩者相等的話,只要是有一個副本掛掉,整個分區(qū)就無法正常工作了。這明顯違反高可用性!一般推薦設置成 replication.factor = min.insync.replicas + 1。

 
設置 unclean.leader.election.enable = false

Kafka 0.11.0.0版本開始 unclean.leader.election.enable 參數的默認值由原來的true 改為false

我們最開始也說了我們發(fā)送的消息會被發(fā)送到 leader 副本,然后 follower 副本才能從 leader 副本中拉取消息進行同步。多個 follower 副本之間的消息同步情況不一樣,當我們配置了 unclean.leader.election.enable = false  的話,當 leader 副本發(fā)生故障時就不會從  follower 副本中和 leader 同步程度達不到要求的副本中選擇出  leader ,這樣降低了消息丟失的可能性。

 

“如何保證Kafka不丟失消息”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI