溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

kafka consumer怎么使用

發(fā)布時(shí)間:2021-12-16 16:48:14 來源:億速云 閱讀:245 作者:iii 欄目:云計(jì)算

這篇文章主要介紹“kafka consumer怎么使用”,在日常操作中,相信很多人在kafka consumer怎么使用問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”kafka consumer怎么使用”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

consumer作為kafka當(dāng)中一個(gè)重要元素,它的常用操作并不復(fù)雜,說白了無非就是2點(diǎn),1、把數(shù)據(jù)poll出來,2、把位置標(biāo)記上。我們找到kafka的java api doc,找到了官方提供的幾種consumer操作的例子,逐一進(jìn)行分析,看看都有幾種操作類型。

Automatic Offset Committing

自動(dòng) Offset 提交

這個(gè)例子顯示了一個(gè)基于offset自動(dòng)提交的consumer api的簡單應(yīng)用。

Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("session.timeout.ms", "30000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
     }

enable.auto.commit 意味著offset將會(huì)得到自動(dòng)提交,而這個(gè)自動(dòng)提交的時(shí)間間隔由 auto.commit.interval.ms 來進(jìn)行控制。

客戶端通過 bootstrap.servers 的配置來連接服務(wù)器,這個(gè)配值當(dāng)中可以是一個(gè)或多個(gè)broker,需要注意的是,這個(gè)配置僅僅用來讓客戶端找到我們的server集群,而不需要把集群當(dāng)中的所有服務(wù)器地址都列上。

在這個(gè)例子當(dāng)中,客戶端作為test group的一員,訂閱了foo和bar2個(gè)topic。

( 這一段直接翻譯很蹩腳,我會(huì)試著根據(jù)自己的理解翻譯出來)首先假設(shè),foo和bar這2個(gè)topic,都分別有3個(gè)partitions,同時(shí)我們將上面的代碼在我們的機(jī)器上起3個(gè)進(jìn)程,也就是說,在test group當(dāng)中,目前有了3個(gè)consumer,一般來講,這3個(gè)consumer會(huì)分別獲得 foo和bar 的各一個(gè)partitions,這是前提。3個(gè)consumer會(huì)周期性的執(zhí)行一個(gè)poll的動(dòng)作(這個(gè)動(dòng)作當(dāng)中隱含的有一個(gè)heartbeat的發(fā)送,來告訴cluster我是活的),這樣3個(gè)consumer會(huì)持續(xù)的保有他們對分配給自己的partition的訪問的權(quán)利,如果某一個(gè)consumer失效了,也就是poll不再執(zhí)行了,cluster會(huì)在一段時(shí)間( session.timeout.ms )之后把partitions分配給其他的consumer。

反序列化的設(shè)置,定義了如何轉(zhuǎn)化bytes,這里我們把key和value都直接轉(zhuǎn)化為string。

Manual Offset Control

手動(dòng)的offset控制

除了周期性的自動(dòng)提交offset之外,用戶也可以在消息被消費(fèi)了之后提交他們的offset。

某些情況下,消息的消費(fèi)是和某些處理邏輯相關(guān)聯(lián)的,我們可以用這樣的方式,手動(dòng)的在處理邏輯結(jié)束之后提交offset。

簡要地說,在這個(gè)例子當(dāng)中,我們希望每次至少消費(fèi)200條消息并將它們插入數(shù)據(jù)庫,之后再提交offset。如果仍然使用前面的自動(dòng)提交方式,就可能出現(xiàn)消息已經(jīng)被消費(fèi),但是插入數(shù)據(jù)庫失敗的情況。這里可以視作一個(gè)簡單的事務(wù)封裝。

但是,有沒有另一種可能性,在插入數(shù)據(jù)庫成功之后,提交offset之前,發(fā)生了錯(cuò)誤,或者說是提交offset本身發(fā)生了錯(cuò)誤,那么就可能出現(xiàn)某些消息被重復(fù)消費(fèi)的情況。

個(gè)人認(rèn)為這段話說的莫名其妙,簡單地說,采用這樣的方式,消息不會(huì)被丟失,但是有可能出現(xiàn)重復(fù)消費(fèi)。

Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "false");
     props.put("auto.commit.interval.ms", "1000");
     props.put("session.timeout.ms", "30000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     final int minBatchSize = 200;
     List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records) {
             buffer.add(record);
         }
         if (buffer.size() >= minBatchSize) {
             insertIntoDb(buffer);
             consumer.commitSync();
             buffer.clear();
         }
     }

上面的例子當(dāng)中,我們用commitSync來標(biāo)記所有的消息;在有些情況下,我們可能希望更加精確的控制offset,那么在下面的例子當(dāng)中,我們可以在每一個(gè)partition當(dāng)中分別控制offset的提交。

try {
         while(running) {
             ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
             for (TopicPartition partition : records.partitions()) {
                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                 for (ConsumerRecord<String, String> record : partitionRecords) {
                     System.out.println(record.offset() + ": " + record.value());
                 }
                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
             }
         }
     } finally {
       consumer.close();
     }

注意:提交的offset應(yīng)該是next message,所以,提交的時(shí)候需要在當(dāng)前最后一條的基礎(chǔ)上+1.

Manual Partition Assignment

手動(dòng)的分區(qū)分配

前面的例子當(dāng)中,我們訂閱一個(gè)topic,然后讓kafka把該topic當(dāng)中的不同partitions,公平的在一個(gè)consumer group內(nèi)部進(jìn)行分配。那么,在某些情況下,我們希望能夠具體的指定partitions的分配關(guān)系。

  • 如果某個(gè)進(jìn)程在本地管理了和partition相關(guān)的狀態(tài),那么它只需要獲得跟他相關(guān)partition。


  • 如果某個(gè)進(jìn)程自身具備高可用性,那么就不需要kafka來檢測錯(cuò)誤并重新分配partition,因?yàn)橄M(fèi)者進(jìn)程會(huì)在另一臺(tái)設(shè)備上重新啟動(dòng)。

要使用這種模式,可以用assign方法來代替subscribe,具體指定一個(gè)partitions列表。

String topic = "foo";
     TopicPartition partition0 = new TopicPartition(topic, 0);
     TopicPartition partition1 = new TopicPartition(topic, 1);
     consumer.assign(Arrays.asList(partition0, partition1));

分配之后,就可以像前面的例子一樣,在循環(huán)當(dāng)中調(diào)用poll來消費(fèi)消息。手動(dòng)的分區(qū)分配不需要組協(xié)調(diào),所以消費(fèi)進(jìn)程失效之后,不會(huì)引發(fā)partition的重新分配,每一個(gè)消費(fèi)者都是獨(dú)立工作的,即使它和其他消費(fèi)者屬于同一個(gè)group。為了避免offset提交的沖突,在這種情況下,通常我們需要保證每一個(gè)consumer使用自己的group id。

需要注意的是,手動(dòng)partition分配和通過subscribe實(shí)現(xiàn)的動(dòng)態(tài)的分區(qū)分配,2種方式是不能混合使用的。

到此,關(guān)于“kafka consumer怎么使用”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI