您好,登錄后才能下訂單哦!
kafka集群是把狀態(tài)保存在zookeeper中的,首先要搭建zookeeper集群。
wget http://xxxxx.oss-cn-xxxx.aliyuncs.com/xxxx/jdk-8u171-linux-x64.rpm
yum localinstall jdk-8u171-linux-x64.rpm -y
wget http://xxx-xx.oss-cn-xxx.aliyuncs.com/xxx/kafka_2.12-1.1.0.tgz
官網下載鏈接:http://kafka.apache.org/downloads
解壓kafka
tar -zxvf kafka_2.12-1.1.0.tgz
mv kafka_2.12-1.1.0 kafka
直接使用kafka自帶的zookeeper建立zk集群
cd /data/kafka
vim conf/zookeeper.properties
#tickTime:
這個時間是作為 Zookeeper 服務器之間或客戶端與服務器之間維持心跳的時間間隔,也就是每個 tickTime 時間就會發(fā)送一個心跳。
#initLimit:
這個配置項是用來配置 Zookeeper 接受客戶端(這里所說的客戶端不是用戶連接 Zookeeper 服務器的客戶端,而是 Zookeeper 服務器集群中連接到 Leader 的 Follower 服務器)初始化連接時最長能忍受多少個心跳時間間隔數。當已經超過 5個心跳的時間(也就是 tickTime)長度后 Zookeeper 服務器還沒有收到客戶端的返回信息,那么表明這個客戶端連接失敗??偟臅r間長度就是 5*2000=10 秒
#syncLimit:
這個配置項標識 Leader 與Follower 之間發(fā)送消息,請求和應答時間長度,最長不能超過多少個 tickTime 的時間長度,總的時間長度就是5*2000=10秒
#dataDir:
快照日志的存儲路徑
#dataLogDir:需手動創(chuàng)建
事物日志的存儲路徑,如果不配置這個那么事物日志會默認存儲到dataDir制定的目錄,這樣會嚴重影響zk的性能,當zk吞吐量較大的時候,產生的事物日志、快照日志太多
#clientPort:
這個端口就是客戶端連接 Zookeeper 服務器的端口,Zookeeper 會監(jiān)聽這個端口,接受客戶端的訪問請求。
進入dataDir目錄,將三臺服務器上的myid文件分別寫入1、2、3。
myid是zk集群用來發(fā)現彼此的標識,必須創(chuàng)建,且不能相同。
echo "1" > /data/kafka/zk/myid
echo "2" > /data/kafka/zk/myid
echo "3" > /data/kafka/zk/myid
zookeeper不會主動的清除舊的快照和日志文件,需要定期清理。
#!/bin/bash
#snapshot file dir
dataDir=/data/kafka/zk/version-2
#tran log dir
dataLogDir=/data/kafka/log/zk/version-2
#Leave 66 files
count=66
count=$[$count+1]
ls -t $dataLogDir/log.* | tail -n +$count | xargs rm -f
ls -t $dataDir/snapshot.* | tail -n +$count | xargs rm -f
#以上這個腳本定義了刪除對應兩個目錄中的文件,保留最新的66個文件,可以將他寫到crontab中,設置為每天凌晨2點執(zhí)行一次就可以了。
進入kafka目錄,執(zhí)行zookeeper命令
cd /data/kafka
nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties > logs/zookeeper.log 2>&1 &
沒有報錯,而且jps查看有zk進程就說明啟動成功了。
vim conf/server.properties
部分參數含義:
先每臺設置host,listeners里要設置,否則后面消費消息會報錯。
broker.id 每臺都不能相同
num.network.threads 設置為cpu核數
num.partitions 分區(qū)數設置視情況而定,上面有講分區(qū)數設置
default.replication.factor kafka保存消息的副本數,如果一個副本失效了,另一個還可以繼續(xù)提供服務
nohup ./bin/kafka-server-start.sh config/server.properties > logs/kafka.log 2>&1 &
執(zhí)行jps檢查
./bin/kafka-topics.sh --create --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --replication-factor 2 --partitions 1 --topic test1
--replication-factor 2 #復制兩份
--partitions 1 #創(chuàng)建1個分區(qū)
--topic #主題為test1
#模擬客戶端去發(fā)送消息,生產者
./bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic test1
#模擬客戶端去接受消息,消費者
./bin/kafka-console-consumer.sh --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --from-beginning --topic test1
#然后在生產者處輸入任意內容,在消費端查看內容。
./bin/kafka-topics.sh --list --zookeeper xxxx:2181
#顯示創(chuàng)建的所有topic
./bin/kafka-topics.sh --describe --zookeeper xxxx:2181 --topic test1
#Topic:ssports PartitionCount:1 ReplicationFactor:2 Configs:
# Topic: test1 Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1
#分區(qū)為為1 復制因子為2 他的 test1的分區(qū)為0
#Replicas: 0,1 復制的為0,1
修改配置文件server.properties添加如下配置:
delete.topic.enable=true
配置完重啟kafka、zookeeper。
如果不想修改配置文件可刪除topc及相關數據目錄
#刪除kafka topic
./bin/kafka-topics.sh --delete --zookeeper xxxx:2181,xxxx:2181 --topic test1
#刪除kafka相關數據目錄
rm -rf /data/kafka/log/kafka/test*
#刪除zookeeper相關路徑
rm -rf /data/kafka/zk/test*
rm -rf /data/kafka/log/zk/test*
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。