溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

開源消息中間件Kafka在華泰證券的探索與實踐是怎樣的

發(fā)布時間:2021-12-27 14:15:51 來源:億速云 閱讀:210 作者:柒染 欄目:大數(shù)據(jù)

開源消息中間件Kafka在華泰證券的探索與實踐是怎樣的,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。

Kafka 作為開源消息中間件的重要分支,在券商領(lǐng)域會有怎樣的應(yīng)用場景?下面從華泰證券的應(yīng)用現(xiàn)狀出發(fā),介紹了 Kafka 在華泰證券的大規(guī)模實踐經(jīng)驗。

       1. 引言    

Apache Kafka 發(fā)源于 LinkedIn,于 2011 年成為 Apache 的孵化項目,隨后于 2012 年成為 Apache 的頂級項目之一。按照官方定義,Kafka 是一個分布式流平臺,具備流數(shù)據(jù)的發(fā)布及訂閱(與消息隊列或企業(yè)級消息系統(tǒng)類似)能力、容錯方式的流數(shù)據(jù)存儲能力以及流數(shù)據(jù)的實時處理能力。
Kafka 的優(yōu)勢在于:
1.可靠性:具有分區(qū)機制、副本機制和容錯機制的分布式消息系統(tǒng)。
2.可擴展性:消息系統(tǒng)支持集群規(guī)模的熱擴展。
3.高性能:在數(shù)據(jù)發(fā)布和訂閱過程中都能保證數(shù)據(jù)的高吞吐量。即便在 TB 級數(shù)據(jù)存儲的情況下,仍然能保證穩(wěn)定的性能。
目前,Kafka 在互聯(lián)網(wǎng)、金融、傳統(tǒng)行業(yè)等各種類型公司內(nèi)部廣泛使用,已成為全球范圍內(nèi)實時數(shù)據(jù)傳輸和處理領(lǐng)域的事實標(biāo)準(zhǔn)。

       2. 基本原理及概念    

一個典型的 Kafka 集群中包含:(1)若干 Producer,用于生產(chǎn)數(shù)據(jù);(2)若干 Broker,構(gòu)成集群吞吐數(shù)據(jù);(3)若干 Consumer 消費數(shù)據(jù);(4)一個 Zookeeper 集群,進行全局控制和管理。
Kafka 的拓撲結(jié)構(gòu)如下圖所示:

開源消息中間件Kafka在華泰證券的探索與實踐是怎樣的
圖1 kafka 架構(gòu)圖

Kafka 通過 Zookeeper 管理集群配置、選舉 leader,以及在 Consumer Group 發(fā)生變化時進行再平衡(rebalance)。Producer 使用 push 模式將消息發(fā)布到 broker,Consumer 使用 pull 模式從 broker 訂閱并消費消息并更新消費的偏移量值(offset)。

基本概念:
? Broker(代理):Kafka 集群的服務(wù)器節(jié)點稱為 broker。
? Topic(主題):在 Kafka 中,使用一個類別屬性來劃分?jǐn)?shù)據(jù)的所屬類,劃分?jǐn)?shù)據(jù)的這個類稱為 topic。一個主題可以有零個、一個或多個消費者去訂閱寫到這個主題里面的數(shù)據(jù)。
? Partition(分區(qū)):主題中的數(shù)據(jù)分割為一個或多個 partition,分區(qū)是一個有序、不變序列的記錄集合,通過不斷追加形成結(jié)構(gòu)化的日志。
? Producer(生產(chǎn)者):數(shù)據(jù)的發(fā)布者,該角色將消息發(fā)布到 Kafka 的 topic 中。生產(chǎn)者負責(zé)選擇哪個記錄分配到指定主題的哪個分區(qū)中。
? Consumer(消費者):從 broker 中讀取數(shù)據(jù),消費者可以消費多個 topic 中的數(shù)據(jù)。
? Consumer Group(消費者組):每個 consumer 都屬于一個特定的 group 組,一個 group 組可以包含多個 consumer,但一個組中只會有一個 consumer 消費數(shù)據(jù)。

