溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Kafka2.5.0有哪些新功能

發(fā)布時(shí)間:2021-12-08 17:01:28 來(lái)源:億速云 閱讀:120 作者:iii 欄目:云計(jì)算

本篇內(nèi)容主要講解“Kafka2.5.0有哪些新功能”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“Kafka2.5.0有哪些新功能”吧!

一、新功能

1、Kafka Streams: Add Cogroup in the DSL

當(dāng)多個(gè)流聚集在一起以形成單個(gè)較大的對(duì)象時(shí)(例如,購(gòu)物網(wǎng)站可能具有購(gòu)物車(chē)流,心愿單流和購(gòu)買(mǎi)流。它們共同構(gòu)成一個(gè)客戶),將其在Kafka Streams DSL中使用非常困難。

通常需要您將所有流分組并聚合到KTables,然后進(jìn)行多個(gè)外部聯(lián)接調(diào)用,最后得到具有所需對(duì)象的KTable。這將為每個(gè)流和一長(zhǎng)串ValueJoiners創(chuàng)建一個(gè)狀態(tài)存儲(chǔ),每個(gè)新記錄都必須經(jīng)過(guò)此連接才能到達(dá)最終對(duì)象。

創(chuàng)建使用單個(gè)狀態(tài)存儲(chǔ)的Cogroup 方法將:

  • 減少?gòu)臓顟B(tài)存儲(chǔ)獲取的數(shù)量。對(duì)于多個(gè)聯(lián)接,當(dāng)新值進(jìn)入任何流時(shí),都會(huì)發(fā)生連鎖反應(yīng),聯(lián)接處理器將繼續(xù)調(diào)用ValueGetters,直到我們?cè)L問(wèn)了所有狀態(tài)存儲(chǔ)。

  • 性能略有提高。如上所述,所有ValueGetters都被調(diào)用,還導(dǎo)致所有ValueJoiners被調(diào)用,從而強(qiáng)制重新計(jì)算所有其他流的當(dāng)前聯(lián)接值,從而影響性能。

2、Add support for TLS 1.3

Java 11添加了對(duì)TLS 1.3的支持。添加對(duì)Java 11的支持后,我們應(yīng)該對(duì)此提供支持。

3、不再支持Scala 2.11

為什么不再支持?

我們目前為3個(gè)Scala版本構(gòu)建Kafka:2.11、2.12和最近發(fā)布的2.13。由于我們必須在每個(gè)受支持的版本上編譯和運(yùn)行測(cè)試,因此從開(kāi)發(fā)和測(cè)試的角度來(lái)看,這是一筆不小的成本。

Scala 2.11.0于2014年4月發(fā)布,對(duì)2.11.x的支持于2017年11月結(jié)束(到發(fā)布Kafka 2.5時(shí)將超過(guò)2年)。Scala 2.12.0于2016年11月發(fā)布,Scala 2.13.0于2019年6月發(fā)布。基于此,現(xiàn)在該放棄對(duì)Scala 2.11的支持了,以便我們使測(cè)試矩陣易于管理(最近的kafka-trunk-jdk8占用了將近10個(gè)小時(shí),它將使用3個(gè)Scala版本構(gòu)建并運(yùn)行單元測(cè)試和集成測(cè)試。此外,Scala 2.12和更高版本還改進(jìn)了與Java 8功能接口的互操作性(Scala 2.12中首次引入)。更具體地說(shuō),Scala 2.12中的lambda可以與Java 8代碼相同的方式與Java 8功能接口一起使用。

在我們的下載頁(yè)面中,我們推薦自Kafka 2.1.0起使用Scala 2.12構(gòu)建的Kafka二進(jìn)制文件。我們切換到Scala 2.12作為Kafka 2.2.0中源tarball,構(gòu)建和系統(tǒng)測(cè)試的默認(rèn)Scala版本。

二、改進(jìn)與修復(fù)

  • 當(dāng)輸入 topic 事務(wù)時(shí),Kafka Streams lag 不為 0

  • Kafka-streams 可配置內(nèi)部 topics message.timestamp.type=CreateTime

  • 將 KStream#toTable 添加到 Streams DSL

  • 將 Commit/List Offsets 選項(xiàng)添加到 AdminClient

  • 將 VoidSerde 添加到 Serdes

  • 改進(jìn) Sensor Retrieval

[KAFKA-3061] 修復(fù)Guava依賴問(wèn)題

[KAFKA-4203] Java生產(chǎn)者默認(rèn)的最大消息大小不再與broker默認(rèn)一致

[KAFKA-5868] kafka消費(fèi)者reblance時(shí)間過(guò)長(zhǎng)問(wèn)題

詳細(xì)更新內(nèi)容點(diǎn)此查看

三、其他版本升級(jí)至2.5.0指南

如果要從2.1.x之前的版本升級(jí),請(qǐng)參閱以下注釋,以了解用于存儲(chǔ)偏移量的架構(gòu)的更改。將inter.broker.protocol.version更改為最新版本后,將無(wú)法降級(jí)到2.1之前的版本。

在所有Broker上更新server.properties并添加以下屬性。CURRENT_KAFKA_VERSION指的是您要升級(jí)的版本。CURRENT_MESSAGE_FORMAT_VERSION是指當(dāng)前使用的消息格式版本。如果以前覆蓋了消息格式版本,則應(yīng)保留其當(dāng)前值?;蛘撸绻獜?.11.0.x之前的版本升級(jí),則應(yīng)將CURRENT_MESSAGE_FORMAT_VERSION設(shè)置為與CURRENT_KAFKA_VERSION相匹配。

  • inter.broker.protocol.version = CURRENT_KAFKA_VERSION(例如0.10.0、0.11.0、1.0、2.0、2.2)。

  • log.message.format.version = CURRENT_MESSAGE_FORMAT_VERSION

  • 如果要從0.11.0.x或更高版本升級(jí),并且尚未覆蓋消息格式,則只需要覆蓋Broker間協(xié)議版本。

    • inter.broker.protocol.version = CURRENT_KAFKA_VERSION(0.11.0,1.0,1.1,2.0,2.1,2.2,2.3)。

  • 一次升級(jí)一個(gè)Broker:關(guān)閉Broker,更新代碼,然后重新啟動(dòng)。完成此操作后,Broker將運(yùn)行最新版本,并且您可以驗(yàn)證集群的行為和性能是否符合預(yù)期。如果有任何問(wèn)題,此時(shí)仍可以降級(jí)。

  • 驗(yàn)證群集的行為和性能后,通過(guò)編輯inter.broker.protocol.version并將其設(shè)置為2.5來(lái)提高協(xié)議版本 。

  • 逐一重新啟動(dòng)Broker,以使新協(xié)議版本生效。Broker開(kāi)始使用最新協(xié)議版本后,將無(wú)法再將群集降級(jí)到較舊版本。

  • 如果您已按照上述說(shuō)明覆蓋了消息格式版本,則需要再次滾動(dòng)重啟以將其升級(jí)到最新版本。一旦所有(或大多數(shù))使用者均已升級(jí)到0.11.0或更高版本,則在每個(gè)Broker上將log.message.format.version更改為2.5,然后逐一重新啟動(dòng)它們。請(qǐng)注意,不再維護(hù)的較舊的Scala客戶端不支持0.11中引入的消息格式,因此,為避免轉(zhuǎn)換成本,必須使用較新的Java客戶端。

2.5.0主要的變化,可能產(chǎn)生的升級(jí)影響

  • 當(dāng)RebalanceProtocol#COOPERATIVE使用時(shí),Consumer#poll仍然可以返回?cái)?shù)據(jù),此外, Consumer#commitSync現(xiàn)在可以拋出RebalanceInProgressException來(lái)通知用戶此類(lèi)事件,CommitFailedException并允許用戶完成正在進(jìn)行的Reblance,然后重新嘗試為那些仍然擁有的分區(qū)提交偏移量。

  • 為了提高典型網(wǎng)絡(luò)環(huán)境中的彈性,默認(rèn)值 zookeeper.session.timeout.ms已從6s增加到18s, replica.lag.time.max.ms從10s增加到30s。

  • cogroup()添加了新的DSL運(yùn)營(yíng)商,用于一次將多個(gè)流聚合在一起。

  • 添加了新的KStream.toTable()API,可將輸入事件流轉(zhuǎn)換為KTable。

  • 添加了新的Serde類(lèi)型Void以表示輸入主題中的空鍵或空值。

  • 棄用UsePreviousTimeOnInvalidTimestamp并替換為UsePartitionTimeOnInvalidTimeStamp。

  • 通過(guò)添加掛起的偏移防護(hù)機(jī)制和更強(qiáng)大的事務(wù)提交一致性檢查,改進(jìn)了一次精確語(yǔ)義,這大大簡(jiǎn)化了可伸縮的一次精確應(yīng)用程序的實(shí)現(xiàn)。

  • 棄用KafkaStreams.store(String, QueryableStoreType)并替換為KafkaStreams.store(StoreQueryParameters)。

  • 不再支持Scala 2.11。

  • 軟件包中的所有Scala類(lèi)kafka.security.auth均已棄用。請(qǐng)注意,在2.4.0中已棄用kafka.security.auth.Authorizer 和kafka.security.auth.SimpleAclAuthorizer。

  • 默認(rèn)情況下,TLSv1和TLSv1.1已被禁用,因?yàn)樗鼈兙哂幸阎陌踩┒础,F(xiàn)在默認(rèn)情況下僅啟用TLSv1.2。您可以通過(guò)在配置選項(xiàng)ssl.protocol和中明確啟用它們來(lái)繼續(xù)使用TLSv1和TLSv1.1 ssl.enabled.protocols。

  • ZooKeeper已升級(jí)到3.5.7,并且如果3.4數(shù)據(jù)目錄中沒(méi)有快照文件,則ZooKeeper從3.4.X升級(jí)到3.5.7可能會(huì)失敗。這通常發(fā)生在測(cè)試升級(jí)中,其中ZooKeeper 3.5.7嘗試加載沒(méi)有創(chuàng)建快照文件的現(xiàn)有3.4數(shù)據(jù)目錄。

  • ZooKeeper 3.5.7版支持有或沒(méi)有客戶端證書(shū)的TLS加密的到ZooKeeper的連接,并且可以使用其他Kafka配置來(lái)利用此功能。

到此,相信大家對(duì)“Kafka2.5.0有哪些新功能”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI