溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Kafka單線程Consumer及參數(shù)詳解

發(fā)布時間:2020-06-14 19:08:20 來源:網(wǎng)絡 閱讀:209 作者:實時計算 欄目:大數(shù)據(jù)

請使用0.9以后的版本:

示例代碼
 Properties props = new Properties();
        props.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.put("auto.offset.reset","earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("foo", "bar"));
      try{  
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
         }
        }finally{
          consumer.close();
        }

1、只需要配置kafka的server groupid autocommit 序列化 autooffsetreset(其中 bootstrap.server group.id key.deserializer value.deserializer 必須指定);

2、用這些Properties構建consumer對象(KafkaConsumer還有其他構造,可以把序列化傳進去);

3、subscribe訂閱topic列表(可以用正則訂閱Pattern.compile("kafka.*")

使用正則必須指定一個listener subscribe(Pattern pattern, ConsumerRebalanceListener listener)); 可以重寫這個接口來實現(xiàn) 分區(qū)變更時的邏輯。如果設置了enable.auto.commit = true 就不用理會這個邏輯。

4、然后循環(huán)poll消息(這里的1000是超時設定,如果沒有很多數(shù)據(jù),也就等一秒);

5、處理消息(打印了offset key value 這里寫處理邏輯)。

6、關閉KafkaConsumer(可以傳一個timeout值 等待秒數(shù) 默認是30)。

參數(shù)詳解

bootstrap.server(最好用主機名不用ip kafka內部用的主機名 除非自己配置了ip)

deserializer 反序列化consumer從broker端獲取的是字節(jié)數(shù)組,還原回對象類型。

默認有十幾種:StringDeserializer LongDeserializer DoubleDeserializer。。

也可以自定義:定義serializer格式 創(chuàng)建自定義deserializer類實現(xiàn)Deserializer 接口 重寫邏輯

?

除了四個必傳的 bootstrap.server group.id key.deserializer value.deserializer

還有session.timeout.ms "coordinator檢測失敗的時間"

是檢測consumer掛掉的時間 為了可以及時的rebalance 默認是10秒 可以設置更小的值避免消息延遲。

max.poll.interval.ms "consumer處理邏輯最大時間"

處理邏輯比較復雜的時候 可以設置這個值 避免造成不必要的 rebalance ,因為兩次poll時間超過了這個參數(shù),kafka認為這個consumer已經跟不上了,會踢出組,而且不能提交offset,就會重復消費。默認是5分鐘。

auto.offset.reset "無位移或者位移越界時kafka的應對策略"

所以如果啟動了一個group從頭消費 成功提交位移后 重啟后還是接著消費 這個參數(shù)無效

所以3個值的解釋是:

earliset 當各分區(qū)下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從最早的位移消費

latest 當各分區(qū)下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區(qū)下的數(shù)據(jù) none topic各分區(qū)都存在已提交的offset時,從offset后開始消費;只要有一個分區(qū)不存在已提交的offset,則拋出異常

(注意kafka-0.10.1.X版本之前:?auto.offset.reset 的值為smallest,和,largest.(offest保存在zk中) 、

我們這是說的是新版本:kafka-0.10.1.X版本之后:?auto.offset.reset 的值更改為:earliest,latest,和none (offest保存在kafka的一個特殊的topic名為:__consumer_offsets里面))

enable.auto.commit 是否自動提交位移

true 自動提交 false需要用戶手動提交 有只處理一次需要的 最近設置為false自己控制。

fetch.max.bytes consumer單次獲取最大字節(jié)數(shù)

max.poll.records 單次poll返回的最大消息數(shù)

默認500條 如果消費很輕量 可以適當提高這個值 增加消費速度。

hearbeat.interval.ms consumer其他組員感知rabalance的時間

該值必須小于 session.timeout.ms 如果檢測到 consumer掛掉 也就根本無法感知rabalance了

connections.max.idle.ms 定期關閉連接的時間

默認是9分鐘 可以設置為-1 永不關閉

更多實時計算,Kafka等相關技術博文,歡迎關注實時流式計算

Kafka單線程Consumer及參數(shù)詳解

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經查實,將立刻刪除涉嫌侵權內容。

AI