主題和分區(qū):
Topic 的本質(zhì)就是一個目錄,由一些 Partition Logs(分區(qū)日志)組成,其組織結(jié)構(gòu)如下圖所示。每個 Partition 中的消息都是有序的,生產(chǎn)的消息被不斷追加到 Partition log 上,其中的每一個消息都被賦予了一個唯一的 offset 值。

開源消息中間件Kafka在華泰證券的探索與實踐是怎樣的
圖 2 Kafka分區(qū)數(shù)據(jù)存儲示意圖

對于傳統(tǒng)的 message queue 而言,一般會刪除已經(jīng)被消費的消息,Kafka 集群會保存所有的消息,不管消息有沒有被消費。Kafka 提供兩種策略刪除舊數(shù)據(jù):(1)基于時間;(2)基于 Partition 文件大小。只有過期的數(shù)據(jù)才會被自動清除以釋放磁盤空間。
Kafka 需要維持的元數(shù)據(jù)只有“已消費消息在 Partition 中的 offset 值”,Consumer 每消費一個消息,offset 就會加 1。其實消息的狀態(tài)完全是由 Consumer 控制的,Consumer 可以跟蹤和重設(shè)這個 offset 值,這樣 Consumer 就可以讀取任意位置的消息。

數(shù)據(jù)備份機制:
Kafka 允許用戶為每個 topic 設(shè)置副本數(shù)量,副本數(shù)量決定了有幾個 broker 來存放寫入的數(shù)據(jù)。如果你的副本數(shù)量設(shè)置為 3,那么一份數(shù)據(jù)就會被存放在 3 臺不同的機器上,那么就允許有 2 個機器失敗。一般推薦副本數(shù)量至少為 2,這樣就可以保證增減、重啟機器時不會影響到數(shù)據(jù)消費。如果對數(shù)據(jù)持久化有更高的要求,可以把副本數(shù)量設(shè)置為 3 或者更多。

核心api:
Producer API:允許應(yīng)用去推送一個流記錄到一個或多個 kafka 主題上。
Consumer API:允許應(yīng)用去訂閱一個或多個主題,并處理流數(shù)據(jù)。ConsumerAPI包含 high levelAPI和 Sample api 兩套。使用 high levelAPI時,同一 Topic 的一條消息只能被同一個 Consumer Group 內(nèi)的一個 Consumer 消費,但多個 Consumer Group 可同時消費這一消息。與之相對的 Sampleapi 是一個底層的 API,完全無狀態(tài)的,每次請求都需要指定 offset 值。
Streams API:允許應(yīng)用作為一個流處理器,消費來自一個或多個主題的輸入流,或生產(chǎn)一個輸出流到一個或多個輸出主題,并可以有效地將輸入流轉(zhuǎn)換為輸出流。
其它 Kafka 的特性將在下面華泰證券的使用示例中進一步介紹。

       3. Kafka在華泰證券背景介紹及建設(shè)現(xiàn)狀    

長期以來,華泰證券的系統(tǒng)建設(shè)依賴于服務(wù)廠商,廠商之間技術(shù)方案的差異性造成了系統(tǒng)之間的異構(gòu)化,各種類型的系統(tǒng)架構(gòu)長期存在,在消息中間件領(lǐng)域尤是如此。如短信平臺使用 IBMMQ,CRM 系統(tǒng)使用 ESB 架構(gòu),自營業(yè)務(wù)使用 Oracletuxedo 架構(gòu),柜臺系統(tǒng)使用恒生 MessageCenter 架構(gòu)等。隨著華泰證券自主研發(fā)的大規(guī)模投入,迫切需要改變這種煙囪式的系統(tǒng)建設(shè)方式,以統(tǒng)一化的服務(wù)化平臺架構(gòu)來建設(shè)系統(tǒng)。
2015 年,我們通過對 Kafka、ActiveMQ 及 RabbitMQ 等開源消息中間件進行全面的測試對比,最終從性能及高可用方面考慮,選擇 Kafka 作為了公司級消息中間件,經(jīng)過兩年多的探索和實踐,Kafka 平臺已承接大量重要生產(chǎn)業(yè)務(wù)系統(tǒng),支撐了全公司業(yè)務(wù)的高速發(fā)展,積累了大量的生產(chǎn)實踐經(jīng)驗。
經(jīng)過將近三年的建設(shè)發(fā)展,目前在華泰證券內(nèi)部已分別建設(shè) 0.9.0 和 0.10.1 版本的 Kafka 集群,總體集群數(shù)量 20 余臺。
目前華泰內(nèi)部 kafka 已為行情計算、交易回報、量化分析等核心系統(tǒng)提供穩(wěn)定服務(wù),同時涵蓋了日志、數(shù)據(jù)分析等諸多運維領(lǐng)域的應(yīng)用,日均消息吞吐量達 2.3TB,峰值流量超 4.8Gb/s,TOPIC 數(shù)量 190 余個,服務(wù) 30 個以上應(yīng)用系統(tǒng)。

       4. 實踐經(jīng)驗    

(1)高可用雙活架構(gòu)
如圖 3 所示,Kafka 高可用特性依賴于 zookeeper 來實現(xiàn),由于 zookeeper 的 paxos 算法特性,故 zookeeper 采用同城三中心部署方式,保證 zookeeper 本身高可用,通常其中兩個數(shù)據(jù)中心部署偶數(shù)臺機器,另一數(shù)據(jù)中心部署單臺機器。
Kafkabroker 跨數(shù)據(jù)中心兩節(jié)點部署,所有 topic 的 partition 保證在兩中心都有副本。如果單數(shù)據(jù)中心出現(xiàn)問題,另一個中心能自動進行接管,業(yè)務(wù)系統(tǒng)可以無感知切換。
由于Kafka的高帶寬需求,主機采用萬兆網(wǎng)卡,并且在網(wǎng)卡做 bond0 以保證網(wǎng)卡高可用,跨數(shù)據(jù)中心之間的網(wǎng)絡(luò)通信采用獨立的萬兆波分通道。

開源消息中間件Kafka在華泰證券的探索與實踐是怎樣的
圖 3 KAFKA 平臺部署架構(gòu)圖

(2)參數(shù)調(diào)優(yōu)
? 首先我們在 JVM 層面做了很多嘗試。對 Kafka 服務(wù)啟動參數(shù)進行調(diào)優(yōu),使用 G1 回收器。kafka 內(nèi)存配置一般選擇 64G,其中 16G 給 Kafka 應(yīng)用本身,剩余內(nèi)存全部用于操作系統(tǒng)本身的 page cache.
? 此外為了保證核心系統(tǒng)的達到最佳的讀寫效果,我們采用 SSD 硬盤,并做了 raid5 冗余,來保證硬盤的高效 IO 讀寫能力。
? 其次我們通過調(diào)整 broker 的 num.io.threads,num.network.threads, num.replica.fetchers 等參數(shù)來保證集群之間快速復(fù)制和吞吐。
(3)數(shù)據(jù)一致性保證
Kafka 有自己一套獨特的消息傳輸保障機制(at least once)。當(dāng) producer 向 broker 發(fā)送消息時,由于副本機制(replication)的存在,一旦這條消息被 broker 確認,它將不會丟失。但如果 producer 發(fā)送數(shù)據(jù)給 broker 后,遇到網(wǎng)絡(luò)問題而造成通信中斷,那 producer 就無法判斷該條消息是否已經(jīng)被確認。這時 producer 可以重試,確保消息已經(jīng)被 broker 確認,為了保證消息的可靠性,我們要求業(yè)務(wù)做到:

? 保證發(fā)送端成功
當(dāng) producer 向 leader 發(fā)送數(shù)據(jù)時,可以通過 request.required.acks 參數(shù)來設(shè)置數(shù)據(jù)可靠性的級別:

1(默認)

leader 已成功收到的數(shù)據(jù)并得到確認后發(fā)送下一條 message。如果 leader 宕機,則會丟失數(shù)據(jù)。

0

送端無需等待來自 broker 的確認而繼續(xù)發(fā)送下一批消息。這種情況下數(shù)據(jù)傳輸效率最高,但是數(shù)據(jù)可靠性確是最低的。

-1(ALL)

發(fā)送端需要等待 ISR 列表中所有列表都確認接收數(shù)據(jù)后才算一次發(fā)送完成,可靠性最高。

