溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

kafka分布式集群

發(fā)布時(shí)間:2020-07-19 07:00:21 來(lái)源:網(wǎng)絡(luò) 閱讀:1992 作者:蔣將將 欄目:建站服務(wù)器

一、簡(jiǎn)介

1、消息傳輸流程

Kafka is a distributed,partitioned,replicated commit logservice。它提供了類似于JMS的特性,但是在設(shè)計(jì)實(shí)現(xiàn)上完全不同,此外它并不是JMS規(guī)范的實(shí)現(xiàn)。kafka對(duì)消息保存時(shí)根據(jù)Topic進(jìn)行歸類,發(fā)送消息者成為Producer,消息接受者成為Consumer,此外kafka集群有多個(gè)kafka實(shí)例組成,每個(gè)實(shí)例(server)成為broker。無(wú)論是kafka集群,還是producer和consumer都依賴于zookeeper來(lái)保證系統(tǒng)可用性集群保存一些meta信息。

kafka分布式集群kafka分布式集群

Producer即生產(chǎn)者,向Kafka集群發(fā)送消息,在發(fā)送消息之前,會(huì)對(duì)消息進(jìn)行分類,即Topic,上圖展示了兩個(gè)producer發(fā)送了分類為topic1的消息,另外一個(gè)發(fā)送了topic2的消息。

Topic即主題,通過(guò)對(duì)消息指定主題可以將消息分類,消費(fèi)者可以只關(guān)注自己需要的Topic中的消息

Consumer即消費(fèi)者,消費(fèi)者通過(guò)與kafka集群建立長(zhǎng)連接的方式,不斷地從集群中拉取消息,然后可以對(duì)這些消息進(jìn)行處理。

2、Topics/logs

一個(gè)Topic可以認(rèn)為是一類消息,每個(gè)topic將被分成多個(gè)partition(區(qū)),每個(gè)partition在存儲(chǔ)層面是append log文件。任何發(fā)布到此partition的消息都會(huì)被直接追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量),offset為一個(gè)long型數(shù)字,它是唯一標(biāo)記一條消息。它唯一的標(biāo)記一條消息。kafka并沒(méi)有提供其他額外的索引機(jī)制來(lái)存儲(chǔ)offset,因?yàn)樵趉afka中幾乎不允許對(duì)消息進(jìn)行“隨機(jī)讀寫(xiě)”。

kafka分布式集群kafka分布式集群

談到kafka的存儲(chǔ),就不得不提到分區(qū),即partitions,創(chuàng)建一個(gè)topic時(shí),同時(shí)可以指定分區(qū)數(shù)目,分區(qū)數(shù)越多,其吞吐量也越大,但是需要的資源也越多,同時(shí)也會(huì)導(dǎo)致更高的不可用性,kafka在接收到生產(chǎn)者發(fā)送的消息之后,會(huì)根據(jù)均衡策略將消息存儲(chǔ)到不同的分區(qū)中。

kafka服務(wù)器消息存儲(chǔ)策略如圖

kafka分布式集群kafka分布式集群

kafka和JMS(Java Message Service)實(shí)現(xiàn)(activeMQ)不同的是:即使消息被消費(fèi),消息仍然不會(huì)被立即刪除.日志文件將會(huì)根據(jù)broker中的配置要求,保留一定的時(shí)間之后刪除;比如log文件保留2天,那么兩天后,文件會(huì)被清除,無(wú)論其中的消息是否被消費(fèi).kafka通過(guò)這種簡(jiǎn)單的手段,來(lái)釋放磁盤(pán)空間,以及減少消息消費(fèi)之后對(duì)文件內(nèi)容改動(dòng)的磁盤(pán)IO開(kāi)支.

對(duì)于consumer而言,它需要保存消費(fèi)消息的offset,對(duì)于offset的保存和使用,有consumer來(lái)控制;當(dāng)consumer正常消費(fèi)消息時(shí),offset將會(huì)"線性"的向前驅(qū)動(dòng),即消息將依次順序被消費(fèi).事實(shí)上consumer可以使用任意順序消費(fèi)消息,它只需要將offset重置為任意值..(offset將會(huì)保存在zookeeper中,參見(jiàn)下文)

kafka集群幾乎不需要維護(hù)任何consumer和producer狀態(tài)信息,這些信息有zookeeper保存;因此producer和consumer的客戶端實(shí)現(xiàn)非常輕量級(jí),它們可以隨意離開(kāi),而不會(huì)對(duì)集群造成額外的影響.

partitions的設(shè)計(jì)目的有多個(gè).最根本原因是kafka基于文件存儲(chǔ).通過(guò)分區(qū),可以將日志內(nèi)容分散到多個(gè)server上,來(lái)避免文件尺寸達(dá)到單機(jī)磁盤(pán)的上限,每個(gè)partiton都會(huì)被當(dāng)前server(kafka實(shí)例)保存;可以將一個(gè)topic切分多任意多個(gè)partitions,來(lái)消息保存/消費(fèi)的效率.此外越多的partitions意味著可以容納更多的consumer,有效提升并發(fā)消費(fèi)的能力.(具體原理參見(jiàn)下文).

3、Distribution(分布)

一個(gè)Topic的多個(gè)partitions,被分布在kafka集群中的多個(gè)server上;每個(gè)server(kafka實(shí)例)負(fù)責(zé)partitions中消息的讀寫(xiě)操作;此外kafka還可以配置partitions需要備份的個(gè)數(shù)(replicas),每個(gè)partition將會(huì)被備份到多臺(tái)機(jī)器上,以提高可用性.

基于replicated方案,那么就意味著需要對(duì)多個(gè)備份進(jìn)行調(diào)度;每個(gè)partition都有一個(gè)server為"leader";leader負(fù)責(zé)所有的讀寫(xiě)操作,如果leader失效,那么將會(huì)有其他follower來(lái)接管(成為新的leader);follower只是單調(diào)的和leader跟進(jìn),同步消息即可..由此可見(jiàn)作為leader的server承載了全部的請(qǐng)求壓力,因此從集群的整體考慮,有多少個(gè)partitions就意味著有多少個(gè)"leader",kafka會(huì)將"leader"均衡的分散在每個(gè)實(shí)例上,來(lái)確保整體的性能穩(wěn)定.

Producers

Producer將消息發(fā)布到指定的Topic中,同時(shí)Producer也能決定將此消息歸屬于哪個(gè)partition;比如基于"round-robin"方式或者通過(guò)其他的一些算法等.

Consumers

本質(zhì)上kafka只支持Topic.每個(gè)consumer屬于一個(gè)consumer group;反過(guò)來(lái)說(shuō),每個(gè)group中可以有多個(gè)consumer.發(fā)送到Topic的消息,只會(huì)被訂閱此Topic的每個(gè)group中的一個(gè)consumer消費(fèi).

如果所有的consumer都具有相同的group,這種情況和queue模式很像;消息將會(huì)在consumers之間負(fù)載均衡.

如果所有的consumer都具有不同的group,那這就是"發(fā)布-訂閱";消息將會(huì)廣播給所有的消費(fèi)者.

在kafka中,一個(gè)partition中的消息只會(huì)被group中的一個(gè)consumer消費(fèi);每個(gè)group中consumer消息消費(fèi)互相獨(dú)立;我們可以認(rèn)為一個(gè)group是一個(gè)"訂閱"者,一個(gè)Topic中的每個(gè)partions,只會(huì)被一個(gè)"訂閱者"中的一個(gè)consumer消費(fèi),不過(guò)一個(gè)consumer可以消費(fèi)多個(gè)partitions中的消息.kafka只能保證一個(gè)partition中的消息被某個(gè)consumer消費(fèi)時(shí),消息是順序的.事實(shí)上,從Topic角度來(lái)說(shuō),消息仍不是有序的.

kafka的設(shè)計(jì)原理決定,對(duì)于一個(gè)topic,同一個(gè)group中不能有多于partitions個(gè)數(shù)的consumer同時(shí)消費(fèi),否則將意味著某些consumer將無(wú)法得到消息.

Guarantees

1) 發(fā)送到partitions中的消息將會(huì)按照它接收的順序追加到日志中

2) 對(duì)于消費(fèi)者而言,它們消費(fèi)消息的順序和日志中消息順序一致.

3) 如果Topic的"replicationfactor"為N,那么允許N-1個(gè)kafka實(shí)例失效。

與生產(chǎn)者的交互

kafka分布式集群kafka分布式集群

生產(chǎn)者在向kafka集群發(fā)送消息的時(shí)候,可以通過(guò)指定分區(qū)來(lái)發(fā)送到指定的分區(qū)中,也可以通過(guò)指定均衡策略來(lái)將消息發(fā)送到不同的分區(qū)中,如果不指定,就會(huì)采用默認(rèn)的隨機(jī)均衡策略,將消息隨機(jī)的存儲(chǔ)到不同的分區(qū)中

與消費(fèi)者的交互

kafka分布式集群kafka分布式集群

在消費(fèi)者消費(fèi)消息時(shí),kafka使用offset來(lái)記錄當(dāng)前消費(fèi)的位置,在kafka的設(shè)計(jì)中,可以有多個(gè)不同的group來(lái)同時(shí)消費(fèi)同一個(gè)topic下的消息,如圖,我們有兩個(gè)不同的group同時(shí)消費(fèi),他們的的消費(fèi)的記錄位置offset各不項(xiàng)目,不互相干擾。

  對(duì)于一個(gè)group而言,消費(fèi)者的數(shù)量不應(yīng)該多余分區(qū)的數(shù)量,因?yàn)樵谝粋€(gè)group中,每個(gè)分區(qū)至多只能綁定到一個(gè)消費(fèi)者上,即一個(gè)消費(fèi)者可以消費(fèi)多個(gè)分區(qū),一個(gè)分區(qū)只能給一個(gè)消費(fèi)者消費(fèi)

  因此,若一個(gè)group中的消費(fèi)者數(shù)量大于分區(qū)數(shù)量的話,多余的消費(fèi)者將不會(huì)收到任何消息。

 

二、使用場(chǎng)景

1、Messaging

對(duì)于一些常規(guī)的消息系統(tǒng),kafka是個(gè)不錯(cuò)的選擇;partitons/replication和容錯(cuò),可以使kafka具有良好的擴(kuò)展性和性能優(yōu)勢(shì).不過(guò)到目前為止,我們應(yīng)該很清楚認(rèn)識(shí)到,kafka并沒(méi)有提供JMS中的"事務(wù)性""消息傳輸擔(dān)保(消息確認(rèn)機(jī)制)""消息分組"等企業(yè)級(jí)特性;kafka只能使用作為"常規(guī)"的消息系統(tǒng),在一定程度上,尚未確保消息的發(fā)送與接收絕對(duì)可靠(比如,消息重發(fā),消息發(fā)送丟失等)

2、Websit activity tracking

kafka可以作為"網(wǎng)站活性跟蹤"的最佳工具;可以將網(wǎng)頁(yè)/用戶操作等信息發(fā)送到kafka中.并實(shí)時(shí)監(jiān)控,或者離線統(tǒng)計(jì)分析等

3、Log Aggregation

kafka的特性決定它非常適合作為"日志收集中心";application可以將操作日志"批量""異步"的發(fā)送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/壓縮消息等,這對(duì)producer端而言,幾乎感覺(jué)不到性能的開(kāi)支.此時(shí)consumer端可以使hadoop等其他系統(tǒng)化的存儲(chǔ)和分析系統(tǒng).

三、設(shè)計(jì)原理

kafka的設(shè)計(jì)初衷是希望作為一個(gè)統(tǒng)一的信息收集平臺(tái),能夠?qū)崟r(shí)的收集反饋信息,并需要能夠支撐較大的數(shù)據(jù)量,且具備良好的容錯(cuò)能力。

1、持久性

2、性能

3、生產(chǎn)者

4、消費(fèi)者

5、消息傳送機(jī)制

6、復(fù)制備份

7、日志

8、分配

四、主要配置

1、Broker配置

kafka分布式集群kafka分布式集群

2、Consumer主要配置

kafka分布式集群kafka分布式集群

3、Producer主要配置

kafka分布式集群kafka分布式集群

五、kafka集群搭建步驟

1、系統(tǒng)環(huán)境

主機(jī)名

系統(tǒng)

zookeeper版本

IP

master

CentOS7.4

3.4.12

192.168.56.129

slave1

CentOS7.4

3.4.12

192.168.56.130

slave2

CentOS7.4

3.4.12

192.168.56.131

2、暫時(shí)關(guān)閉防火墻和selinux

3、軟件下載

下載地址:http://kafka.apache.org/downloads.html

備注:下載最新的二進(jìn)制tgz包

kafka分布式集群kafka分布式集群

kafka分布式集群kafka分布式集群

kafka分布式集群kafka分布式集群

4、搭建zookeeper集群

備注:小伙伴可以參考上一篇文章即可

5、kafka集群

5.1、根據(jù)上面的zookeeper集群服務(wù)器,把kafka上傳到/home下

5.2、解壓

[root@master home]# tar -zxvf kafka_2.12-2.0.0.tgz

[root@master home]# mv kafka_2.12-2.0.0 kafka01

5.3、配置文件

[root@master home]# cd /home/kafka01/config/

備注:server.properties文件里的broker.id,log.dirs,zookeeper.connect必須根據(jù)實(shí)際情況進(jìn)行修改,其他項(xiàng)根據(jù)需要自行斟酌,master配置如下:

broker.id=1  

port=9091

num.network.threads=2

num.io.threads=2

socket.send.buffer.bytes=1048576

socket.receive.buffer.bytes=1048576

socket.request.max.bytes=104857600

 log.dirs=/var/log/kafka/kafka-logs

num.partitions=2

log.flush.interval.messages=10000

log.flush.interval.ms=1000

log.retention.hours=168

#log.retention.bytes=1073741824

log.segment.bytes=536870912

num.replica.fetchers=2

log.cleanup.interval.mins=10

zookeeper.connect=192.168.56.129:2181,192.168.56.130:2181,192.168.56.131:2181

zookeeper.connection.timeout.ms=1000000

kafka.metrics.polling.interval.secs=5

kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter

kafka.csv.metrics.dir=/tmp/kafka_metrics

kafka.csv.metrics.reporter.enabled=false

5.4、啟動(dòng)服務(wù)(master)----前提是三個(gè)節(jié)點(diǎn)的zookeeper已啟動(dòng)

[root@master kafka01]# ./bin/kafka-server-start.sh config/server.properties &

kafka分布式集群kafka分布式集群

補(bǔ)充:

問(wèn)題:&可以使程序在后臺(tái)運(yùn)行,但一旦斷開(kāi)ssh終端,后臺(tái)Java程序也會(huì)終止。

解決辦法:使用shell腳本啟動(dòng)

[root@master kafka01]# cat start.sh

#!/bin/bash

cd /home/kafka01/

./bin/kafka-server-start.sh config/server.properties &

exit

授權(quán),運(yùn)行即可

[root@master kafka01]#chmod +x start.sh

5.5、配置slave1和slave2

slave1配置如下:

broker.id=2

port=9092

log.dirs=/var/log/kafka

zookeeper.connect=192.168.56.129:2181,192.168.56.130:2181,192.168.56.131:2181

啟動(dòng)即可

kafka分布式集群kafka分布式集群

kafka分布式集群kafka分布式集群


slave2配置如下:

broker.id=3

port=9093

log.dirs=/var/log/kafka

zookeeper.connect=192.168.56.129:2181,192.168.56.130:2181,192.168.56.131:2181

啟動(dòng)即可

kafka分布式集群kafka分布式集群

6、測(cè)試

Kafka通過(guò)topic對(duì)同一類的數(shù)據(jù)進(jìn)行管理,同一類的數(shù)據(jù)使用同一個(gè)topic可以在處理數(shù)據(jù)時(shí)更加的便捷

6.1、創(chuàng)建一個(gè)Topic

[root@master kafka01]# bin/kafka-topics.sh --create --zookeeper 192.168.56.129:2181 --replication-factor 1 --partitions 1 --topic test

查看

[root@master kafka01]# bin/kafka-topics.sh --list --zookeeper 192.168.56.129:2181

kafka分布式集群kafka分布式集群

6.2、創(chuàng)建一個(gè)消息消費(fèi)者

[root@master kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.129:9091 --topic test --from-beginning

kafka分布式集群kafka分布式集群

消費(fèi)者創(chuàng)建完成之后,因?yàn)檫€沒(méi)有發(fā)送任何數(shù)據(jù),因此這里在執(zhí)行后沒(méi)有打印出任何數(shù)據(jù)

         不過(guò)別著急,不要關(guān)閉這個(gè)終端,打開(kāi)一個(gè)新的終端,接下來(lái)我們創(chuàng)建第一個(gè)消息生產(chǎn)者

6.3、創(chuàng)建一個(gè)消息生產(chǎn)者

在kafka解壓目錄打開(kāi)一個(gè)新的終端,輸入

[root@master kafka01]# bin/kafka-console-producer.sh --broker-list 192.168.56.129:9091 --topic test

kafka分布式集群kafka分布式集群

在發(fā)送完消息之后,可以回到我們的消息消費(fèi)者終端中,可以看到,終端中已經(jīng)打印出了我們剛才發(fā)送的消息

kafka分布式集群kafka分布式集群

zookeeper查看topic

kafka分布式集群kafka分布式集群

到此即可,共同進(jìn)步之路?。。。。?/p>


向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI