您好,登錄后才能下訂單哦!
文 | 潘國慶 攜程大數(shù)據(jù)平臺實時計算平臺負責(zé)人
本文主要從攜程大數(shù)據(jù)平臺概況、架構(gòu)設(shè)計及實現(xiàn)、在實現(xiàn)當(dāng)中踩坑及填坑的過程、實時計算領(lǐng)域詳細的應(yīng)用場景,以及未來規(guī)劃五個方面闡述攜程實時計算平臺架構(gòu)與實踐,希望對需要構(gòu)建實時數(shù)據(jù)平臺的公司和同學(xué)有所借鑒。
攜程大數(shù)據(jù)平臺結(jié)構(gòu)分為三層:
應(yīng)用層:開發(fā)平臺Zeus(分為調(diào)度系統(tǒng)、Datax數(shù)據(jù)傳輸系統(tǒng)、主數(shù)據(jù)系統(tǒng)、數(shù)據(jù)質(zhì)量系統(tǒng))、查詢平臺(ArtNova報表系統(tǒng)、Adhoc查詢)、機器學(xué)習(xí)(基于tensorflow、spark等開源框架進行開發(fā);GPU云平臺基于K8S實現(xiàn))、實時計算平臺Muise;
中間層:基于開源的大數(shù)據(jù)基礎(chǔ)架構(gòu),分為分布式存儲和計算框架、實時計算框架;
離線主要是基于Hadoop、HDFS分布式存儲、分布式離線計算基于Hive及Spark、KV存儲基于HBase、Presto和Kylin用于Adhoc以及報表系統(tǒng);
實時計算框架底層是基于Kafka封裝的消息隊列系統(tǒng)Hermes, Qmq是攜程自研的消息隊列, Qmq主要用于定單交易系統(tǒng),確保百分之百不丟失數(shù)據(jù)而打造的消息隊列。
底層:資源監(jiān)控與運維監(jiān)控,分為自動化運維系統(tǒng)、大數(shù)據(jù)框架設(shè)施監(jiān)控、大數(shù)據(jù)業(yè)務(wù)監(jiān)控。
1.Muise平臺介紹
1)Muise是什么
Muise,取自希臘神話的文藝女神繆斯之名,是攜程的實時數(shù)據(jù)分析和處理的平臺;Muise平臺底層基于消息隊列和開源的實時處理系統(tǒng)JStorm、Spark Streaming和Flink,能夠支持秒級,甚至是毫秒級延遲的流式數(shù)據(jù)處理。
2)Muise的功能
數(shù)據(jù)源:Hermes Kafka/Mysql、Qmq;
數(shù)據(jù)處理:提供Muise JStorm/Spark/FlinkCore API消費Hermes或Qmq數(shù)據(jù),底層使用Jstorm、Spark或?qū)崟r處理數(shù)據(jù),并提供自己封裝的API給用戶使用。API對接了所有數(shù)據(jù)源系統(tǒng),方便用戶直接使用;
作業(yè)管理:Portal提供對于JStorm、Spark Streaming和Flink作業(yè)的管理,包含新建作業(yè),上傳jar包以及發(fā)布生產(chǎn)等功能;
監(jiān)控和告警:使用Jstorm、Spark和Flink提供的Metrics框架,支持自定義的metrics;metrics信息中心化管理,接入Ops的監(jiān)控和告警系統(tǒng),提供全面的監(jiān)控和告警支持,幫助用戶在第一時間內(nèi)監(jiān)控到作業(yè)是否發(fā)生問題。
2.Muise平臺現(xiàn)狀
平臺現(xiàn)狀:
Jstorm 2.1.1、Spark 2.0.1、Flink1.6.0、Kafka 2.0;
集群規(guī)模:
13個集群、200+臺機器150+Jstorm、50+Yarn、100+ Kafka;
作業(yè)規(guī)模:
11個業(yè)務(wù)線、350+Jstorm作業(yè)、120+SS/Flink作業(yè);
消息規(guī)模:
Topic 1300+、增量 100T+ PD、Avg 200K TPS、Max 900K TPS;
消息延時:
Hermes 200ms以內(nèi)、Storm 20ms以內(nèi);
消息處理成功率:
99.99%。
3.Muise平臺演進之路
2015 Q2~2015 Q3 :基于Storm開發(fā)實時計算平臺;
2016 Q1~2016 Q2 :Storm遷移JStorm、引入StreamCQL;
2017 Q1~2017 Q2 :Spark Streaming調(diào)研與接入;
2017 Q3~2018 Q1 :Flink調(diào)研與接入。
4.Muise平臺架構(gòu)
1)Muise平臺架構(gòu)
應(yīng)用層:Muise Portal 目前主要支持了 Storm 與 Spark Streaming兩類作業(yè),支持新建作業(yè)、Jar包發(fā)布、作業(yè)運行與停止等一系列功能;
中間層:對底層Infrastructure做了封裝,為用戶提供基于Storm、Spark、Flink相對應(yīng)的API以及各方面Services;
底層:Hermes & Qmq是數(shù)據(jù)源、Redis、HBase、HDFS、DB等作為外部的數(shù)據(jù)存儲、Graphite、Grafana、ES主要用于監(jiān)控。
2)Muise實時計算流程
Producer端:用戶先申請Kafka的topic,然后將數(shù)據(jù)實時寫到Kafka中;
Muise Portal端:用戶基于我們提供的API做開發(fā),開發(fā)完以后通過Muise Portal配置、上傳和啟動作業(yè);作業(yè)啟動后,jar包會分發(fā)到各個對應(yīng)的集群消費Kafka數(shù)據(jù);
存儲端:數(shù)據(jù)在被消費之后可以寫回QMQ或Kafka,也可以存儲到外部系統(tǒng)Redis、HBase、HDFS/Hive、DB。
5.平臺設(shè)計 ——易用性
首先:作為一個平臺設(shè)計第一要點就是要簡單易用,我們提供綜合的Portal,便于用戶自己新建管理它的作業(yè),方便開發(fā)實時作業(yè)第一時間能夠上線;
其次:我們封裝了很多Core API,支持多套實時計算框架:
支持HermesKafka/MySQL 、QMQ;
集成Jstorm、Spark Streaming、Flink;
作業(yè)資源管控;
提供DB、Redis、HBase和HDFS輸出組件;
基于內(nèi)置Metric系統(tǒng)定制多項metric進行作業(yè)預(yù)警監(jiān)控;
用戶可自定義Metric用于監(jiān)控與預(yù)警;
支持AtLeast Once 與Exactly Once語義。
上文講到平臺設(shè)計要易用,下面講平臺的容錯,確保數(shù)據(jù)一定不能出問題。
6.平臺設(shè)計——容錯
Jstorm:基于Acker機制確保At Least Once;
Spark Streaming:基于Checkpoint實現(xiàn)Exactly Once、基于Kafka Offset回溯實現(xiàn)At Least Once;
Flink:基于Flinktwo-phase commit + Kafka 0.11事務(wù)性支持實現(xiàn)Exactly Once。
7.Exactly Once
1)Direct Approach
當(dāng)前大部分拿Spark Streaming消費Kafka的話,都是用Direct Approach的方式:
優(yōu)點:記錄每個批次消費的Offset,作業(yè)可通過offset回溯;
缺點:數(shù)據(jù)存儲與offset存儲異步:
數(shù)據(jù)保存成功,應(yīng)用宕機,offset未保存 (導(dǎo)致數(shù)據(jù)重復(fù));
offset保存成功,應(yīng)用宕機,數(shù)據(jù)保存失敗 (導(dǎo)致數(shù)據(jù)丟失);
2)CheckPoint
優(yōu)點:默認記錄每個批次的運行狀態(tài)與源數(shù)據(jù),宕機時可從cp目錄恢復(fù);
缺點:
1. 非100%保證ExactlyOnce;
https://www.iteblog.com/archives/1795 描述了無法保證Exactly once的場景;
https://issues.apache.org/jira/browse/SPARK-17606 也存在doCheckPoint時出現(xiàn)塊丟失的情況;
2. 啟用cp帶來額外性能影響;
3. Streaming作業(yè)邏輯改變無法從cp恢復(fù)。
適用場景:比較適合有狀態(tài)計算的場景;
使用方式:建議程序自己存儲offset,當(dāng)發(fā)生宕機時,如果spark代碼邏輯沒有發(fā)生改變,則根據(jù)checkpoint目錄創(chuàng)建StreamingContext。如果發(fā)生改變,則根據(jù)實現(xiàn)自己存儲的offset創(chuàng)建context并設(shè)立新的checkpoint點。
8.平臺設(shè)計——監(jiān)控與告警
如何能夠第一時間幫用戶發(fā)現(xiàn)作業(yè)問題,是一個重中之重。
集群監(jiān)控
服務(wù)器監(jiān)控:考量的指標(biāo)有Memory、CPU、Disk IO、Net IO;
平臺監(jiān)控:Ganglia;
作業(yè)監(jiān)控
基于實時計算框架原生Metric系統(tǒng);
定制Metrics反應(yīng)作業(yè)狀態(tài);
采集原生與定制Metrics用于監(jiān)控和告警;
存儲:Graphite展 現(xiàn):Grafana 告警:Appmon;
我們現(xiàn)在定制的很多Metrics當(dāng)中比較通用的是:
Fail:定期時間內(nèi),Jstorm數(shù)據(jù)處理失敗數(shù)量、Spark task Fail數(shù)量;
Ack:定期時間內(nèi),處理的數(shù)據(jù)量;
Lag:定期時間內(nèi),數(shù)據(jù)產(chǎn)生與被消費的中間延遲(kafka 2.0基于自帶bornTime)。
攜程開發(fā)了自己告警系統(tǒng),將Metrics代入系統(tǒng)之后基于規(guī)則做告警。通過作業(yè)監(jiān)控看板完成相關(guān)指標(biāo)的監(jiān)控和查看,我們會把Flink作為比較關(guān)心的Metrics指標(biāo),全都導(dǎo)入到Graphite數(shù)據(jù)庫里面,然后基于前端Grafana做展現(xiàn)。通過作業(yè)監(jiān)控看板,我們能夠直接看到Kafka to Flink Delay(Lag),相當(dāng)于數(shù)據(jù)從產(chǎn)生到被Flink作業(yè)消費,中間延遲是62毫秒,速度相對比較快的。其次我們監(jiān)控了每次從Kafka中獲取數(shù)據(jù)的速度。因為從Kafka獲取數(shù)據(jù)是基于一小塊一小塊去獲取,我們設(shè)置的是每次拉2兆的數(shù)據(jù)量。通過作業(yè)監(jiān)控看板可以監(jiān)控到每次從Kafka拉取數(shù)據(jù)時候的平均延遲是25毫秒,Max是 760毫秒。
接下來講講我們在這幾年踩到的一些坑以及如何填坑的。
坑1:HermesUBT數(shù)據(jù)量大,埋點信息眾多,服務(wù)端與客戶端均承受巨大壓力;
解決方案:提供統(tǒng)一分流作業(yè),基于特定規(guī)則與配置將數(shù)據(jù)分流至不同topic。
坑2:Kafka無法保證全局有序;
解決方案:如果在強制全局有序的場景下,使用單Partition;如果在部分有序的情況下,可基于某個字段作Hash,保證Partition內(nèi)部有序。
坑3:Kafka無法根據(jù)時間精確回溯到某時間段的數(shù)據(jù);
解決方案:平臺提供過濾功能,過濾時間早于設(shè)定時間的數(shù)據(jù)(kafka 0.10之后每條數(shù)據(jù)都帶有自己的時間戳,所以這個問題在升級kafka之后自然而然的就解決了)。
坑4:最初,攜程所有的Spark Streaming、Flink作業(yè)都是跑在主機群上面的,是一個大Hadoop集群,目前是幾千臺規(guī)模,離線和實時是混布的,一旦一個大的離線作業(yè)上來時,會對實時作業(yè)有影響;其次是Hadoop集群經(jīng)常會做一些升級改造,所以可能會重啟Name Node或者Node Manager,這會導(dǎo)致作業(yè)有時會掛掉;
解決方案:我們采用分開部署,單獨搭建實時集群,獨立運行實時作業(yè)。離線歸離線,實時歸實時的,實時集群單獨跑Spark Streaming跟Yarn的作業(yè),離線專門跑離線的作業(yè)。
當(dāng)分開部署后,會遇到新的問題,部分實時作業(yè)需要去一些離線作業(yè)做一些Join或 Feature的操作,所以也是需要訪問主機群數(shù)據(jù)。這相當(dāng)于有一個跨集群訪問的問題。
坑5:Hadoop實時集群跨集群訪問主機群;
解決方案:Hdfs-site.xml配置ns-prod、ns雙重namespace,分別指向本地與主機群;
Spark配置spark.yarn.access.namenodes or hadoopFlieSystems
坑6:無論是Jstorm還是接Storm都會遇到一個CPU搶占的問題,當(dāng)你上了一個大的作業(yè),尤其是那種消耗CPU特別厲害的,可能我給它分開了一個Worker,一個CPU Core,但是它最后有可能會給我用到3個甚至4個;
解決方案:啟用cgroup限制cpu使用率。
1.實時報表統(tǒng)計
實時報表統(tǒng)計與展現(xiàn)也是Spark Streaming使用較多的一個場景,數(shù)據(jù)可以基于Process Time統(tǒng)計,也可以基于Event Time統(tǒng)計。由于本身Spark Streaming不同批次的job可以視為一個個的滾動窗口,某個獨立的窗口中包含了多個時間段的數(shù)據(jù),這使得使用SparkStreaming基于Event Time統(tǒng)計時存在一定的限制。一般較為常用的方式是統(tǒng)計每個批次中不同時間維度的累積值并導(dǎo)入到外部系統(tǒng),如ES;然后在報表展現(xiàn)的時基于時間做二次聚合獲得完整的累加值最終求得聚合值。下圖展示了攜程IBU基于Spark Streaming實現(xiàn)的實時看板。
2.實時數(shù)倉
1)Spark Streaming近實時存儲數(shù)據(jù)
如今市面上有形形×××的工具可以從Kafka實時消費數(shù)據(jù)并進行過濾清洗最終落地到對應(yīng)的存儲系統(tǒng),如:Camus、Flume等。相比較于此類產(chǎn)品,Spark Streaming的優(yōu)勢首先在于可以支持更為復(fù)雜的處理邏輯,其次基于Yarn系統(tǒng)的資源調(diào)度使得Spark Streaming的資源配置更加靈活,用戶采用Spark Streaming實時把數(shù)據(jù)寫到HDFS或者寫到Hive里面去。
2)基于各種規(guī)則作數(shù)據(jù)質(zhì)量檢測
基于Spark Streaming,自定義metric功能對數(shù)據(jù)的數(shù)據(jù)量、字段數(shù)、數(shù)據(jù)格式與重復(fù)數(shù)據(jù)進行了數(shù)據(jù)質(zhì)量校驗與監(jiān)控。
3)基于自定義metric實時預(yù)警
基于我們封裝提供的Metric注冊系統(tǒng)確定一些規(guī)則,然后每個批次基于這些規(guī)則做一個校驗,返回一個結(jié)果。這個結(jié)果會基于Metric sink吐出來,吐出來基于metrics的結(jié)果做一個監(jiān)控。當(dāng)前我們采用Flink加載TensorFlow模型實時做預(yù)測?;緯r效性是數(shù)據(jù)一旦到達兩秒鐘之內(nèi)就能夠把告警信息告出來,給用戶非常好的體驗。
1.Flink on K8S
在攜程內(nèi)部有一些不同的計算框架,有實時計算的,有機器學(xué)習(xí)的,還有離線計算的,所以需要一個統(tǒng)一的底層框架來進行管理,因此在未來將Flink遷移到了K8S上,進行統(tǒng)一的資源管控。
2.Muise平臺接入Flink SQL
Muise平臺雖然接入了Flink,但是用戶還是得手寫代碼,我們開發(fā)了一個實時特征平臺,用戶只需要寫SQL,即基于Flink的SQL就可以實時采集用戶所需要的模型里面或者用到的特征。之后會把實時特征平臺跟實時計算平臺做進行合并,用戶最后只需要寫SQL就可以實現(xiàn)所有的實時作業(yè)實現(xiàn)。
3.Jstorm全面啟用Cgroup
當(dāng)前由于部分歷史原因?qū)е卢F(xiàn)在很多作業(yè)跑在Jstorm上面,因此出現(xiàn)了資源分配不均衡的情況,之后會全面啟用Cgroup。
4.在線模型訓(xùn)練
攜程部分部門需要實時在線模型訓(xùn)練,通過用Spark訓(xùn)練了模型之后,然后使用Spark Streaming的模型,實時做一個攔截或者控制,應(yīng)用在風(fēng)控等場景。
—end—
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。