溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Kafka中怎么通過整合SpringBoot實現(xiàn)消息發(fā)送與消費

發(fā)布時間:2021-06-21 18:19:17 來源:億速云 閱讀:447 作者:Leah 欄目:大數據

Kafka中怎么通過整合SpringBoot實現(xiàn)消息發(fā)送與消費,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個問題。

kafka和zookeeper集群前邊寫過了。如果遇到kakfa說沒有連接記得把kafka下logs日志都刪除了,重新啟動kafka集群再啟動springboot服務

zookeeper https://my.oschina.net/u/3730149/blog/3071737

kafka https://my.oschina.net/u/3730149/blog/3071754
  • 生產者

Kafka中怎么通過整合SpringBoot實現(xiàn)消息發(fā)送與消費

maven依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.gzh.kafka.producer</groupId>
	<artifactId>producer</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>kafka-producer-master</name>
	<description>demo project for kafka producer</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.5.9.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<spring-kafka.version>2.1.5.RELEASE</spring-kafka.version>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
			<version>${spring-kafka.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-test -->
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<version>${spring-kafka.version}</version>
			<scope>test</scope>
		</dependency>

		<!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger2 -->
		<dependency>
			<groupId>io.springfox</groupId>
			<artifactId>springfox-swagger2</artifactId>
			<version>2.8.0</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger-ui -->
		<dependency>
			<groupId>io.springfox</groupId>
			<artifactId>springfox-swagger-ui</artifactId>
			<version>2.8.0</version>
		</dependency>

	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

application.properties

server.port=8000
spring.application.name=kafka-producer
#kafka configuration
spring.kafka.producer.bootstrap-servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#topic
kafka.app.topic.foo=test20180430

使用Spring Boot發(fā)送Spring Kafka消息

SpringKafka提供了使用Producer的KafkaTemplate類發(fā)送消息,并提供將數據發(fā)送到Kafka主題的高級操作。 提供異步和同步方法,異步方法返回Future。Spring Boot根據application.properties屬性文件中配置的屬性自動配置并初始化KafkaTemplate。

為了方便測試發(fā)送消息,使用了Spring的定時任務,在類上使用@EnableScheduling 注解開啟定時任務,通過@Scheduled注解指定發(fā)送消息規(guī)則。

	package com.gzh.kafka.producer.component;

	import org.slf4j.Logger;
	import org.slf4j.LoggerFactory;
	import org.springframework.beans.factory.annotation.Autowired;
	import org.springframework.beans.factory.annotation.Value;
	import org.springframework.kafka.core.KafkaTemplate;
	import org.springframework.kafka.support.SendResult;
	import org.springframework.scheduling.annotation.EnableScheduling;
	import org.springframework.scheduling.annotation.Scheduled;
	import org.springframework.stereotype.Component;
	import org.springframework.util.concurrent.ListenableFuture;

	@Component
	@EnableScheduling
	public class KafkaMessageProducer {

		private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageProducer.class);

		@Autowired
		private KafkaTemplate<String, String> kafkaTemplate;

		@Value("${kafka.app.topic.foo}")
		private String topic;

		@Scheduled(cron = "00/5 * * * * ?")
		public void send() {
			String message = "Hello World---" + System.currentTimeMillis();
			LOG.info("topic="+topic+",message="+message);
			ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
			future.addCallback(success -> LOG.info("KafkaMessageProducer 發(fā)送消息成功!"),
					fail -> LOG.error("KafkaMessageProducer 發(fā)送消息失敗!"));
		}
	}

創(chuàng)建消息生產者啟動類

	package com.gzh.kafka.producer;

	import org.springframework.boot.SpringApplication;
	import org.springframework.boot.autoconfigure.SpringBootApplication;
	import org.springframework.boot.context.properties.EnableConfigurationProperties;

	@SpringBootApplication
	@EnableConfigurationProperties
	public class KafkaProducerApplication{

		public static void main(String[] args) {
			SpringApplication.run(KafkaProducerApplication.class, args);
		}
	}

至此,Spring Boot整合Spring Kafka消息生產者應用已經整合完畢。啟動zookeeper、kafka各個服務器。啟動生產者應用,查看消息生產者應用控制臺日志,顯示發(fā)送消息成功!說明整合OK。

也可以用前段web頁面請求的方式

	package com.gzh.kafka.producer.service;

	import org.slf4j.Logger;
	import org.slf4j.LoggerFactory;
	import org.springframework.beans.factory.annotation.Autowired;
	import org.springframework.beans.factory.annotation.Value;
	import org.springframework.kafka.core.KafkaTemplate;
	import org.springframework.kafka.support.SendResult;
	import org.springframework.stereotype.Service;
	import org.springframework.util.concurrent.ListenableFuture;

	@Service
	public class KafkaMessageSendService {

		private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageSendService.class);

		@Autowired
		private KafkaTemplate<String, String> kafkaTemplate;

		@Value("${kafka.app.topic.foo}")
		private String topic;

		public void send(String message){
			LOG.info("topic="+topic+",message="+message);
			ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
			future.addCallback(success -> LOG.info("KafkaMessageProducer 發(fā)送消息成功!"),
					fail -> LOG.error("KafkaMessageProducer 發(fā)送消息失?。?quot;));
		}
	}

界面請求處理controller類

	package com.gzh.kafka.producer.controller;

	import org.springframework.beans.factory.annotation.Autowired;
	import org.springframework.http.MediaType;
	import org.springframework.web.bind.annotation.RequestMapping;
	import org.springframework.web.bind.annotation.RequestMethod;
	import org.springframework.web.bind.annotation.RequestParam;
	import org.springframework.web.bind.annotation.RestController;

	import com.gzh.kafka.producer.service.KafkaMessageSendService;

	@RestController
	@RequestMapping(value="send",produces=MediaType.APPLICATION_JSON_UTF8_VALUE)
	public class KafkaMessageSendController {

		@Autowired
		private KafkaMessageSendService kafkaMessageSendService;

		@RequestMapping(value="/sendMessage",method=RequestMethod.POST)
		public String send(@RequestParam(required=true) String message){
			try {
				kafkaMessageSendService.send(message);
			} catch (Exception e) {
				return "send failed.";
			}
			return message;
		}
	}

通過Swagger訪問測試Controller服務請求

Kafka中怎么通過整合SpringBoot實現(xiàn)消息發(fā)送與消費

  • 消費者

Kafka中怎么通過整合SpringBoot實現(xiàn)消息發(fā)送與消費

maven依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.gzh.kafka.consumer</groupId>
	<artifactId>consumer</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>kafka-consumer-master</name>
	<description>demo project for kafka consumer</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.5.9.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<spring-kafka.version>1.3.4.RELEASE</spring-kafka.version>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
			<version>${spring-kafka.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-test -->
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<version>${spring-kafka.version}</version>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>


</project>

注意,這是使用Spring-Kafka時一定要注意版本問題,否則會報各種奇葩錯誤。Spring官方網站上給出了SpringKafka和kafka-client版本(它的版本號要和kafka服務器的版本保持一致)的對應關系: Kafka中怎么通過整合SpringBoot實現(xiàn)消息發(fā)送與消費

application.properties配置

server.port=8001
spring.application.name=kafka-consumer

#kafka configuration
#指定消息被消費之后自動提交偏移量,以便下次繼續(xù)消費
spring.kafka.consumer.enable-auto-commit=true
#指定消息組
spring.kafka.consumer.group-id=guan
#指定kafka服務器地址
spring.kafka.consumer.bootstrap-servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
#指定從最近地方開始消費(earliest)
spring.kafka.consumer.auto-offset-reset=latest

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#topic
kafka.app.topic.foo=test20180430

通過使用@KafkaListener來注解一個方法Spring Kafka會自動創(chuàng)建一個消息監(jiān)聽器容器。使用該注解,并指定要消費的topic(也可以指定消費組以及分區(qū)號,支持正則表達式匹配),這樣,消費者一旦啟動,就會監(jiān)聽kafka服務器上的topic,實時進行消費消息。

	package com.gzh.kafka.consumer.service;

	import org.slf4j.Logger;
	import org.slf4j.LoggerFactory;
	import org.springframework.kafka.annotation.KafkaListener;
	import org.springframework.messaging.MessageHeaders;
	import org.springframework.messaging.handler.annotation.Headers;
	import org.springframework.messaging.handler.annotation.Payload;
	import org.springframework.stereotype.Component;


	@Component
	public class KafkaMessageConsumer {

		private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageConsumer.class);

		@KafkaListener(topics={"${kafka.app.topic.foo}"})
		public void receive(@Payload String message, @Headers MessageHeaders headers){
			LOG.info("KafkaMessageConsumer 接收到消息:"+message);
			headers.keySet().forEach(key->LOG.info("{}: {}",key,headers.get(key)));
		}
	}

創(chuàng)建消息消費者啟動類

	package com.gzh.kafka.consumer;

	import org.springframework.boot.SpringApplication;
	import org.springframework.boot.autoconfigure.SpringBootApplication;
	import org.springframework.boot.context.properties.EnableConfigurationProperties;

	@SpringBootApplication
	@EnableConfigurationProperties
	public class KafkaConsumerApplication {

		public static void main(String[] args) {
			SpringApplication.run(KafkaConsumerApplication.class, args);
		}
	}

消費者應用已經完成,接下來讓我們驗證Spring Kafka消息發(fā)送和接收效果。先依次啟動zookeeper、kafka服務器,然后在啟動生產者(kafka-producer-master)應用,再啟動消費者(kafka-consumer-master)應用,然后觀察生產者和消費者啟動類日志: 顯示接受消息成功!

看完上述內容,你們掌握Kafka中怎么通過整合SpringBoot實現(xiàn)消息發(fā)送與消費的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI