溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

java中Kafka如何使用

發(fā)布時(shí)間:2021-11-24 08:15:10 來(lái)源:億速云 閱讀:174 作者:小新 欄目:開(kāi)發(fā)技術(shù)

這篇文章將為大家詳細(xì)講解有關(guān)java中Kafka如何使用,小編覺(jué)得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。

前言

官方文檔:http://kafka.apache.org/

中文文檔:https://kafka.apachecn.org/

Apache Kafka是分布式發(fā)布-訂閱消息系統(tǒng)。

Apache Kafka與傳統(tǒng)消息系統(tǒng)相比,有以下不同:

  • 它被設(shè)計(jì)為一個(gè)分布式系統(tǒng),易于向外擴(kuò)展;

  • 它同時(shí)為發(fā)布和訂閱提供高吞吐量;

  • 它支持多訂閱者,當(dāng)失敗時(shí)能自動(dòng)平衡消費(fèi)者;

  • 它將消息持久化到磁盤(pán),因此可用于批量消費(fèi),例如ETL,以及實(shí)時(shí)應(yīng)用程序。

1 簡(jiǎn)單介紹

java中Kafka如何使用

首先是一些概念:

Kafka作為一個(gè)集群,運(yùn)行在一臺(tái)或者多臺(tái)服務(wù)器上.Kafka 通過(guò) topic 對(duì)存儲(chǔ)的流數(shù)據(jù)進(jìn)行分類(lèi)。每條記錄中包含一個(gè)key,一個(gè)value和一個(gè)timestamp(時(shí)間戳)。

Kafka有四個(gè)核心的API:

The Producer API 允許一個(gè)應(yīng)用程序發(fā)布一串流式的數(shù)據(jù)到一個(gè)或者多個(gè)Kafka topic。

The Consumer API 允許一個(gè)應(yīng)用程序訂閱一個(gè)或多個(gè) topic ,并且對(duì)發(fā)布給他們的流式數(shù)據(jù)進(jìn)行處理。

The Streams API 允許一個(gè)應(yīng)用程序作為一個(gè)流處理器,消費(fèi)一個(gè)或者多個(gè)topic產(chǎn)生的輸入流,然后生產(chǎn)一個(gè)輸出流到一個(gè)或多個(gè)topic中去,在輸入輸出流中進(jìn)行有效的轉(zhuǎn)換。

The Connector API 允許構(gòu)建并運(yùn)行可重用的生產(chǎn)者或者消費(fèi)者,將Kafka topics連接到已存在的應(yīng)用程序或者數(shù)據(jù)系統(tǒng)。比如,連接到一個(gè)關(guān)系型數(shù)據(jù)庫(kù),捕捉表(table)的所有變更內(nèi)容。

支持的語(yǔ)言(除了Java之外的)

java中Kafka如何使用

常見(jiàn)概念

1 Topics和日志

讓我們首先深入了解下Kafka的核心概念:提供一串流式的記錄— topic

Topic 就是數(shù)據(jù)主題,是數(shù)據(jù)記錄發(fā)布的地方,可以用來(lái)區(qū)分業(yè)務(wù)系統(tǒng)。Kafka中的Topics總是多訂閱者模式,一個(gè)topic可以擁有一個(gè)或者多個(gè)消費(fèi)者來(lái)訂閱它的數(shù)據(jù)。

對(duì)于每一個(gè)topic, Kafka集群都會(huì)維持一個(gè)分區(qū)日志,如下所示:

java中Kafka如何使用

每個(gè)分區(qū)都是有序且順序不可變的記錄集,并且不斷地追加到結(jié)構(gòu)化的commit log文件。分區(qū)中的每一個(gè)記錄都會(huì)分配一個(gè)id號(hào)來(lái)表示順序,我們稱之為offset,offset用來(lái)唯一的標(biāo)識(shí)分區(qū)中每一條記錄。

Kafka 集群保留所有發(fā)布的記錄—無(wú)論他們是否已被消費(fèi)—并通過(guò)一個(gè)可配置的參數(shù)——保留期限來(lái)控制. 舉個(gè)例子, 如果保留策略設(shè)置為2天,一條記錄發(fā)布后兩天內(nèi),可以隨時(shí)被消費(fèi),兩天過(guò)后這條記錄會(huì)被拋棄并釋放磁盤(pán)空間。Kafka的性能和數(shù)據(jù)大小無(wú)關(guān),所以長(zhǎng)時(shí)間存儲(chǔ)數(shù)據(jù)沒(méi)有什么問(wèn)題.

java中Kafka如何使用

日志中的 partition(分區(qū))有以下幾個(gè)用途。第一,當(dāng)日志大小超過(guò)了單臺(tái)服務(wù)器的限制,允許日志進(jìn)行擴(kuò)展。每個(gè)單獨(dú)的分區(qū)都必須受限于主機(jī)的文件限制,不過(guò)一個(gè)主題可能有多個(gè)分區(qū),因此可以處理無(wú)限量的數(shù)據(jù)。第二,可以作為并行的單元集—關(guān)于這一點(diǎn),更多細(xì)節(jié)如下

2 分布式

日志的分區(qū)partition (分布)在Kafka集群的服務(wù)器上。每個(gè)服務(wù)器在處理數(shù)據(jù)和請(qǐng)求時(shí),共享這些分區(qū)。每一個(gè)分區(qū)都會(huì)在已配置的服務(wù)器上進(jìn)行備份,確保容錯(cuò)性.

每個(gè)分區(qū)都有一臺(tái) server 作為 “l(fā)eader”,零臺(tái)或者多臺(tái)server作為 follwers 。leader server 處理一切對(duì) partition (分區(qū))的讀寫(xiě)請(qǐng)求,而follwers只需被動(dòng)的同步leader上的數(shù)據(jù)。當(dāng)leader宕機(jī)了,followers 中的一臺(tái)服務(wù)器會(huì)自動(dòng)成為新的 leader。每臺(tái) server 都會(huì)成為某些分區(qū)的 leader 和某些分區(qū)的 follower,因此集群的負(fù)載是平衡的。

3 生產(chǎn)者

生產(chǎn)者可以將數(shù)據(jù)發(fā)布到所選擇的topic中。生產(chǎn)者負(fù)責(zé)將記錄分配到topic的哪一個(gè) partition(分區(qū))中??梢允褂醚h(huán)的方式來(lái)簡(jiǎn)單地實(shí)現(xiàn)負(fù)載均衡,也可以根據(jù)某些語(yǔ)義分區(qū)函數(shù)(例如:記錄中的key)來(lái)完成。下面會(huì)介紹更多關(guān)于分區(qū)的使用。

4 消費(fèi)者

消費(fèi)者使用一個(gè) 消費(fèi)組 名稱來(lái)進(jìn)行標(biāo)識(shí),發(fā)布到topic中的每條記錄被分配給訂閱消費(fèi)組中的一個(gè)消費(fèi)者實(shí)例.消費(fèi)者實(shí)例可以分布在多個(gè)進(jìn)程中或者多個(gè)機(jī)器上。

如果所有的消費(fèi)者實(shí)例在同一消費(fèi)組中,消息記錄會(huì)負(fù)載平衡到每一個(gè)消費(fèi)者實(shí)例.

如果所有的消費(fèi)者實(shí)例在不同的消費(fèi)組中,每條消息記錄會(huì)廣播到所有的消費(fèi)者進(jìn)程.

java中Kafka如何使用

如圖,這個(gè) Kafka 集群有兩臺(tái) server 的,四個(gè)分區(qū)(p0-p3)和兩個(gè)消費(fèi)者組。消費(fèi)組A有兩個(gè)消費(fèi)者,消費(fèi)組B有四個(gè)消費(fèi)者。

通常情況下,每個(gè) topic 都會(huì)有一些消費(fèi)組,一個(gè)消費(fèi)組對(duì)應(yīng)一個(gè)"邏輯訂閱者"。一個(gè)消費(fèi)組由許多消費(fèi)者實(shí)例組成,便于擴(kuò)展和容錯(cuò)。這就是發(fā)布和訂閱的概念,只不過(guò)訂閱者是一組消費(fèi)者而不是單個(gè)的進(jìn)程。

在Kafka中實(shí)現(xiàn)消費(fèi)的方式是將日志中的分區(qū)劃分到每一個(gè)消費(fèi)者實(shí)例上,以便在任何時(shí)間,每個(gè)實(shí)例都是分區(qū)唯一的消費(fèi)者。維護(hù)消費(fèi)組中的消費(fèi)關(guān)系由Kafka協(xié)議動(dòng)態(tài)處理。如果新的實(shí)例加入組,他們將從組中其他成員處接管一些 partition 分區(qū);如果一個(gè)實(shí)例消失,擁有的分區(qū)將被分發(fā)到剩余的實(shí)例。

Kafka 只保證分區(qū)內(nèi)的記錄是有序的,而不保證主題中不同分區(qū)的順序。每個(gè) partition 分區(qū)按照key值排序足以滿足大多數(shù)應(yīng)用程序的需求。但如果你需要總記錄在所有記錄的上面,可使用僅有一個(gè)分區(qū)的主題來(lái)實(shí)現(xiàn),這意味著每個(gè)消費(fèi)者組只有一個(gè)消費(fèi)者進(jìn)程。

保證

high-level Kafka給予以下保證:

生產(chǎn)者發(fā)送到特定topic partition 的消息將按照發(fā)送的順序處理。 也就是說(shuō),如果記錄M1和記錄M2由相同的生產(chǎn)者發(fā)送,并先發(fā)送M1記錄,那么M1的偏移比M2小,并在日志中較早出現(xiàn)一個(gè)消費(fèi)者實(shí)例按照日志中的順序查看記錄.對(duì)于具有N個(gè)副本的主題,我們最多容忍N(yùn)-1個(gè)服務(wù)器故障,從而保證不會(huì)丟失任何提交到日志中的記錄.

關(guān)于保證的更多細(xì)節(jié)可以看文檔的設(shè)計(jì)部分。

2 下載安裝

Kafka依賴于Zookeeper,而Zookeeper又依賴于Java,因此在使用Kafka之前要安裝jdk1.8的環(huán)境和啟動(dòng)zookeeper服務(wù)器。

下載或安裝地址:

JDK1.8://kemok4.com/article/229780.htm:
http://kemok4.com/article/229783.htm:
https://kafka.apachecn.org/downloads.html

好,下面我們開(kāi)始進(jìn)行安裝

[root@iZ2ze4m2ri7irkf6h7n8zoZ local]# tar -zxf kafka_2.11-1.0.0.tgz
[root@iZ2ze4m2ri7irkf6h7n8zoZ local]# mv kafka_2.11-1.0.0 kafka-2.11

3 基本使用

3.1 啟動(dòng)Kafka

首先檢查下自己的jdk 是否安裝:

[root@iZ2ze4m2ri7irkf6h7n8zoZ local]# java -version
java version "1.8.0_144"
Java(TM) SE Runtime Environment (build 1.8.0_144-b01)
Java HotSpot(TM) 64-Bit Server VM (build 25.144-b01, mixed mode)

啟動(dòng)Zookeeper:

[root@iZ2ze4m2ri7irkf6h7n8zoZ zookeeper-3.5.9]# ls
bin  conf  docs  lib  LICENSE.txt  NOTICE.txt  README.md  README_packaging.txt
[root@iZ2ze4m2ri7irkf6h7n8zoZ zookeeper-3.5.9]# cd conf/
[root@iZ2ze4m2ri7irkf6h7n8zoZ conf]# ls
configuration.xsl  log4j.properties  zoo_sample.cfg
[root@iZ2ze4m2ri7irkf6h7n8zoZ conf]# cp zoo_sample.cfg zoo.cfg
[root@iZ2ze4m2ri7irkf6h7n8zoZ conf]# cd ../bin/
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ls
README.txt    zkCli.cmd  zkEnv.cmd  zkServer.cmd            zkServer.sh          zkTxnLogToolkit.sh
zkCleanup.sh  zkCli.sh   zkEnv.sh   zkServer-initialize.sh  zkTxnLogToolkit.cmd
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./zkServer.
zkServer.cmd  zkServer.sh   
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.5.9/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

啟動(dòng)Kafka:

[root@iZ2ze4m2ri7irkf6h7n8zoZ kafka-2.11]# ls
bin  config  libs  LICENSE  NOTICE  site-docs
[root@iZ2ze4m2ri7irkf6h7n8zoZ kafka-2.11]# cd config/
[root@iZ2ze4m2ri7irkf6h7n8zoZ config]# ls
connect-console-sink.properties    connect-file-source.properties  log4j.properties        zookeeper.properties
connect-console-source.properties  connect-log4j.properties        producer.properties
connect-distributed.properties     connect-standalone.properties   server.properties
connect-file-sink.properties       consumer.properties             tools-log4j.properties
[root@iZ2ze4m2ri7irkf6h7n8zoZ config]# cd ../bin/
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-server-start.sh ../config/server.properties 
[2021-11-20 10:21:10,326] INFO KafkaConfig values: 
......
[2021-11-20 10:21:12,423] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2021-11-20 10:21:12,423] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser)
[2021-11-20 10:21:12,424] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

3.2 簡(jiǎn)單測(cè)試使用

新建和查看topic

[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic ymx
Created topic "ymx".
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-topics.sh --list --zookeeper localhost:2181
ymx

生產(chǎn)者發(fā)送消息:

[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic ymx
>Hello Kafka!
>Hello Ymx!
>Hello Kafka and Ymx!
>

消費(fèi)者消費(fèi)消息:

[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ymx --from-beginning
Hello Kafka!
Hello Ymx!
Hello Kafka and Ymx!

3.3 搭建多代理集群

3.3.1 開(kāi)始搭建

首先要copy下配置文件

[root@iZ2ze4m2ri7irkf6h7n8zoZ config]# cp server.properties server-01.properties 
[root@iZ2ze4m2ri7irkf6h7n8zoZ config]# cp server.properties server-02.properties 
[root@iZ2ze4m2ri7irkf6h7n8zoZ config]# vim server-01.properties 
####  內(nèi)容開(kāi)始  ####
broker.id=1                  #  21行左右,broker的唯一標(biāo)識(shí)(同一個(gè)集群中)
listeners=PLAINTEXT://:9093  #  31行左右,放開(kāi),代表kafka的端口號(hào)
log.dirs=/tmp/kafka-logs-01  #  60行左右,用逗號(hào)分隔的目錄列表,在其中存儲(chǔ)日志文件  
####  內(nèi)容結(jié)束  ####
[root@iZ2ze4m2ri7irkf6h7n8zoZ config]# vim server-02.properties 
####  內(nèi)容開(kāi)始  ####
broker.id=2                  #  21行左右,broker的唯一標(biāo)識(shí)(同一個(gè)集群中)
listeners=PLAINTEXT://:9094  #  31行左右,放開(kāi),代表kafka的端口號(hào)
log.dirs=/tmp/kafka-logs-02  #  60行左右,用逗號(hào)分隔的目錄列表,在其中存儲(chǔ)日志文件  
####  內(nèi)容結(jié)束  ####

根據(jù)配置文件啟動(dòng)Kafka(同一主機(jī)下)

[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-server-start.sh ../config/server-01.properties

報(bào)錯(cuò)信息:

[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-server-start.sh ../config/server-01.properties 
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073741824, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 1073741824 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /usr/local/kafka-2.11/bin/hs_err_pid4036.log

原因:物理機(jī)或虛擬機(jī)內(nèi)存不足,不足以保證Kafka啟動(dòng)或運(yùn)行時(shí)需要的內(nèi)容容量

解決方式

增加物理機(jī)或虛擬機(jī)的內(nèi)存

減少Kafka啟動(dòng)所需內(nèi)容的配置,將要修改的文件為kafka-server-start.sh

export KAFKA_HEAP_OPTS="-Xmx512M -Xms256M" #29行左右
3.3.2 使用

解決好之后我們開(kāi)始啟動(dòng):

[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-server-start.sh ../config/server-01.properties 
[2021-11-20 10:58:33,138] INFO KafkaConfig values:
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-server-start.sh ../config/server-02.properties 
[2021-11-20 10:59:04,187] INFO KafkaConfig values:

ps:看下我們的阿里云服務(wù)器的狀況

java中Kafka如何使用

[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic mr-yan
Created topic "mr-yan".
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic mr-yan
Topic:mr-yan    PartitionCount:1        ReplicationFactor:3     Configs:
        Topic: mr-yan   Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
  • PartitionCount:主題分區(qū)數(shù)。

  • ReplicationFactor:用來(lái)設(shè)置主題的副本數(shù)。

  • leader:是負(fù)責(zé)給定分區(qū)所有讀寫(xiě)操作的節(jié)點(diǎn)。每個(gè)節(jié)點(diǎn)都是隨機(jī)選擇的部分分區(qū)的領(lǐng)導(dǎo)者。

  • replicas:是復(fù)制分區(qū)日志的節(jié)點(diǎn)列表,不管這些節(jié)點(diǎn)是leader還是僅僅活著。

  • isr:是一組“同步”replicas,是replicas列表的子集,它活著并被指到leader。

進(jìn)行集群環(huán)境下的使用:

[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic mr-yan
>Hello Kafkas!  
>Hello Mr.Yan   
>
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic mr-yan
Hello Kafkas!
Hello Mr.Yan
3.3.3 驗(yàn)證容錯(cuò)性

首先我們停掉一個(gè)Kafka的Broker:

[root@iZ2ze4m2ri7irkf6h7n8zoZ ~]# ps -ef|grep server-01.properties
root     19859 28247  1 10:58 pts/3    ../config/server-01.properties
root     23934 16569  0 11:12 pts/11   00:00:00 grep --color=auto server-01.properties
[root@iZ2ze4m2ri7irkf6h7n8zoZ ~]# kill -9 28247
[root@iZ2ze4m2ri7irkf6h7n8zoZ ~]# ps -ef|grep server-01.properties
root     32604 16569  0 11:13 pts/11   00:00:00 grep --color=auto server-01.properties
[root@iZ2ze4m2ri7irkf6h7n8zoZ ~]# cd /usr/local/kafka-2.11/bin/
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic mr-yan
Topic:mr-yan    PartitionCount:1        ReplicationFactor:3     Configs:
        Topic: mr-yan   Partition: 0    Leader: 0       Replicas: 1,0,2 Isr: 0,2

查看生產(chǎn)者和消費(fèi)者的變化,并再次使用,發(fā)現(xiàn)仍可以進(jìn)行使用

[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic mr-yan
>Hello Kafkas!  
>Hello Mr.Yan   
>[2021-11-20 11:12:28,881] WARN [Producer clientId=console-producer] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
>Hello Kafkas too!
>Hello Mr.Yan too!
>
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic mr-yan
Hello Kafkas!
Hello Mr.Yan
[2021-11-20 11:12:28,812] WARN [Consumer clientId=consumer-1, groupId=console-consumer-22158] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-11-20 11:12:29,165] WARN [Consumer clientId=consumer-1, groupId=console-consumer-22158] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Hello Kafkas too!
Hello Mr.Yan too!

4 小總結(jié)

主題,分區(qū),副本的概念

Kafka是根據(jù)主題(topic)進(jìn)行消息的傳遞,但是又有分區(qū)和副本的概念,下面來(lái)分別解釋下:

分區(qū):kafka對(duì)每一條消息的key做一個(gè)hashcode運(yùn)算,然后將得到的數(shù)值對(duì)分區(qū)數(shù)量進(jìn)行模運(yùn)算就得到了這條消息所在分區(qū)的數(shù)字。副本:同一分區(qū)的幾個(gè)副本之間保存的是相同的數(shù)據(jù),副本之間的關(guān)系是“一主多從”,其中的主(leader)則負(fù)責(zé)對(duì)外提供讀寫(xiě)操作的服務(wù),而從(follower)則負(fù)責(zé)與主節(jié)點(diǎn)同步數(shù)據(jù),當(dāng)主節(jié)點(diǎn)宕機(jī),從節(jié)點(diǎn)之間能重新選舉leader進(jìn)行對(duì)外服務(wù)。

kafka會(huì)保證同一個(gè)分區(qū)內(nèi)的消息有序,但是不保證主題內(nèi)的消息有序。

java中Kafka如何使用

關(guān)于“java中Kafka如何使用”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺(jué)得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI