溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

kafka的知識點有哪些呢

發(fā)布時間:2021-12-15 15:53:11 來源:億速云 閱讀:152 作者:柒染 欄目:編程語言

這篇文章給大家介紹kafka的知識點有哪些呢,內(nèi)容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。

一、kafka HA

1.1 replication

如圖.1所示,同一個 partition 可能會有多個 replica(對應(yīng) server.properties 配置中的 default.replication.factor=N)。沒有 replica 的情況下,一旦 broker 宕機,其上所有 patition 的數(shù)據(jù)都不可被消費,同時 producer 也不能再將數(shù)據(jù)存于其上的 patition。引入replication 之后,同一個 partition 可能會有多個 replica,而這時需要在這些 replica 之間選出一個 leader,producer 和 consumer 只與這個 leader 交互,其它 replica 作為 follower 從 leader 中復(fù)制數(shù)據(jù)。

Kafka 分配 Replica 的算法如下:

1. 將所有 broker(假設(shè)共 n 個 broker)和待分配的 partition 排序 2. 將第 i 個 partition 分配到第(i mod n)個 broker 上 3. 將第 i 個 partition 的第 j 個 replica 分配到第((i + j) mode n)個 broker上

1.2 leader failover

當 partition 對應(yīng)的 leader 宕機時,需要從 follower 中選舉出新 leader。在選舉新leader時,一個基本的原則是,新的 leader 必須擁有舊 leader commit 過的所有消息。

kafka 在 zookeeper 中(/brokers/.../state)動態(tài)維護了一個 ISR(in-sync replicas),由3.3節(jié)的寫入流程可知 ISR 里面的所有 replica 都跟上了 leader,只有 ISR 里面的成員才能選為 leader。對于 f+1 個 replica,一個 partition 可以在容忍 f 個 replica 失效的情況下保證消息不丟失。

當所有 replica 都不工作時,有兩種可行的方案:

1. 等待 ISR 中的任一個 replica 活過來,并選它作為 leader??杀U蠑?shù)據(jù)不丟失,但時間可能相對較長。 2. 選擇第一個活過來的 replica(不一定是 ISR 成員)作為 leader。無法保障數(shù)據(jù)不丟失,但相對不可用時間較短。

kafka 0.8.* 使用第二種方式。

kafka 通過 Controller 來選舉 leader.

1.3 broker failover

kafka broker failover 序列

流程說明: 

1. controller 在 zookeeper 的 /brokers/ids/[brokerId] 節(jié)點注冊 Watcher,當 broker 宕機時 zookeeper 會 fire watch 2. controller 從 /brokers/ids 節(jié)點讀取可用broker 3. controller決定set_p,該集合包含宕機 broker 上的所有 partition 4. 對 set_p 中的每一個 partition  4.1 從/brokers/topics/[topic]/partitions/[partition]/state 節(jié)點讀取 ISR  4.2 決定新 leader(如4.3節(jié)所描述)  4.3 將新 leader、ISR、controller_epoch 和 leader_epoch 等信息寫入 state 節(jié)點 5. 通過 RPC 向相關(guān) broker 發(fā)送 leaderAndISRRequest 命令

1.4 controller failover

 當 controller 宕機時會觸發(fā) controller failover。每個 broker 都會在 zookeeper 的 "/controller" 節(jié)點注冊 watcher,當 controller 宕機時 zookeeper 中的臨時節(jié)點消失,所有存活的 broker 收到 fire 的通知,每個 broker 都嘗試創(chuàng)建新的 controller path,只有一個競選成功并當選為 controller。

當新的 controller 當選時,會觸發(fā) KafkaController.onControllerFailover 方法,在該方法中完成如下操作:

1. 讀取并增加 Controller Epoch。 2. 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注冊 watcher。 3. 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注冊 watcher。 4. 通過 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注冊 watcher。 5. 若 delete.topic.enable=true(默認值是 false),則 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注冊 watcher。 6. 通過 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注冊Watch。 7. 初始化 ControllerContext 對象,設(shè)置當前所有 topic,“活”著的 broker 列表,所有 partition 的 leader 及 ISR等。 8. 啟動 replicaStateMachine 和 partitionStateMachine。 9. 將 brokerState 狀態(tài)設(shè)置為 RunningAsController。 10. 將每個 partition 的 Leadership 信息發(fā)送給所有“活”著的 broker。 11. 若 auto.leader.rebalance.enable=true(默認值是true),則啟動 partition-rebalance 線程。 12. 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,則刪除相應(yīng)的Topic。

二. consumer 消費消息

2.1 consumer API

kafka 提供了兩套 consumer API:

1. The high-level Consumer API 2. The SimpleConsumer API

 其中 high-level consumer API 提供了一個從 kafka 消費數(shù)據(jù)的高層抽象,而 SimpleConsumer API 則需要開發(fā)人員更多地關(guān)注細節(jié)。

2.1.1 The high-level consumer API

high-level consumer API 提供了 consumer group 的語義,一個消息只能被 group 內(nèi)的一個 consumer 所消費,且 consumer 消費消息時不關(guān)注 offset,最后一個 offset 由 zookeeper 保存。

使用 high-level consumer API 可以是多線程的應(yīng)用,應(yīng)當注意:

1. 如果消費線程大于 patition 數(shù)量,則有些線程將收不到消息 2. 如果 patition 數(shù)量大于線程數(shù),則有些線程多收到多個 patition 的消息 3. 如果一個線程消費多個 patition,則無法保證你收到的消息的順序,而一個 patition 內(nèi)的消息是有序的
2.1.2 The SimpleConsumer API

如果你想要對 patition 有更多的控制權(quán),那就應(yīng)該使用 SimpleConsumer API,比如:

1. 多次讀取一個消息 2. 只消費一個 patition 中的部分消息 3. 使用事務(wù)來保證一個消息僅被消費一次

 但是使用此 API 時,partition、offset、broker、leader 等對你不再透明,需要自己去管理。你需要做大量的額外工作:

1. 必須在應(yīng)用程序中跟蹤 offset,從而確定下一條應(yīng)該消費哪條消息 2. 應(yīng)用程序需要通過程序獲知每個 Partition 的 leader 是誰 3. 需要處理 leader 的變更

 使用 SimpleConsumer API 的一般流程如下:

1. 查找到一個“活著”的 broker,并且找出每個 partition 的 leader 2. 找出每個 partition 的 follower 3. 定義好請求,該請求應(yīng)該能描述應(yīng)用程序需要哪些數(shù)據(jù) 4. fetch 數(shù)據(jù) 5. 識別 leader 的變化,并對之作出必要的響應(yīng)

以下針對 high-level Consumer API 進行說明。

2.2 consumer group

如 2.2 節(jié)所說, kafka 的分配單位是 patition。每個 consumer 都屬于一個 group,一個 partition 只能被同一個 group 內(nèi)的一個 consumer 所消費(也就保障了一個消息只能被 group 內(nèi)的一個 consuemr 所消費),但是多個 group 可以同時消費這個 partition。

kafka 的設(shè)計目標之一就是同時實現(xiàn)離線處理和實時處理,根據(jù)這一特性,可以使用 spark/Storm 這些實時處理系統(tǒng)對消息在線處理,同時使用 Hadoop 批處理系統(tǒng)進行離線處理,還可以將數(shù)據(jù)備份到另一個數(shù)據(jù)中心,只需要保證這三者屬于不同的 consumer group。

2.3 消費方式

consumer 采用 pull 模式從 broker 中讀取數(shù)據(jù)。

push 模式很難適應(yīng)消費速率不同的消費者,因為消息發(fā)送速率是由 broker 決定的。它的目標是盡可能以最快速度傳遞消息,但是這樣很容易造成 consumer 來不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。而 pull 模式則可以根據(jù) consumer 的消費能力以適當?shù)乃俾氏M消息。

對于 Kafka 而言,pull 模式更合適,它可簡化 broker 的設(shè)計,consumer 可自主控制消費消息的速率,同時 consumer 可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現(xiàn)不同的傳輸語義。

2.4 consumer delivery guarantee

如果將 consumer 設(shè)置為 autocommit,consumer 一旦讀到數(shù)據(jù)立即自動 commit。如果只討論這一讀取消息的過程,那 Kafka 確保了 Exactly once。

但實際使用中應(yīng)用程序并非在 consumer 讀取完數(shù)據(jù)就結(jié)束了,而是要進行進一步處理,而數(shù)據(jù)處理與 commit 的順序在很大程度上決定了consumer delivery guarantee:

1.讀完消息先 commit 再處理消息。 這種模式下,如果 consumer 在 commit 后還沒來得及處理消息就 crash 了,下次重新開始工作后就無法讀到剛剛已提交而未處理的消息,這就對應(yīng)于 At most once 2.讀完消息先處理再 commit。 這種模式下,如果在處理完消息之后 commit 之前 consumer crash 了,下次重新開始工作時還會處理剛剛未 commit 的消息,實際上該消息已經(jīng)被處理過了。這就對應(yīng)于 At least once。 3.如果一定要做到 Exactly once,就需要協(xié)調(diào) offset 和實際操作的輸出。 精典的做法是引入兩階段提交。如果能讓 offset 和操作輸入存在同一個地方,會更簡潔和通用。這種方式可能更好,因為許多輸出系統(tǒng)可能不支持兩階段提交。比如,consumer 拿到數(shù)據(jù)后可能把數(shù)據(jù)放到 HDFS,如果把最新的 offset 和數(shù)據(jù)本身一起寫到 HDFS,那就可以保證數(shù)據(jù)的輸出和 offset 的更新要么都完成,要么都不完成,間接實現(xiàn) Exactly once。(目前就 high-level API而言,offset 是存于Zookeeper 中的,無法存于HDFS,而SimpleConsuemr API的 offset 是由自己去維護的,可以將之存于 HDFS 中)

總之,Kafka 默認保證 At least once,并且允許通過設(shè)置 producer 異步提交來實現(xiàn) At most once。而 Exactly once 要求與外部存儲系統(tǒng)協(xié)作,幸運的是 kafka 提供的 offset 可以非常直接非常容易得使用這種方式。


2.5 consumer rebalance

當有 consumer 加入或退出、以及 partition 的改變(如 broker 加入或退出)時會觸發(fā) rebalance。consumer rebalance算法如下:

1. 將目標 topic 下的所有 partirtion 排序,存于PT 2. 對某 consumer group 下所有 consumer 排序,存于 CG,第 i 個consumer 記為 Ci 3. N=size(PT)/size(CG),向上取整 4. 解除 Ci 對原來分配的 partition 的消費權(quán)(i從0開始) 5. 將第i*N到(i+1)*N-1個 partition 分配給 Ci

在 0.8.*版本,每個 consumer 都只負責調(diào)整自己所消費的 partition,為了保證整個consumer group 的一致性,當一個 consumer 觸發(fā)了 rebalance 時,該 consumer group 內(nèi)的其它所有其它 consumer 也應(yīng)該同時觸發(fā) rebalance。這會導(dǎo)致以下幾個問題:

1.Herd effect   任何 broker 或者 consumer 的增減都會觸發(fā)所有的 consumer 的 rebalance 2.Split Brain   每個 consumer 分別單獨通過 zookeeper 判斷哪些 broker 和 consumer 宕機了,那么不同 consumer 在同一時刻從 zookeeper 看到的 view 就可能不一樣,這是由 zookeeper 的特性決定的,這就會造成不正確的 reblance 嘗試。 3. 調(diào)整結(jié)果不可控   所有的 consumer 都并不知道其它 consumer 的 rebalance 是否成功,這可能會導(dǎo)致 kafka 工作在一個不正確的狀態(tài)。

基于以上問題,kafka 設(shè)計者考慮在0.9.*版本開始使用中心 coordinator 來控制 consumer rebalance,然后又從簡便性和驗證要求兩方面考慮,計劃在 consumer 客戶端實現(xiàn)分配方案。

三、注意事項

3.1 producer 無法發(fā)送消息的問題

最開始在本機搭建了kafka偽集群,本地 producer 客戶端成功發(fā)布消息至 broker。隨后在服務(wù)器上搭建了 kafka 集群,在本機連接該集群,producer 卻無法發(fā)布消息到 broker(奇怪也沒有拋錯)。最開始懷疑是 iptables 沒開放,于是開放端口,結(jié)果還不行(又開始是代碼問題、版本問題等等,倒騰了很久)。最后沒辦法,一項一項查看 server.properties 配置,發(fā)現(xiàn)以下兩個配置:

# The address the socket server listens on. It will get the value returned from  # java.net.InetAddress.getCanonicalHostName() if not configured. #   FORMAT: #     listeners = security_protocol://host_name:port #   EXAMPLE: #     listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://:9092

 # Hostname and port the broker will advertise to producers and consumers. If not set, 
 # it uses the value for "listeners" if configured. Otherwise, it will use the value
 # returned from java.net.InetAddress.getCanonicalHostName().
 #advertised.listeners=PLAINTEXT://your.host.name:9092

關(guān)于kafka的知識點有哪些呢就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI