溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Kafka丟失數(shù)據(jù)問題優(yōu)化分析

發(fā)布時間:2021-11-22 10:09:35 來源:億速云 閱讀:150 作者:iii 欄目:大數(shù)據(jù)

本篇內(nèi)容主要講解“Kafka丟失數(shù)據(jù)問題優(yōu)化分析”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學(xué)習(xí)“Kafka丟失數(shù)據(jù)問題優(yōu)化分析”吧!

數(shù)據(jù)丟失是一件非常嚴重的事情事,針對數(shù)據(jù)丟失的問題我們需要有明確的思路來確定問題所在,針對這段時間的總結(jié),我個人面對kafka 數(shù)據(jù)丟失問題的解決思路如下:

1、是否真正的存在數(shù)據(jù)丟失問題,比如有很多時候可能是其他同事操作了測試環(huán)境,所以首先確保數(shù)據(jù)沒有第三方干擾。

2、理清你的業(yè)務(wù)流程,數(shù)據(jù)流向,數(shù)據(jù)到底是在什么地方丟失的數(shù)據(jù),在kafka 之前的環(huán)節(jié)或者kafka之后的流程丟失?比如kafka的數(shù)據(jù)是由flume提供的,也許是flume丟失了數(shù)據(jù),kafka 自然就沒有這一部分數(shù)據(jù)。

3、如何發(fā)現(xiàn)有數(shù)據(jù)丟失,又是如何驗證的。從業(yè)務(wù)角度考慮,例如:教育行業(yè),每年高考后數(shù)據(jù)量巨大,但是卻反常的比高考前還少,或者源端數(shù)據(jù)量和目的端數(shù)據(jù)量不符

4、 定位數(shù)據(jù)是否在kafka之前就已經(jīng)丟失還事消費端丟失數(shù)據(jù)的

kafka支持數(shù)據(jù)的重新回放功能(換個消費group),清空目的端所有數(shù)據(jù),重新消費。
如果是在消費端丟失數(shù)據(jù),那么多次消費結(jié)果完全一模一樣的幾率很低。
如果是在寫入端丟失數(shù)據(jù),那么每次結(jié)果應(yīng)該完全一樣(在寫入端沒有問題的前提下)。
 

5、kafka環(huán)節(jié)丟失數(shù)據(jù),常見的kafka環(huán)節(jié)丟失數(shù)據(jù)的原因有:

如果auto.commit.enable=true,當consumer fetch了一些數(shù)據(jù)但還沒有完全處理掉的時候,剛好到commit interval出發(fā)了提交offset操作,接著consumer crash掉了。這時已經(jīng)fetch的數(shù)據(jù)還沒有處理完成但已經(jīng)被commit掉,因此沒有機會再次被處理,數(shù)據(jù)丟失。

網(wǎng)絡(luò)負載很高或者磁盤很忙寫入失敗的情況下,沒有自動重試重發(fā)消息。沒有做限速處理,超出了網(wǎng)絡(luò)帶寬限速。kafka一定要配置上消息重試的機制,并且重試的時間間隔一定要長一些,默認1秒鐘并不符合生產(chǎn)環(huán)境(網(wǎng)絡(luò)中斷時間有可能超過1秒)。

如果磁盤壞了,會丟失已經(jīng)落盤的數(shù)據(jù)
 

單批數(shù)據(jù)的長度超過限制會丟失數(shù)據(jù),報kafka.common.MessageSizeTooLargeException異常解決:

Consumer side:fetch.message.max.bytes- this will determine the largest size of a message that can be fetched by the consumer.

Broker side:replica.fetch.max.bytes- this will allow for the replicas in the brokers to send messages within the cluster and make sure the messages are replicated correctly. If this is too small, then the message will never be replicated, and therefore, the consumer will never see the message because the message will never be committed (fully replicated).

Broker side:message.max.bytes- this is the largest size of the message that can be received by the broker from a producer.

Broker side (per topic):max.message.bytes- this is the largest size of the message the broker will allow to be appended to the topic. This size is validated pre-compression. (Defaults to broker'smessage.max.bytes.)
 

6、partition leader在未完成副本數(shù)follows的備份時就宕機的情況,即使選舉出了新的leader但是已經(jīng)push的數(shù)據(jù)因為未備份就丟失了!kafka是多副本的,當你配置了同步復(fù)制之后。多個副本的數(shù)據(jù)都在PageCache里面,出現(xiàn)多個副本同時掛掉的概率比1個副本掛掉的概率就很小了。(官方推薦是通過副本來保證數(shù)據(jù)的完整性的)

7、kafka的數(shù)據(jù)一開始就是存儲在PageCache上的,定期flush到磁盤上的,也就是說,不是每個消息都被存儲在磁盤了,如果出現(xiàn)斷電或者機器故障等,PageCache上的數(shù)據(jù)就丟失了??梢酝ㄟ^log.flush.interval.messages和log.flush.interval.ms來配置flush間隔,interval大丟的數(shù)據(jù)多些,小會影響性能但在0.8版本,可以通過replica機制保證數(shù)據(jù)不丟,代價就是需要更多資源,尤其是磁盤資源,kafka當前支持GZip和Snappy壓縮,來緩解這個問題 是否使用replica取決于在可靠性和資源代價之間的balance。

同時kafka也提供了相關(guān)的配置參數(shù),來讓你在性能與可靠性之間權(quán)衡(一般默認):

當達到下面的消息數(shù)量時,會將數(shù)據(jù)flush到日志文件中。默認10000

log.flush.interval.messages=10000
 

當達到下面的時間(ms)時,執(zhí)行一次強制的flush操作。interval.ms和interval.messages無論哪個達到,都會flush。默認3000ms

log.flush.interval.ms=1000
 

檢查是否需要將日志flush的時間間隔

log.flush.scheduler.interval.ms = 3000  
  
Kafka的優(yōu)化建議

producer端

  • 設(shè)計上保證數(shù)據(jù)的可靠安全性,依據(jù)分區(qū)數(shù)做好數(shù)據(jù)備份,設(shè)立副本數(shù)等。push數(shù)據(jù)的方式:同步異步推送數(shù)據(jù):權(quán)衡安全性和速度性的要求,選擇相應(yīng)的同步推送還是異步推送方式,當發(fā)現(xiàn)數(shù)據(jù)有問題時,可以改為同步來查找問題。

  • flush是kafka的內(nèi)部機制,kafka優(yōu)先在內(nèi)存中完成數(shù)據(jù)的交換,然后將數(shù)據(jù)持久化到磁盤.kafka首先會把數(shù)據(jù)緩存(緩存到內(nèi)存中)起來再批量flush??梢酝ㄟ^log.flush.interval.messages和log.flush.interval.ms來配置flush間隔

  • 可以通過replica機制保證數(shù)據(jù)不丟。代價就是需要更多資源,尤其是磁盤資源,kafka當前支持GZip和Snappy壓縮,來緩解這個問題。是否使用replica(副本)取決于在可靠性和資源代價之間的balance(平衡)

  • broker到 Consumer kafka的consumer提供兩種接口。

high-level版本已經(jīng)封裝了對partition和offset的管理,默認是會定期自動commit offset,這樣可能會丟數(shù)據(jù)的
low-level版本自己管理spout線程和partition之間的對應(yīng)關(guān)系和每個partition上的已消費的offset(定期寫到zk)
并且只有當這個offset被ack后,即成功處理后,才會被更新到zk,所以基本是可以保證數(shù)據(jù)不丟的即使spout線程crash(崩潰),重啟后還是可以從zk中讀到對應(yīng)的offset
 
  • 異步要考慮到partition leader在未完成副本數(shù)follows的備份時就宕機的情況,即使選舉出了新的leader但是已經(jīng)push的數(shù)據(jù)因為未備份就丟失了!

不能讓內(nèi)存的緩沖池太滿,如果滿了內(nèi)存溢出,也就是說數(shù)據(jù)寫入過快,kafka的緩沖池數(shù)據(jù)落盤速度太慢,這時肯定會造成數(shù)據(jù)丟失。
盡量保證生產(chǎn)者端數(shù)據(jù)一直處于線程阻塞狀態(tài),這樣一邊寫內(nèi)存一邊落盤。
異步寫入的話還可以設(shè)置類似flume回滾類型的batch數(shù),即按照累計的消息數(shù)量,累計的時間間隔,累計的數(shù)據(jù)大小設(shè)置batch大小。
 
  • 設(shè)置合適的方式,增大batch 大小來減小網(wǎng)絡(luò)IO和磁盤IO的請求,這是對于kafka效率的思考。

不過異步寫入丟失數(shù)據(jù)的情況還是難以控制
還是得穩(wěn)定整體集群架構(gòu)的運行,特別是zookeeper,當然正對異步數(shù)據(jù)丟失的情況盡量保證broker端的穩(wěn)定運作吧
 

kafka不像hadoop更致力于處理大量級數(shù)據(jù),kafka的消息隊列更擅長于處理小數(shù)據(jù)。針對具體業(yè)務(wù)而言,若是源源不斷的push大量的數(shù)據(jù)(eg:網(wǎng)絡(luò)爬蟲),可以考慮消息壓縮。但是這也一定程度上對CPU造成了壓力,還是得結(jié)合業(yè)務(wù)數(shù)據(jù)進行測試選擇 

broker端

topic設(shè)置多分區(qū),分區(qū)自適應(yīng)所在機器,為了讓各分區(qū)均勻分布在所在的broker中,分區(qū)數(shù)要大于broker數(shù)。分區(qū)是kafka進行并行讀寫的單位,是提升kafka速度的關(guān)鍵。

  1. broker能接收消息的最大字節(jié)數(shù)的設(shè)置一定要比消費端能消費的最大字節(jié)數(shù)要小,否則broker就會因為消費端無法使用這個消息而掛起。

  2. broker可賦值的消息的最大字節(jié)數(shù)設(shè)置一定要比能接受的最大字節(jié)數(shù)大,否則broker就會因為數(shù)據(jù)量的問題無法復(fù)制副本,導(dǎo)致數(shù)據(jù)丟失。 

comsumer端

關(guān)閉自動更新offset,等到數(shù)據(jù)被處理后再手動跟新offset。

在消費前做驗證前拿取的數(shù)據(jù)是否是接著上回消費的數(shù)據(jù),不正確則return先行處理排錯。

一般來說zookeeper只要穩(wěn)定的情況下記錄的offset是沒有問題,除非是多個consumer group 同時消費一個分區(qū)的數(shù)據(jù),其中一個先提交了,另一個就丟失了。 

問題

kafka的數(shù)據(jù)一開始就是存儲在PageCache上的,定期flush到磁盤上的,也就是說,不是每個消息都被存儲在磁盤了,如果出現(xiàn)斷電或者機器故障等,PageCache上的數(shù)據(jù)就丟失了。這個是總結(jié)出的到目前為止沒有發(fā)生丟失數(shù)據(jù)的情況

//producer用于壓縮數(shù)據(jù)的壓縮類型。默認是無壓縮。正確的選項值是none、gzip、snappy。壓縮最好用于批量處理,批量處理消息越多,壓縮性能越好
    props.put("compression.type", "gzip");
    //增加延遲
    props.put("linger.ms", "50");
    //這意味著leader需要等待所有備份都成功寫入日志,這種策略會保證只要有一個備份存活就不會丟失數(shù)據(jù)。這是最強的保證。,
    props.put("acks", "all");
    //無限重試,直到你意識到出現(xiàn)了問題,設(shè)置大于0的值將使客戶端重新發(fā)送任何數(shù)據(jù),一旦這些數(shù)據(jù)發(fā)送失敗。注意,這些重試與客戶端接收到發(fā)送錯誤時的重試沒有什么不同。允許重試將潛在的改變數(shù)據(jù)的順序,如果這兩個消息記錄都是發(fā)送到同一個partition,則第一個消息失敗第二個發(fā)送成功,則第二條消息會比第一條消息出現(xiàn)要早。
    props.put("retries ", MAX_VALUE);
    props.put("reconnect.backoff.ms ", 20000);
    props.put("retry.backoff.ms", 20000);

    //關(guān)閉unclean leader選舉,即不允許非ISR中的副本被選舉為leader,以避免數(shù)據(jù)丟失
    props.put("unclean.leader.election.enable", false);
    //關(guān)閉自動提交offset
    props.put("enable.auto.commit", false);
    限制客戶端在單個連接上能夠發(fā)送的未響應(yīng)請求的個數(shù)。設(shè)置此值是1表示kafka broker在響應(yīng)請求之前client不能再向同一個broker發(fā)送請求。注意:設(shè)置此參數(shù)是為了避免消息亂序
    props.put("max.in.flight.requests.per.connection", 1);
   
Kafka重復(fù)消費原因

強行kill線程,導(dǎo)致消費后的數(shù)據(jù),offset沒有提交,partition就斷開連接。比如,通常會遇到消費的數(shù)據(jù),處理很耗時,導(dǎo)致超過了Kafka的session timeout時間(0.10.x版本默認是30秒),那么就會re-blance重平衡,此時有一定幾率offset沒提交,會導(dǎo)致重平衡后重復(fù)消費。

如果在close之前調(diào)用了consumer.unsubscribe()則有可能部分offset沒提交,下次重啟會重復(fù)消費。

kafka數(shù)據(jù)重復(fù) kafka設(shè)計的時候是設(shè)計了(at-least once)至少一次的邏輯,這樣就決定了數(shù)據(jù)可能是重復(fù)的,kafka采用基于時間的SLA(服務(wù)水平保證),消息保存一定時間(通常為7天)后會被刪除。

kafka的數(shù)據(jù)重復(fù)一般情況下應(yīng)該在消費者端,這時log.cleanup.policy = delete使用定期刪除機制。

到此,相信大家對“Kafka丟失數(shù)據(jù)問題優(yōu)化分析”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI