溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何使用消息隊(duì)列

發(fā)布時(shí)間:2021-10-14 12:01:55 來源:億速云 閱讀:223 作者:iii 欄目:編程語言

本篇內(nèi)容介紹了“如何使用消息隊(duì)列”的有關(guān)知識,在實(shí)際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

1 概述

1.1 基本概念
1.1.1 Broker 代理

已發(fā)布的消息保存在一組服務(wù)器中,稱為Kafka集群。集群中的每個(gè)服務(wù)器都是一個(gè)Broker。

1.1.2 Topic 主題

通過Topic機(jī)制對消息進(jìn)行分類,可以認(rèn)為每個(gè)Topic就是一個(gè)隊(duì)列。

1.1.3 Partition 分區(qū)

每個(gè)Topic可以有多個(gè)分區(qū),主要為了提高并發(fā)而設(shè)計(jì)。相同Topic下不同Partition可以并發(fā)接收消息,同時(shí)也能供消費(fèi)者并發(fā)拉取消息。有多少Partition就有多少并發(fā)量。

在Kafka服務(wù)器上,分區(qū)是以文件目錄的形式存在的。每個(gè)分區(qū)目錄中,Kafka會按配置大小及配置周期將分區(qū)拆分成多個(gè)段文件(LogSegment),每個(gè)段由三部分組成:

- 日志文件:*.log
- 位移索引文件:*.index
- 時(shí)間索引文件:*.timeindex

其中*.log用于存儲消息本身的數(shù)據(jù)內(nèi)容,*.index存儲消息在文件中的位置(包括消息的邏輯offset和物理存儲offset),*.timeindex存儲消息創(chuàng)建時(shí)間和對應(yīng)邏輯地址的映射關(guān)系。

將分區(qū)拆分成多個(gè)段是為了控制存儲文件大小。可以很方便的通過操作系統(tǒng)mmap機(jī)制映射到內(nèi)存中,提高寫入和讀取效率。同時(shí)還有一個(gè)好處就是,當(dāng)系統(tǒng)要清除過期數(shù)據(jù)時(shí),可以直接將過期的段文件刪除。

如果每個(gè)消息都要在index中保存位置信息,index文件自身大小也很容易變的很大。所以Kafka將index設(shè)計(jì)為稀疏索引來減小index文件的大小。

1.1.4 Replication 副本

消息冗余數(shù)量。不能超過集群中Broker的數(shù)量。

1.2 基本操作
1.2.1 Topic相關(guān)
# 創(chuàng)建Topic 
# --topic 主題名稱 避免使用[_]及[.]號
# --replication-factor 副本數(shù)量(不能超過broker節(jié)點(diǎn)數(shù))
# --partitions 分區(qū)數(shù)量(并發(fā))
./bin/kafka-topics.sh --create \
--topic UserDataQueue \
--replication-factor 3 \
--partitions 5 \
--bootstrap-server localhost:9092,localhost:9093,localhost:9094

# 查看Topic
./bin/kafka-topics.sh --list \
--bootstrap-server localhost:9092,localhost:9093,localhost:9094

# 修改Topic
# 刪除Topic
1.2.2 Message相關(guān)
# 發(fā)送消息
# --topic 指定目標(biāo)Topic
./bin/kafka-console-producer.sh \
--topic UserDataQueue \
--bootstrap-server localhost:9092,localhost:9093,localhost:9094

# 拉取消息
# --from-beginning 從頭開始(獲取現(xiàn)有的全量數(shù)據(jù))
./bin/kafka-console-consumer.sh \
--topic UserDataQueue \
--bootstrap-server localhost:9092,localhost:9093,localhost:9094 \
--from-beginning

2 集群配置

Kafka集群依賴于Zookeeper。

2.1 Zookeeper配置及啟動
# 需要修改的參數(shù)

# the directory where the snapshot is stored.
dataDir=/kafka/zkdata
# the port at which the clients will connect
clientPort=2182
# 啟動
./bin/zookeeper-server-start.sh -daemon /kafka/zookeeper.properties
2.2 Kafka配置及啟動
# 需修改參數(shù)

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1  # 同一集群內(nèi)ID必須唯一

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://localhost:9092  # 同一主機(jī)的話,端口號不能相同

# A comma separated list of directories under which to store log files
log.dirs=/kafka/data01  # 日志存儲目錄,需做隔離

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2182  # Zookeeper連接地址,參見2.1 zk配置
# Kafka啟動

# broker-1
./bin/kafka-server-start.sh -daemon /kafka/server01.properties

# broker-2
./bin/kafka-server-start.sh -daemon /kafka/server02.properties

# broker-3
./bin/kafka-server-start.sh -daemon /kafka/server03.properties
2.3 Zookeeper可視化

PrettyZoo 是一個(gè)基于 Apache Curator 和 JavaFX 實(shí)現(xiàn)的 Zookeeper 圖形化管理客戶端。

由下圖可以看到,集群3個(gè)Broker均正常啟動。

如何使用消息隊(duì)列

2.4 Kafka可視化及監(jiān)控
2.4.1 AKHQ

管理Topic,Topic消息,消費(fèi)組等的Kafka可視化系統(tǒng),相關(guān)文檔:https://akhq.io/

如何使用消息隊(duì)列

2.4.2 Kafka Eagle

一個(gè)簡單且高效的監(jiān)控系統(tǒng)。相關(guān)文檔:http://www.kafka-eagle.org/index.html

如何使用消息隊(duì)列

Kafka Eagle 自帶監(jiān)控大屏。

如何使用消息隊(duì)列

3 與Spring Boot集成

Spring Boot版本:2.4.4。

官方示例:https://github.com/spring-projects/spring-kafka/tree/main/samples

3.1 Spring Boot
3.1.1 添加依賴
implementation 'org.springframework.kafka:spring-kafka'
3.1.2 配置文件
spring:
  kafka:
    bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
    producer:
      client-id: kfk-demo
      retries: 3
3.1.3 消息發(fā)送
@RestController
public class IndexController {

    @Autowired
    KafkaTemplate<Object, Object> kafkaTemplate;

    @GetMapping
    public String index() {
        int rdm = new Random().nextInt(1000);
        kafkaTemplate.send("UserDataQueue", new UserData("", rdm));
        return "hello world";
    }

    @GetMapping("index2")
    public String index2() {
		// 發(fā)送字符串方式
        kafkaTemplate.send("UserDataTopic", new Gson().toJson(new UserData("apple", 23)));
        return "ok";
    }
}
3.1.4 消息接收
@Component
@KafkaListener(
        id = "kfk-demo-userdata",
        topics = {"UserDataQueue"},
        groupId = "kfk-demo-group",
        clientIdPrefix = "kfk-demo-client")
public class KfkListener {

    @KafkaHandler
    public void process(@Payload UserData ud,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        System.out.println(String.format("topic: %s, partition: %d, userData: %s", topic, partition, ud));
    }

    @KafkaHandler(isDefault = true)
    public void process(Object obj) {
        System.out.println(obj);
    }
}

// 接收字符串方式
@Slf4j
@Component
@KafkaListener(id = "kfk-demo2", topics = {"UserDataTopic"})
public class KfkUserDataTopicListener {

    @KafkaHandler
    public void process(String userDataStr) {
        UserData userData = new Gson().fromJson(userDataStr, UserData.class);
        log.info("username: {}, age: {}", userData.getUsername(), userData.getAge());
    }
}
3.1.5 Topic自動創(chuàng)建
@Configuration
public class KafkaConfig {

    @Bean
    public NewTopic userDataTopic() {
        return new NewTopic("UserDataTopic", 3, (short) 1);
    }
}

“如何使用消息隊(duì)列”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI