您好,登錄后才能下訂單哦!
這篇文章給大家分享的是有關(guān)如何使用spring boot整合kafka和延遲啟動(dòng)消費(fèi)者的內(nèi)容。小編覺(jué)得挺實(shí)用的,因此分享給大家做個(gè)參考,一起跟隨小編過(guò)來(lái)看看吧。
spring boot整合kafka的時(shí)候一般使用@KafkaListener來(lái)設(shè)置消費(fèi)者,但是這種方式在spring啟動(dòng)的時(shí)候就會(huì)立即開(kāi)啟消費(fèi)者。如果有需要根據(jù)配置信息延遲開(kāi)啟指定的消費(fèi)者就不能使用這種方式。
import org.springframework.kafka.annotation.TopicPartition; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; @Target({ ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) public @interface DelayKafkaConsumer { String id() default ""; String[] topics() default {}; String errorHandler() default ""; String groupId() default ""; TopicPartition[] topicPartitions() default {}; String beanRef() default "__listener"; }
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.aop.framework.Advised; import org.springframework.aop.support.AopUtils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.ListableBeanFactory; import org.springframework.beans.factory.ObjectFactory; import org.springframework.beans.factory.config.*; import org.springframework.context.expression.StandardBeanExpressionResolver; import org.springframework.core.MethodIntrospector; import org.springframework.core.annotation.AnnotationUtils; import org.springframework.core.convert.converter.Converter; import org.springframework.core.convert.converter.GenericConverter; import org.springframework.format.Formatter; import org.springframework.format.FormatterRegistry; import org.springframework.format.support.DefaultFormattingConversionService; import org.springframework.kafka.annotation.KafkaListenerConfigurer; import org.springframework.kafka.annotation.PartitionOffset; import org.springframework.kafka.annotation.TopicPartition; import org.springframework.kafka.config.KafkaListenerEndpoint; import org.springframework.kafka.config.KafkaListenerEndpointRegistrar; import org.springframework.kafka.config.MethodKafkaListenerEndpoint; import org.springframework.kafka.listener.KafkaListenerErrorHandler; import org.springframework.kafka.support.KafkaNull; import org.springframework.kafka.support.TopicPartitionInitialOffset; import org.springframework.messaging.converter.GenericMessageConverter; import org.springframework.messaging.handler.annotation.support.*; import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver; import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; import org.springframework.stereotype.Service; import org.springframework.util.Assert; import org.springframework.util.ReflectionUtils; import org.springframework.util.StringUtils; import java.lang.reflect.Method; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; @Service public class MyKafkaConsumerFactory implements KafkaListenerConfigurer,BeanFactoryAware { private static final Logger logger = LoggerFactory.getLogger(MyKafkaConsumerFactory.class); private KafkaListenerEndpointRegistrar kafkaListenerEndpointRegistrar; private final AtomicInteger counter = new AtomicInteger(); private BeanFactory beanFactory; private BeanExpressionResolver resolver = new StandardBeanExpressionResolver(); private BeanExpressionContext expressionContext; private final ListenerScope listenerScope = new ListenerScope(); private final KafkaHandlerMethodFactoryAdapter messageHandlerMethodFactory = new KafkaHandlerMethodFactoryAdapter(); @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { this.kafkaListenerEndpointRegistrar = registrar; addFormatters(messageHandlerMethodFactory.defaultFormattingConversionService); } public void startConsumer(KafkaListenerEndpoint endpoint){ kafkaListenerEndpointRegistrar.registerEndpoint(endpoint); } public void startConsumer(Object target){ logger.info("start consumer {} ...",target.getClass()); Class<?> targetClass = AopUtils.getTargetClass(target); Map<Method, Set<DelayKafkaConsumer>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, new MethodIntrospector.MetadataLookup<Set<DelayKafkaConsumer>>() { @Override public Set<DelayKafkaConsumer> inspect(Method method) { Set<DelayKafkaConsumer> listenerMethods = findListenerAnnotations(method); return (!listenerMethods.isEmpty() ? listenerMethods : null); } }); if (annotatedMethods.size()==0) throw new IllegalArgumentException(target.getClass()+" need have method with @DelayKafkaConsumer"); for (Map.Entry<Method, Set<DelayKafkaConsumer>> entry : annotatedMethods.entrySet()) { Method method = entry.getKey(); logger.info("find message listen handler method : {} , object : {}",method.getName(),target.getClass()); for (DelayKafkaConsumer listener : entry.getValue()) { if(listener.topics().length==0) { logger.info("topics value is empty , will skip it , method : {} , target object : {}",method.getName(),target.getClass()); continue; } processKafkaListener(listener,method,target); logger.info("register method {} success , target object : {}",method.getName(),target.getClass()); } } logger.info("{} consumer start complete .",target.getClass()); } protected void processKafkaListener(DelayKafkaConsumer kafkaListener, Method method, Object bean) { Method methodToUse = checkProxy(method, bean); MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint(); endpoint.setMethod(methodToUse); endpoint.setBeanFactory(this.beanFactory); String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler"); if (StringUtils.hasText(errorHandlerBeanName)) { endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class)); } processListener(endpoint, kafkaListener, bean, methodToUse); } protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, DelayKafkaConsumer kafkaListener, Object bean, Object adminTarget) { String beanRef = kafkaListener.beanRef(); if (StringUtils.hasText(beanRef)) { this.listenerScope.addListener(beanRef, bean); } endpoint.setBean(bean); endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory); endpoint.setId(getEndpointId(kafkaListener)); endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId())); endpoint.setTopics(resolveTopics(kafkaListener)); endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener)); kafkaListenerEndpointRegistrar.registerEndpoint(endpoint); if (StringUtils.hasText(beanRef)) { this.listenerScope.removeListener(beanRef); } } private String getEndpointId(DelayKafkaConsumer kafkaListener) { if (StringUtils.hasText(kafkaListener.id())) { return resolve(kafkaListener.id()); } else { return "Custom-Consumer" + this.counter.getAndIncrement(); } } private String getEndpointGroupId(DelayKafkaConsumer kafkaListener, String id) { String groupId = null; if (StringUtils.hasText(kafkaListener.groupId())) { groupId = resolveExpressionAsString(kafkaListener.groupId(), "groupId"); } if (groupId == null && StringUtils.hasText(kafkaListener.id())) { groupId = id; } return groupId; } private String[] resolveTopics(DelayKafkaConsumer kafkaListener) { String[] topics = kafkaListener.topics(); List<String> result = new ArrayList<>(); if (topics.length > 0) { for (int i = 0; i < topics.length; i++) { Object topic = resolveExpression(topics[i]); resolveAsString(topic, result); } } return result.toArray(new String[result.size()]); } private void resolveAsString(Object resolvedValue, List<String> result) { if (resolvedValue instanceof String[]) { for (Object object : (String[]) resolvedValue) { resolveAsString(object, result); } } else if (resolvedValue instanceof String) { result.add((String) resolvedValue); } else if (resolvedValue instanceof Iterable) { for (Object object : (Iterable<Object>) resolvedValue) { resolveAsString(object, result); } } else { throw new IllegalArgumentException(String.format( "@DelayKafkaConsumer can't resolve '%s' as a String", resolvedValue)); } } private TopicPartitionInitialOffset[] resolveTopicPartitions(DelayKafkaConsumer kafkaListener) { TopicPartition[] topicPartitions = kafkaListener.topicPartitions(); List<TopicPartitionInitialOffset> result = new ArrayList<>(); if (topicPartitions.length > 0) { for (TopicPartition topicPartition : topicPartitions) { result.addAll(resolveTopicPartitionsList(topicPartition)); } } return result.toArray(new TopicPartitionInitialOffset[result.size()]); } private List<TopicPartitionInitialOffset> resolveTopicPartitionsList(TopicPartition topicPartition) { Object topic = resolveExpression(topicPartition.topic()); Assert.state(topic instanceof String, "topic in @TopicPartition must resolve to a String, not " + topic.getClass()); Assert.state(StringUtils.hasText((String) topic), "topic in @TopicPartition must not be empty"); String[] partitions = topicPartition.partitions(); PartitionOffset[] partitionOffsets = topicPartition.partitionOffsets(); Assert.state(partitions.length > 0 || partitionOffsets.length > 0, "At least one 'partition' or 'partitionOffset' required in @TopicPartition for topic '" + topic + "'"); List<TopicPartitionInitialOffset> result = new ArrayList<>(); for (int i = 0; i < partitions.length; i++) { resolvePartitionAsInteger((String) topic, resolveExpression(partitions[i]), result); } for (PartitionOffset partitionOffset : partitionOffsets) { Object partitionValue = resolveExpression(partitionOffset.partition()); Integer partition; if (partitionValue instanceof String) { Assert.state(StringUtils.hasText((String) partitionValue), "partition in @PartitionOffset for topic '" + topic + "' cannot be empty"); partition = Integer.valueOf((String) partitionValue); } else if (partitionValue instanceof Integer) { partition = (Integer) partitionValue; } else { throw new IllegalArgumentException(String.format( "@PartitionOffset for topic '%s' can't resolve '%s' as an Integer or String, resolved to '%s'", topic, partitionOffset.partition(), partitionValue.getClass())); } Object initialOffsetValue = resolveExpression(partitionOffset.initialOffset()); Long initialOffset; if (initialOffsetValue instanceof String) { Assert.state(StringUtils.hasText((String) initialOffsetValue), "'initialOffset' in @PartitionOffset for topic '" + topic + "' cannot be empty"); initialOffset = Long.valueOf((String) initialOffsetValue); } else if (initialOffsetValue instanceof Long) { initialOffset = (Long) initialOffsetValue; } else { throw new IllegalArgumentException(String.format( "@PartitionOffset for topic '%s' can't resolve '%s' as a Long or String, resolved to '%s'", topic, partitionOffset.initialOffset(), initialOffsetValue.getClass())); } Object relativeToCurrentValue = resolveExpression(partitionOffset.relativeToCurrent()); Boolean relativeToCurrent; if (relativeToCurrentValue instanceof String) { relativeToCurrent = Boolean.valueOf((String) relativeToCurrentValue); } else if (relativeToCurrentValue instanceof Boolean) { relativeToCurrent = (Boolean) relativeToCurrentValue; } else { throw new IllegalArgumentException(String.format( "@PartitionOffset for topic '%s' can't resolve '%s' as a Boolean or String, resolved to '%s'", topic, partitionOffset.relativeToCurrent(), relativeToCurrentValue.getClass())); } TopicPartitionInitialOffset topicPartitionOffset = new TopicPartitionInitialOffset((String) topic, partition, initialOffset, relativeToCurrent); if (!result.contains(topicPartitionOffset)) { result.add(topicPartitionOffset); } else { throw new IllegalArgumentException( String.format("@TopicPartition can't have the same partition configuration twice: [%s]", topicPartitionOffset)); } } return result; } private void resolvePartitionAsInteger(String topic, Object resolvedValue, List<TopicPartitionInitialOffset> result) { if (resolvedValue instanceof String[]) { for (Object object : (String[]) resolvedValue) { resolvePartitionAsInteger(topic, object, result); } } else if (resolvedValue instanceof String) { Assert.state(StringUtils.hasText((String) resolvedValue), "partition in @TopicPartition for topic '" + topic + "' cannot be empty"); result.add(new TopicPartitionInitialOffset(topic, Integer.valueOf((String) resolvedValue))); } else if (resolvedValue instanceof Integer[]) { for (Integer partition : (Integer[]) resolvedValue) { result.add(new TopicPartitionInitialOffset(topic, partition)); } } else if (resolvedValue instanceof Integer) { result.add(new TopicPartitionInitialOffset(topic, (Integer) resolvedValue)); } else if (resolvedValue instanceof Iterable) { for (Object object : (Iterable<Object>) resolvedValue) { resolvePartitionAsInteger(topic, object, result); } } else { throw new IllegalArgumentException(String.format( "@DelayKafkaConsumer for topic '%s' can't resolve '%s' as an Integer or String", topic, resolvedValue)); } } private Set<DelayKafkaConsumer> findListenerAnnotations(Method method) { Set<DelayKafkaConsumer> listeners = new HashSet<>(); DelayKafkaConsumer ann = AnnotationUtils.findAnnotation(method, DelayKafkaConsumer.class); if (ann != null) { listeners.add(ann); } return listeners; } private Method checkProxy(Method methodArg, Object bean) { Method method = methodArg; if (AopUtils.isJdkDynamicProxy(bean)) { try { method = bean.getClass().getMethod(method.getName(), method.getParameterTypes()); Class<?>[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces(); for (Class<?> iface : proxiedInterfaces) { try { method = iface.getMethod(method.getName(), method.getParameterTypes()); break; } catch (NoSuchMethodException noMethod) { } } } catch (SecurityException ex) { ReflectionUtils.handleReflectionException(ex); } catch (NoSuchMethodException ex) { throw new IllegalStateException(String.format( "target method '%s' found on bean target class '%s', " + "but not found in any interface(s) for bean JDK proxy. Either " + "pull the method up to an interface or switch to subclass (CGLIB) " + "proxies by setting proxy-target-class/proxyTargetClass " + "attribute to 'true'", method.getName(), method.getDeclaringClass().getSimpleName()), ex); } } return method; } @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.beanFactory = beanFactory; if (beanFactory instanceof ConfigurableListableBeanFactory) { this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver(); this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory, this.listenerScope); } } private String resolveExpressionAsString(String value, String attribute) { Object resolved = resolveExpression(value); if (resolved instanceof String) { return (String) resolved; } else { throw new IllegalStateException("The [" + attribute + "] must resolve to a String. " + "Resolved to [" + resolved.getClass() + "] for [" + value + "]"); } } private Object resolveExpression(String value) { String resolvedValue = resolve(value); return this.resolver.evaluate(resolvedValue, this.expressionContext); } /** * Resolve the specified value if possible. * @param value the value to resolve * @return the resolved value * @see ConfigurableBeanFactory#resolveEmbeddedValue */ private String resolve(String value) { if (this.beanFactory instanceof ConfigurableBeanFactory) { return ((ConfigurableBeanFactory) this.beanFactory).resolveEmbeddedValue(value); } return value; } private void addFormatters(FormatterRegistry registry) { for (Converter<?, ?> converter : getBeansOfType(Converter.class)) { registry.addConverter(converter); } for (GenericConverter converter : getBeansOfType(GenericConverter.class)) { registry.addConverter(converter); } for (org.springframework.format.Formatter<?> formatter : getBeansOfType(Formatter.class)) { registry.addFormatter(formatter); } } private <T> Collection<T> getBeansOfType(Class<T> type) { if (this.beanFactory instanceof ListableBeanFactory) { return ((ListableBeanFactory) this.beanFactory).getBeansOfType(type).values(); }else { return Collections.emptySet(); } } private static class ListenerScope implements Scope { private final Map<String, Object> listeners = new HashMap<>(); ListenerScope() { super(); } public void addListener(String key, Object bean) { this.listeners.put(key, bean); } public void removeListener(String key) { this.listeners.remove(key); } @Override public Object get(String name, ObjectFactory<?> objectFactory) { return this.listeners.get(name); } @Override public Object remove(String name) { return null; } @Override public void registerDestructionCallback(String name, Runnable callback) { } @Override public Object resolveContextualObject(String key) { return this.listeners.get(key); } @Override public String getConversationId() { return null; } } private class KafkaHandlerMethodFactoryAdapter implements MessageHandlerMethodFactory { private final DefaultFormattingConversionService defaultFormattingConversionService = new DefaultFormattingConversionService(); private MessageHandlerMethodFactory messageHandlerMethodFactory; public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory kafkaHandlerMethodFactory1) { this.messageHandlerMethodFactory = kafkaHandlerMethodFactory1; } @Override public InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) { return getMessageHandlerMethodFactory().createInvocableHandlerMethod(bean, method); } private MessageHandlerMethodFactory getMessageHandlerMethodFactory() { if (this.messageHandlerMethodFactory == null) { this.messageHandlerMethodFactory = createDefaultMessageHandlerMethodFactory(); } return this.messageHandlerMethodFactory; } private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() { DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory(); defaultFactory.setBeanFactory(MyKafkaConsumerFactory.this.beanFactory); ConfigurableBeanFactory cbf = (MyKafkaConsumerFactory.this.beanFactory instanceof ConfigurableBeanFactory ? (ConfigurableBeanFactory) MyKafkaConsumerFactory.this.beanFactory : null); defaultFactory.setConversionService(this.defaultFormattingConversionService); List<HandlerMethodArgumentResolver> argumentResolvers = new ArrayList<>(); // Annotation-based argument resolution argumentResolvers.add(new HeaderMethodArgumentResolver(this.defaultFormattingConversionService, cbf)); argumentResolvers.add(new HeadersMethodArgumentResolver()); // Type-based argument resolution final GenericMessageConverter messageConverter = new GenericMessageConverter(this.defaultFormattingConversionService); argumentResolvers.add(new MessageMethodArgumentResolver(messageConverter)); argumentResolvers.add(new PayloadArgumentResolver(messageConverter) { @Override protected boolean isEmptyPayload(Object payload) { return payload == null || payload instanceof KafkaNull; } }); defaultFactory.setArgumentResolvers(argumentResolvers); defaultFactory.afterPropertiesSet(); return defaultFactory; } } }
通過(guò)startConsumer來(lái)啟動(dòng)一個(gè)消費(fèi)者(多次調(diào)用會(huì)啟動(dòng)多個(gè)消費(fèi)者)。target必須至少包含一個(gè)有@DelayKafkaConsumer注解的方法。這里類(lèi)似@KafkaListener。我去掉了一部分功能,保留了比較常用的部分。
這里提供了一個(gè)通過(guò)注解的方式在spring boot項(xiàng)目中動(dòng)態(tài)控制consumer的方法。還有其他的方法來(lái)達(dá)到這種效果,不過(guò)我覺(jué)得這種方法比較方便。
之前博客里面提到本公司為物聯(lián)網(wǎng)項(xiàng)目。項(xiàng)目中使用mqtt+kafka進(jìn)行與設(shè)備端的通訊,之前的協(xié)議格式為json格式,現(xiàn)在改成字節(jié)數(shù)組byte[]格式進(jìn)行通信。
集成springboot后,具體的demo網(wǎng)上很多,接下來(lái)有時(shí)間會(huì)出一份kafka的demo。
報(bào)錯(cuò)信息如下:
Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry';
nested exception is org.apache.kafka.common.KafkaException:Failed to construct kafka consumer
之前json格式通信時(shí)候,構(gòu)建kafka消費(fèi)工廠的時(shí)候,其中ConcurrentMessageListenerContainer的key為String類(lèi)型,而value現(xiàn)在為byte[]類(lèi)型,所以構(gòu)建消費(fèi)者工廠的時(shí)候需要指定正確的value類(lèi)型。
代碼如下:
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, byte[]>> kafkaListenerContainerByteFactory() { ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<String, byte[]>(); factory.setConsumerFactory(consumerByteFactory()); factory.setConcurrency(concurrency); factory.getContainerProperties().setPollTimeout(1500); return factory; }
整體kafka生產(chǎn)者+kafka消費(fèi)者的demo會(huì)在接下來(lái)的博客中陸續(xù)整理。
感謝各位的閱讀!關(guān)于“如何使用spring boot整合kafka和延遲啟動(dòng)消費(fèi)者”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,讓大家可以學(xué)到更多知識(shí),如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到吧!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。