您好,登錄后才能下訂單哦!
[TOC]
? Kafka是一個分布式消息隊列,采用scala語言開發(fā)。Kafka對消息保存時根據(jù)Topic進行歸類,發(fā)送消息者稱為Producer,消息接受者稱為Consumer,此外kafka集群有多個kafka實例組成,每個實例(server)成為broker。無論是kafka集群,還是producer和consumer都依賴于zookeeper集群保存一些meta信息,來保證系統(tǒng)可用性。
(1)點對點模式(類似接受文件,一對一,消費者主動拉取數(shù)據(jù),消息收到后消息清除)點對點模型通常是一個基于拉取或者輪詢的消息傳送模型,這種模型從隊列中請求信息,而不是將消息推送到客戶端。這個模型的特點是發(fā)送到隊列的消息被一個且只有一個接收者接收處理,即使有多個消息監(jiān)聽者也是如此。
(2)發(fā)布/訂閱模式(類似公眾號,一對多,數(shù)據(jù)生產(chǎn)后,推送給所有訂閱者)
發(fā)布訂閱模型則是一個基于推送的消息傳送模型。發(fā)布訂閱模型可以有多種不同的訂閱者,臨時訂閱者只在主動監(jiān)聽主題時才接收消息,而持久訂閱者則監(jiān)聽主題的所有消息,即使當前訂閱者不可用,處于離線狀態(tài)。
1)解耦:
允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
2)冗余:
消息隊列把數(shù)據(jù)進行持久化直到它們已經(jīng)被完全處理,通過這一方式規(guī)避了數(shù)據(jù)丟失風險。許多消息隊列所采用的"插入-獲取-刪除"范式中,在把一個消息從隊列中刪除之前,需要你的處理系統(tǒng)明確的指出該消息已經(jīng)被處理完畢,從而確保你的數(shù)據(jù)被安全的保存直到你使用完畢。
3)擴展性:
因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。
4)靈活性 & 峰值處理能力:
在訪問量劇增的情況下,應(yīng)用仍然需要繼續(xù)發(fā)揮作用,但是這樣的突發(fā)流量并不常見。如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關(guān)鍵組件頂住突發(fā)的訪問壓力,而不會因為突發(fā)的超負荷的請求而完全崩潰。
5)可恢復性:
系統(tǒng)的一部分組件失效時,不會影響到整個系統(tǒng)。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統(tǒng)恢復后被處理。
6)順序保證:
在大多使用場景下,數(shù)據(jù)處理的順序都很重要。大部分消息隊列本來就是排序的,并且能保證數(shù)據(jù)會按照特定的順序來處理。(Kafka保證一個Partition內(nèi)的消息的有序性,無法保證整體有序,觸發(fā)一個topic只有一個partition)
7)緩沖:
有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度,解決生產(chǎn)消息和消費消息的處理速度不一致的情況。
8)異步通信:
很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。
? 消息由生產(chǎn)者發(fā)布到Kafka集群后,會被消費者消費。消息的消費模型有兩種:推送模型(push)和拉取模型(pull)。
? 基于推送模型(push)的消息系統(tǒng),由消息代理記錄消費者的消費狀態(tài)。消息代理在將消息推送到消費者后,標記這條消息為已消費,但這種方式無法很好地保證消息被處理。比如,消息代理把消息發(fā)送出去后,當消費進程掛掉或者由于網(wǎng)絡(luò)原因沒有收到這條消息時,就有可能造成消息丟失(因為消息代理已經(jīng)把這條消息標記為已消費了,但實際上這條消息并沒有被實際處理)。如果要保證消息被處理,消息代理發(fā)送完消息后,要設(shè)置狀態(tài)為“已發(fā)送”,只有收到消費者的確認請求后才更新為“已消費”,這就需要消息代理中記錄所有的消費狀態(tài),這種做法顯然是不可取的。
? Kafka采用拉取模型,由消費者自己記錄消費狀態(tài),每個消費者互相獨立地順序讀取每個分區(qū)的消息。如下圖所示,有兩個消費者(不同消費者組)拉取同一個主題的消息,消費者A的消費進度是3,消費者B的消費進度是6。消費者拉取的最大上限通過最高水位(watermark)控制(也就是只能取到當前topic的最后一條消息),生產(chǎn)者最新寫入的消息如果還沒有達到備份數(shù)量(也就是要保證副本數(shù)寫入完成,從而保證消息不丟失,由此才讓該消息給消費者消費),對消費者是不可見的。這種由消費者控制偏移量的優(yōu)點是:消費者可以按照任意的順序消費消息。比如,消費者可以重置到舊的偏移量,重新處理之前已經(jīng)消費過的消息;或者直接跳到最近的位置,從當前的時刻開始消費。
? 圖1.1 kafka消費模型
? 在一些消息系統(tǒng)中,消息代理會在消息被消費之后立即刪除消息。如果有不同類型的消費者訂閱同一個主題,消息代理可能需要冗余地存儲同一消息;或者等所有消費者都消費完才刪除,這就需要消息代理跟蹤每個消費者的消費狀態(tài),這種設(shè)計很大程度上限制了消息系統(tǒng)的整體吞吐量和處理延遲。Kafka的做法是生產(chǎn)者發(fā)布的所有消息會一致保存在Kafka集群中,不管消息有沒有被消費。用戶可以通過設(shè)置保留時間來清理過期的數(shù)據(jù),比如,設(shè)置保留策略為兩天。那么,在消息發(fā)布之后,它可以被不同的消費者消費,在兩天之后,過期的消息就會自動清理掉。
1)Producer :消息生產(chǎn)者,就是向kafka broker發(fā)消息的客戶端。
2)Consumer :消息消費者,向kafka broker取 消息的客戶端
3)Topic :可以理解為一個隊列。是消息的一個分組
4) Consumer Group (CG):kafka提供的可擴展且具有容錯性的消費者機制。既然是一個組,那么組內(nèi)必然可以有多個消費者或消費者實例(consumer instance),它們共享一個公共的ID,即group ID。組內(nèi)的所有消費者協(xié)調(diào)在一起來消費訂閱主題(subscribed topics)的所有分區(qū)(partition)。當然,每個分區(qū)只能由同一個消費組內(nèi)的一個consumer來消費。但是不同消費者組消費同一個topic是可以的,而且互不影響。消費者可以通過水平擴展的方式同時讀取大量的消息。另外,如果一個消費者失敗了,那么其他的group成員會自動負載均衡讀取之前失敗的消費者讀取的分區(qū)
5)Broker :一臺kafka服務(wù)器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic。
6)Partition:為了實現(xiàn)擴展性,一個非常大的topic可以分布到多個broker(即服務(wù)器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序?qū)⑾l(fā)給consumer,不保證一個topic的整體(多個partition間)的順序。
7)Offset:kafka的存儲文件都是按照offset.kafka來命名,用offset做名字的好處是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。當然the first offset就是00000000000.kafka
一般來說,假設(shè) "test" 這個topic有兩個分區(qū),那么該topic的存儲目錄有兩個,命名為:test-0,test-1 ,然后對應(yīng)分區(qū)的目錄保存對應(yīng)的數(shù)據(jù)
首先kafka依賴zookeeper存儲元信息、且需要jdk來運行程序。所以需要事先部署好這兩個。請看之前的文章。
準備好三臺虛擬機:
bigdata121 | bigdata122 | bigdata123 |
---|---|---|
zookeeper1 | zookeeper2 | zookeeper3 |
kafka1 | kafka | kafka3 |
軟件版本:
jdk | 1.8 |
---|---|
zookeeper | 3.4.10 |
kafka | 2.1.1 |
centos | 7.2.1511 |
bigdata121:
1、解壓:
tar zxf kafka_2.11-2.1.1.tgz -C /opt/modules/
2、創(chuàng)建日志目錄:
mkdir /opt/modules/kafka_2.11-2.1.1/logs
3、修改kafka server配置文件:
vim /opt/modules/kafka_2.11-2.1.1/config/server.properties
#### 修改一些關(guān)鍵性配置
#broker的全局唯一編號,不能重復
broker.id=0
#是否允許刪除topic,測試環(huán)境方便測試設(shè)置為true,生產(chǎn)環(huán)境建議設(shè)置為false
delete.topic.enable=true
#kafka運行日志存放的路徑
log.dirs=/opt/modules/kafka_2.11-2.1.1/logs
#配置連接Zookeeper集群地址,并且/path/to 是指定在zookeeper中存儲的根節(jié)點路徑,比如 /root
zookeeper.connect=bigdata121:2181,bigdata122:2181,bigdata123:2181/path/to
4、配置環(huán)境變量
vim /etc/profile.d/kafka.sh
#!/bin/bash
export KAFKA_HOME=/opt/modules/kafka_2.11-2.1.1
export PATH=$PATH:${KAFKA_HOME}/bin
5、啟用環(huán)境變量
source /etc/profile.d/kafka.sh
配置好后,將kafka的整個目錄rsync到其他兩臺主機的 /opt/modules 下,并修改
/opt/modules/kafka_2.11-2.1.1/config/server.properties 這個配置文件
broker.id=1、broker.id=2
反正就是每個broker的id必須唯一
分別在三臺機器上啟動kafka集群節(jié)點:
kafka-server-start.sh -daemon config/server.properties
-daemon 表示以后臺進程方式啟動kafka服務(wù)
config/server.properties server的配置文件路徑
停止當前節(jié)點:
kafka-server-stop.sh
1)查看當前服務(wù)器中的所有topic
[root@bigdata11 kafka]$ bin/kafka-topics.sh --zookeeper bigdata13:2181 --list
2)創(chuàng)建topic
[root@bigdata11 kafka]$ bin/kafka-topics.sh --zookeeper bigdata13:2181 --create --replication-factor 3 --partitions 1 --topic first
選項說明:
--topic 定義topic名
--replication-factor 定義副本數(shù)
--partitions 定義分區(qū)數(shù)
3)刪除topic
[root@bigdata11 kafka]$ bin/kafka-topics.sh --zookeeper bigdata11:2181 --delete --topic first
需要server.properties中設(shè)置delete.topic.enable=true否則只是標記刪除或者直接重啟。
4)發(fā)送消息
[root@bigdata11 kafka]$ bin/kafka-console-producer.sh --broker-list bigdata11:9092 --topic first
>hello world
5)消費消息
[root@bigdata12 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server node3:9092 --from-beginning --topic first
--from-beginning:會把first主題中以往所有的數(shù)據(jù)都讀取出來。根據(jù)業(yè)務(wù)場景選擇是否增加該配置。
6)查看某個Topic的詳情
[root@bigdata11 kafka]$ bin/kafka-topics.sh --zookeeper bigdata11:2181 --describe --topic first
? producer采用推(push)模式將消息發(fā)布到broker,每條消息都被追加(append)到分區(qū)(patition)中,屬于順序?qū)懘疟P(順序?qū)懘疟P效率比隨機寫內(nèi)存要高,保障kafka吞吐率)
? Kafka集群有多個消息代理服務(wù)器(broker-server)組成,發(fā)布到Kafka集群的每條消息都有一個類別,用主題(topic)來表示。通常,不同應(yīng)用產(chǎn)生不同類型的數(shù)據(jù),可以設(shè)置不同的主題。一個主題一般會有多個消息的訂閱者,當生產(chǎn)者發(fā)布消息到某個主題時,訂閱了這個主題的消費者都可以接收到生成者寫入的新消息。
? Kafka集群為每個主題維護了分布式的分區(qū)(partition)日志文件,物理意義上可以把主題(topic)看作進行了分區(qū)的日志文件(partition log)。主題的每個分區(qū)都是一個有序的、不可變的記錄序列,新的消息會不斷追加到日志中。分區(qū)中的每條消息都會按照時間順序分配到一個單調(diào)遞增的順序編號,叫做偏移量(offset),這個偏移量能夠唯一地定位當前分區(qū)中的每一條消息。
? 消息發(fā)送時都被發(fā)送到一個topic,其本質(zhì)就是一個目錄,而topic是由一些Partition Logs(分區(qū)日志)組成,其組織結(jié)構(gòu)如下圖所示:下圖中的topic有3個分區(qū),每個分區(qū)的偏移量都從0開始,不同分區(qū)之間的偏移量都是獨立的,不會相互影響。
? 圖3.1 kafka寫入方式
? 圖3.2 kafka分區(qū)讀取
? 我們可以看到,每個Partition中的消息都是有序的,生產(chǎn)的消息被不斷追加到Partition log上,其中的每一個消息都被賦予了一個唯一的offset值。
發(fā)布到Kafka主題的每條消息包括鍵值和時間戳。消息到達服務(wù)器端的指定分區(qū)后,都會分配到一個自增的偏移量。原始的消息內(nèi)容和分配的偏移量以及其他一些元數(shù)據(jù)信息最后都會存儲到分區(qū)日志文件中。消息的鍵也可以不用設(shè)置,這種情況下消息會均衡地分布到不同的分區(qū)。
(1)方便在集群中擴展,每個Partition可以通過調(diào)整以適應(yīng)它所在的機器,而一個topic又可以有多個Partition組成,因此整個集群就可以適應(yīng)任意大小的數(shù)據(jù)了;
(2)可以提高并發(fā),因為可以以Partition為單位讀寫了。
傳統(tǒng)消息系統(tǒng)在服務(wù)端保持消息的順序,如果有多個消費者消費同一個消息隊列,服務(wù)端會以消費存儲的順序依次發(fā)送給消費者。但由于消息是異步發(fā)送給消費者的,消息到達消費者的順序可能是無序的,這就意味著在并行消費時,傳統(tǒng)消息系統(tǒng)無法很好地保證消息被順序處理。雖然我們可以設(shè)置一個專用的消費者只消費一個隊列,以此來解決消息順序的問題,但是這就使得消費處理無法真正執(zhí)行。
Kafka比傳統(tǒng)消息系統(tǒng)有更強的順序性保證,它使用主題的分區(qū)作為消息處理的并行單元。Kafka以分區(qū)作為最小的粒度,將每個分區(qū)分配給消費者組中不同的而且是唯一的消費者,并確保一個分區(qū)只屬于一個消費者,即這個消費者就是這個分區(qū)的唯一讀取線程。那么,只要分區(qū)的消息是有序的,消費者處理的消息順序就有保證。每個主題有多個分區(qū),不同的消費者處理不同的分區(qū),所以Kafka不僅保證了消息的有序性,也做到了消費者的負載均衡。
(1)指定了patition,則直接使用;
(2)未指定patition但指定key,通過對key進行hash出一個patition
(3)patition和key都未指定,使用輪詢選出一個patition。
下面看看這個默認的partition實現(xiàn)類的源碼:
DefaultPartitioner類
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
//未指定key,輪詢獲取分區(qū)號
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
//這里就是當指定了key時,對key進行hash來獲取分區(qū)號
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
? 同一個partition可能會有多個replication(對應(yīng) server.properties 配置中的 default.replication.factor=N)。沒有replication的情況下,一旦broker 宕機,其上所有 patition 的數(shù)據(jù)都不可被消費(數(shù)據(jù)直接丟失了),同時producer也不能再將數(shù)據(jù)存于其上的patition。引入replication之后,同一個partition可能會有多個replication,而這時需要在這些replication之間選出一個leader,producer和consumer只與這個leader交互(讀寫操作都只會和leader交互),其它replication作為follower從leader 中復制數(shù)據(jù),不會執(zhí)行其他操作。當leader掛了時,會在follower中選出新的leader。
1) producer先從zookeeper的 "/brokers/.../state"節(jié)點找到該partition的leader
比如完整的路徑如:/brokers/topics/TOPIC_NAME/partitions/NUM_OF_PARTITION/state
2)producer將消息發(fā)送給該leader
3)leader將消息寫入本地log
4)followers從leader pull消息,寫入本地log后向leader發(fā)送ACK
5)leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后commit 的offset)并向producer發(fā)送ACK
物理上把topic分成一個或多個patition(對應(yīng) server.properties 中的num.partitions=3配置,這是默認分區(qū)個數(shù),創(chuàng)建topic時可以手動指定分區(qū)個數(shù)),每個patition物理上對應(yīng)一個文件夾(該文件夾存儲該patition的所有消息和索引文件),如下:
分區(qū)目錄命名方式為 topicName-partiontionNum 的形式
首先,我們創(chuàng)建了first這個topic,有三個partition,0、1、2
[root@bigdata11 logs]$ ll
drwxrwxr-x. 2 root root 4096 8月 6 14:37 first-0
drwxrwxr-x. 2 root root 4096 8月 6 14:35 first-1
drwxrwxr-x. 2 root root 4096 8月 6 14:37 first-2
[root@bigdata11 logs]$ cd first-0
[root@bigdata11 first-0]$ ll
-rw-rw-r--. 1 root root 10485760 8月 6 14:33 00000000000000000000.index 這是索引
-rw-rw-r--. 1 root root 219 8月 6 15:07 00000000000000000000.log 這是分區(qū)日志,也就是存儲消息的地方
-rw-rw-r--. 1 root root 10485756 8月 6 14:33 00000000000000000000.timeindex
-rw-rw-r--. 1 root root 8 8月 6 14:37 leader-epoch-checkpoint
前面說到,無論消息是否被消費,kafka都會保留所有消息,消費者可以根據(jù)需要隨時從需要offset消費數(shù)據(jù)。有兩種策略可以刪除舊數(shù)據(jù):
1)基于時間:log.retention.hours=168,也就是默認刪除7天前的數(shù)據(jù)
2)基于大?。簂og.retention.bytes=1073741824,超過1GB刪除
需要注意的是,因為Kafka讀取特定消息的時間復雜度為O(1)(因為是通過索引直接定位讀取,所以和大小無關(guān)),即與文件大小無關(guān),所以這里刪除過期文件與提高 Kafka 性能無關(guān)。
3.2.3 zookeeper存儲結(jié)構(gòu)
zookeeper存儲了整個kafka集群的一些元信息,比如有哪些broker,哪些topic等。下面看看結(jié)構(gòu):
? 圖3.3 zookeeper存儲結(jié)構(gòu)
其中某些目錄的作用如下:
/brokers/topics/TOPIC_NAME/partitions/PARTITION_NUM/state:
指定topic的指定分區(qū)的元信息,里面存儲了該分區(qū)leader所在broker的id,以及所有副本存儲在哪些broker中。
/brokers/ids/xxxx:
有哪些broker,以及對應(yīng)的id
/consumer:
注冊的consumer的信息,例如消費者組id、消費的topic、消費的offset、消費者組中的哪個消費者消費哪個partition等
要注意的是,只有consumer會在zookeeper注冊,producer不會在zookeeper注冊
kafka支持高級api和低級api進行操作。
1)高級API優(yōu)點
高級API 寫起來簡單
不需要自行去管理offset,系統(tǒng)通過zookeeper自行管理。
不需要管理分區(qū),副本等情況,系統(tǒng)自動管理。
消費者斷線會自動根據(jù)上一次記錄在zookeeper中的offset去接著獲取數(shù)據(jù)(默認設(shè)置1分鐘更新一下zookeeper中存的offset)
可以使用group來區(qū)分對同一個topic 的不同程序訪問分離開來(不同的group記錄不同的offset,這樣不同程序讀取同一個topic才不會因為offset互相影響)
2)高級API缺點
不能自行控制offset(對于某些特殊需求來說)
不能細化控制如分區(qū)、副本、zk等
1)低級 API 優(yōu)點
能夠讓開發(fā)者自己控制offset,想從哪里讀取就從哪里讀取。
自行控制連接分區(qū),對分區(qū)自定義進行負載均衡
對zookeeper的依賴性降低(如:offset不一定非要靠zk存儲,自行存儲offset即可,比如存在文件或者內(nèi)存中)
2)低級API缺點
太過復雜,需要自行控制offset,連接哪個分區(qū),找到分區(qū)leader 等。
? 消費者是以consumer group消費者組的方式工作,由一個或者多個消費者組成一個組,共同消費一個topic。每個分區(qū)在同一時間只能由group中的一個消費者讀取,但是多個group可以同時消費這個partition。某個消費者讀取某個分區(qū),也可以叫做某個消費者是某個分區(qū)的擁有者。
? 在這種情況下,消費者可以通過水平擴展的方式同時讀取大量的消息。另外,如果一個消費者失敗了,那么其他的group成員會自動負載均衡讀取之前失敗的消費者讀取的分區(qū)。
? consumer采用pull(拉)模式從broker中讀取數(shù)據(jù)。
? push(推)模式很難適應(yīng)消費速率不同的消費者,因為消息發(fā)送速率是由broker決定的。它的目標是盡可能以最快速度傳遞消息,但是這樣很容易造成consumer來不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。而pull模式則可以根據(jù)consumer的消費能力以適當?shù)乃俾氏M消息。
? 對于Kafka而言,pull模式更合適,它可簡化broker的設(shè)計,consumer可自主控制消費消息的速率,同時consumer可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現(xiàn)不同的傳輸語義。
? pull模式不足之處是,如果kafka沒有數(shù)據(jù),消費者可能會陷入循環(huán)中,一直等待數(shù)據(jù)到達。為了避免這種情況,我們在我們的拉請求中有參數(shù),允許消費者請求在等待數(shù)據(jù)到達的“長輪詢”中進行阻塞(并且可選地等待到給定的字節(jié)數(shù),以確保大的傳輸大?。?/p>
idea創(chuàng)建maven工程,添加kafka依賴:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.1.1</version>
</dependency>
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class OldProducer {
@SuppressWarnings("deprecation")
public static void main(String[] args) {
Properties properties = new Properties();
//指定broker地址列表
properties.put("metadata.broker.list", "bigdata11:9092");
//指定producer需要broker發(fā)送ack確認收到消息
properties.put("request.required.acks", "1");
//指定序列化類
properties.put("serializer.class", "kafka.serializer.StringEncoder");
//使用上面的配置項創(chuàng)建kafka producer
Producer<Integer, String> producer = new Producer<Integer,String>(new ProducerConfig(properties));
//發(fā)送消息
KeyedMessage<Integer, String> message = new KeyedMessage<Integer, String>("first", "hello world");
producer.send(message );
}
}
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class NewProducer {
public static void main(String[] args) {
Properties props = new Properties();
// Kafka服務(wù)端的主機名和端口號
props.put("bootstrap.servers", "bigdata12:9092");
// 等待所有副本節(jié)點的應(yīng)答
props.put("acks", "all");
// 消息發(fā)送最大嘗試次數(shù)
props.put("retries", 0);
// 一批消息處理大小
props.put("batch.size", 16384);
// 請求延時
props.put("linger.ms", 1);
// 發(fā)送緩存區(qū)內(nèi)存大小
props.put("buffer.memory", 33554432);
// key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 50; i++) {
producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), "hello world-" + i));
}
producer.close();
}
}
package com.king.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class CallBackProducer {
public static void main(String[] args) {
Properties props = new Properties();
// Kafka服務(wù)端的主機名和端口號
props.put("bootstrap.servers", "bigdata12:9092");
// 等待所有副本節(jié)點的應(yīng)答
props.put("acks", "all");
// 消息發(fā)送最大嘗試次數(shù)
props.put("retries", 0);
// 一批消息處理大小
props.put("batch.size", 16384);
// 增加服務(wù)端請求延時
props.put("linger.ms", 1);
// 發(fā)送緩存區(qū)內(nèi)存大小
props.put("buffer.memory", 33554432);
// key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
for (int i = 0; i < 50; i++) {
kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i), new Callback() {
//重寫里面的回到方法
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (metadata != null) {
System.out.println(metadata.partition() + "---" + metadata.offset());
}
}
});
}
kafkaProducer.close();
}
}
舊api:
import java.util.Map;
import kafka.producer.Partitioner;
public class CustomPartitioner implements Partitioner {
public CustomPartitioner() {
super();
}
@Override
public int partition(Object key, int numPartitions) {
// 控制分區(qū)
return 0;
}
}
新api:
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
public class CustomPartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 控制分區(qū)
return 0;
}
@Override
public void close() {
}
}
實現(xiàn)好自定義的分區(qū)類之后,需要在創(chuàng)建producer的配置項添加指定自定義分區(qū)類的配置:
properties.put("partitioner.class", "自定義的分區(qū)類名,需要全類名");
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class CustomConsumer {
@SuppressWarnings("deprecation")
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("zookeeper.connect", "bigdata11:2181");
properties.put("group.id", "g1");
properties.put("zookeeper.session.timeout.ms", "500");
properties.put("zookeeper.sync.time.ms", "250");
properties.put("auto.commit.interval.ms", "1000");
// 創(chuàng)建消費者連接器
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
//需要自己維護offset
HashMap<String, Integer> topicCount = new HashMap<>();
topicCount.put("first", 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCount);
KafkaStream<byte[], byte[]> stream = consumerMap.get("first").get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
System.out.println(new String(it.next().message()));
}
}
}
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class CustomNewConsumer {
public static void main(String[] args) {
Properties props = new Properties();
// 定義kakfa 服務(wù)的地址,不需要將所有broker指定上
props.put("bootstrap.servers", "bigdata11:9092");
// 制定consumer group
props.put("group.id", "test");
// 是否自動確認offset
props.put("enable.auto.commit", "true");
// 自動確認offset的時間間隔
props.put("auto.commit.interval.ms", "1000");
// key的序列化類
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// value的序列化類
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 定義consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 消費者訂閱的topic, 可同時訂閱多個
consumer.subscribe(Arrays.asList("first", "second","third"));
while (true) {
// 讀取數(shù)據(jù),讀取超時時間為100ms
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
? Producer攔截器(interceptor)是在Kafka 0.10版本被引入的,主要用于實現(xiàn)clients端的定制化控制邏輯。對于producer而言,interceptor使得用戶在消息發(fā)送前以及producer回調(diào)邏輯前有機會對消息做一些定制化需求,比如修改消息等。同時,producer允許用戶指定多個interceptor按序作用于同一條消息從而形成一個攔截鏈(interceptor chain)。Intercetpor的實現(xiàn)接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定義的方法包括:
(1)configure(configs)
獲取配置信息和初始化數(shù)據(jù)時調(diào)用。
(2)onSend(ProducerRecord):
該方法封裝進KafkaProducer.send方法中,即它運行在用戶主線程中。Producer確保在消息被序列化以及計算分區(qū)前調(diào)用該方法。用戶可以在該方法中對消息做任何操作,但最好保證不要修改消息所屬的topic和分區(qū),否則會影響目標分區(qū)的計算
(3)onAcknowledgement(RecordMetadata, Exception):
該方法會在消息被應(yīng)答或消息發(fā)送失敗時調(diào)用,并且通常都是在producer回調(diào)邏輯觸發(fā)之前。onAcknowledgement運行在producer的IO線程中,因此不要在該方法中放入很重的邏輯,否則會拖慢producer的消息發(fā)送效率
(4)close:
關(guān)閉interceptor,主要用于執(zhí)行一些資源清理工作
如前所述,interceptor可能被運行在多個線程中,因此在具體實現(xiàn)時用戶需要自行確保線程安全。另外倘若指定了多個interceptor,則producer將按照指定順序調(diào)用它們,并僅僅是捕獲每個interceptor可能拋出的異常記錄到錯誤日志中而非在向上傳遞。這在使用過程中要特別留意。
需求:
實現(xiàn)一個簡單的雙interceptor組成的攔截鏈。第一個interceptor會在消息發(fā)送前將時間戳信息加到消息value的最前部;第二個interceptor會在消息發(fā)送后更新成功發(fā)送消息數(shù)或失敗發(fā)送消息數(shù)。
程序:
(1)實現(xiàn)時間攔截器:
package com.king.kafka.interceptor;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class TimeInterceptor implements ProducerInterceptor<String, String> {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// 創(chuàng)建一個新的record,把時間戳寫入消息體的最前部
return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
System.currentTimeMillis() + "," + record.value().toString());
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
}
(2)統(tǒng)計發(fā)送消息成功和發(fā)送失敗消息數(shù),并在producer關(guān)閉時打印這兩個計數(shù)器
package com.king.kafka.interceptor;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class CounterInterceptor implements ProducerInterceptor<String, String>{
private int errorCounter = 0;
private int successCounter = 0;
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// 統(tǒng)計成功和失敗的次數(shù)
if (exception == null) {
successCounter++;
} else {
errorCounter++;
}
}
@Override
public void close() {
// 保存結(jié)果
System.out.println("Successful sent: " + successCounter);
System.out.println("Failed sent: " + errorCounter);
}
}
(3)producer主程序
package com.king.kafka.interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
public class InterceptorProducer {
public static void main(String[] args) throws Exception {
// 1 設(shè)置配置信息
Properties props = new Properties();
props.put("bootstrap.servers", "bigdata11:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2 構(gòu)建攔截鏈
List<String> interceptors = new ArrayList<>();
interceptors.add("com.king.kafka.interceptor.TimeInterceptor"); interceptors.add("com.king.kafka.interceptor.CounterInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
String topic = "first";
Producer<String, String> producer = new KafkaProducer<>(props);
// 3 發(fā)送消息
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message" + i);
producer.send(record);
}
// 4 一定要關(guān)閉producer,這樣才會調(diào)用interceptor的close方法
producer.close();
}
}
(4)測試
(1)在kafka上啟動消費者,然后運行客戶端java程序。
[root@bigdata11 kafka]$ bin/kafka-console-consumer.sh --zookeeper bigdata11:2181 --from-beginning --topic first
1501904047034,message0
1501904047225,message1
1501904047230,message2
1501904047234,message3
1501904047236,message4
1501904047240,message5
1501904047243,message6
1501904047246,message7
1501904047249,message8
1501904047252,message9
(2)觀察java平臺控制臺輸出數(shù)據(jù)如下:
Successful sent: 10
Failed sent: 0
? Kafka Streams。Apache Kafka開源項目的一個組成部分。是一個功能強大,易于使用的庫。用于在Kafka上構(gòu)建高可分布式、拓展性,容錯的應(yīng)用程序。有如下特點:
1)功能強大
? 高擴展性,彈性,容錯
2)輕量級
? 無需專門的集群
? 一個庫,而不是框架
3)完全集成
? 100%的Kafka 0.10.0版本兼容
? 易于集成到現(xiàn)有的應(yīng)用程序
4)實時性
? 毫秒級延遲
? 并非微批處理
? 窗口允許亂序數(shù)據(jù)
? 允許遲到數(shù)據(jù)
? 當前已經(jīng)有非常多的流式處理系統(tǒng),最知名且應(yīng)用最多的開源流式處理系統(tǒng)有Spark Streaming和Apache Storm。Apache Storm發(fā)展多年,應(yīng)用廣泛,提供記錄級別的處理能力,當前也支持SQL on Stream。而Spark Streaming基于Apache Spark,可以非常方便與圖計算,SQL處理等集成,功能強大,對于熟悉其它Spark應(yīng)用開發(fā)的用戶而言使用門檻低。另外,目前主流的Hadoop發(fā)行版,如Cloudera和Hortonworks,都集成了Apache Storm和Apache Spark,使得部署更容易。
? 既然Apache Spark與Apache Storm擁用如此多的優(yōu)勢,那為何還需要Kafka Stream呢?主要有如下原因。
? 第一,Spark和Storm都是流式處理框架,而Kafka Stream提供的是一個基于Kafka的流式處理類庫。框架要求開發(fā)者按照特定的方式去開發(fā)邏輯部分,供框架調(diào)用。開發(fā)者很難了解框架的具體運行方式,從而使得調(diào)試成本高,并且使用受限。而Kafka Stream作為流式處理類庫,直接提供具體的類給開發(fā)者調(diào)用,整個應(yīng)用的運行方式主要由開發(fā)者控制,方便使用和調(diào)試。
? 第二,雖然Cloudera與Hortonworks方便了Storm和Spark的部署,但是這些框架的部署仍然相對復雜。而Kafka Stream作為類庫,可以非常方便的嵌入應(yīng)用程序中,它對應(yīng)用的打包和部署基本沒有任何要求。
? 第三,就流式處理系統(tǒng)而言,基本都支持Kafka作為數(shù)據(jù)源。例如Storm具有專門的kafka-spout,而Spark也提供專門的spark-streaming-kafka模塊。事實上,Kafka基本上是主流的流式處理系統(tǒng)的標準數(shù)據(jù)源。換言之,大部分流式系統(tǒng)中都已部署了Kafka,此時使用Kafka Stream的成本非常低。
? 第四,使用Storm或Spark Streaming時,需要為框架本身的進程預留資源,如Storm的supervisor和Spark on YARN的node manager。即使對于應(yīng)用實例而言,框架本身也會占用部分資源,如Spark Streaming需要為shuffle和storage預留內(nèi)存。但是Kafka作為類庫不占用系統(tǒng)資源。
? 第五,由于Kafka本身提供數(shù)據(jù)持久化,因此Kafka Stream提供滾動部署和滾動升級以及重新計算的能力。
? 第六,由于Kafka Consumer Rebalance機制,Kafka Stream可以在線動態(tài)調(diào)整并行度。
(1)需求
實時處理單詞帶有”>>>”前綴的內(nèi)容。例如輸入”test>>>ximenqing”,最終處理成“ximenqing”
(2)代碼程序:
業(yè)務(wù)處理類:
package com.king.kafka.stream;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
//實現(xiàn) Processor 接口,用于實現(xiàn)具體業(yè)務(wù)邏輯
public class LogProcessor implements Processor<byte[], byte[]> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
//這里是具體業(yè)務(wù)邏輯
@Override
public void process(byte[] key, byte[] value) {
String input = new String(value);
// 如果包含“>>>”則只保留該標記后面的內(nèi)容
if (input.contains(">>>")) {
input = input.split(">>>")[1].trim();
// 輸出到下一個topic
context.forward("logProcessor".getBytes(), input.getBytes());
}else{
context.forward("logProcessor".getBytes(), input.getBytes());
}
}
@Override
public void punctuate(long timestamp) {
}
@Override
public void close() {
}
}
主類入口:
package com.king.kafka.stream;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
public class Application {
public static void main(String[] args) {
// 定義輸入的topic
String from = "first";
// 定義輸出的topic
String to = "second";
// 設(shè)置參數(shù)
Properties settings = new Properties();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bigdata11:9092");
StreamsConfig config = new StreamsConfig(settings);
// 構(gòu)建拓撲
TopologyBuilder builder = new TopologyBuilder();
//創(chuàng)建一個builder,指定source ,processor ,sink。并給它們起別名。
//這里的parentName實際上是指定上一層是什么的名字
builder.addSource("SOURCE", from)
.addProcessor("PROCESS", new ProcessorSupplier<byte[], byte[]>() {
@Override
public Processor<byte[], byte[]> get() {
// 具體分析處理
return new LogProcessor();
}
}, "SOURCE")
.addSink("SINK", to, "PROCESS");
//構(gòu)建處理任務(wù),包括配置以及任務(wù)詳情
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
}
}
(3)測試
運行程序,然后在命令行下分別啟動producer和consumer,看情況:
在bigdata13上啟動生產(chǎn)者
[root@bigdata13 kafka]$ bin/kafka-console-producer.sh --broker-list bigdata11:9092 --topic first
>hello>>>world
>h>>>itstar
>hahaha
(6)在bigdata12上啟動消費者
[root@bigdata12 kafka]$ bin/kafka-console-consumer.sh --zookeeper bigdata11:2181 --from-beginning --topic second
world
itstar
hahaha
可以看到消費處理的數(shù)據(jù)是符合預期的。
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。