您好,登錄后才能下訂單哦!
這篇文章主要介紹“kafka consumer怎么使用”,在日常操作中,相信很多人在kafka consumer怎么使用問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”kafka consumer怎么使用”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!
consumer作為kafka當(dāng)中一個(gè)重要元素,它的常用操作并不復(fù)雜,說白了無非就是2點(diǎn),1、把數(shù)據(jù)poll出來,2、把位置標(biāo)記上。我們找到kafka的java api doc,找到了官方提供的幾種consumer操作的例子,逐一進(jìn)行分析,看看都有幾種操作類型。
自動(dòng) Offset 提交
這個(gè)例子顯示了一個(gè)基于offset自動(dòng)提交的consumer api的簡單應(yīng)用。
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); }
enable.auto.commit
意味著offset將會(huì)得到自動(dòng)提交,而這個(gè)自動(dòng)提交的時(shí)間間隔由 auto.commit.interval.ms
來進(jìn)行控制。
客戶端通過 bootstrap.servers
的配置來連接服務(wù)器,這個(gè)配值當(dāng)中可以是一個(gè)或多個(gè)broker,需要注意的是,這個(gè)配置僅僅用來讓客戶端找到我們的server集群,而不需要把集群當(dāng)中的所有服務(wù)器地址都列上。
在這個(gè)例子當(dāng)中,客戶端作為test group的一員,訂閱了foo和bar2個(gè)topic。
( 這一段直接翻譯很蹩腳,我會(huì)試著根據(jù)自己的理解翻譯出來)首先假設(shè),foo和bar這2個(gè)topic,都分別有3個(gè)partitions,同時(shí)我們將上面的代碼在我們的機(jī)器上起3個(gè)進(jìn)程,也就是說,在test group當(dāng)中,目前有了3個(gè)consumer,一般來講,這3個(gè)consumer會(huì)分別獲得 foo和bar 的各一個(gè)partitions,這是前提。3個(gè)consumer會(huì)周期性的執(zhí)行一個(gè)poll的動(dòng)作(這個(gè)動(dòng)作當(dāng)中隱含的有一個(gè)heartbeat的發(fā)送,來告訴cluster我是活的),這樣3個(gè)consumer會(huì)持續(xù)的保有他們對分配給自己的partition的訪問的權(quán)利,如果某一個(gè)consumer失效了,也就是poll不再執(zhí)行了,cluster會(huì)在一段時(shí)間( session.timeout.ms
)之后把partitions分配給其他的consumer。
反序列化的設(shè)置,定義了如何轉(zhuǎn)化bytes,這里我們把key和value都直接轉(zhuǎn)化為string。
手動(dòng)的offset控制
除了周期性的自動(dòng)提交offset之外,用戶也可以在消息被消費(fèi)了之后提交他們的offset。
某些情況下,消息的消費(fèi)是和某些處理邏輯相關(guān)聯(lián)的,我們可以用這樣的方式,手動(dòng)的在處理邏輯結(jié)束之后提交offset。
簡要地說,在這個(gè)例子當(dāng)中,我們希望每次至少消費(fèi)200條消息并將它們插入數(shù)據(jù)庫,之后再提交offset。如果仍然使用前面的自動(dòng)提交方式,就可能出現(xiàn)消息已經(jīng)被消費(fèi),但是插入數(shù)據(jù)庫失敗的情況。這里可以視作一個(gè)簡單的事務(wù)封裝。
但是,有沒有另一種可能性,在插入數(shù)據(jù)庫成功之后,提交offset之前,發(fā)生了錯(cuò)誤,或者說是提交offset本身發(fā)生了錯(cuò)誤,那么就可能出現(xiàn)某些消息被重復(fù)消費(fèi)的情況。
個(gè)人認(rèn)為這段話說的莫名其妙,簡單地說,采用這樣的方式,消息不會(huì)被丟失,但是有可能出現(xiàn)重復(fù)消費(fèi)。
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); final int minBatchSize = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); consumer.commitSync(); buffer.clear(); } }
上面的例子當(dāng)中,我們用commitSync來標(biāo)記所有的消息;在有些情況下,我們可能希望更加精確的控制offset,那么在下面的例子當(dāng)中,我們可以在每一個(gè)partition當(dāng)中分別控制offset的提交。
try { while(running) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset() + ": " + record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } finally { consumer.close(); }
注意:提交的offset應(yīng)該是next message,所以,提交的時(shí)候需要在當(dāng)前最后一條的基礎(chǔ)上+1.
手動(dòng)的分區(qū)分配
前面的例子當(dāng)中,我們訂閱一個(gè)topic,然后讓kafka把該topic當(dāng)中的不同partitions,公平的在一個(gè)consumer group內(nèi)部進(jìn)行分配。那么,在某些情況下,我們希望能夠具體的指定partitions的分配關(guān)系。
如果某個(gè)進(jìn)程在本地管理了和partition相關(guān)的狀態(tài),那么它只需要獲得跟他相關(guān)partition。
如果某個(gè)進(jìn)程自身具備高可用性,那么就不需要kafka來檢測錯(cuò)誤并重新分配partition,因?yàn)橄M(fèi)者進(jìn)程會(huì)在另一臺(tái)設(shè)備上重新啟動(dòng)。
要使用這種模式,可以用assign方法來代替subscribe,具體指定一個(gè)partitions列表。
String topic = "foo"; TopicPartition partition0 = new TopicPartition(topic, 0); TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0, partition1));
分配之后,就可以像前面的例子一樣,在循環(huán)當(dāng)中調(diào)用poll來消費(fèi)消息。手動(dòng)的分區(qū)分配不需要組協(xié)調(diào),所以消費(fèi)進(jìn)程失效之后,不會(huì)引發(fā)partition的重新分配,每一個(gè)消費(fèi)者都是獨(dú)立工作的,即使它和其他消費(fèi)者屬于同一個(gè)group。為了避免offset提交的沖突,在這種情況下,通常我們需要保證每一個(gè)consumer使用自己的group id。
需要注意的是,手動(dòng)partition分配和通過subscribe實(shí)現(xiàn)的動(dòng)態(tài)的分區(qū)分配,2種方式是不能混合使用的。
到此,關(guān)于“kafka consumer怎么使用”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。