? 保證消費者消費成功(at least once)
我們要求消費者關(guān)閉自動提交(enable.auto.commit:false),同時當(dāng)消費者每次 poll 處理完業(yè)務(wù)邏輯后必須完成手動同步提交(commitSync),如果消費者在消費過程中發(fā)生 crash,下次啟動時依然會從之前的位置開始消費,從而保證每次提交的內(nèi)容都能被消費。

? 消息去重
考慮到 producer,broker,consumer 之間都有可能造成消息重復(fù),所以我們要求接收端需要支持消息去重的功能,最好借助業(yè)務(wù)消息本身的冪等性來做。其中有些大數(shù)據(jù)組件,如 hbase,elasticsearch 天然就支持冪等操作。

開源消息中間件Kafka在華泰證券的探索與實踐是怎樣的
圖 4Kafka 消息可靠性機制

場景事例:行情數(shù)據(jù) hbase 存儲
在華泰內(nèi)部使用 kafka 來緩存一段時間的行情數(shù)據(jù),并做相應(yīng)處理為了保證 kafka 中數(shù)據(jù)的完整性,發(fā)送端API參數(shù)配置:

props.put(“acks”, “all”);

為了防止某條發(fā)送影響后續(xù)的消息發(fā)送,采用帶異步回調(diào)的模式發(fā)送

開源消息中間件Kafka在華泰證券的探索與實踐是怎樣的在接收端,啟動專門的消費者拉取 kafka 數(shù)據(jù)存入 hbase。hbase 的 rowkey 的設(shè)計主要包括 SecurityId(股票id)和 timestamp(行情數(shù)據(jù)時間)。消費線程從 kafka 拉取數(shù)據(jù)后反序列化,然后批量插入 hbase,只有插入成功后才往 kafka 中持久化 offset。這樣的好處是,如果在中間任意一個階段發(fā)生報錯,程序恢復(fù)后都會從上一次持久化 offset 的位置開始消費數(shù)據(jù),而不會造成數(shù)據(jù)丟失。如果中途有重復(fù)消費的數(shù)據(jù),則插入 hbase 的 rowkey 是相同的,數(shù)據(jù)只會覆蓋不會重復(fù),最終達到數(shù)據(jù)一致。
所以,從根本上說,kafka 上的數(shù)據(jù)傳輸也是數(shù)據(jù)最終一致性的典型場景。

開源消息中間件Kafka在華泰證券的探索與實踐是怎樣的
圖 5hbase 持久化邏輯

(4)ACL安全
目前華泰內(nèi)部通過配置 allow.everyone.if.no.acl.found 參數(shù)(:true)讓 Kafka 集群同時具備ACL和非ACL的能力,避免資源的浪費。我們選用 SASL 作為 Kafka 鑒權(quán)方式,因為 SASL 雖然簡單,但已滿足需求,而 Kerberos 使用過重,過度復(fù)雜組件會給 Kafka 帶來更多不確定的因素,如示例所示,根據(jù)部門劃分來分配用戶。
示例:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
ser_dep1=“ password 1”
user_dep2=“ password 2”
user_dep3=“ password 3”;
};
服務(wù)啟動后,通過 Kafka 的 command line 接口,配置基于用戶、ip、topic、groupid 等的 acl 權(quán)限來保證各業(yè)務(wù)之間的隔離。

       5.未來規(guī)劃    

隨著業(yè)務(wù)的不斷發(fā)展,Kafka 在華泰證券內(nèi)部已成為核心組件。未來重點進行 PaaS 平臺建設(shè),建立分級保障和ACL權(quán)限管控,對重點業(yè)務(wù)進行獨立管理。
目前 Kafka 的 topic 一般只有 2 個副本,在某些特殊場景下存在數(shù)據(jù)丟失的風(fēng)險,未來我們會通過升級擴容,基于業(yè)務(wù)的重要程度提升副本數(shù),強化集群的高可用性。
后續(xù)我們還會深入研究 Kafka1.0,與 KafkaStreaming、KQL、Storm、Spark、Flink 等流式計算引擎相結(jié)合,依托 Kafka 打造公司級流式計算平臺。

看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識有進一步的了解或閱讀更多相關(guān)文章,請關(guān)注億速云行業(yè)資訊頻道,感謝您對億速云的支持。

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI