您好,登錄后才能下訂單哦!
這篇文章主要講解了“Kafka的基本原理是什么”,文中的講解內(nèi)容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Kafka的基本原理是什么”吧!
1、什么是Kafka?
Kafka是一個使用Scala編寫的消息系統(tǒng),原本開發(fā)自LinkedIn,用作LinkedIn的活動流(Activity Stream)和運營數(shù)據(jù)處理管道(Pipeline)的基礎(chǔ)。現(xiàn)在它已被多家不同類型的公司作為多種類型的數(shù)據(jù)管道和消息系統(tǒng)使用。
Kafka是一種分布式的,基于發(fā)布/訂閱的消息系統(tǒng)。
Kafka使用zookeeper作為其分布式協(xié)調(diào)框架,很好的將消息生產(chǎn)、消息存儲、消息消費的過程結(jié)合在一起。同時借助zookeeper,kafka能夠生產(chǎn)者、消費者和broker在內(nèi)的所以組件在無狀態(tài)的情況下,建立起生產(chǎn)者和消費者的訂閱關(guān)系,并實現(xiàn)生產(chǎn)者與消費者的負載均衡。
2、kafka的特性
(1)以時間復(fù)雜度為O(1)的方式提供消息持久化能力,即使對TB級以上數(shù)據(jù)也能保證常數(shù)時間復(fù)雜度的訪問性能。
(2)高吞吐率。即使在非常廉價的商用機器上也能做到單機支持每秒100K條以上消息的傳輸。
(3)支持Kafka Server間的消息分區(qū),及分布式消費,同時保證每個Partition內(nèi)的消息順序存儲和傳輸。
(4)同時支持離線數(shù)據(jù)處理(Offline)和實時數(shù)據(jù)處理(Online)。
(5)Scale out:支持在線水平擴展。無需停機即可擴展機器。
(6)支持定期刪除數(shù)據(jù)機制??梢园凑諘r間段來刪除,也可以按照文檔大小來刪除。
(7)Consumer采用pull的方式消費數(shù)據(jù),消費狀態(tài)由Consumer控制,減輕Broker負擔。
3、Kafka架構(gòu)
(1)Broker:和RabbitMQ中的Broker概念類似。一個kafka服務(wù)器就是一個Broker,而一個kafka集群包含一個或多個Broker。Broker會持久化數(shù)據(jù)到相應(yīng)的Partition中,不會有cache壓力。
(2)Topic:主題。每條消息都有一個類別,這個類別就叫做Topic。Kafka的Topic可以理解為RabbitMQ的Queue消息隊列,相同類別的消息被發(fā)送到同一個Topic中,然后再被此Topic的Consumer消費。Topic是邏輯上的概念,而物理上的實現(xiàn)就是Partition。
(3)Partition:分區(qū)。分區(qū)是物理上的概念,每個Topic包含一個或者多個Partition,每個Partition都是一個有序隊列。發(fā)送給Topic的消息經(jīng)過分區(qū)算法(可以自定義),決定消息存儲在哪一個Partition當中。每一條數(shù)據(jù)都會被分配一個有序id:Offset。注意:kafka只保證按一個partition中的順序?qū)⑾l(fā)給Consumer,不保證一個Topic的整體(多個partition間)的順序。
(4)Replication:備份。Replication是基于Partition而不是Topic的。每個Partition都有自己的備份,且分布在不同的Broker上。
(5)Offset:偏移量。kafka的存儲文件都是用offset來命名,用offset做名字的好處是方便查找。例如你想找位于2049的位置,只要找到2048.log的文件即可。當然the first offset就是00000000000.log。注意:每個Partition中的Offset都是各不影響的從0開始的有序數(shù)列。
(6)Producer:消息生產(chǎn)者。
(7)Consumer:消息消費者。Consumer采用pull的方式從Broker獲取消息,并且Consumer要維護消費狀態(tài),因此Kafaka系統(tǒng)中,業(yè)務(wù)重心一般都在Consumer身上,而不像RabbitMQ那樣Broker做了大部分的事情。
(8)Consumer Group:消費組。每個Consumer屬于一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬于默認的group)。每個Topic可以被多個Group訂閱,每個Group中可以有多個Consumer。發(fā)送到Topic的一條消息會被每個Group中的一個Consumer消費,多個Consumer之間將交錯消費整個Topic的消息,實現(xiàn)負載均衡。
(9)Record:消息。每一個消息由一個Key、一個Value和一個時間戳構(gòu)成。
Kafka內(nèi)部結(jié)構(gòu)圖(圖片源于網(wǎng)絡(luò))
Kafka拓撲結(jié)構(gòu)圖(圖片源于網(wǎng)絡(luò))
Topic在邏輯上可以被認為是一個queue,每條消費都必須指定它的Topic,可以簡單理解為必須指明把這條消息放進哪個queue里。為了使得Kafka的吞吐率可以線性提高,物理上把Topic分成一個或多個Partition,每個Partition在物理上對應(yīng)一個文件夾,該文件夾下存儲這個Partition的所有消息和索引文件。若創(chuàng)建topic1和topic2兩個topic,且分別有13個和19個分區(qū),則整個集群上會相應(yīng)會生成共32個文件夾。partiton命名規(guī)則為topic名稱+有序序號,第一個partiton序號從0開始,序號最大值為partitions數(shù)量減1。
(1)每個partition目錄相當于一個巨型文件被平均分配到多個大小相等segment數(shù)據(jù)文件中。但每個segment file消息數(shù)量不一定相等,這種特性方便old segment file快速被刪除。
(2)每個partiton只需要支持順序讀寫就行了,segment文件生命周期由服務(wù)端配置參數(shù)決定。
(3)segment file組成:由2大部分組成,分別為index file(后綴“.index”)和data file(后綴“.log”),此2個文件一一對應(yīng),成對出現(xiàn)。
(4)segment文件命名規(guī)則:partition全局的第一個segment從0開始,后續(xù)每個segment文件名為上一個segment文件最后一條消息的offset值。數(shù)值最大為64位long大小,19位數(shù)字字符長度,沒有數(shù)字用0填充。
Segment file結(jié)構(gòu)圖(圖片來源于網(wǎng)絡(luò))
以上述圖2中一對segment file文件為例,說明segment中index和log文件對應(yīng)關(guān)系物理結(jié)構(gòu)如下:
Kafka集群Partition分布圖1(圖片來源于網(wǎng)絡(luò))
當集群中新增2節(jié)點,Partition增加到6個時分布情況如下:
Kafka集群Partition分布圖2(圖片來源于網(wǎng)絡(luò))
在Kafka集群中,每個Broker都有均等分配Leader Partition機會。
上述圖Broker Partition中,箭頭指向為副本,以Partition-0為例:broker1中parition-0為Leader,Broker2中Partition-0為副本。每個Broker(按照BrokerId有序)依次分配主Partition,下一個Broker為副本,如此循環(huán)迭代分配,多副本都遵循此規(guī)則。
副本分配算法:
(1)將所有n個Broker和待分配的i個Partition排序。
(2)將第i個Partition分配到第(i mod n)個Broker上。
(3)將第i個Partition的第j個副本分配到第((i + j) mod n)個Broker上
例如圖2中的第三個Partition:partition-2,將被分配到Broker3((3 mod 6)=3)上,partition-2的副本將被分配到Broker4上((3+1) mod 6=4)。
(1)Kafka把topic中一個parition大文件分成多個小文件段,通過多個小文件段,就容易定期清除或刪除已經(jīng)消費完文件,減少磁盤占用??梢栽O(shè)置segment文件大小定期刪除和消息過期時間定期刪除
(2)通過索引信息可以快速定位message。
(3)通過index元數(shù)據(jù)全部映射到memory,可以避免segment file的IO磁盤操作。
(4)通過索引文件稀疏存儲,可以大幅降低index文件元數(shù)據(jù)占用空間大小。
對于多個Partition,多個Consumer
(1)如果consumer比partition多,是浪費,因為kafka的設(shè)計是在一個partition上是不允許并發(fā)的,所以consumer數(shù)不要大于partition數(shù)。
(2)如果consumer比partition少,一個consumer會對應(yīng)于多個partition,這里要合理分配consumer數(shù)和partition數(shù),否則會導(dǎo)致partition里面的數(shù)據(jù)被取的不均勻。最好partiton數(shù)目是consumer數(shù)目的整數(shù)倍,所以partition數(shù)目很重要,比如取24,就很容易設(shè)定consumer數(shù)目。
(3)如果consumer從多個partition讀到數(shù)據(jù),不保證數(shù)據(jù)間的順序性,kafka只保證在一個partition上數(shù)據(jù)是有序的,但多個partition,根據(jù)你讀的順序會有不同
(4)增減consumer,broker,partition會導(dǎo)致rebalance,所以rebalance后consumer對應(yīng)的partition會發(fā)生變化
(5)High-level接口中獲取不到數(shù)據(jù)的時候是會block的。
關(guān)于zookeeper中Offset初始值的問題:
Zookeeper中Offset的初始值默認是非法的,因此通過設(shè)置Consumer的參數(shù)auto.offset.reset來告訴Consumer讀取到Offset非法時該怎么做。
auto.offset.reset有三個值:
(1)smallest : 自動把zookeeper中的offset設(shè)為Partition中最小的offset;
(2)largest : 自動把zookeeper中offset設(shè)為Partition中最大的offset;
(3)anything else: 拋出異常;
auto.offset.reset默認值是largest,此種情況下如果producer先發(fā)送了10條數(shù)據(jù)到某個Partition,然后Consumer啟功后修改zookeeper中非法Offset值為Partition中的最大值9(Offset從0開始),這樣Consumer就忽略了這10條消息。就算現(xiàn)在再次設(shè)置成smallest也讀取不到之前的10條數(shù)據(jù)了,因為此時Offset是合法的了。
所以,想要讀取之前的數(shù)據(jù),就需要在一開始指定auto.offset.reset=smallest。
Replication是基于Partition而不是Topic的。每個Partition都有自己的備份,且分布在不同的Broker上。這些Partition當中有一個是Leader,其他都是Follower。Leader Partition負責讀寫操作,F(xiàn)ollower Partition只負責從Leader處復(fù)制數(shù)據(jù),使自己與Leader保持一致。Zookeeper負責兩者間的故障切換(fail over,可以理解為Leader選舉)。
消息復(fù)制延遲受最慢的Follower限制,Leader負責跟蹤所有Follower的狀態(tài),如果Follower“落后”太多或者失效,Leader就將此Follower從Replication同步列表中移除,但此時Follower是活著的,并且一直從Leader拉取數(shù)據(jù),直到差距小于replica.lag.max.messages值,然后重新加入同步列表。當一條消息被所有的Follower保存成功,此消息才被認為是“committed”,Consumer才能消費這條消息。這種同步方式就要求Leader和Follower之間要有良好的網(wǎng)絡(luò)環(huán)境。
一個partition的follower落后于leader足夠多時,會被認為不在同步副本列表或處于滯后狀態(tài)。在Kafka-0.8.2.x中,副本滯后判斷依據(jù)是副本落后于leader最大消息數(shù)量(replica.lag.max.messages)或replication響應(yīng)Leader partition的最長等待時間(replica.lag.time.max.ms)。前者是用來檢測緩慢的副本,而后者是用來檢測失效或死亡的副本。假設(shè)replica.lag.max.messages設(shè)置為4,表明只要follower落后leader的消息數(shù)小于4,就不會從同步副本列表中移除。replica.lag.time.max設(shè)置為500 ms,表明只要follower向leader發(fā)送拉取數(shù)據(jù)請求時間間隔超過500 ms,就會被標記為死亡,并且會從同步副本列表中移除。
當Leader處于流量高峰時,比如一瞬間就收到了4條數(shù)據(jù),此時所有Follower將被認為是“out-of-sync”并且從同步副本列表中移除,然后Follower拉取數(shù)據(jù)趕上Leader過后又重新加入同步列表,就這樣Follower頻繁在副本同步列表移除和重新加入之間來回切換。
即使只有一個replicas實例存活,仍然可以保證消息的正常發(fā)送和接收,只要zookeeper集群存活即可(注意:不同于其他分布式存儲,比如hbase需要"多數(shù)派"存活才行)。
當leader失效時,需在followers中選取出新的leader,可能此時follower落后于leader,因此需要選擇一個"up-to-date"的follower。kafka中l(wèi)eader選舉并沒有采用"投票多數(shù)派"的算法,因為這種算法對于"網(wǎng)絡(luò)穩(wěn)定性"/"投票參與者數(shù)量"等條件有較高的要求,而且kafka集群的設(shè)計,還需要容忍N-1個replicas失效。對于kafka而言,每個partition中所有的replicas信息都可以在zookeeper中獲得,那么選舉leader將是一件非常簡單的事情。選擇follower時需要兼顧一個問題,就是新leader 所在的server服務(wù)器上已經(jīng)承載的partition leader的個數(shù),如果一個server上有過多的partition leader,意味著此server將承受著更多的IO壓力。在選舉新leader,需要考慮到"負載均衡",partition leader較少的broker將會更有可能成為新的leader。在整個集群中,只要有一個replicas存活,那么此partition都可以繼續(xù)接受讀寫操作。
當一個Group中,有Consumer加入或者離開時,會觸發(fā)Partitions均衡。均衡的最終目的,是提升Topic的并發(fā)消費能力。
(1)假如topic1,具有如下partitions: P0,P1,P2,P3
(2)加入group中,有如下consumer: C0,C1
(3)首先根據(jù)partition索引號對partitions排序: P0,P1,P2,P3
(4)根據(jù)consumer.id排序: C0,C1
(5)計算倍數(shù): M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)
(6)然后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]
通過此算法,就能知道具體Consumer消費的是哪個分區(qū)中的數(shù)據(jù)。
在kafka-Client-0.11.0.0.jar中,提供的有默認的KafkaProducer和DefaultPartitioner實現(xiàn)。其中DefaultPartitioner主要提供了Producer發(fā)送消息到分區(qū)的路由算法,如果給定Key值,就通過Key的哈希值和分區(qū)個數(shù)取余來計算;如果沒有給定Key,就通過ThreadLocalRandom.current().nextInt()產(chǎn)生的隨機數(shù)與分區(qū)數(shù)取余(其中涉及復(fù)雜步奏參考如下代碼)。具體代碼如下:
public class DefaultPartitioner implements Partitioner { private final ConcurrentMap<string, atomicinteger=""> topicCounterMap = new ConcurrentHashMap<>(); public void configure(Map<string,> configs) {} /** * 計算給定記錄的分區(qū) * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<partitioninfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = nextValue(topic); List<partitioninfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } private int nextValue(String topic) { AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement(); } public void close() {} }
我們也可以設(shè)置自己的Partition路由規(guī)則,需要繼承Partitioner類實現(xiàn)
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
Kafka的消息delivery保證主要有三種:
(1)At most once 最多一次。消息可能會丟失,但絕不會重復(fù)傳輸。
(2)At least once 最少一次。消息絕不會丟失,但可能會重復(fù)傳輸。
(3)Exactly once 正好一次。每條消息正好被傳輸一次和消費一次。
Producer的delivery保證可以通過參數(shù)request.required.acks設(shè)置來保證:
(1)request.required.acks=0。
相當于消息異步發(fā)送。消息一發(fā)送完畢馬上發(fā)送下一條。由于不需要ack,可能會造成數(shù)據(jù)丟失,相當于實現(xiàn)了At most once。
(2)request.required.acks=1。
消息發(fā)送給Leader Partition,在Leader Partition確認消息并ack 生產(chǎn)者過后才發(fā)下一條。
(3)request.required.acks=-1。
消息發(fā)送給Leader,在Leader收到所有Follower確認保存消息的ack后對producer進行ack才發(fā)送下一條。
所以一條消息從Producer到Broker至少是確保了At least once的,因為有Replication的存在,只要消息到達Broker就不會丟失。如果ack出現(xiàn)問題,比如網(wǎng)絡(luò)中斷,有可能會導(dǎo)致producer收不到ack而重復(fù)發(fā)送消息。Exactly once這種方式,沒有查到相關(guān)的實現(xiàn)。
第(3)種方式的具體步奏如下:
a. producer 先從 zookeeper 的 "/brokers/.../state" 節(jié)點找到該 partition 的 leader
b. producer 將消息發(fā)送給該 leader
c. leader 將消息寫入本地 log
d. followers 從 leader pull 消息,寫入本地 log 后向 leader 發(fā)送 ACK
e. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 發(fā)送 ACK
Consumer從Broker拉取數(shù)據(jù)過后,可以選擇commit,此操作會在zookeeper中存下此Consumer讀取對應(yīng)Partition消息的Offset,以便下一次拉取數(shù)據(jù)時會從Partition的下一個Offset消費,避免重復(fù)消費。
同樣,Consumer可以通過設(shè)置參數(shù)enable.auto.commit=true來自動確認消息,即Consumer一收到消息立刻自動commit。如果只看消息的讀取過程,kafka是確保了Exactly once的,但是實際情況中Consumer不可能讀取到數(shù)據(jù)就結(jié)束了,往往還需要處理讀取到的數(shù)據(jù)。因此Consumer處理消息和commit消息的順序就決定了delivery保證的類別。
(1)先處理后commit
這種方式實現(xiàn)了At least once。Consumer收到消息先處理后提交,如果在處理完成后機器崩潰了,導(dǎo)致Offset沒有更新,Consumer下次啟動時又會重新讀取上一次消費的數(shù)據(jù),實際上此消息已經(jīng)處理過了。
(2)先commit后處理
這種方式實現(xiàn)了At most once。Consumer收到消息過后立刻commit,更新zookeeper上的Offset,然后再處理消息。如果處理還未結(jié)束Consumer崩潰了,等Consumer再次啟動的時候會讀取Offset更新過后的下一條數(shù)據(jù),這就導(dǎo)致了數(shù)據(jù)丟失。
Kafka提供了兩種Consumer API,選用哪種API需要視具體情況而定。
High Level Consumer API圍繞著Consumer Group這個邏輯概念展開,它屏蔽了每個Topic的每個Partition的Offset管理(自動讀取zookeeper中該Partition的last offset )、Broker失敗轉(zhuǎn)移以及增減Partition、Consumer時的負載均衡(當Partition和Consumer增減時,Kafka自動進行Rebalance)。
Low Level Consumer API,作為底層的Consumer API,提供了消費Kafka Message更大的控制,用戶可以實現(xiàn)重復(fù)讀取、跳讀等功能。
使用Low Level Consumer API,是沒有對Broker、Consumer、Partition增減進行處理,如果出現(xiàn)這些的增減時,需要自己處理負載均衡。
Low Level Consumer API提供更大靈活控制是以增加復(fù)雜性為代價的:
(1)Offset不再透明
(2)Broker自動失敗轉(zhuǎn)移需要處理
(3)增加Consumer、Partition、Broker需要自己做負載均衡
感謝各位的閱讀,以上就是“Kafka的基本原理是什么”的內(nèi)容了,經(jīng)過本文的學習后,相信大家對Kafka的基本原理是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。