您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“Kafka的設計原理介紹”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!
什么是消息隊列?簡單來說,消息隊列是存放消息的容器??蛻舳丝梢詫⑾l(fā)送到消息服務器,也可以從消息服務器獲取消息。
今天,我將圍繞如下幾個問題進行分享:
為什么需要消息系統(tǒng)?
Kafka 架構(gòu)原理?
Kafka 如何存儲消息?
Producer 如何發(fā)送消息?
Consumer 如何消費消息?
Offset 如何保存?
消息系統(tǒng)可能遇到哪些問題?
為什么需要消息系統(tǒng)?
削峰
數(shù)據(jù)庫的處理能力是有限的,在峰值期,過多的請求落到后臺,一旦超過系統(tǒng)的處理能力,可能會使系統(tǒng)掛掉。
如上圖所示,系統(tǒng)的處理能力是 2k/s,MQ 處理能力是 8k/s,峰值請求 5k/s,MQ 的處理能力遠遠大于數(shù)據(jù)庫,在高峰期,請求可以先積壓在 MQ 中,系統(tǒng)可以根據(jù)自身的處理能力以 2k/s 的速度消費這些請求。
這樣等高峰期一過,請求可能只有 100/s,系統(tǒng)可以很快的消費掉積壓在 MQ 中的請求。
注意,上面的請求指的是寫請求,查詢請求一般通過緩存解決。
解耦
如下場景,S 系統(tǒng)與 A、B、C 系統(tǒng)緊密耦合。由于需求變動,A 系統(tǒng)修改了相關(guān)代碼,S 系統(tǒng)也需要調(diào)整 A 相關(guān)的代碼。
過幾天,C 系統(tǒng)需要刪除,S 緊跟著刪除 C 相關(guān)代碼;又過了幾天,需要新增 D 系統(tǒng),S 系統(tǒng)又要添加與 D 相關(guān)的代碼;再過幾天,程序猿瘋了...
這樣各個系統(tǒng)緊密耦合,不利于維護,也不利于擴展?,F(xiàn)在引入 MQ,A 系統(tǒng)變動,A 自己修改自己的代碼即可;C 系統(tǒng)刪除,直接取消訂閱;D 系統(tǒng)新增,訂閱相關(guān)消息即可。
這樣通過引入消息中間件,使各個系統(tǒng)都與 MQ 交互,從而避免它們之間的錯綜復雜的調(diào)用關(guān)系。
Kafka 架構(gòu)原理?
Kafka 相關(guān)概念:
Broker:Kafka 集群中包含的服務器。
Producer:消息生產(chǎn)者。
Consumer:消息消費者。
Consumer Group:每個 Consumer 都屬于一個 Consumer Group,每條消息只能被 Consumer Group 中的一個 Consumer 消費,但可以被多個 Consumer Group 消費。
Topic:消息的類別。每條消息都屬于某個 Topic,不同的 Topic 之間是相互獨立的,即 Kafka 是面向 Topic 的。
Partition:每個 Topic 分為多個 Partition,Partition 是 Kafka 分配的單位。Kafka 物理上的概念,相當于一個目錄,目錄下的日志文件構(gòu)成這個 Partition。
Replica:Partition 的副本,保障 Partition 的高可用。
Leader:Replica 中的一個角色, Producer 和 Consumer 只跟 Leader 交互。
Follower:Replica 中的一個角色,從 Leader 中復制數(shù)據(jù)。
Controller:Kafka 集群中的其中一個服務器,用來進行 Leader Election 以及各種 Failover。
Zookeeper:Kafka 通過 Zookeeper 來存儲集群的 Meta 信息。
Topic and Logs
Message 是按照 Topic 來組織的,每個 Topic 可以分成多個 Partition(對應 server.properties/num.partitions)。
Partition 是一個順序的追加日志,屬于順序?qū)懘疟P(順序?qū)懘疟P效率比隨機寫內(nèi)存要高,保障 Kafka 吞吐率)。
其結(jié)構(gòu)如下:server.properties/num.partitions 表示文件 server.properties 中的 num.partitions 配置項,下同。
Partition 中的每條記錄(Message)包含三個屬性:Offset,messageSize 和 Data。
其中 Offset 表示消息偏移量;messageSize 表示消息的大小;Data 表示消息的具體內(nèi)容。
Partition 是以文件的形式存儲在文件系統(tǒng)中,位置由 server.properties/log.dirs 指定,其命名規(guī)則為
比如,Topic 為"page_visits"的消息,分為 5 個 Partition,其目錄結(jié)構(gòu)為:
Partition 可能位于不同的 Broker 上,Partition 是分段的,每個段是一個 Segment 文件。
Segment的常用配置有:
#server.properties #segment文件的大小,默認為 1G log.segment.bytes=1024*1024*1024 #滾動生成新的segment文件的最大時長 log.roll.hours=24*7 #segment文件保留的最大時長,超時將被刪除 log.retention.hours=24*7
Partition 目錄下包括了數(shù)據(jù)文件和索引文件,下圖是某個 Partition 的目錄結(jié)構(gòu):
Index 采用稀疏存儲的方式,它不會為每一條 Message 都建立索引,而是每隔一定的字節(jié)數(shù)建立一條索引,避免索引文件占用過多的空間。
缺點是沒有建立索引的 Offset 不能一次定位到 Message 的位置,需要做一次順序掃描,但是掃描的范圍很小。
索引包含兩個部分(均為 4 個字節(jié)的數(shù)字),分別為相對 Offset 和 Position。
相對 Offset 表示 Segment 文件中的 Offset,Position 表示 Message 在數(shù)據(jù)文件中的位置。
總結(jié):Kafka 的 Message 存儲采用了分區(qū)(Partition),磁盤順序讀寫,分段(LogSegment)和稀疏索引這幾個手段來達到高效性。
Partition and Replica
一個 Topic 物理上分為多個 Partition,位于不同的 Broker 上。如果沒有 Replica,一旦 Broker 宕機,其上所有的 Patition 將不可用。
每個 Partition 可以有多個Replica(對應server.properties/default.replication.factor),分配到不同的 Broker 上。
其中有一個 Leader 負責讀寫,處理來自 Producer 和 Consumer 的請求;其他作為 Follower 從 Leader Pull 消息,保持與 Leader 的同步。
如何分配 Partition 和 Replica 到 Broker 上?步驟如下:
將所有 Broker(假設共 n 個 Broker)和待分配的 Partition 排序。
將第 i 個 Partition 分配到第(i mod n)個 Broker 上。
將第 i 個 Partition 的第 j 個 Replica 分配到第((i + j) mode n)個 Broker 上。
根據(jù)上面的分配規(guī)則,若 Replica 的數(shù)量大于 Broker 的數(shù)量,必定會有兩個相同的 Replica 分配到同一個 Broker 上,產(chǎn)生冗余。因此 Replica 的數(shù)量應該小于或等于 Broker 的數(shù)量。
Leader 選舉
Kafka 在 Zookeeper 中(/brokers/topics/[topic]/partitions/[partition]/state)動態(tài)維護了一個 ISR(in-sync replicas)。
ISR 里面的所有 Replica 都"跟上"了 Leader,Controller 將會從 ISR 里選一個做 Leader。
具體流程如下:
Controller 在 Zookeeper 的 /brokers/ids/[brokerId] 節(jié)點注冊 Watcher,當 Broker 宕機時 Zookeeper 會 Fire Watch。
Controller 從 /brokers/ids 節(jié)點讀取可用 Broker。
Controller 決定 set_p,該集合包含宕機 Broker 上的所有 Partition。
對 set_p 中的每一個 Partition,從/brokers/topics/[topic]/partitions/[partition]/state 節(jié)點讀取 ISR,決定新 Leader,將新 Leader、ISR、controller_epoch 和 leader_epoch 等信息寫入 State 節(jié)點。
通過 RPC 向相關(guān) Broker 發(fā)送 leaderAndISRRequest 命令。
當 ISR 為空時,會選一個 Replica(不一定是 ISR 成員)作為 Leader;當所有的 Replica 都歇菜了,會等任意一個 Replica 復活,將其作為 Leader。
ISR(同步列表)中的 Follower 都"跟上"了Leader,"跟上"并不表示完全一致,它由 server.properties/replica.lag.time.max.ms 配置。
表示 Leader 等待 Follower 同步消息的最大時間,如果超時,Leader 將 Follower 移除 ISR。配置項 replica.lag.max.messages 已經(jīng)移除。
Replica 同步
Kafka 通過"拉模式"同步消息,即 Follower 從 Leader 批量拉取數(shù)據(jù)來同步。
具體的可靠性,是由生產(chǎn)者(根據(jù)配置項 producer.properties/acks)來決定的。
In Kafka 0.9,request.required.acks=-1 which configration of producer is replaced by acks=all, but this old config is remained in docs.
在 0.9 版本,生產(chǎn)者配置項 request.required.acks=-1 被 acks=all 取代,但是老的配置項還保留在文檔中。
PS:最新的文檔 2.2.x request.required.acks 已經(jīng)不存在了。
在 Acks=-1 的時候,如果 ISR 少于 min.insync.replicas 指定的數(shù)目,將會拋出 NotEnoughReplicas 或 NotEnoughReplicasAfterAppend 異常。
Producer 如何發(fā)送消息?
Producer 首先將消息封裝進一個 ProducerRecord 實例中。
消息路由:
發(fā)送消息時如果指定了 Partition,則直接使用。
如果指定了 Key,則對 Key 進行哈希,選出一個 Partition。這個 Hash(即分區(qū)機制)由 producer.properties/partitioner.class 指定的類實現(xiàn),這個路由類需要實現(xiàn) Partitioner 接口。
如果都未指定,通過 Round-Robin 來選 Partition。
消息并不會立即發(fā)送,而是先進行序列化后,發(fā)送給 Partitioner,也就是上面提到的 Hash 函數(shù),由 Partitioner 確定目標分區(qū)后,發(fā)送到一塊內(nèi)存緩沖區(qū)中(發(fā)送隊列)。
Producer 的另一個工作線程(即 Sender 線程),則負責實時地從該緩沖區(qū)中提取出準備好的消息封裝到一個批次內(nèi),統(tǒng)一發(fā)送到對應的 Broker 中。
其過程大致是這樣的:
圖片來自 123archu
Consumer 如何消費消息?
每個 Consumer 都劃歸到一個邏輯 Consumer Group 中,一個 Partition 只能被同一個 Consumer Group 中的一個 Consumer 消費,但可以被不同的 Consumer Group 消費。
若 Topic 的 Partition 數(shù)量為 p,Consumer Group 中訂閱此 Topic 的 Consumer 數(shù)量為 c, 則:
p < c: 會有 c - p 個 consumer閑置,造成浪費 p > c: 一個 consumer 對應多個 partition p = c: 一個 consumer 對應一個 partition
應該合理分配 Consumer 和 Partition 的數(shù)量,避免造成資源傾斜,最好 Partiton 數(shù)目是 Consumer 數(shù)目的整數(shù)倍。
①如何將 Partition 分配給 Consumer
生產(chǎn)過程中 Broker 要分配 Partition,消費過程這里,也要分配 Partition 給消費者。
類似 Broker 中選了一個 Controller 出來,消費也要從 Broker 中選一個 Coordinator,用于分配 Partition。
當 Partition 或 Consumer 數(shù)量發(fā)生變化時,比如增加 Consumer,減少 Consumer(主動或被動),增加 Partition,都會進行 Rebalance。
其過程如下:
Consumer 給 Coordinator 發(fā)送 JoinGroupRequest 請求。這時其他 Consumer 發(fā) Heartbeat 請求過來時,Coordinator 會告訴他們,要 Rebalance了。其他 Consumer 也發(fā)送 JoinGroupRequest 請求。
Coordinator 在 Consumer 中選出一個 Leader,其他作為 Follower,通知給各個 Consumer,對于 Leader,還會把 Follower 的 Metadata 帶給它。
Consumer Leader 根據(jù) Consumer Metadata 重新分配 Partition。
Consumer 向 Coordinator 發(fā)送 SyncGroupRequest,其中 Leader 的 SyncGroupRequest 會包含分配的情況。Coordinator 回包,把分配的情況告訴 Consumer,包括 Leader。
②Consumer Fetch Message
Consumer 采用"拉模式"消費消息,這樣 Consumer 可以自行決定消費的行為。
Consumer 調(diào)用 Poll(duration)從服務器拉取消息。拉取消息的具體行為由下面的配置項決定:
#consumer.properties #消費者最多 poll 多少個 record max.poll.records=500 #消費者 poll 時 partition 返回的最大數(shù)據(jù)量 max.partition.fetch.bytes=1048576 #Consumer 最大 poll 間隔 #超過此值服務器會認為此 consumer failed #并將此 consumer 踢出對應的 consumer group max.poll.interval.ms=300000
在 Partition 中,每個消息都有一個 Offset。新消息會被寫到 Partition 末尾(最新的一個 Segment 文件末尾), 每個 Partition 上的消息是順序消費的,不同的 Partition 之間消息的消費順序是不確定的。
若一個 Consumer 消費多個 Partition, 則各個 Partition 之前消費順序是不確定的,但在每個 Partition 上是順序消費。
若來自不同 Consumer Group 的多個 Consumer 消費同一個 Partition,則各個 Consumer 之間的消費互不影響,每個 Consumer 都會有自己的 Offset。
Consumer A 和 Consumer B 屬于不同的 Consumer Group。Cosumer A 讀取到 Offset=9, Consumer B 讀取到 Offset=11,這個值表示下次讀取的位置。
也就是說 Consumer A 已經(jīng)讀取了 Offset 為 0~8 的消息,Consumer B 已經(jīng)讀取了 Offset 為 0~10 的消息。
下次從 Offset=9 開始讀取的 Consumer 并不一定還是 Consumer A 因為可能發(fā)生 Rebalance。
Offset 如何保存?
Consumer 消費 Partition 時,需要保存 Offset 記錄當前消費位置。
Offset 可以選擇自動提交或調(diào)用 Consumer 的 commitSync() 或 commitAsync() 手動提交,相關(guān)配置為:
#是否自動提交 offset enable.auto.commit=true #自動提交間隔。enable.auto.commit=true 時有效 auto.commit.interval.ms=5000
Offset 保存在名叫 __consumeroffsets 的 Topic 中。寫消息的 Key 由 GroupId、Topic、Partition 組成,Value 是 Offset。
一般情況下,每個 Key 的 Offset 都是緩存在內(nèi)存中,查詢的時候不用遍歷 Partition,如果沒有緩存,第一次就會遍歷 Partition 建立緩存,然后查詢返回。
__consumeroffsets 的 Partition 數(shù)量由下面的 Server 配置決定:
offsets.topic.num.partitions=50
Offset 保存在哪個分區(qū)上,即 __consumeroffsets 的分區(qū)機制,可以表示為:
groupId.hashCode() mode groupMetadataTopicPartitionCount
groupMetadataTopicPartitionCount 是上面配置的分區(qū)數(shù)。因為一個 Partition 只能被同一個 Consumer Group 的一個 Consumer 消費,因此可以用 GroupId 表示此 Consumer 消費 Offeset 所在分區(qū)。
消息系統(tǒng)可能遇到哪些問題?
Kafka 支持 3 種消息投遞語義:
at most once:最多一次,消息可能會丟失,但不會重復
獲取數(shù)據(jù) -> commit offset -> 業(yè)務處理
at least once:最少一次,消息不會丟失,可能會重復
獲取數(shù)據(jù) -> 業(yè)務處理 -> commit offset。
exactly once:只且一次,消息不丟失不重復,只且消費一次(0.11 中實現(xiàn),僅限于下游也是 Kafka)
①如何保證消息不被重復消費?(消息的冪等性)
對于更新操作,天然具有冪等性。對于新增操作,可以給每條消息一個唯一的 id,處理前判斷是否被處理過。這個 id 可以存儲在 Redis 中,如果是寫數(shù)據(jù)庫可以用主鍵約束。
②如何保證消息的可靠性傳輸?(消息丟失的問題)
根據(jù) Kafka 架構(gòu),有三個地方可能丟失消息:Consumer,Producer 和 Server。
消費端弄丟了數(shù)據(jù):當 server.properties/enable.auto.commit 設置為 True 的時候,Kafka 會先 Commit Offset 再處理消息,如果這時候出現(xiàn)異常,這條消息就丟失了。
因此可以關(guān)閉自動提交 Offset,在處理完成后手動提交 Offset,這樣可以保證消息不丟失;但是如果提交 Offset 失敗,可能導致重復消費的問題, 這時保證冪等性即可。
Kafka 弄丟了消息:如果某個 Broker 不小心掛了,此時若 Replica 只有一個,Broker 上的消息就丟失了。
若 Replica>1,給 Leader 重新選一個 Follower 作為新的 Leader,如果 Follower 還有些消息沒有同步,這部分消息便丟失了。
可以進行如下配置,避免上面的問題:
給 Topic 設置 replication.factor 參數(shù):這個值必須大于 1,要求每個 Partition 必須有至少 2 個副本。
在 Kafka 服務端設置 min.insync.replicas 參數(shù):這個值必須大于 1,這個是要求一個 Leader 至少感知到有至少一個 Follower 還跟自己保持聯(lián)系,沒掉隊,這樣才能確保 Leader 掛了還有一個 Follower 吧。
在 Producer 端設置 acks=all:這個是要求每條數(shù)據(jù),必須是寫入所有 Replica 之后,才能認為是寫成功了。
在 Producer 端設置 retries=MAX(很大很大很大的一個值,無限次重試的意思):這個是要求一旦寫入失敗,就無限重試,卡在這里了。
Producer弄丟了消息:在 Producer 端設置 acks=all,保證所有的 ISR 都同步了消息才認為寫入成功。
③如何保證消息的順序性?
Kafka 中 Partition 上的消息是順序的,可以將需要順序消費的消息發(fā)送到同一個 Partition 上,用單個 Consumer 消費。
“Kafka的設計原理介紹”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。