溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

kafka-consumer-offset位移問題怎么解決

發(fā)布時間:2023-03-07 11:24:40 來源:億速云 閱讀:117 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要介紹了kafka-consumer-offset位移問題怎么解決的相關(guān)知識,內(nèi)容詳細(xì)易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇kafka-consumer-offset位移問題怎么解決文章都會有所收獲,下面我們一起來看看吧。

    1 offset的默認(rèn)維護(hù)位置

    kafka-consumer-offset位移問題怎么解決

    _consumer_offsets主題里面采用key和 value的方式存儲數(shù)據(jù)。

    key是 group.id+topic+分區(qū)號,value 就是當(dāng)前offset的值。

    每隔一段時間,kafka 內(nèi)部會對這個topic進(jìn)行compact(壓縮),也就是每個group.id+topic+分區(qū)號就保留最新數(shù)據(jù)。

    Kafka0.9版本之前,consumer黑認(rèn)將offset保存在Zookeeper中。0.9版本開始,consumer默認(rèn)將offset保存在Kafka一個內(nèi)置的topic中,該topic為_consumer_offsets。

    將offset信息存儲在zk中的不足:如果將offset信息存儲在zk中,那么所有的consumer都會訪問zk,會消耗大量的網(wǎng)絡(luò)資源,消費(fèi)速度慢。

    1.1 消費(fèi)offset案例

    思想:_consumer_offsets為Kafka中的 topic,那就可以通過消費(fèi)者進(jìn)行消費(fèi)。

    在配置文件 config/consumer.properties中添加配置exclude.internal.topics = false,默認(rèn)是 true,表示不能消費(fèi)系統(tǒng)主題。為了查看該系統(tǒng)主題數(shù)據(jù),所以該參數(shù)修改為false。修改以后執(zhí)行分發(fā)命令:xsync consumer.properties。

    采用命令行方式,創(chuàng)建一個新的topic。

    [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic atguigu --partitions 2 --replication-factor 2

    啟動生產(chǎn)者往atguigu生產(chǎn)數(shù)據(jù)。

    [atguigu@hadoop102 kafka] $ bin/kafka-console-producer.sh --topic atguigu --bootstrap-server hadoop102:9092

    啟動消費(fèi)者消費(fèi)atguigu數(shù)據(jù)。

    [atguigu@hadoop104 kafka]$ bin/kafka-console-consumer.sh bootstrap-server hadoop102:9092--topic atguigu --group test

    注意:指定消費(fèi)者組名稱,更好觀察數(shù)據(jù)存儲位置(key是 group.id+topic+分區(qū)號)。查看消費(fèi)者消費(fèi)主題_consumer_offsets。

    [atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --topic _consumer_offsets --bootstrap-server hadoop102:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

    kafka-consumer-offset位移問題怎么解決

    2 自動提交offset

    為了使我們能夠?qū)W⒂谧约旱臉I(yè)務(wù)邏輯,Kafka提供了自動提交offset的功能。自動提交offset的相關(guān)參數(shù):

    • enable.auto.commit:是否開啟自動提交offset功能,默認(rèn)是true

    • auto.commit.interval.ms:自動提交offset的時間間隔,默認(rèn)是5s

    kafka-consumer-offset位移問題怎么解決

    消費(fèi)者配置代碼:

    //配置是否是自動提交
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
    //提交時間間隔,單位是ms
    properties.put(ConsumerConfig.AUTO_COMNIT_INTERVAL_NS_CONFI6,1000);

    3 手動提交offset

    3.1 原理

    雖然自動提交offset十分簡單便利,但由于其是基于時間提交的,開發(fā)人員難以把握offset提交的時機(jī)。因此Kafka還提供了手動提交offset的API。

    手動提交offset的方法有兩種:分別是commitSync(同步提交)commitAsync(異步提交)

    兩者的相同點是,都會將本次提交的一批數(shù)據(jù)最高的偏移量提交;不同點是,同步提交阻塞當(dāng)前線程,一直到提交成功,并且會自動失敗重試(由不可控因素導(dǎo)致,也會出現(xiàn)提交失敗);而異步提交則沒有失敗重試機(jī)制,故有可能提交失敗。

    • commitSync(同步提交):必須等待offset提交完畢,再去消費(fèi)下一批數(shù)據(jù)。

    • commitAsync(異步提交):發(fā)送完提交offset請求后,就開始消費(fèi)下一批數(shù)據(jù)了

    kafka-consumer-offset位移問題怎么解決

    3.2 代碼示例

    3.2.1 同步提交

    //手動提交屬性配置
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG ,false);
    //消費(fèi)代碼邏輯
    XXX
    XXX
    XXX
    //手動提交代碼(處理完數(shù)據(jù)以后,這里為了方便,只展示關(guān)鍵代碼)
    //手動提交offset
    kafkaConsumer.commitsync();

    3.2.2 異步提交(生產(chǎn)常用)

    //手動提交屬性配置
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG ,false);
    //消費(fèi)代碼邏輯
    XXX
    XXX
    XXX
    //手動提交代碼(處理完數(shù)據(jù)以后,這里為了方便,只展示關(guān)鍵代碼)
    //手動提交offset
    kafkaConsumer.commitAsync();

    4 指定offset消費(fèi)

    auto.offset.reset = earliest | latest | none 默認(rèn)是latest

    當(dāng)Kafka 中沒有初始偏移量(消費(fèi)者組第一次消費(fèi))或服務(wù)器上不再存在當(dāng)前偏移量時(例如該數(shù)據(jù)已被刪除),該怎么辦?

    • earliest:自動將偏移量重置為最早的偏移量,--from-beginning。

    • latest(默認(rèn)值):自動將偏移量重置為最新偏移量。

    • none:如果未找到消費(fèi)者組的先前偏移量,則向消費(fèi)者拋出異常。

    kafka-consumer-offset位移問題怎么解決

    任意指定offset位移開始消費(fèi)。

    //1創(chuàng)建消費(fèi)者
    KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);
    // 2訂閱主題
    ArrayList<String> topics = new ArrayList<>(;topics.add( "first");
    kafkaConsumer.subscribe(topics);
     
    //指定位置進(jìn)行消費(fèi)
    set<TopicPartition> assignment = kafkaConsumer.assignment();//獲取所有分區(qū)信息
    //保證分區(qū)分配方案已經(jīng)制定完畢,因為由于leader消費(fèi)者制定分配方案會消耗一定時間,有可能此時獲取不到分區(qū)信息,所以加一層分區(qū)空間判斷
    while (assignment.size() == 0){
        //促使獲取的分區(qū)數(shù)量不為0
        kafkaConsumer.poll(Duration.ofSeconds(1));
        assignment = kafkaConsumer.assignment();
    }
     
    //遍歷所有分區(qū),指定消費(fèi)的offset
    for (TopicPartition topicPartition : assignment) {
        kafkaConsumer.seek(topicPartition, 100);
    }
     
    // 3消費(fèi)數(shù)據(jù)
    while (true){

    5 指定時間消費(fèi)

    需求:在生產(chǎn)環(huán)境中,會遇到最近消費(fèi)的幾個小時數(shù)據(jù)異常,想重新按照時間消費(fèi)。

    例如要求按照時間消費(fèi)前一天的數(shù)據(jù),怎么處理?

    //1創(chuàng)建消費(fèi)者
    KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);
    // 2訂閱主題
    ArrayList<String> topics = new ArrayList<>(;topics.add( "first");
    kafkaConsumer.subscribe(topics);
     
    //指定位置進(jìn)行消費(fèi)
    set<TopicPartition> assignment = kafkaConsumer.assignment();//獲取所有分區(qū)信息
    //保證分區(qū)分配方案已經(jīng)制定完畢,因為由于leader消費(fèi)者制定分配方案會消耗一定時間,有可能此時獲取不到分區(qū)信息,所以加一層分區(qū)空間判斷
    while (assignment.size() == 0){
        //促使獲取的分區(qū)數(shù)量不為0
        kafkaConsumer.poll(Duration.ofSeconds(1));
        assignment = kafkaConsumer.assignment();
    }
    //希望把時間轉(zhuǎn)換為對應(yīng)的offset
    HashMap<TopicPartition,Long> topicPartitionLongHashMap = new HashMap<>();
    //封裝對應(yīng)集合
    for (TopicPartition topicPartition : assignment) {
        //希望獲取當(dāng)前系統(tǒng)時間一天前的數(shù)據(jù)。
        topicPartitionLongHashMap.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
    }
    Nap<TopicPartition,OffsetAnd imestamp> topioPartitionffsetAndrtimestampMep = karfiaConsumer.offsetsForTines(topicPartitionL ongHashiap);
     
     
    //遍歷所有分區(qū),指定消費(fèi)的offset
    //指定消費(fèi)的offset
    for (TopicPartition topicPartition : assignment) {
        OffsetAndTimestamp offsetAndTimestamp = topicPartition0ffsetAndTimestampHap.get(topicPartition);
        kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset());
    }
     
    // 3消費(fèi)數(shù)據(jù)
    while (true){

    6 漏消費(fèi)和重復(fù)消費(fèi)分析

    6.1 重復(fù)消費(fèi)

    場景1:重復(fù)消費(fèi)。自動提交offset引起。

    kafka-consumer-offset位移問題怎么解決

    6.2 漏消費(fèi)

    場景1:漏消費(fèi)。設(shè)置offset為手動提交,當(dāng)offset被提交時,數(shù)據(jù)還在內(nèi)存中未落盤,此時剛好消費(fèi)者線程被kill掉,那么offset已經(jīng)提交,但是數(shù)據(jù)未處理,導(dǎo)致這部分內(nèi)存中的數(shù)據(jù)丟失。

    kafka-consumer-offset位移問題怎么解決

    6.3 消費(fèi)者事務(wù)

    如果想完成Consumer端的精準(zhǔn)一次性消費(fèi),那么需要Kafka消費(fèi)端將消費(fèi)過程和提交offset過程做原子綁定。

    此時我們需要將Kafka的offset保存到支持事務(wù)的自定義介質(zhì)(比如MySQL)。這部分知識會在后續(xù)項目部分涉及。

    kafka-consumer-offset位移問題怎么解決

    7 數(shù)據(jù)積壓

    方案1:如果是Kafka消費(fèi)能力不足,則可以考慮增加Topic的分區(qū)數(shù),并且同時提升消費(fèi)組的消費(fèi)者數(shù)量,消費(fèi)者數(shù)=分區(qū)數(shù)。(兩者缺一不可)

    kafka-consumer-offset位移問題怎么解決

    方案2:如果是下游的數(shù)據(jù)處理不及時:提高每批次拉取的數(shù)量。批次拉取數(shù)據(jù)過少(拉取數(shù)據(jù)/處理時間<生產(chǎn)速度),使處理的數(shù)據(jù)小于生產(chǎn)的數(shù)據(jù),也會造成數(shù)據(jù)積壓。

    kafka-consumer-offset位移問題怎么解決

    關(guān)于“kafka-consumer-offset位移問題怎么解決”這篇文章的內(nèi)容就介紹到這里,感謝各位的閱讀!相信大家對“kafka-consumer-offset位移問題怎么解決”知識都有一定的了解,大家如果還想學(xué)習(xí)更多知識,歡迎關(guān)注億速云行業(yè)資訊頻道。

    向AI問一下細(xì)節(jié)

    免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

    AI