溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

kafka數(shù)據(jù)可靠性是怎么深度解讀

發(fā)布時(shí)間:2021-12-15 11:27:22 來源:億速云 閱讀:137 作者:柒染 欄目:大數(shù)據(jù)

本篇文章為大家展示了kafka數(shù)據(jù)可靠性是怎么深度解讀,內(nèi)容簡(jiǎn)明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過這篇文章的詳細(xì)介紹希望你能有所收獲。

1 概述

Kakfa起初是由LinkedIn公司開發(fā)的一個(gè)分布式的消息系統(tǒng),后成為Apache的一部分,它使用Scala編寫,以可水平擴(kuò)展和高吞吐率而被廣泛使用。目前越來越多的開源分布式處理系統(tǒng)如Cloudera、Apache  Storm、Spark等都支持與Kafka集成。

Kafka憑借著自身的優(yōu)勢(shì),越來越受到互聯(lián)網(wǎng)企業(yè)的青睞,唯品會(huì)也采用Kafka作為其內(nèi)部核心消息引擎之一。Kafka作為一個(gè)商業(yè)級(jí)消息中間件,消息可靠性的重要性可想而知。如何確保消息的精確傳輸?如何確保消息的準(zhǔn)確存儲(chǔ)?如何確保消息的正確消費(fèi)?這些都是需要考慮的問題。本文首先從Kafka的架構(gòu)著手,先了解下Kafka的基本原理,然后通過對(duì)kakfa的存儲(chǔ)機(jī)制、復(fù)制原理、同步原理、可靠性和持久性保證等等一步步對(duì)其可靠性進(jìn)行分析,***通過benchmark來增強(qiáng)對(duì)Kafka高可靠性的認(rèn)知。

2 Kafka體系架構(gòu)

kafka數(shù)據(jù)可靠性是怎么深度解讀

如上圖所示,一個(gè)典型的Kafka體系架構(gòu)包括若干Producer(可以是服務(wù)器日志,業(yè)務(wù)數(shù)據(jù),頁面前端產(chǎn)生的page  view等等),若干broker(Kafka支持水平擴(kuò)展,一般broker數(shù)量越多,集群吞吐率越高),若干Consumer  (Group),以及一個(gè)Zookeeper集群。Kafka通過Zookeeper管理集群配置,選舉leader,以及在consumer  group發(fā)生變化時(shí)進(jìn)行rebalance。Producer使用push(推)模式將消息發(fā)布到broker,Consumer使用pull(拉)模式從broker訂閱并消費(fèi)消息。

名詞解釋:

kafka數(shù)據(jù)可靠性是怎么深度解讀

2.1 Topic & Partition

一個(gè)topic可以認(rèn)為一個(gè)一類消息,每個(gè)topic將被分成多個(gè)partition,每個(gè)partition在存儲(chǔ)層面是append  log文件。任何發(fā)布到此partition的消息都會(huì)被追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量),offset為一個(gè)long型的數(shù)字,它唯一標(biāo)記一條消息。每條消息都被append到partition中,是順序?qū)懘疟P,因此效率非常高(經(jīng)驗(yàn)證,順序?qū)懘疟P效率比隨機(jī)寫內(nèi)存還要高,這是Kafka高吞吐率的一個(gè)很重要的保證)。

kafka數(shù)據(jù)可靠性是怎么深度解讀

每一條消息被發(fā)送到broker中,會(huì)根據(jù)partition規(guī)則選擇被存儲(chǔ)到哪一個(gè)partition。如果partition規(guī)則設(shè)置的合理,所有消息可以均勻分布到不同的partition里,這樣就實(shí)現(xiàn)了水平擴(kuò)展。(如果一個(gè)topic對(duì)應(yīng)一個(gè)文件,那這個(gè)文件所在的機(jī)器I/O將會(huì)成為這個(gè)topic的性能瓶頸,而partition解決了這個(gè)問題)。在創(chuàng)建topic時(shí)可以在$KAFKA_HOME/config/server.properties中指定這個(gè)partition的數(shù)量(如下所示),當(dāng)然可以在topic創(chuàng)建之后去修改partition的數(shù)量。

# The default number of log partitions per topic. More partitions allow greater  # parallelism for consumption, but this will also result in more files across  # the brokers.  num.partitions=3

在發(fā)送一條消息時(shí),可以指定這個(gè)消息的key,producer根據(jù)這個(gè)key和partition機(jī)制來判斷這個(gè)消息發(fā)送到哪個(gè)partition。partition機(jī)制可以通過指定producer的partition.class這一參數(shù)來指定,該class必須實(shí)現(xiàn)kafka.producer.Partitioner接口。

有關(guān)Topic與Partition的更多細(xì)節(jié),可以參考下面的“Kafka文件存儲(chǔ)機(jī)制”這一節(jié)。

3 高可靠性存儲(chǔ)分析

Kafka的高可靠性的保障來源于其健壯的副本(replication)策略。通過調(diào)節(jié)其副本相關(guān)參數(shù),可以使得Kafka在性能和可靠性之間運(yùn)轉(zhuǎn)的游刃有余。Kafka從0.8.x版本開始提供partition級(jí)別的復(fù)制,replication的數(shù)量可以在$KAFKA_HOME/config/server.properties中配置(default.replication.refactor)。

這里先從Kafka文件存儲(chǔ)機(jī)制入手,從***層了解Kafka的存儲(chǔ)細(xì)節(jié),進(jìn)而對(duì)其的存儲(chǔ)有個(gè)微觀的認(rèn)知。之后通過Kafka復(fù)制原理和同步方式來闡述宏觀層面的概念。***從ISR,HW,leader選舉以及數(shù)據(jù)可靠性和持久性保證等等各個(gè)維度來豐富對(duì)Kafka相關(guān)知識(shí)點(diǎn)的認(rèn)知。

3.1 Kafka文件存儲(chǔ)機(jī)制

Kafka中消息是以topic進(jìn)行分類的,生產(chǎn)者通過topic向Kafka  broker發(fā)送消息,消費(fèi)者通過topic讀取數(shù)據(jù)。然而topic在物理層面又能以partition為分組,一個(gè)topic可以分成若干個(gè)partition,那么topic以及partition又是怎么存儲(chǔ)的呢?partition還可以細(xì)分為segment,一個(gè)partition物理上由多個(gè)segment組成,那么這些segment又是什么呢?下面我們來一一揭曉。

