您好,登錄后才能下訂單哦!
這篇“Spring Boot中怎么使用@KafkaListener并發(fā)批量接收消息”文章的知識點(diǎn)大部分人都不太理解,所以小編給大家總結(jié)了以下內(nèi)容,內(nèi)容詳細(xì),步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“Spring Boot中怎么使用@KafkaListener并發(fā)批量接收消息”文章吧。
###第一步,并發(fā)消費(fèi)###
先看代碼,重點(diǎn)是這我們使用的是ConcurrentKafkaListenerContainerFactory并且設(shè)置了factory.setConcurrency(4); (我的topic有4個分區(qū),為了加快消費(fèi)將并發(fā)設(shè)置為4,也就是有4個KafkaMessageListenerContainer)
@Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4); factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(3000); return factory; }
注意也可以直接在application.properties中添加spring.kafka.listener.concurrency=3,然后使用@KafkaListener并發(fā)消費(fèi)。
###第二步,批量消費(fèi)###
然后是批量消費(fèi)。重點(diǎn)是factory.setBatchListener(true);
以及 propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
一個設(shè)啟用批量消費(fèi),一個設(shè)置批量消費(fèi)每次最多消費(fèi)多少條消息記錄。
重點(diǎn)說明一下,我們設(shè)置的ConsumerConfig.MAX_POLL_RECORDS_CONFIG是50,并不是說如果沒有達(dá)到50條消息,我們就一直等待。官方的解釋是"The maximum number of records returned in a single call to poll().", 也就是50表示的是一次poll最多返回的記錄數(shù)。
從啟動日志中可以看到還有個 max.poll.interval.ms = 300000, 也就說每間隔max.poll.interval.ms我們就調(diào)用一次poll。每次poll最多返回50條記錄。
max.poll.interval.ms官方解釋是"The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. ";
@Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4); factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker()); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit()); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId()); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset()); propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); return propsMap; }
啟動日志截圖
關(guān)于max.poll.records和max.poll.interval.ms官方解釋截圖:
###第三步,分區(qū)消費(fèi)###
對于只有一個分區(qū)的topic,不需要分區(qū)消費(fèi),因為沒有意義。下面的例子是針對有2個分區(qū)的情況(我的完整代碼中有4個listenPartitionX方法,我的topic設(shè)置了4個分區(qū)),讀者可以根據(jù)自己的情況進(jìn)行調(diào)整。
public class MyListener { private static final String TPOIC = "topic02"; @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0" }) }) public void listenPartition0(List<ConsumerRecord<?, ?>> records) { log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId()); log.info("Id0 records size " + records.size()); for (ConsumerRecord<?, ?> record : records) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); log.info("Received: " + record); if (kafkaMessage.isPresent()) { Object message = record.value(); String topic = record.topic(); log.info("p0 Received message={}", message); } } } @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "1" }) }) public void listenPartition1(List<ConsumerRecord<?, ?>> records) { log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId()); log.info("Id1 records size " + records.size()); for (ConsumerRecord<?, ?> record : records) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); log.info("Received: " + record); if (kafkaMessage.isPresent()) { Object message = record.value(); String topic = record.topic(); log.info("p1 Received message={}", message); } } }
以上就是關(guān)于“Spring Boot中怎么使用@KafkaListener并發(fā)批量接收消息”這篇文章的內(nèi)容,相信大家都有了一定的了解,希望小編分享的內(nèi)容對大家有幫助,若想了解更多相關(guān)的知識內(nèi)容,請關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。