您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)java中Kafka如何使用,小編覺(jué)得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。
官方文檔:http://kafka.apache.org/
中文文檔:https://kafka.apachecn.org/
Apache Kafka是分布式發(fā)布-訂閱消息系統(tǒng)。
Apache Kafka與傳統(tǒng)消息系統(tǒng)相比,有以下不同:
它被設(shè)計(jì)為一個(gè)分布式系統(tǒng),易于向外擴(kuò)展;
它同時(shí)為發(fā)布和訂閱提供高吞吐量;
它支持多訂閱者,當(dāng)失敗時(shí)能自動(dòng)平衡消費(fèi)者;
它將消息持久化到磁盤(pán),因此可用于批量消費(fèi),例如ETL,以及實(shí)時(shí)應(yīng)用程序。
首先是一些概念:
Kafka作為一個(gè)集群,運(yùn)行在一臺(tái)或者多臺(tái)服務(wù)器上.Kafka 通過(guò) topic 對(duì)存儲(chǔ)的流數(shù)據(jù)進(jìn)行分類(lèi)。每條記錄中包含一個(gè)key,一個(gè)value和一個(gè)timestamp(時(shí)間戳)。
Kafka有四個(gè)核心的API:
The Producer API 允許一個(gè)應(yīng)用程序發(fā)布一串流式的數(shù)據(jù)到一個(gè)或者多個(gè)Kafka topic。
The Consumer API 允許一個(gè)應(yīng)用程序訂閱一個(gè)或多個(gè) topic ,并且對(duì)發(fā)布給他們的流式數(shù)據(jù)進(jìn)行處理。
The Streams API 允許一個(gè)應(yīng)用程序作為一個(gè)流處理器,消費(fèi)一個(gè)或者多個(gè)topic產(chǎn)生的輸入流,然后生產(chǎn)一個(gè)輸出流到一個(gè)或多個(gè)topic中去,在輸入輸出流中進(jìn)行有效的轉(zhuǎn)換。
The Connector API 允許構(gòu)建并運(yùn)行可重用的生產(chǎn)者或者消費(fèi)者,將Kafka topics連接到已存在的應(yīng)用程序或者數(shù)據(jù)系統(tǒng)。比如,連接到一個(gè)關(guān)系型數(shù)據(jù)庫(kù),捕捉表(table)的所有變更內(nèi)容。
支持的語(yǔ)言(除了Java之外的):
常見(jiàn)概念:
1 Topics和日志
讓我們首先深入了解下Kafka的核心概念:提供一串流式的記錄— topic
Topic 就是數(shù)據(jù)主題,是數(shù)據(jù)記錄發(fā)布的地方,可以用來(lái)區(qū)分業(yè)務(wù)系統(tǒng)。Kafka中的Topics總是多訂閱者模式,一個(gè)topic可以擁有一個(gè)或者多個(gè)消費(fèi)者來(lái)訂閱它的數(shù)據(jù)。
對(duì)于每一個(gè)topic, Kafka集群都會(huì)維持一個(gè)分區(qū)日志,如下所示:
每個(gè)分區(qū)都是有序且順序不可變的記錄集,并且不斷地追加到結(jié)構(gòu)化的commit log文件。分區(qū)中的每一個(gè)記錄都會(huì)分配一個(gè)id號(hào)來(lái)表示順序,我們稱之為offset,offset用來(lái)唯一的標(biāo)識(shí)分區(qū)中每一條記錄。
Kafka 集群保留所有發(fā)布的記錄—無(wú)論他們是否已被消費(fèi)—并通過(guò)一個(gè)可配置的參數(shù)——保留期限來(lái)控制. 舉個(gè)例子, 如果保留策略設(shè)置為2天,一條記錄發(fā)布后兩天內(nèi),可以隨時(shí)被消費(fèi),兩天過(guò)后這條記錄會(huì)被拋棄并釋放磁盤(pán)空間。Kafka的性能和數(shù)據(jù)大小無(wú)關(guān)
,所以長(zhǎng)時(shí)間存儲(chǔ)數(shù)據(jù)沒(méi)有什么問(wèn)題.
日志中的 partition(分區(qū))有以下幾個(gè)用途。第一,當(dāng)日志大小超過(guò)了單臺(tái)服務(wù)器的限制,允許日志進(jìn)行擴(kuò)展。每個(gè)單獨(dú)的分區(qū)都必須受限于主機(jī)的文件限制,不過(guò)一個(gè)主題可能有多個(gè)分區(qū),因此可以處理無(wú)限量的數(shù)據(jù)。第二,可以作為并行的單元集—關(guān)于這一點(diǎn),更多細(xì)節(jié)如下
2 分布式
日志的分區(qū)partition (分布)在Kafka集群的服務(wù)器上。每個(gè)服務(wù)器在處理數(shù)據(jù)和請(qǐng)求時(shí),共享這些分區(qū)。每一個(gè)分區(qū)都會(huì)在已配置的服務(wù)器上進(jìn)行備份,確保容錯(cuò)性.
每個(gè)分區(qū)都有一臺(tái) server 作為 “l(fā)eader”,零臺(tái)或者多臺(tái)server作為 follwers 。leader server 處理一切對(duì) partition (分區(qū))的讀寫(xiě)請(qǐng)求,而follwers只需被動(dòng)的同步leader上的數(shù)據(jù)。當(dāng)leader宕機(jī)了,followers 中的一臺(tái)服務(wù)器會(huì)自動(dòng)成為新的 leader。每臺(tái) server 都會(huì)成為某些分區(qū)的 leader 和某些分區(qū)的 follower,因此集群的負(fù)載是平衡的。
3 生產(chǎn)者
生產(chǎn)者可以將數(shù)據(jù)發(fā)布到所選擇的topic中。生產(chǎn)者負(fù)責(zé)將記錄分配到topic的哪一個(gè) partition(分區(qū))中??梢允褂醚h(huán)的方式來(lái)簡(jiǎn)單地實(shí)現(xiàn)負(fù)載均衡,也可以根據(jù)某些語(yǔ)義分區(qū)函數(shù)(例如:記錄中的key)來(lái)完成。下面會(huì)介紹更多關(guān)于分區(qū)的使用。
4 消費(fèi)者
消費(fèi)者使用一個(gè) 消費(fèi)組 名稱來(lái)進(jìn)行標(biāo)識(shí),發(fā)布到topic中的每條記錄被分配給訂閱消費(fèi)組中的一個(gè)消費(fèi)者實(shí)例.消費(fèi)者實(shí)例可以分布在多個(gè)進(jìn)程中或者多個(gè)機(jī)器上。
如果所有的消費(fèi)者實(shí)例在同一消費(fèi)組中,消息記錄會(huì)負(fù)載平衡到每一個(gè)消費(fèi)者實(shí)例.
如果所有的消費(fèi)者實(shí)例在不同的消費(fèi)組中,每條消息記錄會(huì)廣播到所有的消費(fèi)者進(jìn)程.
如圖,這個(gè) Kafka 集群有兩臺(tái) server 的,四個(gè)分區(qū)(p0-p3)和兩個(gè)消費(fèi)者組。消費(fèi)組A有兩個(gè)消費(fèi)者,消費(fèi)組B有四個(gè)消費(fèi)者。
通常情況下,每個(gè) topic 都會(huì)有一些消費(fèi)組,一個(gè)消費(fèi)組對(duì)應(yīng)一個(gè)"邏輯訂閱者"。一個(gè)消費(fèi)組由許多消費(fèi)者實(shí)例組成,便于擴(kuò)展和容錯(cuò)。這就是發(fā)布和訂閱的概念,只不過(guò)訂閱者是一組消費(fèi)者而不是單個(gè)的進(jìn)程。
在Kafka中實(shí)現(xiàn)消費(fèi)的方式是將日志中的分區(qū)劃分到每一個(gè)消費(fèi)者實(shí)例上,以便在任何時(shí)間,每個(gè)實(shí)例都是分區(qū)唯一的消費(fèi)者。維護(hù)消費(fèi)組中的消費(fèi)關(guān)系由Kafka協(xié)議動(dòng)態(tài)處理。如果新的實(shí)例加入組,他們將從組中其他成員處接管一些 partition 分區(qū);如果一個(gè)實(shí)例消失,擁有的分區(qū)將被分發(fā)到剩余的實(shí)例。
Kafka 只保證分區(qū)內(nèi)的記錄是有序的,而不保證主題中不同分區(qū)的順序。每個(gè) partition 分區(qū)按照key值排序足以滿足大多數(shù)應(yīng)用程序的需求。但如果你需要總記錄在所有記錄的上面,可使用僅有一個(gè)分區(qū)的主題來(lái)實(shí)現(xiàn),這意味著每個(gè)消費(fèi)者組只有一個(gè)消費(fèi)者進(jìn)程。
保證
high-level Kafka給予以下保證:
生產(chǎn)者發(fā)送到特定topic partition 的消息將按照發(fā)送的順序處理。 也就是說(shuō),如果記錄M1和記錄M2由相同的生產(chǎn)者發(fā)送,并先發(fā)送M1記錄,那么M1的偏移比M2小,并在日志中較早出現(xiàn)一個(gè)消費(fèi)者實(shí)例按照日志中的順序查看記錄.對(duì)于具有N個(gè)副本的主題,我們最多容忍N(yùn)-1個(gè)服務(wù)器故障,從而保證不會(huì)丟失任何提交到日志中的記錄.
關(guān)于保證的更多細(xì)節(jié)可以看文檔的設(shè)計(jì)部分。
Kafka依賴于Zookeeper,而Zookeeper又依賴于Java,因此在使用Kafka之前要安裝jdk1.8的環(huán)境和啟動(dòng)zookeeper服務(wù)器。
下載或安裝地址:
JDK1.8://kemok4.com/article/229780.htm:
http://kemok4.com/article/229783.htm:
https://kafka.apachecn.org/downloads.html
好,下面我們開(kāi)始進(jìn)行安裝
[root@iZ2ze4m2ri7irkf6h7n8zoZ local]# tar -zxf kafka_2.11-1.0.0.tgz [root@iZ2ze4m2ri7irkf6h7n8zoZ local]# mv kafka_2.11-1.0.0 kafka-2.11
首先檢查下自己的jdk 是否安裝:
[root@iZ2ze4m2ri7irkf6h7n8zoZ local]# java -version java version "1.8.0_144" Java(TM) SE Runtime Environment (build 1.8.0_144-b01) Java HotSpot(TM) 64-Bit Server VM (build 25.144-b01, mixed mode)
啟動(dòng)Zookeeper:
[root@iZ2ze4m2ri7irkf6h7n8zoZ zookeeper-3.5.9]# ls bin conf docs lib LICENSE.txt NOTICE.txt README.md README_packaging.txt [root@iZ2ze4m2ri7irkf6h7n8zoZ zookeeper-3.5.9]# cd conf/ [root@iZ2ze4m2ri7irkf6h7n8zoZ conf]# ls configuration.xsl log4j.properties zoo_sample.cfg [root@iZ2ze4m2ri7irkf6h7n8zoZ conf]# cp zoo_sample.cfg zoo.cfg [root@iZ2ze4m2ri7irkf6h7n8zoZ conf]# cd ../bin/ [root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ls README.txt zkCli.cmd zkEnv.cmd zkServer.cmd zkServer.sh zkTxnLogToolkit.sh zkCleanup.sh zkCli.sh zkEnv.sh zkServer-initialize.sh zkTxnLogToolkit.cmd [root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./zkServer. zkServer.cmd zkServer.sh [root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./zkServer.sh start ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper-3.5.9/bin/../conf/zoo.cfg Starting zookeeper ... STARTED
啟動(dòng)Kafka:
[root@iZ2ze4m2ri7irkf6h7n8zoZ kafka-2.11]# ls bin config libs LICENSE NOTICE site-docs [root@iZ2ze4m2ri7irkf6h7n8zoZ kafka-2.11]# cd config/ [root@iZ2ze4m2ri7irkf6h7n8zoZ config]# ls connect-console-sink.properties connect-file-source.properties log4j.properties zookeeper.properties connect-console-source.properties connect-log4j.properties producer.properties connect-distributed.properties connect-standalone.properties server.properties connect-file-sink.properties consumer.properties tools-log4j.properties [root@iZ2ze4m2ri7irkf6h7n8zoZ config]# cd ../bin/ [root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-server-start.sh ../config/server.properties [2021-11-20 10:21:10,326] INFO KafkaConfig values: ...... [2021-11-20 10:21:12,423] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser) [2021-11-20 10:21:12,423] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser) [2021-11-20 10:21:12,424] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
新建和查看topic
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic ymx Created topic "ymx". [root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-topics.sh --list --zookeeper localhost:2181 ymx
生產(chǎn)者發(fā)送消息:
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic ymx >Hello Kafka! >Hello Ymx! >Hello Kafka and Ymx! >
消費(fèi)者消費(fèi)消息:
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ymx --from-beginning Hello Kafka! Hello Ymx! Hello Kafka and Ymx!
首先要copy下配置文件
[root@iZ2ze4m2ri7irkf6h7n8zoZ config]# cp server.properties server-01.properties [root@iZ2ze4m2ri7irkf6h7n8zoZ config]# cp server.properties server-02.properties [root@iZ2ze4m2ri7irkf6h7n8zoZ config]# vim server-01.properties #### 內(nèi)容開(kāi)始 #### broker.id=1 # 21行左右,broker的唯一標(biāo)識(shí)(同一個(gè)集群中) listeners=PLAINTEXT://:9093 # 31行左右,放開(kāi),代表kafka的端口號(hào) log.dirs=/tmp/kafka-logs-01 # 60行左右,用逗號(hào)分隔的目錄列表,在其中存儲(chǔ)日志文件 #### 內(nèi)容結(jié)束 #### [root@iZ2ze4m2ri7irkf6h7n8zoZ config]# vim server-02.properties #### 內(nèi)容開(kāi)始 #### broker.id=2 # 21行左右,broker的唯一標(biāo)識(shí)(同一個(gè)集群中) listeners=PLAINTEXT://:9094 # 31行左右,放開(kāi),代表kafka的端口號(hào) log.dirs=/tmp/kafka-logs-02 # 60行左右,用逗號(hào)分隔的目錄列表,在其中存儲(chǔ)日志文件 #### 內(nèi)容結(jié)束 ####
根據(jù)配置文件啟動(dòng)Kafka(同一主機(jī)下)
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-server-start.sh ../config/server-01.properties
報(bào)錯(cuò)信息:
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-server-start.sh ../config/server-01.properties Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073741824, 0) failed; error='Cannot allocate memory' (errno=12) # # There is insufficient memory for the Java Runtime Environment to continue. # Native memory allocation (mmap) failed to map 1073741824 bytes for committing reserved memory. # An error report file with more information is saved as: # /usr/local/kafka-2.11/bin/hs_err_pid4036.log
原因
:物理機(jī)或虛擬機(jī)內(nèi)存不足,不足以保證Kafka啟動(dòng)或運(yùn)行時(shí)需要的內(nèi)容容量
解決方式:
增加物理機(jī)或虛擬機(jī)的內(nèi)存
減少Kafka啟動(dòng)所需內(nèi)容的配置,將要修改的文件為kafka-server-start.sh
export KAFKA_HEAP_OPTS="-Xmx512M -Xms256M" #29行左右
解決好之后我們開(kāi)始啟動(dòng):
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-server-start.sh ../config/server-01.properties [2021-11-20 10:58:33,138] INFO KafkaConfig values:
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-server-start.sh ../config/server-02.properties [2021-11-20 10:59:04,187] INFO KafkaConfig values:
ps:看下我們的阿里云服務(wù)器的狀況
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic mr-yan Created topic "mr-yan". [root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic mr-yan Topic:mr-yan PartitionCount:1 ReplicationFactor:3 Configs: Topic: mr-yan Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
PartitionCount:主題分區(qū)數(shù)。
ReplicationFactor:用來(lái)設(shè)置主題的副本數(shù)。
leader:是負(fù)責(zé)給定分區(qū)所有讀寫(xiě)操作的節(jié)點(diǎn)。每個(gè)節(jié)點(diǎn)都是隨機(jī)選擇的部分分區(qū)的領(lǐng)導(dǎo)者。
replicas:是復(fù)制分區(qū)日志的節(jié)點(diǎn)列表,不管這些節(jié)點(diǎn)是leader還是僅僅活著。
isr:是一組“同步”replicas,是replicas列表的子集,它活著并被指到leader。
進(jìn)行集群環(huán)境下的使用:
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic mr-yan >Hello Kafkas! >Hello Mr.Yan >
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic mr-yan Hello Kafkas! Hello Mr.Yan
首先我們停掉一個(gè)Kafka的Broker:
[root@iZ2ze4m2ri7irkf6h7n8zoZ ~]# ps -ef|grep server-01.properties root 19859 28247 1 10:58 pts/3 ../config/server-01.properties root 23934 16569 0 11:12 pts/11 00:00:00 grep --color=auto server-01.properties [root@iZ2ze4m2ri7irkf6h7n8zoZ ~]# kill -9 28247 [root@iZ2ze4m2ri7irkf6h7n8zoZ ~]# ps -ef|grep server-01.properties root 32604 16569 0 11:13 pts/11 00:00:00 grep --color=auto server-01.properties [root@iZ2ze4m2ri7irkf6h7n8zoZ ~]# cd /usr/local/kafka-2.11/bin/ [root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic mr-yan Topic:mr-yan PartitionCount:1 ReplicationFactor:3 Configs: Topic: mr-yan Partition: 0 Leader: 0 Replicas: 1,0,2 Isr: 0,2
查看生產(chǎn)者和消費(fèi)者的變化,并再次使用,發(fā)現(xiàn)仍可以進(jìn)行使用
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic mr-yan >Hello Kafkas! >Hello Mr.Yan >[2021-11-20 11:12:28,881] WARN [Producer clientId=console-producer] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) >Hello Kafkas too! >Hello Mr.Yan too! >
[root@iZ2ze4m2ri7irkf6h7n8zoZ bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic mr-yan Hello Kafkas! Hello Mr.Yan [2021-11-20 11:12:28,812] WARN [Consumer clientId=consumer-1, groupId=console-consumer-22158] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2021-11-20 11:12:29,165] WARN [Consumer clientId=consumer-1, groupId=console-consumer-22158] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) Hello Kafkas too! Hello Mr.Yan too!
主題,分區(qū),副本的概念
Kafka是根據(jù)主題(topic)進(jìn)行消息的傳遞,但是又有分區(qū)和副本的概念,下面來(lái)分別解釋下:
分區(qū):kafka對(duì)每一條消息的key做一個(gè)hashcode運(yùn)算,然后將得到的數(shù)值對(duì)分區(qū)數(shù)量進(jìn)行模運(yùn)算就得到了這條消息所在分區(qū)的數(shù)字。副本:同一分區(qū)的幾個(gè)副本之間保存的是相同的數(shù)據(jù),副本之間的關(guān)系是“一主多從”,其中的主(leader)則負(fù)責(zé)對(duì)外提供讀寫(xiě)操作的服務(wù),而從(follower)則負(fù)責(zé)與主節(jié)點(diǎn)同步數(shù)據(jù),當(dāng)主節(jié)點(diǎn)宕機(jī),從節(jié)點(diǎn)之間能重新選舉leader進(jìn)行對(duì)外服務(wù)。
kafka會(huì)保證同一個(gè)分區(qū)內(nèi)的消息有序,但是不保證主題內(nèi)的消息有序。
關(guān)于“java中Kafka如何使用”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺(jué)得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。