為了便于說明問題,假設(shè)這里只有一個(gè)Kafka集群,且這個(gè)集群只有一個(gè)Kafka broker,即只有一臺(tái)物理機(jī)。在這個(gè)Kafka  broker中配置($KAFKA_HOME/config/server.properties中)log.dirs=/tmp/kafka-logs,以此來設(shè)置Kafka消息文件存儲(chǔ)目錄,與此同時(shí)創(chuàng)建一個(gè)topic:topic_zzh_test,partition的數(shù)量為4($KAFKA_HOME/bin/kafka-topics.sh  –create –zookeeper localhost:2181 –partitions 4 –topic topic_vms_test  –replication-factor 4)。那么我們此時(shí)可以在/tmp/kafka-logs目錄中可以看到生成了4個(gè)目錄:

drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-0  drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-1  drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-2  drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-3

在Kafka文件存儲(chǔ)中,同一個(gè)topic下有多個(gè)不同的partition,每個(gè)partiton為一個(gè)目錄,partition的名稱規(guī)則為:topic名稱+有序序號(hào),***個(gè)序號(hào)從0開始計(jì),***的序號(hào)為partition數(shù)量減1,partition是實(shí)際物理上的概念,而topic是邏輯上的概念。

上面提到partition還可以細(xì)分為segment,這個(gè)segment又是什么?如果就以partition為最小存儲(chǔ)單位,我們可以想象當(dāng)Kafka  producer不斷發(fā)送消息,必然會(huì)引起partition文件的***擴(kuò)張,這樣對(duì)于消息文件的維護(hù)以及已經(jīng)被消費(fèi)的消息的清理帶來嚴(yán)重的影響,所以這里以segment為單位又將partition細(xì)分。每個(gè)partition(目錄)相當(dāng)于一個(gè)巨型文件被平均分配到多個(gè)大小相等的segment(段)數(shù)據(jù)文件中(每個(gè)segment  文件中消息數(shù)量不一定相等)這種特性也方便old  segment的刪除,即方便已被消費(fèi)的消息的清理,提高磁盤的利用率。每個(gè)partition只需要支持順序讀寫就行,segment的文件生命周期由服務(wù)端配置參數(shù)(log.segment.bytes,log.roll.{ms,hours}等若干參數(shù))決定。

segment文件由兩部分組成,分別為“.index”文件和“.log”文件,分別表示為segment索引文件和數(shù)據(jù)文件。這兩個(gè)文件的命令規(guī)則為:partition全局的***個(gè)segment從0開始,后續(xù)每個(gè)segment文件名為上一個(gè)segment文件***一條消息的offset值,數(shù)值大小為64位,20位數(shù)字字符長(zhǎng)度,沒有數(shù)字用0填充,如下:

00000000000000000000.index  00000000000000000000.log  00000000000000170410.index  00000000000000170410.log  00000000000000239430.index  00000000000000239430.log

以上面的segment文件為例,展示出segment:00000000000000170410的“.index”文件和“.log”文件的對(duì)應(yīng)的關(guān)系,如下圖:

kafka數(shù)據(jù)可靠性是怎么深度解讀

如上圖,“.index”索引文件存儲(chǔ)大量的元數(shù)據(jù),“.log”數(shù)據(jù)文件存儲(chǔ)大量的消息,索引文件中的元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中message的物理偏移地址。其中以“.index”索引文件中的元數(shù)據(jù)[3,  348]為例,在“.log”數(shù)據(jù)文件表示第3個(gè)消息,即在全局partition中表示170410+3=170413個(gè)消息,該消息的物理偏移地址為348。

那么如何從partition中通過offset查找message呢?

以上圖為例,讀取offset=170418的消息,首先查找segment文件,其中00000000000000000000.index為最開始的文件,第二個(gè)文件為00000000000000170410.index(起始偏移為170410+1=170411),而第三個(gè)文件為00000000000000239430.index(起始偏移為239430+1=239431),所以這個(gè)offset=170418就落到了第二個(gè)文件之中。其他后續(xù)文件可以依次類推,以其實(shí)偏移量命名并排列這些文件,然后根據(jù)二分查找法就可以快速定位到具體文件位置。其次根據(jù)00000000000000170410.index文件中的[8,1325]定位到00000000000000170410.log文件中的1325的位置進(jìn)行讀取。

要是讀取offset=170418的消息,從00000000000000170410.log文件中的1325的位置進(jìn)行讀取,那么怎么知道何時(shí)讀完本條消息,否則就讀到下一條消息的內(nèi)容了?

這個(gè)就需要聯(lián)系到消息的物理結(jié)構(gòu)了,消息都具有固定的物理結(jié)構(gòu),包括:offset(8 Bytes)、消息體的大小(4 Bytes)、crc32(4  Bytes)、magic(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K  Bytes)、payload(N Bytes)等等字段,可以確定一條消息的大小,即讀取到哪里截止。

3.2 復(fù)制原理和同步方式

Kafka中topic的每個(gè)partition有一個(gè)預(yù)寫式的日志文件,雖然partition可以繼續(xù)細(xì)分為若干個(gè)segment文件,但是對(duì)于上層應(yīng)用來說可以將partition看成最小的存儲(chǔ)單元(一個(gè)有多個(gè)segment文件拼接的“巨型”文件),每個(gè)partition都由一些列有序的、不可變的消息組成,這些消息被連續(xù)的追加到partition中。

kafka數(shù)據(jù)可靠性是怎么深度解讀

上圖中有兩個(gè)新名詞:HW和LEO。這里先介紹下LEO,LogEndOffset的縮寫,表示每個(gè)partition的log***一條Message的位置。HW是HighWatermark的縮寫,是指consumer能夠看到的此partition的位置,這個(gè)涉及到多副本的概念,這里先提及一下,下節(jié)再詳表。

言歸正傳,為了提高消息的可靠性,Kafka每個(gè)topic的partition有N個(gè)副本(replicas),其中N(大于等于1)是topic的復(fù)制因子(replica  fator)的個(gè)數(shù)。Kafka通過多副本機(jī)制實(shí)現(xiàn)故障自動(dòng)轉(zhuǎn)移,當(dāng)Kafka集群中一個(gè)broker失效情況下仍然保證服務(wù)可用。在Kafka中發(fā)生復(fù)制時(shí)確保partition的日志能有序地寫到其他節(jié)點(diǎn)上,N個(gè)replicas中,其中一個(gè)replica為leader,其他都為follower,  leader處理partition的所有讀寫請(qǐng)求,與此同時(shí),follower會(huì)被動(dòng)定期地去復(fù)制leader上的數(shù)據(jù)。

如下圖所示,Kafka集群中有4個(gè)broker, 某topic有3個(gè)partition,且復(fù)制因子即副本個(gè)數(shù)也為3:

kafka數(shù)據(jù)可靠性是怎么深度解讀

Kafka提供了數(shù)據(jù)復(fù)制算法保證,如果leader發(fā)生故障或掛掉,一個(gè)新leader被選舉并被接受客戶端的消息成功寫入。Kafka確保從同步副本列表中選舉一個(gè)副本為leader,或者說follower追趕leader數(shù)據(jù)。leader負(fù)責(zé)維護(hù)和跟蹤ISR(In-Sync  Replicas的縮寫,表示副本同步隊(duì)列,具體可參考下節(jié))中所有follower滯后的狀態(tài)。當(dāng)producer發(fā)送一條消息到broker后,leader寫入消息并復(fù)制到所有follower。消息提交之后才被成功復(fù)制到所有的同步副本。消息復(fù)制延遲受最慢的follower限制,重要的是快速檢測(cè)慢副本,如果follower“落后”太多或者失效,leader將會(huì)把它從ISR中刪除。

3.3 ISR

上節(jié)我們涉及到ISR (In-Sync  Replicas),這個(gè)是指副本同步隊(duì)列。副本數(shù)對(duì)Kafka的吞吐率是有一定的影響,但極大的增強(qiáng)了可用性。默認(rèn)情況下Kafka的replica數(shù)量為1,即每個(gè)partition都有一個(gè)唯一的leader,為了確保消息的可靠性,通常應(yīng)用中將其值(由broker的參數(shù)offsets.topic.replication.factor指定)大小設(shè)置為大于1,比如3。  所有的副本(replicas)統(tǒng)稱為Assigned  Replicas,即AR。ISR是AR中的一個(gè)子集,由leader維護(hù)ISR列表,follower從leader同步數(shù)據(jù)有一些延遲(包括延遲時(shí)間replica.lag.time.max.ms和延遲條數(shù)replica.lag.max.messages兩個(gè)維度,  當(dāng)前***的版本0.10.x中只支持replica.lag.time.max.ms這個(gè)維度),任意一個(gè)超過閾值都會(huì)把follower剔除出ISR,  存入OSR(Outof-Sync Replicas)列表,新加入的follower也會(huì)先存放在OSR中。AR=ISR+OSR。

Kafka  0.10.x版本后移除了replica.lag.max.messages參數(shù),只保留了replica.lag.time.max.ms作為ISR中副本管理的參數(shù)。為什么這樣做呢?replica.lag.max.messages表示當(dāng)前某個(gè)副本落后leaeder的消息數(shù)量超過了這個(gè)參數(shù)的值,那么leader就會(huì)把follower從ISR中刪除。假設(shè)設(shè)置replica.lag.max.messages=4,那么如果producer一次傳送至broker的消息數(shù)量都小于4條時(shí),因?yàn)樵趌eader接受到producer發(fā)送的消息之后而follower副本開始拉取這些消息之前,follower落后leader的消息數(shù)不會(huì)超過4條消息,故此沒有follower移出ISR,所以這時(shí)候replica.lag.max.message的設(shè)置似乎是合理的。但是producer發(fā)起瞬時(shí)高峰流量,producer一次發(fā)送的消息超過4條時(shí),也就是超過replica.lag.max.messages,此時(shí)follower都會(huì)被認(rèn)為是與leader副本不同步了,從而被踢出了ISR。但實(shí)際上這些follower都是存活狀態(tài)的且沒有性能問題。那么在之后追上leader,并被重新加入了ISR。于是就會(huì)出現(xiàn)它們不斷地剔出ISR然后重新回歸ISR,這無疑增加了無謂的性能損耗。而且這個(gè)參數(shù)是broker全局的。設(shè)置太大了,影響真正“落后”follower的移除;設(shè)置的太小了,導(dǎo)致follower的頻繁進(jìn)出。無法給定一個(gè)合適的replica.lag.max.messages的值,故此,新版本的Kafka移除了這個(gè)參數(shù)。

注:ISR中包括:leader和follower。

上面一節(jié)還涉及到一個(gè)概念,即HW。HW俗稱高水位,HighWatermark的縮寫,取一個(gè)partition對(duì)應(yīng)的ISR中最小的LEO作為HW,consumer最多只能消費(fèi)到HW所在的位置。另外每個(gè)replica都有HW,leader和follower各自負(fù)責(zé)更新自己的HW的狀態(tài)。對(duì)于leader新寫入的消息,consumer不能立刻消費(fèi),leader會(huì)等待該消息被所有ISR中的replicas同步后更新HW,此時(shí)消息才能被consumer消費(fèi)。這樣就保證了如果leader所在的broker失效,該消息仍然可以從新選舉的leader中獲取。對(duì)于來自內(nèi)部broKer的讀取請(qǐng)求,沒有HW的限制。

下圖詳細(xì)的說明了當(dāng)producer生產(chǎn)消息至broker后,ISR以及HW和LEO的流轉(zhuǎn)過程:

kafka數(shù)據(jù)可靠性是怎么深度解讀

由此可見,Kafka的復(fù)制機(jī)制既不是完全的同步復(fù)制,也不是單純的異步復(fù)制。事實(shí)上,同步復(fù)制要求所有能工作的follower都復(fù)制完,這條消息才會(huì)被commit,這種復(fù)制方式極大的影響了吞吐率。而異步復(fù)制方式下,follower異步的從leader復(fù)制數(shù)據(jù),數(shù)據(jù)只要被leader寫入log就被認(rèn)為已經(jīng)commit,這種情況下如果follower都還沒有復(fù)制完,落后于leader時(shí),突然leader宕機(jī),則會(huì)丟失數(shù)據(jù)。而Kafka的這種使用ISR的方式則很好的均衡了確保數(shù)據(jù)不丟失以及吞吐率。

Kafka的ISR的管理最終都會(huì)反饋到Zookeeper節(jié)點(diǎn)上。具體位置為:/brokers/topics/[topic]/partitions/[partition]/state。目前有兩個(gè)地方會(huì)對(duì)這個(gè)Zookeeper的節(jié)點(diǎn)進(jìn)行維護(hù):

Controller來維護(hù):Kafka集群中的其中一個(gè)Broker會(huì)被選舉為Controller,主要負(fù)責(zé)Partition管理和副本狀態(tài)管理,也會(huì)執(zhí)行類似于重分配partition之類的管理任務(wù)。在符合某些特定條件下,Controller下的LeaderSelector會(huì)選舉新的leader,ISR和新的leader_epoch及controller_epoch寫入Zookeeper的相關(guān)節(jié)點(diǎn)中。同時(shí)發(fā)起LeaderAndIsrRequest通知所有的replicas。

leader來維護(hù):leader有單獨(dú)的線程定期檢測(cè)ISR中follower是否脫離ISR,  如果發(fā)現(xiàn)ISR變化,則會(huì)將新的ISR的信息返回到Zookeeper的相關(guān)節(jié)點(diǎn)中。

3.4 數(shù)據(jù)可靠性和持久性保證

當(dāng)producer向leader發(fā)送數(shù)據(jù)時(shí),可以通過request.required.acks參數(shù)來設(shè)置數(shù)據(jù)可靠性的級(jí)別:

  • 1(默認(rèn)):這意味著producer在ISR中的leader已成功收到的數(shù)據(jù)并得到確認(rèn)后發(fā)送下一條message。如果leader宕機(jī)了,則會(huì)丟失數(shù)據(jù)。

  • 0:這意味著producer無需等待來自broker的確認(rèn)而繼續(xù)發(fā)送下一批消息。這種情況下數(shù)據(jù)傳輸效率***,但是數(shù)據(jù)可靠性確是***的。

  • -1:producer需要等待ISR中的所有follower都確認(rèn)接收到數(shù)據(jù)后才算一次發(fā)送完成,可靠性***。但是這樣也不能保證數(shù)據(jù)不丟失,比如當(dāng)ISR中只有l(wèi)eader時(shí)(前面ISR那一節(jié)講到,ISR中的成員由于某些情況會(huì)增加也會(huì)減少,最少就只剩一個(gè)leader),這樣就變成了acks=1的情況。

如果要提高數(shù)據(jù)的可靠性,在設(shè)置request.required.acks=-1的同時(shí),也要min.insync.replicas這個(gè)參數(shù)(可以在broker或者topic層面進(jìn)行設(shè)置)的配合,這樣才能發(fā)揮***的功效。min.insync.replicas這個(gè)參數(shù)設(shè)定ISR中的最小副本數(shù)是多少,默認(rèn)值為1,當(dāng)且僅當(dāng)request.required.acks參數(shù)設(shè)置為-1時(shí),此參數(shù)才生效。如果ISR中的副本數(shù)少于min.insync.replicas配置的數(shù)量時(shí),客戶端會(huì)返回異常:org.apache.kafka.common.errors.NotEnoughReplicasExceptoin:  Messages are rejected since there are fewer in-sync replicas than required。

接下來對(duì)acks=1和-1的兩種情況進(jìn)行詳細(xì)分析:

1. request.required.acks=1

producer發(fā)送數(shù)據(jù)到leader,leader寫本地日志成功,返回客戶端成功;此時(shí)ISR中的副本還沒有來得及拉取該消息,leader就宕機(jī)了,那么此次發(fā)送的消息就會(huì)丟失。

kafka數(shù)據(jù)可靠性是怎么深度解讀

2. request.required.acks=-1

同步(Kafka默認(rèn)為同步,即producer.type=sync)的發(fā)送模式,replication.factor>=2且min.insync.replicas>=2的情況下,不會(huì)丟失數(shù)據(jù)。

有兩種典型情況。acks=-1的情況下(如無特殊說明,以下acks都表示為參數(shù)request.required.acks),數(shù)據(jù)發(fā)送到leader,  ISR的follower全部完成數(shù)據(jù)同步后,leader此時(shí)掛掉,那么會(huì)選舉出新的leader,數(shù)據(jù)不會(huì)丟失。

kafka數(shù)據(jù)可靠性是怎么深度解讀

acks=-1的情況下,數(shù)據(jù)發(fā)送到leader后  ,部分ISR的副本同步,leader此時(shí)掛掉。比如follower1h和follower2都有可能變成新的leader,  producer端會(huì)得到返回異常,producer端會(huì)重新發(fā)送數(shù)據(jù),數(shù)據(jù)可能會(huì)重復(fù)。

kafka數(shù)據(jù)可靠性是怎么深度解讀

當(dāng)然上圖中如果在leader  crash的時(shí)候,follower2還沒有同步到任何數(shù)據(jù),而且follower2被選舉為新的leader的話,這樣消息就不會(huì)重復(fù)。

注:Kafka只處理fail/recover問題,不處理Byzantine問題。

3.5 關(guān)于HW的進(jìn)一步探討

考慮上圖(即acks=-1,部分ISR副本同步)中的另一種情況,如果在Leader掛掉的時(shí)候,follower1同步了消息4,5,follower2同步了消息4,與此同時(shí)follower2被選舉為leader,那么此時(shí)follower1中的多出的消息5該做如何處理呢?

這里就需要HW的協(xié)同配合了。如前所述,一個(gè)partition中的ISR列表中,leader的HW是所有ISR列表里副本中最小的那個(gè)的LEO。類似于木桶原理,水位取決于***那塊短板。

kafka數(shù)據(jù)可靠性是怎么深度解讀

如上圖,某個(gè)topic的某partition有三個(gè)副本,分別為A、B、C。A作為leader肯定是LEO***,B緊隨其后,C機(jī)器由于配置比較低,網(wǎng)絡(luò)比較差,故而同步最慢。這個(gè)時(shí)候A機(jī)器宕機(jī),這時(shí)候如果B成為leader,假如沒有HW,在A重新恢復(fù)之后會(huì)做同步(makeFollower)操作,在宕機(jī)時(shí)log文件之后直接做追加操作,而假如B的LEO已經(jīng)達(dá)到了A的LEO,會(huì)產(chǎn)生數(shù)據(jù)不一致的情況,所以使用HW來避免這種情況。

A在做同步操作的時(shí)候,先將log文件截?cái)嗟街白约旱腍W的位置,即3,之后再從B中拉取消息進(jìn)行同步。

如果失敗的follower恢復(fù)過來,它首先將自己的log文件截?cái)嗟缴洗蝐heckpointed時(shí)刻的HW的位置,之后再從leader中同步消息。leader掛掉會(huì)重新選舉,新的leader會(huì)發(fā)送“指令”讓其余的follower截?cái)嘀磷陨淼腍W的位置然后再拉取新的消息。

當(dāng)ISR中的個(gè)副本的LEO不一致時(shí),如果此時(shí)leader掛掉,選舉新的leader時(shí)并不是按照LEO的高低進(jìn)行選舉,而是按照ISR中的順序選舉。

3.6 Leader選舉

一條消息只有被ISR中的所有follower都從leader復(fù)制過去才會(huì)被認(rèn)為已提交。這樣就避免了部分?jǐn)?shù)據(jù)被寫進(jìn)了leader,還沒來得及被任何follower復(fù)制就宕機(jī)了,而造成數(shù)據(jù)丟失。而對(duì)于producer而言,它可以選擇是否等待消息commit,這可以通過request.required.acks來設(shè)置。這種機(jī)制確保了只要ISR中有一個(gè)或者以上的follower,一條被commit的消息就不會(huì)丟失。

有一個(gè)很重要的問題是當(dāng)leader宕機(jī)了,怎樣在follower中選舉出新的leader,因?yàn)閒ollower可能落后很多或者直接crash了,所以必須確保選擇“***”的follower作為新的leader。一個(gè)基本的原則就是,如果leader不在了,新的leader必須擁有原來的leader  commit的所有消息。這就需要做一個(gè)折中,如果leader在表名一個(gè)消息被commit前等待更多的follower確認(rèn),那么在它掛掉之后就有更多的follower可以成為新的leader,但這也會(huì)造成吞吐率的下降。

一種非常常用的選舉leader的方式是“少數(shù)服從多數(shù)”,Kafka并不是采用這種方式。這種模式下,如果我們有2f+1個(gè)副本,那么在commit之前必須保證有f+1個(gè)replica復(fù)制完消息,同時(shí)為了保證能正確選舉出新的leader,失敗的副本數(shù)不能超過f個(gè)。這種方式有個(gè)很大的優(yōu)勢(shì),系統(tǒng)的延遲取決于最快的幾臺(tái)機(jī)器,也就是說比如副本數(shù)為3,那么延遲就取決于最快的那個(gè)follower而不是最慢的那個(gè)?!吧贁?shù)服從多數(shù)”的方式也有一些劣勢(shì),為了保證leader選舉的正常進(jìn)行,它所能容忍的失敗的follower數(shù)比較少,如果要容忍1個(gè)follower掛掉,那么至少要3個(gè)以上的副本,如果要容忍2個(gè)follower掛掉,必須要有5個(gè)以上的副本。也就是說,在生產(chǎn)環(huán)境下為了保證較高的容錯(cuò)率,必須要有大量的副本,而大量的副本又會(huì)在大數(shù)據(jù)量下導(dǎo)致性能的急劇下降。這種算法更多用在Zookeeper這種共享集群配置的系統(tǒng)中而很少在需要大量數(shù)據(jù)的系統(tǒng)中使用的原因。HDFS的HA功能也是基于“少數(shù)服從多數(shù)”的方式,但是其數(shù)據(jù)存儲(chǔ)并不是采用這樣的方式。

實(shí)際上,leader選舉的算法非常多,比如Zookeeper的Zab、Raft以及Viewstamped  Replication。而Kafka所使用的leader選舉算法更像是微軟的PacificA算法。

Kafka在Zookeeper中為每一個(gè)partition動(dòng)態(tài)的維護(hù)了一個(gè)ISR,這個(gè)ISR里的所有replica都跟上了leader,只有ISR里的成員才能有被選為leader的可能(unclean.leader.election.enable=false)。在這種模式下,對(duì)于f+1個(gè)副本,一個(gè)Kafka  topic能在保證不丟失已經(jīng)commit消息的前提下容忍f個(gè)副本的失敗,在大多數(shù)使用場(chǎng)景下,這種模式是十分有利的。事實(shí)上,為了容忍f個(gè)副本的失敗,“少數(shù)服從多數(shù)”的方式和ISR在commit前需要等待的副本的數(shù)量是一樣的,但是ISR需要的總的副本的個(gè)數(shù)幾乎是“少數(shù)服從多數(shù)”的方式的一半。

上文提到,在ISR中至少有一個(gè)follower時(shí),Kafka可以確保已經(jīng)commit的數(shù)據(jù)不丟失,但如果某一個(gè)partition的所有replica都掛了,就無法保證數(shù)據(jù)不丟失了。這種情況下有兩種可行的方案:

  • 等待ISR中任意一個(gè)replica“活”過來,并且選它作為leader

  • 選擇***個(gè)“活”過來的replica(并不一定是在ISR中)作為leader

這就需要在可用性和一致性當(dāng)中作出一個(gè)簡(jiǎn)單的抉擇。如果一定要等待ISR中的replica“活”過來,那不可用的時(shí)間就可能會(huì)相對(duì)較長(zhǎng)。而且如果ISR中所有的replica都無法“活”過來了,或者數(shù)據(jù)丟失了,這個(gè)partition將永遠(yuǎn)不可用。選擇***個(gè)“活”過來的replica作為leader,而這個(gè)replica不是ISR中的replica,那即使它并不保障已經(jīng)包含了所有已commit的消息,它也會(huì)成為leader而作為consumer的數(shù)據(jù)源。默認(rèn)情況下,Kafka采用第二種策略,即unclean.leader.election.enable=true,也可以將此參數(shù)設(shè)置為false來啟用***種策略。

unclean.leader.election.enable這個(gè)參數(shù)對(duì)于leader的選舉、系統(tǒng)的可用性以及數(shù)據(jù)的可靠性都有至關(guān)重要的影響。下面我們來分析下幾種典型的場(chǎng)景。

kafka數(shù)據(jù)可靠性是怎么深度解讀

如果上圖所示,假設(shè)某個(gè)partition中的副本數(shù)為3,replica-0, replica-1, replica-2分別存放在broker0,  broker1和broker2中。AR=(0,1,2),ISR=(0,1)。

設(shè)置request.required.acks=-1,  min.insync.replicas=2,unclean.leader.election.enable=false。這里講broker0中的副本也稱之為broker0起初broker0為leader,broker1為follower。

當(dāng)ISR中的replica-0出現(xiàn)crash的情況時(shí),broker1選舉為新的leader[ISR=(1)],因?yàn)槭躮in.insync.replicas=2影響,write不能服務(wù),但是read能繼續(xù)正常服務(wù)。此種情況恢復(fù)方案:

  1. 嘗試恢復(fù)(重啟)replica-0,如果能起來,系統(tǒng)正常;

  2. 如果replica-0不能恢復(fù),需要將min.insync.replicas設(shè)置為1,恢復(fù)write功能。

當(dāng)ISR中的replica-0出現(xiàn)crash,緊接著replica-1也出現(xiàn)了crash,  此時(shí)[ISR=(1),leader=-1],不能對(duì)外提供服務(wù),此種情況恢復(fù)方案:

  1. 嘗試恢復(fù)replica-0和replica-1,如果都能起來,則系統(tǒng)恢復(fù)正常;

  2. 如果replica-0起來,而replica-1不能起來,這時(shí)候仍然不能選出leader,因?yàn)楫?dāng)設(shè)置unclean.leader.election.enable=false時(shí),leader只能從ISR中選舉,當(dāng)ISR中所有副本都失效之后,需要ISR中***失效的那個(gè)副本能恢復(fù)之后才能選舉leader,  即replica-0先失效,replica-1后失效,需要replica-1恢復(fù)后才能選舉leader。保守的方案建議把unclean.leader.election.enable設(shè)置為true,但是這樣會(huì)有丟失數(shù)據(jù)的情況發(fā)生,這樣可以恢復(fù)read服務(wù)。同樣需要將min.insync.replicas設(shè)置為1,恢復(fù)write功能;

  3. replica-1恢復(fù),replica-0不能恢復(fù),這個(gè)情況上面遇到過,read服務(wù)可用,需要將min.insync.replicas設(shè)置為1,恢復(fù)write功能;

  4. replica-0和replica-1都不能恢復(fù),這種情況可以參考情形2.

當(dāng)ISR中的replica-0,  replica-1同時(shí)宕機(jī),此時(shí)[ISR=(0,1)],不能對(duì)外提供服務(wù),此種情況恢復(fù)方案:嘗試恢復(fù)replica-0和replica-1,當(dāng)其中任意一個(gè)副本恢復(fù)正常時(shí),對(duì)外可以提供read服務(wù)。直到2個(gè)副本恢復(fù)正常,write功能才能恢復(fù),或者將將min.insync.replicas設(shè)置為1。

3.7 Kafka的發(fā)送模式

Kafka的發(fā)送模式由producer端的配置參數(shù)producer.type來設(shè)置,這個(gè)參數(shù)指定了在后臺(tái)線程中消息的發(fā)送方式是同步的還是異步的,默認(rèn)是同步的方式,即producer.type=sync。如果設(shè)置成異步的模式,即producer.type=async,可以是producer以batch的形式push數(shù)據(jù),這樣會(huì)極大的提高broker的性能,但是這樣會(huì)增加丟失數(shù)據(jù)的風(fēng)險(xiǎn)。如果需要確保消息的可靠性,必須要將producer.type設(shè)置為sync。

對(duì)于異步模式,還有4個(gè)配套的參數(shù),如下:

kafka數(shù)據(jù)可靠性是怎么深度解讀

以batch的方式推送數(shù)據(jù)可以極大的提高處理效率,kafka  producer可以將消息在內(nèi)存中累計(jì)到一定數(shù)量后作為一個(gè)batch發(fā)送請(qǐng)求。batch的數(shù)量大小可以通過producer的參數(shù)(batch.num.messages)控制。通過增加batch的大小,可以減少網(wǎng)絡(luò)請(qǐng)求和磁盤IO的次數(shù),當(dāng)然具體參數(shù)設(shè)置需要在效率和時(shí)效性方面做一個(gè)權(quán)衡。在比較新的版本中還有batch.size這個(gè)參數(shù)。

4 高可靠性使用分析

4.1 消息傳輸保障

前面已經(jīng)介紹了Kafka如何進(jìn)行有效的存儲(chǔ),以及了解了producer和consumer如何工作。接下來討論的是Kafka如何確保消息在producer和consumer之間傳輸。有以下三種可能的傳輸保障(delivery  guarantee):

  • At most once: 消息可能會(huì)丟,但絕不會(huì)重復(fù)傳輸

  • At least once:消息絕不會(huì)丟,但可能會(huì)重復(fù)傳輸

  • Exactly once:每條消息肯定會(huì)被傳輸一次且僅傳輸一次

Kafka的消息傳輸保障機(jī)制非常直觀。當(dāng)producer向broker發(fā)送消息時(shí),一旦這條消息被commit,由于副本機(jī)制(replication)的存在,它就不會(huì)丟失。但是如果producer發(fā)送數(shù)據(jù)給broker后,遇到的網(wǎng)絡(luò)問題而造成通信中斷,那producer就無法判斷該條消息是否已經(jīng)提交(commit)。雖然Kafka無法確定網(wǎng)絡(luò)故障期間發(fā)生了什么,但是producer可以retry多次,確保消息已經(jīng)正確傳輸?shù)絙roker中,所以目前Kafka實(shí)現(xiàn)的是at  least once。

consumer從broker中讀取消息后,可以選擇commit,該操作會(huì)在Zookeeper中存下該consumer在該partition下讀取的消息的offset。該consumer下一次再讀該partition時(shí)會(huì)從下一條開始讀取。如未commit,下一次讀取的開始位置會(huì)跟上一次commit之后的開始位置相同。當(dāng)然也可以將consumer設(shè)置為autocommit,即consumer一旦讀取到數(shù)據(jù)立即自動(dòng)commit。如果只討論這一讀取消息的過程,那Kafka是確保了exactly  once, 但是如果由于前面producer與broker之間的某種原因?qū)е孪⒌闹貜?fù),那么這里就是at least once。

考慮這樣一種情況,當(dāng)consumer讀完消息之后先commit再處理消息,在這種模式下,如果consumer在commit后還沒來得及處理消息就crash了,下次重新開始工作后就無法讀到剛剛已提交而未處理的消息,這就對(duì)應(yīng)于at  most once了。

讀完消息先處理再commit。這種模式下,如果處理完了消息在commit之前consumer  crash了,下次重新開始工作時(shí)還會(huì)處理剛剛未commit的消息,實(shí)際上該消息已經(jīng)被處理過了,這就對(duì)應(yīng)于at least once。

要做到exactly once就需要引入消息去重機(jī)制。

4.2 消息去重

如上一節(jié)所述,Kafka在producer端和consumer端都會(huì)出現(xiàn)消息的重復(fù),這就需要去重處理。

Kafka文檔中提及GUID(Globally Unique Identifier)的概念,通過客戶端生成算法得到每個(gè)消息的unique  id,同時(shí)可映射至broker上存儲(chǔ)的地址,即通過GUID便可查詢提取消息內(nèi)容,也便于發(fā)送方的冪等性保證,需要在broker上提供此去重處理模塊,目前版本尚不支持。

針對(duì)GUID, 如果從客戶端的角度去重,那么需要引入集中式緩存,必然會(huì)增加依賴復(fù)雜度,另外緩存的大小難以界定。

不只是Kafka, 類似RabbitMQ以及RocketMQ這類商業(yè)級(jí)中間件也只保障at least once,  且也無法從自身去進(jìn)行消息去重。所以我們建議業(yè)務(wù)方根據(jù)自身的業(yè)務(wù)特點(diǎn)進(jìn)行去重,比如業(yè)務(wù)消息本身具備冪等性,或者借助Redis等其他產(chǎn)品進(jìn)行去重處理。

4.3 高可靠性配置

Kafka提供了很高的數(shù)據(jù)冗余彈性,對(duì)于需要數(shù)據(jù)高可靠性的場(chǎng)景,我們可以增加數(shù)據(jù)冗余備份數(shù)(replication.factor),調(diào)高最小寫入副本數(shù)的個(gè)數(shù)(min.insync.replicas)等等,但是這樣會(huì)影響性能。反之,性能提高而可靠性則降低,用戶需要自身業(yè)務(wù)特性在彼此之間做一些權(quán)衡性選擇。

要保證數(shù)據(jù)寫入到Kafka是安全的,高可靠的,需要如下的配置:

  • topic的配置:replication.factor>=3,即副本數(shù)至少是3個(gè);2<=min.insync.replicas<=replication.factor

  • broker的配置:leader的選舉條件unclean.leader.election.enable=false

  • producer的配置:request.required.acks=-1(all),producer.type=sync

5 BenchMark

Kafka在唯品會(huì)有著很深的歷史淵源,根據(jù)唯品會(huì)消息中間件團(tuán)隊(duì)(VMS團(tuán)隊(duì))所掌握的資料顯示,在VMS團(tuán)隊(duì)運(yùn)轉(zhuǎn)的Kafka集群中所支撐的topic數(shù)已接近2000,每天的請(qǐng)求量也已達(dá)千億級(jí)。這里就以Kafka的高可靠性為基準(zhǔn)點(diǎn)來探究幾種不同場(chǎng)景下的行為表現(xiàn),以此來加深對(duì)Kafka的認(rèn)知,為大家在以后高效的使用Kafka時(shí)提供一份依據(jù)。

5.1 測(cè)試環(huán)境

Kafka broker用到了4臺(tái)機(jī)器,分別為broker[0/1/2/3]配置如下:

  • CPU: 24core/2.6GHZ

  • Memory: 62G

  • Network: 4000Mb

  • OS/kernel: CentOs release 6.6 (Final)

  • Disk: 1089G

  • Kafka版本:0.10.1.0

  • broker端JVM參數(shù)設(shè)置:

-Xmx8G -Xms8G -server -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/apps/service/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9999

客戶端機(jī)器配置:

  • CPU: 24core/2.6GHZ

  • Memory: 3G

  • Network: 1000Mb

  • OS/kernel: CentOs release 6.3 (Final)

  • Disk: 240G

5.2 不同場(chǎng)景測(cè)試

場(chǎng)景1:測(cè)試不同的副本數(shù)、min.insync.replicas策略以及request.required.acks策略(以下簡(jiǎn)稱acks策略)對(duì)于發(fā)送速度(TPS)的影響。

具體配置:一個(gè)producer;發(fā)送方式為sync;消息體大小為1kB;partition數(shù)為12。副本數(shù)為:1/2/4;min.insync.replicas分別為1/2/4;acks分別為-1(all)/1/0。

具體測(cè)試數(shù)據(jù)如下表(min.insync.replicas只在acks=-1時(shí)有效):

kafka數(shù)據(jù)可靠性是怎么深度解讀

測(cè)試結(jié)果分析:

  • 客戶端的acks策略對(duì)發(fā)送的TPS有較大的影響,TPS:acks_0 > acks_1 > ack_-1;

  • 副本數(shù)越高,TPS越低;副本數(shù)一致時(shí),min.insync.replicas不影響TPS;

  • acks=0/1時(shí),TPS與min.insync.replicas參數(shù)以及副本數(shù)無關(guān),僅受acks策略的影響。

下面將partition的個(gè)數(shù)設(shè)置為1,來進(jìn)一步確認(rèn)下不同的acks策略、不同的min.insync.replicas策略以及不同的副本數(shù)對(duì)于發(fā)送速度的影響,詳細(xì)請(qǐng)看情景2和情景3。

場(chǎng)景2:在partition個(gè)數(shù)固定為1,測(cè)試不同的副本數(shù)和min.insync.replicas策略對(duì)發(fā)送速度的影響。

具體配置:一個(gè)producer;發(fā)送方式為sync;消息體大小為1kB;producer端acks=-1(all)。變換副本數(shù):2/3/4;  min.insync.replicas設(shè)置為:1/2/4。

測(cè)試結(jié)果如下:

kafka數(shù)據(jù)可靠性是怎么深度解讀

測(cè)試結(jié)果分析:副本數(shù)越高,TPS越低(這點(diǎn)與場(chǎng)景1的測(cè)試結(jié)論吻合),但是當(dāng)partition數(shù)為1時(shí)差距甚微。min.insync.replicas不影響TPS。

場(chǎng)景3:在partition個(gè)數(shù)固定為1,測(cè)試不同的acks策略和副本數(shù)對(duì)發(fā)送速度的影響。

具體配置:一個(gè)producer;發(fā)送方式為sync;消息體大小為1kB;min.insync.replicas=1。topic副本數(shù)為:1/2/4;acks:  0/1/-1。

測(cè)試結(jié)果如下:

kafka數(shù)據(jù)可靠性是怎么深度解讀

測(cè)試結(jié)果分析(與情景1一致):

  • 副本數(shù)越多,TPS越低;

  • 客戶端的acks策略對(duì)發(fā)送的TPS有較大的影響,TPS:acks_0 > acks_1 > ack_-1。

場(chǎng)景4:測(cè)試不同partition數(shù)對(duì)發(fā)送速率的影響

具體配置:一個(gè)producer;消息體大小為1KB;發(fā)送方式為sync;topic副本數(shù)為2;min.insync.replicas=2;acks=-1。partition數(shù)量設(shè)置為1/2/4/8/12。

測(cè)試結(jié)果:

kafka數(shù)據(jù)可靠性是怎么深度解讀

測(cè)試結(jié)果分析:partition的不同會(huì)影響TPS,隨著partition的個(gè)數(shù)的增長(zhǎng)TPS會(huì)有所增長(zhǎng),但并不是一直成正比關(guān)系,到達(dá)一定臨界值時(shí),partition數(shù)量的增加反而會(huì)使TPS略微降低。

場(chǎng)景5:通過將集群中部分broker設(shè)置成不可服務(wù)狀態(tài),測(cè)試對(duì)客戶端以及消息落盤的影響。

具體配置:一個(gè)producer;消息體大小1KB;發(fā)送方式為sync;topic副本數(shù)為4;min.insync.replicas設(shè)置為2;acks=-1;retries=0/100000000;partition數(shù)為12。

具體測(cè)試數(shù)據(jù)如下表:

kafka數(shù)據(jù)可靠性是怎么深度解讀

出錯(cuò)信息:

  • 錯(cuò)誤1:客戶端返回異常,部分?jǐn)?shù)據(jù)可落盤,部分失?。簅rg.apache.kafka.common.errors.NetworkException: The  server disconnected before a response was received.

  • 錯(cuò)誤2:[WARN]internals.Sender &ndash; Got error produce response with correlation id  19369 on topic-partition default_channel_replicas_4_1-3, retrying (999999999  attempts left). Error: NETWORK_EXCEPTION

  • 錯(cuò)誤3: [WARN]internals.Sender &ndash; Got error produce response with correlation id  77890 on topic-partition default_channel_replicas_4_1-8, retrying (999999859  attempts left). Error: NOT_ENOUGH_REPLICAS

  • 錯(cuò)誤4: [WARN]internals.Sender &ndash; Got error produce response with correlation id  77705 on topic-partition default_channel_replicas_4_1-3, retrying (999999999  attempts left). Error: NOT_ENOUGH_REPLICAS_AFTER_APPEND

測(cè)試結(jié)果分析:

  • kill兩臺(tái)broker后,客戶端可以繼續(xù)發(fā)送。broker減少后,partition的leader分布在剩余的兩臺(tái)broker上,造成了TPS的減小;

  • kill三臺(tái)broker后,客戶端無法繼續(xù)發(fā)送。Kafka的自動(dòng)重試功能開始起作用,當(dāng)大于等于min.insync.replicas數(shù)量的broker恢復(fù)后,可以繼續(xù)發(fā)送;

  • 當(dāng)retries不為0時(shí),消息有重復(fù)落盤;客戶端成功返回的消息都成功落盤,異常時(shí)部分消息可以落盤。

場(chǎng)景6:測(cè)試單個(gè)producer的發(fā)送延遲,以及端到端的延遲。

具體配置::一個(gè)producer;消息體大小1KB;發(fā)送方式為sync;topic副本數(shù)為4;min.insync.replicas設(shè)置為2;acks=-1;partition數(shù)為12。

測(cè)試數(shù)據(jù)及結(jié)果(單位為ms):

kafka數(shù)據(jù)可靠性是怎么深度解讀

各場(chǎng)景測(cè)試總結(jié):

  • 當(dāng)acks=-1時(shí),Kafka發(fā)送端的TPS受限于topic的副本數(shù)量(ISR中),副本越多TPS越低;

  • acks=0時(shí),TPS***,其次為1,最差為-1,即TPS:acks_0 > acks_1 > ack_-1

  • min.insync.replicas參數(shù)不影響TPS;

  • partition的不同會(huì)影響TPS,隨著partition的個(gè)數(shù)的增長(zhǎng)TPS會(huì)有所增長(zhǎng),但并不是一直成正比關(guān)系,到達(dá)一定臨界值時(shí),partition數(shù)量的增加反而會(huì)使TPS略微降低;

  • Kafka在acks=-1,min.insync.replicas>=1時(shí),具有高可靠性,所有成功返回的消息都可以落盤。 

上述內(nèi)容就是kafka數(shù)據(jù)可靠性是怎么深度解讀,你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI