溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何進(jìn)行kafka各原理的剖析

發(fā)布時(shí)間:2022-01-10 11:06:07 來源:億速云 閱讀:166 作者:柒染 欄目:大數(shù)據(jù)

如何進(jìn)行kafka各原理的剖析,很多新手對此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。

kafka各原理的剖析

topic如何創(chuàng)建于刪除的

topic的創(chuàng)建
如何進(jìn)行kafka各原理的剖析

具體流程文字為:

1、 controller 在 ZooKeeper 的 /brokers/topics 節(jié)點(diǎn)上注冊 watcher,當(dāng) topic 被創(chuàng)建,則 controller 會通過 watch 得到該 topic 的 partition/replica 分配。
2、 controller從 /brokers/ids 讀取當(dāng)前所有可用的 broker 列表,對于 set_p 中的每一個(gè) partition:
     2.1、 從分配給該 partition 的所有 replica(稱為AR)中任選一個(gè)可用的 broker 作為新的 leader,并將AR設(shè)置為新的 ISR 
     2.2、 將新的 leader 和 ISR 寫入 /brokers/topics/[topic]/partitions/[partition]/state 
3、 controller 通過 RPC 向相關(guān)的 broker 發(fā)送 LeaderAndISRRequest。

注意:此部分 和 partition 的leader選舉過程很類似 都是需要 zk參與 相關(guān)信息都是記錄到zk中
controller在這些過程中啟到非常重要的作用。

topic的刪除
如何進(jìn)行kafka各原理的剖析

文字過程:

1、 controller 在 zooKeeper 的 /brokers/topics 節(jié)點(diǎn)上注冊 watcher,當(dāng) topic 被刪除,則 controller 會通過 watch 得到該 topic 的 partition/replica 分配。 
2、 若 delete.topic.enable=false,結(jié)束;否則 controller 注冊在 /admin/delete_topics 上的 watch 被 fire,controller 通過回調(diào)向?qū)?yīng)的 broker 發(fā)送 StopReplicaRequest。

前面我們講到的很多的處理故障過程 包括 topic創(chuàng)建刪除 partition leader的轉(zhuǎn)換 broker發(fā)生故障的過程中如何保證高可用 都涉及到了一個(gè)組件 controller,關(guān)于kafka中出現(xiàn)的相關(guān)概念名詞,我會專門的寫一個(gè)博客,這里先簡單的提一下。
Controller:Kafka 集群中的其中一個(gè)服務(wù)器,用來進(jìn)行 Leader Election 以及各種 Failover。

大家有沒有想過一個(gè)問題,就是如果controller出現(xiàn)了故障,怎么辦,如何failover的呢?我們往下看。

首先我們最一個(gè)實(shí)驗(yàn),我們在zk中找到controller在哪個(gè)broker上,并查看controller_epoch的次數(shù)

[zk: localhost:2181(CONNECTED) 14] ls /kafkagroup/controller
controller_epoch   controller
[zk: localhost:2181(CONNECTED) 14] ls /kafkagroup/controller
[]
[zk: localhost:2181(CONNECTED) 15] get /kafkagroup/controller
{"version":1,"brokerid":1002,"timestamp":"1566648802297"}
[zk: localhost:2181(CONNECTED) 22] get /kafkagroup/controller_epoch
23

我們可以看到當(dāng)前的controller在1002上 在此之前發(fā)了23次controller的切換

我們手動到 1002節(jié)點(diǎn)上殺死kafka進(jìn)程
[hadoop@kafka02-55-12$ jps
11665 Jps
10952 Kafka
11068 ZooKeeperMain
10495 QuorumPeerMain
[hadoop@kafka02-55-12$ kill -9 10952
[hadoop@kafka02-55-12$ jps
11068 ZooKeeperMain
11678 Jps
10495 QuorumPeerMain

再看zk上的信息,相關(guān)信息已經(jīng)同步到zk中了

[zk: localhost:2181(CONNECTED) 16] get /kafkagroup/controller
{"version":1,"brokerid":1003,"timestamp":"1566665835022"}
[zk: localhost:2181(CONNECTED) 22] get /kafkagroup/controller_epoch
24
[zk: localhost:2181(CONNECTED) 25] ls /kafkagroup/brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 26] ls /kafkagroup/brokers/ids
[1003, 1001]

在后臺日志中就會看到很多
[hadoop@kafka03-55-13 logs]$ vim state-change.log
[2019-08-25 01:01:07,886] TRACE [Controller id=1003 epoch=24] Received response {error_code=0} for request UPDATE_METADATA wit
h correlation id 7 sent to broker 10.211.55.13:9092 (id: 1003 rack: null) (state.change.logger)
[hadoop@kafka03-55-13 logs]$ pwd
/data/kafka/kafka-server-logs/logs
state改變的信息

[hadoop@kafka03-55-13 logs]$ tailf controller.log
[2019-08-25 01:05:42,295] TRACE [Controller id=1003] Leader imbalance ratio for broker 1002 is 1.0 (kafka.controller.KafkaCont
roller)
[2019-08-25 01:05:42,295] INFO [Controller id=1003] Starting preferred replica leader election for partitions  (kafka.controll
er.KafkaController)

接下來,我們具體的分析一下,他到底內(nèi)部發(fā)生了什么,如何切換的
當(dāng) controller 宕機(jī)時(shí)會觸發(fā) controller failover。每個(gè) broker 都會在 zookeeper 的 "/controller" 節(jié)點(diǎn)注冊 watcher,當(dāng) controller 宕機(jī)時(shí) zookeeper 中的臨時(shí)節(jié)點(diǎn)消失,所有存活的 broker 收到 fire 的通知,每個(gè) broker 都嘗試創(chuàng)建新的 controller path,只有一個(gè)競選成功并當(dāng)選為 controller。

當(dāng)新的 controller 當(dāng)選時(shí),會觸發(fā) KafkaController.onControllerFailover 方法,在該方法中完成如下操作:

1、 讀取并增加 Controller Epoch。
2、 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注冊 watcher。
3、 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注冊 watcher。
4、 通過 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注冊 watcher。
5、 若 delete.topic.enable=true(默認(rèn)值是 false),則 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注冊 watcher。
6、 通過 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注冊Watch。
7、 初始化 ControllerContext 對象,設(shè)置當(dāng)前所有 topic,“活”著的 broker 列表,所有 partition 的 leader 及 ISR等。
8、 啟動 replicaStateMachine 和 partitionStateMachine。
9、 將 brokerState 狀態(tài)設(shè)置為 RunningAsController。
10、 將每個(gè) partition 的 Leadership 信息發(fā)送給所有“活”著的 broker。
11、 若 auto.leader.rebalance.enable=true(默認(rèn)值是true),則啟動 partition-rebalance 線程。
12、 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,則刪除相應(yīng)的Topic。

可以看到,都是在zk上進(jìn)行交互,controller的從新選舉會依次通知 zk中相關(guān)的位置 并注冊watcher ,在此過程中 就會發(fā)送 partition的leader的選舉,還會發(fā)生partition-rebalanced 刪除無用的topic等一系列操作(因?yàn)槲覀冞@里是直接考慮的最糟糕的情況就是broker宕機(jī)了一個(gè),然而宕機(jī)的這臺上就是controller)


consumer是如何消費(fèi)消息的

重要概念:每個(gè) Consumer 都劃歸到一個(gè)邏輯 Consumer Group 中,一個(gè) Partition 只能被同一個(gè) Consumer Group 中的一個(gè) Consumer 消費(fèi),但可以被不同的 Consumer Group 消費(fèi)。

若 Topic 的 Partition 數(shù)量為 p,Consumer Group 中訂閱此 Topic 的 Consumer 數(shù)量為 c, 則:
p < c: 會有 c - p 個(gè) consumer閑置,造成浪費(fèi)
p > c: 一個(gè) consumer 對應(yīng)多個(gè) partition
p = c: 一個(gè) consumer 對應(yīng)一個(gè) partition
應(yīng)該合理分配 Consumer 和 Partition 的數(shù)量,避免造成資源傾斜,
本人建議最好 Partiton 數(shù)目是 Consumer 數(shù)目的整數(shù)倍。

在consumer消費(fèi)的過程中如何把partition分配給consumer?
也可以理解為consumer發(fā)生rebalance的過程是如何的?
生產(chǎn)過程中 Broker 要分配 Partition,消費(fèi)過程這里,也要分配 Partition 給消費(fèi)者。

類似 Broker 中選了一個(gè) Controller 出來,消費(fèi)也要從 Broker 中選一個(gè) Coordinator,用于分配 Partition。// Coordinator 和 Controller 都是一個(gè)概念,協(xié)調(diào)者 組織者

當(dāng) Partition 或 Consumer 數(shù)量發(fā)生變化時(shí),比如增加 Consumer,減少 Consumer(主動或被動),增加 Partition,都會進(jìn)行 consumer的Rebalance。//發(fā)生rebalance發(fā)生在consumer端

見圖:
如何進(jìn)行kafka各原理的剖析

文字信息為:
1、Consumer 給 Coordinator 發(fā)送 JoinGroupRequest 請求。這時(shí)其他 Consumer 發(fā) Heartbeat 請求過來時(shí),Coordinator 會告訴他們,要 Rebalance了。其他 Consumer 也發(fā)送 JoinGroupRequest 請求。
2、Coordinator 在 Consumer 中選出一個(gè) Leader,其他作為 Follower,通知給各個(gè) Consumer,對于 Leader,還會把 Follower 的 Metadata 帶給它。
3、Consumer Leader 根據(jù) Consumer Metadata 重新分配 Partition。
4、Consumer 向 Coordinator 發(fā)送 SyncGroupRequest,其中 Leader 的 SyncGroupRequest 會包含分配的情況。
5、Coordinator 回包,把分配的情況告訴 Consumer,包括 Leader。

小結(jié):
消費(fèi)組與分區(qū)重平衡
可以看到,當(dāng)新的消費(fèi)者加入消費(fèi)組,它會消費(fèi)一個(gè)或多個(gè)分區(qū),而這些分區(qū)之前是由其他消費(fèi)者負(fù)責(zé)的;另外,當(dāng)消費(fèi)者離開消費(fèi)組(比如重啟、宕機(jī)等)時(shí),它所消費(fèi)的分區(qū)會分配給其他分區(qū)。這種現(xiàn)象稱為重平衡(rebalance)。
重平衡是 Kafka 一個(gè)很重要的性質(zhì),這個(gè)性質(zhì)保證了高可用和水平擴(kuò)展。
不過也需要注意到,在重平衡期間,所有消費(fèi)者都不能消費(fèi)消息,因此會造成整個(gè)消費(fèi)組短暫的不可用。而且,將分區(qū)進(jìn)行重平衡也會導(dǎo)致原來的消費(fèi)者狀態(tài)過期,從而導(dǎo)致消費(fèi)者需要重新更新狀態(tài),這段期間也會降低消費(fèi)性能。

文字過程實(shí)例:
消費(fèi)者通過定期發(fā)送心跳(hearbeat)到一個(gè)作為組協(xié)調(diào)者(group coordinator)的 broker 來保持在消費(fèi)組內(nèi)存活。這個(gè) broker 不是固定的,每個(gè)消費(fèi)組都可能不同。當(dāng)消費(fèi)者拉取消息或者提交時(shí),便會發(fā)送心跳。
如果消費(fèi)者超過一定時(shí)間沒有發(fā)送心跳,那么它的會話(session)就會過期,組協(xié)調(diào)者會認(rèn)為該消費(fèi)者已經(jīng)宕機(jī),然后觸發(fā)重平衡??梢钥吹?,從消費(fèi)者宕機(jī)到會話過期是有一定時(shí)間的,這段時(shí)間內(nèi)該消費(fèi)者的分區(qū)都不能進(jìn)行消息消費(fèi);通常情況下,我們可以進(jìn)行優(yōu)雅關(guān)閉,這樣消費(fèi)者會發(fā)送離開的消息到組協(xié)調(diào)者,這樣組協(xié)調(diào)者可以立即進(jìn)行重平衡而不需要等待會話過期。
在 0.10.1 版本,Kafka 對心跳機(jī)制進(jìn)行了修改,將發(fā)送心跳與拉取消息進(jìn)行分離,這樣使得發(fā)送心跳的頻率不受拉取的頻率影響。另外更高版本的 Kafka 支持配置一個(gè)消費(fèi)者多長時(shí)間不拉取消息但仍然保持存活,這個(gè)配置可以避免活鎖(livelock)?;铈i,是指應(yīng)用沒有故障但是由于某些原因不能進(jìn)一步消費(fèi)。

接下來思考一個(gè)問題,consumer是如何取消息的 Consumer Fetch Message
Consumer 采用"拉模式"消費(fèi)消息,這樣 Consumer 可以自行決定消費(fèi)的行為。
Consumer 調(diào)用 Poll(duration)從服務(wù)器拉取消息。拉取消息的具體行為由下面的配置項(xiàng)決定:

#consumer.properties
#消費(fèi)者最多 poll 多少個(gè) record
max.poll.records=500
#消費(fèi)者 poll 時(shí) partition 返回的最大數(shù)據(jù)量
max.partition.fetch.bytes=1048576
#Consumer 最大 poll 間隔
#超過此值服務(wù)器會認(rèn)為此 consumer failed 
#并將此 consumer 踢出對應(yīng)的 consumer group 
max.poll.interval.ms=300000

小結(jié):
1、在 Partition 中,每個(gè)消息都有一個(gè) Offset。新消息會被寫到 Partition 末尾(最新的一個(gè) Segment 文件末尾), 每個(gè) Partition 上的消息是順序消費(fèi)的,不同的 Partition 之間消息的消費(fèi)順序是不確定的。
2、若一個(gè) Consumer 消費(fèi)多個(gè) Partition, 則各個(gè) Partition 之前消費(fèi)順序是不確定的,但在每個(gè) Partition 上是順序消費(fèi)。
3、若來自不同 Consumer Group 的多個(gè) Consumer 消費(fèi)同一個(gè) Partition,則各個(gè) Consumer 之間的消費(fèi)互不影響,每個(gè) Consumer 都會有自己的 Offset。

舉個(gè)官方小栗子:
如何進(jìn)行kafka各原理的剖析


Offset 如何保存?  
Consumer 消費(fèi) Partition 時(shí),需要保存 Offset 記錄當(dāng)前消費(fèi)位置。

Offset 可以選擇自動提交或調(diào)用 Consumer 的 commitSync() 或 commitAsync() 手動提交,相關(guān)配置為:

#是否自動提交 offset
enable.auto.commit=true
#自動提交間隔。enable.auto.commit=true 時(shí)有效
auto.commit.interval.ms=5000
//enable.auto.commit 的默認(rèn)值是 true;就是默認(rèn)采用自動提交的機(jī)制。
auto.commit.interval.ms 的默認(rèn)值是 5000,單位是毫秒。5 秒

Offset 保存在名叫 __consumeroffsets 的 Topic 中。寫消息的 Key 由 GroupId、Topic、Partition 組成,Value 是 Offset。

一般情況下,每個(gè) Key 的 Offset 都是緩存在內(nèi)存中,查詢的時(shí)候不用遍歷 Partition,如果沒有緩存,第一次就會遍歷 Partition 建立緩存,然后查詢返回

__consumeroffsets 的 Partition 數(shù)量由下面的 Server 配置決定:

offsets.topic.num.partitions=50

默然的consumeroffsets是沒有repale副本的 需要我們通過在一開始的參數(shù)指定,或者通過后期的增加 consumeroffsets 的副本json的方式動態(tài)添加

auto.create.topics.enable=true      
default.replication.factor=2
num.partitions=3

經(jīng)過測試上面的3個(gè)參數(shù)雖然在搭建好kafak的時(shí)候第一次指定了,但是 consumeroffsets副本數(shù)還是1個(gè),這個(gè)的關(guān)鍵在于,配置文件中需要指定
kafka配置文件關(guān)于此參數(shù)解釋如下:
############################# Internal Topic Settings  #############################   內(nèi)部主題設(shè)置
#組元數(shù)據(jù)內(nèi)部主題“
consumer_offsets”和“transaction_state”的復(fù)制因子
#對于除開發(fā)測試之外的任何其他內(nèi)容,建議使用大于1的值以確??捎眯?,例如設(shè)置成3。
The replication factor for the group metadata internal topics "__consumer_offsets" and "
transaction_state"
For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.

offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
transaction.state.log.min.isr=2
//關(guān)于這3個(gè)參數(shù),可以在修改kafka程序中指定的 consumer_offsets 的副本數(shù)
然后@上海-馬吉輝 說只要num.partitions=3,__consumer_offsets副本數(shù)就是3,我測試不是 還是1
所以還是以offsets.topic.replication.factor參數(shù)控制為準(zhǔn)
如果不是第一次啟動kafka  那幾個(gè)配置只有在初次啟動生效的。 apache kafka 下載下來應(yīng)該都默認(rèn)是 1 吧,2.* 也是 1 啊。
可以這樣修改
先停止kafka集群,刪除每個(gè)broker  data目錄下所有
consumeroffsets*
然后刪除zookeeper下rmr /kafkatest/brokers/topics/consumer_offsets    然后重啟kafka
消費(fèi)一下,這個(gè)__consumer_offsets就會創(chuàng)建了
注意:是在第一次消費(fèi)時(shí),才創(chuàng)建這個(gè)topic的,不是broker集群啟動就創(chuàng)建,還有那個(gè)
trancation_state  topic也是第一次使用事務(wù)的時(shí)候才會創(chuàng)建

小結(jié):在生產(chǎn)上,沒人去刪zk里的內(nèi)容,危險(xiǎn)系數(shù)大,還是推薦動態(tài)擴(kuò)副本,只要把json寫對就好
控制 __consumer_offsets 的副本數(shù) 的關(guān)鍵參數(shù)為這3個(gè)。


Offset 保存在哪個(gè)分區(qū)上,即 __consumeroffsets 的分區(qū)機(jī)制,可以表示為
groupId.hashCode() mode groupMetadataTopicPartitionCount
groupMetadataTopicPartitionCount 是上面配置的分區(qū)數(shù)。因?yàn)橐粋€(gè) Partition 只能被同一個(gè) Consumer Group 的一個(gè) Consumer 消費(fèi),因此可以用 GroupId 表示此 Consumer 消費(fèi) Offeset 所在分區(qū)。

Kafka 中的可靠性保證有如下四點(diǎn):

1、對于一個(gè)分區(qū)來說,它的消息是有序的。如果一個(gè)生產(chǎn)者向一個(gè)分區(qū)先寫入消息A,然后寫入消息B,那么消費(fèi)者會先讀取消息A再讀取消息B。
2、當(dāng)消息寫入所有in-sync狀態(tài)的副本后,消息才會認(rèn)為已提交(committed)。這里的寫入有可能只是寫入到文件系統(tǒng)的緩存,不一定刷新到磁盤。生產(chǎn)者可以等待不同時(shí)機(jī)的確認(rèn),比如等待分區(qū)主副本寫入即返回,后者等待所有in-sync狀態(tài)副本寫入才返回。
3、一旦消息已提交,那么只要有一個(gè)副本存活,數(shù)據(jù)不會丟失。
4、消費(fèi)者只能讀取到已提交的消息。

看到這里,我們對kafka有了全新的認(rèn)識 ,kafka到底為什么這么厲害,下面總結(jié)了11點(diǎn)
1、批量處理
2、客戶端優(yōu)化
3、日志格式優(yōu)化
4、日志編碼
5、消息壓縮
6、建立索引
7、分區(qū)
8、一致性
9、順序?qū)懕P
10、頁緩存 *
11、零拷貝


有關(guān)內(nèi)存映射:
即便是順序?qū)懭胗脖P,硬盤的訪問速度還是不可能追上內(nèi)存。所以Kafka的數(shù)據(jù)并不是實(shí)時(shí)的寫入硬盤,它充分利用了現(xiàn)代操作系統(tǒng)分頁存儲來利用內(nèi)存提高I/O效率。 Memory Mapped Files(后面簡稱mmap)也被翻譯成內(nèi)存映射文件,它的工作原理是直接利用操作系統(tǒng)的Page來實(shí)現(xiàn)文件到物理內(nèi)存的直接映射。完成映射之后你對物理內(nèi)存的操作會被同步到硬盤上(操作系統(tǒng)在適當(dāng)?shù)臅r(shí)候)。 通過mmap,進(jìn)程像讀寫硬盤一樣讀寫內(nèi)存,也不必關(guān)心內(nèi)存的大小有虛擬內(nèi)存為我們兜底。 mmap其實(shí)是Linux中的一個(gè)用來實(shí)現(xiàn)內(nèi)存映射的函數(shù),在Java NIO中可用MappedByteBuffer來實(shí)現(xiàn)內(nèi)存映射。

Kafka消息壓縮
生產(chǎn)者發(fā)送壓縮消息,是把多個(gè)消息批量發(fā)送,把多個(gè)消息壓縮成一個(gè)wrapped message來發(fā)送。和普通的消息一樣,在磁盤上的數(shù)據(jù)和從producer發(fā)送來到broker的數(shù)據(jù)格式一模一樣,發(fā)送給consumer的數(shù)據(jù)也是同樣的格式。

看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識有進(jìn)一步的了解或閱讀更多相關(guān)文章,請關(guān)注億速云行業(yè)資訊頻道,感謝您對億速云的支持。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI