溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何從零開始搭建Kafka+SpringBoot分布式消息系統(tǒng)

發(fā)布時間:2021-09-29 16:09:47 來源:億速云 閱讀:344 作者:柒染 欄目:編程語言

這期內(nèi)容當(dāng)中小編將會給大家?guī)碛嘘P(guān)如何從零開始搭建Kafka+SpringBoot分布式消息系統(tǒng),文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

前言

由于kafka強(qiáng)依賴于zookeeper,所以需先搭建好zookeeper集群。由于zookeeper是由java編寫的,需運(yùn)行在jvm上,所以首先應(yīng)具備java環(huán)境。 (ps:默認(rèn)您的centos系統(tǒng)可聯(lián)網(wǎng),本教程就不教配置ip什么的了) (ps2:沒有wget的先裝一下:yum install wget) (ps3:人啊,就是要條理。東邊放一點(diǎn),西邊放一點(diǎn),過段時間就不知道自己裝在哪里了。本教程所有下載均放在/usr/local目錄下) (ps4:kafka可能有內(nèi)置zookeeper,感覺可以越過zookeeper教程,但是這里也配置出來了。我沒試過)

一、配置jdk

因?yàn)閛racle 公司不允許直接通過wget 下載官網(wǎng)上的jdk包。所以你直接wget以下地址下載下來的是一個只有5k的網(wǎng)頁文件而已,并不是需要的jdk包。(壟斷地位就是任性)。 (請通過java -version判斷是否自帶jdk,我的沒帶)

1、官網(wǎng)下載

下面是jdk8的官方下載地址:

https://www.oracle.com/technetwork/java/javase/downloads/java-archive-javase8u211-later-5573849.html

如何從零開始搭建Kafka+SpringBoot分布式消息系統(tǒng)

2、上傳解壓

這里通過xftp上傳到服務(wù)器指定位置:/usr/local

如何從零開始搭建Kafka+SpringBoot分布式消息系統(tǒng)

運(yùn)行命令使環(huán)境生效

source /etc/profile

如何從零開始搭建Kafka+SpringBoot分布式消息系統(tǒng)

等待下載完成之后解壓:

tar -zxvf zookeeper-3.4.6.tar.gz

從零開始搭建Kafka+SpringBoot分布式消息系統(tǒng)

重命名為zookeeper1

mv zookeeper-3.4.6 zookeeper1
cp -r zookeeper1 zookeeper2
cp -r zookeeper1 zookeeper3

2、創(chuàng)建data、logs文件夾

在zookeeper1目錄下創(chuàng)建

如何從零開始搭建Kafka+SpringBoot分布式消息系統(tǒng)

在data目錄下新建myid文件。內(nèi)容為1

如何從零開始搭建Kafka+SpringBoot分布式消息系統(tǒng)

3、修改zoo.cfg文件

cd /usr/local/zookeeper/zookeeper1/conf/
cp zoo_sample.cfg zoo.cfg

進(jìn)行過上面兩步之后,有zoo.cfg文件了,現(xiàn)在修改內(nèi)容為:

從零開始搭建Kafka+SpringBoot分布式消息系統(tǒng)

dataDir=/usr/local/zookeeper/zookeeper1/data
dataLogDir=/usr/local/zookeeper/zookeeper1/logs
server.1=192.168.233.11:2888:3888
server.2=192.168.233.11:2889:3889
server.3=192.168.233.11:2890:3890

4、搭建zookeeper2

首先,復(fù)制改名。

cd /usr/local/zookeeper/
cp -r zookeeper1 zookeeper2

然后修改具體的某些配置:

vim zookeeper2/conf/zoo.cfg

將下圖三個地方1改成2

如何從零開始搭建Kafka+SpringBoot分布式消息系統(tǒng)

vim zookeeper2/data/myid

同時將myid中的值改成2

如何從零開始搭建Kafka+SpringBoot分布式消息系統(tǒng)

vim zookeeper3/conf/zoo.cfg

修改為3

如何從零開始搭建Kafka+SpringBoot分布式消息系統(tǒng)

6、測試zookeeper集群

cd /usr/local/zookeeper/zookeeper1/bin/

由于啟動所需代碼比較多,這里簡單寫了一個啟動腳本:

vim start

start的內(nèi)容如下

cd /usr/local/zookeeper/zookeeper1/bin/
./zkServer.sh start ../conf/zoo.cfg
cd /usr/local/zookeeper/zookeeper2/bin/
./zkServer.sh start ../conf/zoo.cfg
cd /usr/local/zookeeper/zookeeper3/bin/
./zkServer.sh start ../conf/zoo.cfg

下面是連接腳本:

vim login

login內(nèi)容如下:

./zkCli.sh -server 192.168.233.11:2181,192.168.233.11:2182,192.168.233.11:2183

腳本編寫完成,接下來啟動:

sh start
sh login

啟動集群成功,如下圖:

如何從零開始搭建Kafka+SpringBoot分布式消息系統(tǒng)

三、搭建kafka集群

1、下載kafka

首先創(chuàng)建kafka目錄:

mkdir /usr/local/kafka

然后在該目錄下載

cd /usr/local/kafka/
wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz

下載成功之后解壓:

tar -zxvf kafka_2.11-1.1.0.tgz

2、修改集群配置

首先進(jìn)入conf目錄下:

cd /usr/local/kafka/kafka_2.11-1.1.0/config

修改server.properties 修改內(nèi)容:

broker.id=0
log.dirs=/tmp/kafka-logs
listeners=PLAINTEXT://192.168.233.11:9092

復(fù)制兩份server.properties

cp server.properties server2.properties
cp server.properties server3.properties

修改server2.properties

vim server2.properties

修改主要內(nèi)容為:

broker.id=1
log.dirs=/tmp/kafka-logs1
listeners=PLAINTEXT://192.168.233.11:9093

如上,修改server3.properties 修改內(nèi)容為:

broker.id=2
log.dirs=/tmp/kafka-logs2
listeners=PLAINTEXT://192.168.233.11:9094

3、啟動kafka

這里還是在bin目錄編寫一個腳本:

cd ../bin/
vim start

腳本內(nèi)容為:

./kafka-server-start.sh ../config/server.properties &
./kafka-server-start.sh ../config/server2.properties &
./kafka-server-start.sh ../config/server3.properties &

通過jps命令可以查看到,共啟動了3個kafka。

從零開始搭建Kafka+SpringBoot分布式消息系統(tǒng)

4、創(chuàng)建Topic

cd /usr/local/kafka/kafka_2.11-1.1.0
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

如何從零開始搭建Kafka+SpringBoot分布式消息系統(tǒng)

kafka打印了幾條日志

如何從零開始搭建Kafka+SpringBoot分布式消息系統(tǒng)

查看kafka狀態(tài)

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

如何從零開始搭建Kafka+SpringBoot分布式消息系統(tǒng)

6、啟動消費(fèi)者

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

可以看出,啟動消費(fèi)者之后就會自動消費(fèi)。

如何從零開始搭建Kafka+SpringBoot分布式消息系統(tǒng)

消費(fèi)者自動捕獲成功!

如何從零開始搭建Kafka+SpringBoot分布式消息系統(tǒng)

不滿足的話啟動springboot的時候會拋異常的?。?!ps:該走的岔路我都走了o(╥﹏╥)o (我的kafka-clients是1.1.0,spring-kafka是2.2.2,中間那列暫時不用管)

從零開始搭建Kafka+SpringBoot分布式消息系統(tǒng)

回歸正題,搞了兩個小時,終于搞好了,想哭… 遇到的問題基本就是jar版本不匹配。 上面的步驟我也都會相應(yīng)的去修改,爭取大家按照本教程一遍過?。?!

1、pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.gzky</groupId>
    <artifactId>study</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>study</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-redis</artifactId>
            <version>1.3.8.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.2.0.RELEASE</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

pom文件中,重點(diǎn)是下面這兩個版本。

<parent>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-parent</artifactId>
       <version>2.1.1.RELEASE</version>
       <relativePath/> <!-- lookup parent from repository -->
   </parent>
<dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.2.0.RELEASE</version>
</dependency>

2、application.yml

spring:
  redis:
    cluster:
      #設(shè)置key的生存時間,當(dāng)key過期時,它會被自動刪除;
      expire-seconds: 120
      #設(shè)置命令的執(zhí)行時間,如果超過這個時間,則報錯;
      command-timeout: 5000
      #設(shè)置redis集群的節(jié)點(diǎn)信息,其中namenode為域名解析,通過解析域名來獲取相應(yīng)的地址;
      nodes: 192.168.233.11:9001,192.168.233.11:9002,192.168.233.11:9003,192.168.233.11:9004,192.168.233.11:9005,192.168.233.11:9006
  kafka:
    # 指定kafka 代理地址,可以多個
    bootstrap-servers: 192.168.233.11:9092,192.168.233.11:9093,192.168.233.11:9094
    producer:
      retries: 0
      # 每次批量發(fā)送消息的數(shù)量
      batch-size: 16384
      buffer-memory: 33554432
      # 指定消息key和消息體的編解碼方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      # 指定默認(rèn)消費(fèi)者group id
      group-id: test-group
      auto-offset-reset: earliest
      enable-auto-commit: true
      auto-commit-interval: 100
      # 指定消息key和消息體的編解碼方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

server:
  port: 8085
  servlet:
    #context-path: /redis
    context-path: /kafka

沒有配置Redis的可以把Redis部分刪掉,也就是下圖: 想學(xué)習(xí)配置Redis集群的可以參考:《Redis集群redis-cluster的搭建及集成springboot》

如何從零開始搭建Kafka+SpringBoot分布式消息系統(tǒng)

3、生產(chǎn)者

package com.gzky.study.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

/**
 * kafka生產(chǎn)者工具類
 *
 * @author biws
 * @date 2019/12/17
 **/
@Component
public class KfkaProducer {

    private static Logger logger = LoggerFactory.getLogger(KfkaProducer.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * 生產(chǎn)數(shù)據(jù)
     * @param str 具體數(shù)據(jù)
     */
    public void send(String str) {
        logger.info("生產(chǎn)數(shù)據(jù):"> 
 4、消費(fèi)者package com.gzky.study.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * kafka消費(fèi)者監(jiān)聽消息
 *
 * @author biws
 * @date 2019/12/17
 **/
@Component
public class KafkaConsumerListener {

    private static Logger logger = LoggerFactory.getLogger(KafkaConsumerListener.class);

    @KafkaListener(topics = "testTopic")
    public void onMessage(String str){
        //insert(str);//這里為插入數(shù)據(jù)庫代碼
        logger.info("監(jiān)聽到:" + str);
        System.out.println("監(jiān)聽到:" + str);
    }

}  5、對外接口package com.gzky.study.controller;

import com.gzky.study.utils.KfkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

/**
 * kafka對外接口
 *
 * @author biws
 * @date 2019/12/17
 **/
@RestController
public class KafkaController {

    @Autowired
    KfkaProducer kfkaProducer;

    /**
     * 生產(chǎn)消息
     * @param str
     * @return
     */
    @RequestMapping(value = "/sendKafkaWithTestTopic",method = RequestMethod.GET)
    @ResponseBody
    public boolean sendTopic(@RequestParam String str){
        kfkaProducer.send(str);
        return true;
    }
}  6、postman測試這里首先應(yīng)該在服務(wù)器啟動監(jiān)聽器(kafka根目錄),下面命令必須是具體的服務(wù)器ip,不能是localhost,是我踩過的坑:推薦此處重啟一下集群 關(guān)閉kafka命令:cd /usr/local/kafka/kafka_2.11-1.1.0/bin
./kafka-server-stop.sh ../config/server.properties &
./kafka-server-stop.sh ../config/server2.properties &
./kafka-server-stop.sh ../config/server3.properties & 此處應(yīng)該jps看一下,等待所有的kafka都關(guān)閉(關(guān)不掉的kill掉),再重新啟動kafka:./kafka-server-start.sh ../config/server.properties &
./kafka-server-start.sh ../config/server2.properties &
./kafka-server-start.sh ../config/server3.properties & 等待kafka啟動成功后,啟動消費(fèi)者監(jiān)聽端口:cd /usr/local/kafka/kafka_2.11-1.1.0
bin/kafka-console-consumer.sh --bootstrap-server 192.168.233.11:9092 --from-beginning --topic testTopic  曾經(jīng)我亂輸?shù)臏y試信息全部被監(jiān)聽過來了!啟動springboot服務(wù) 然后用postman生產(chǎn)消息: 然后享受成果,服務(wù)器端監(jiān)聽成功。 項(xiàng)目中也監(jiān)聽成功!

上述就是小編為大家分享的如何從零開始搭建Kafka+SpringBoot分布式消息系統(tǒng)了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI