您好,登錄后才能下訂單哦!
本文作者:易偉平(餓了么)
整理:姬平(阿里巴巴實時計算部)
本文將為大家展示餓了么大數(shù)據(jù)平臺在實時計算方面所做的工作,以及計算引擎的演變之路,你可以借此了解Storm、Spark、Flink的優(yōu)缺點。如何選擇一個合適的實時計算引擎?Flink憑借何種優(yōu)勢成為餓了么首選?本文將帶你一一解開謎題。
下面是目前餓了么平臺現(xiàn)狀架構(gòu)圖:
cdn.xitu.io/2019/4/24/16a4ebcdcad6494b?w=4388&h=2760&f=png&s=1385555">
來源于多個數(shù)據(jù)源的數(shù)據(jù)寫到 kafka 里,計算引擎主要是 Storm , Spark 和 Flink,計算引擎出來的結(jié)果數(shù)據(jù)再落地到各種存儲上。
目前 Storm 任務(wù)大概有100多個,Spark任務(wù)有50個左右,F(xiàn)link暫時還比較少。
目前我們集群規(guī)模每天數(shù)據(jù)量有60TB,計算次數(shù)有1000000000,節(jié)點有400個。這里要提一下,Spark 和 Flink都是 on yarn 的,其中Flink onyarn主要是用作任務(wù)間 jobmanager 隔離, Storm 是 standalone 模式。
在講述我們應(yīng)用場景之前,先強調(diào)實時計算一個重要概念, 一致性語義:
1) at-most-once:即 fire and forget,我們通常寫一個 java 的應(yīng)用,不去考慮源頭的 offset 管理,也不去考慮下游的冪等性的話,就是簡單的at-most-once,數(shù)據(jù)來了,不管中間狀態(tài)怎樣,寫數(shù)據(jù)的狀態(tài)怎樣,也沒有ack機制。
2) at-least-once: 重發(fā)機制,重發(fā)數(shù)據(jù)保證每條數(shù)據(jù)至少處理一次。
3) exactly-once: 使用粗 checkpoint 粒度控制來實現(xiàn) exactly-once,我們講的 exactly-once 大多數(shù)指計算引擎內(nèi)的 exactly-once,即每一步的 operator 內(nèi)部的狀態(tài)是否可以重放;上一次的 job 如果掛了,能否從上一次的狀態(tài)順利恢復(fù),沒有涉及到輸出到 sink 的冪等性概念。
4) at-least-one + idempotent = exactly-one:如果我們能保證說下游有冪等性的操作,比如基于mysql實現(xiàn) update on duplicate key;或者你用es, cassandra之類的話,可以通過主鍵key去實現(xiàn)upset的語義, 保證at-least-once的同時,再加上冪等性就是exactly-once。
餓了么早期都是使用Storm,16年之前還是Storm,17年才開始有Sparkstreaming, Structed-streaming。Storm用的比較早,主要有下面幾個概念:
1) 數(shù)據(jù)是 tuple-based
2) 毫秒級延遲
3) 主要支持java, 現(xiàn)在利用apache beam也支持python和go。
4) Sql的功能還不完備,我們自己內(nèi)部封裝了typhon,用戶只需要擴展我們的一些接口,就可以使用很多主要的功能;flux是Storm的一個比較好的工具,只需要寫一個yaml文件,就可以描述一個Storm任務(wù),某種程度上說滿足了一些需求,但還是要求用戶是會寫java的工程師,數(shù)據(jù)分析師就使用不了。
1) 易用性:因為使用門檻高,從而限制了它的推廣。
2)StateBackend:更多的需要外部存儲,比如redis之類的kv存儲。
3) 資源分配方面:用worker和slot提前設(shè)定的方式,另外由于優(yōu)化點做的較少,引擎吞吐量相對比較低一點。
有一天有個業(yè)務(wù)方過來提需求說 我們能不能寫個sql,幾分鐘內(nèi)就可以發(fā)布一個實時計算任務(wù)。 于是我們開始做Sparkstreaming。它的主要概念如下:
1) Micro-batch:需要提前設(shè)定一個窗口,然后在窗口內(nèi)處理數(shù)據(jù)。
2) 延遲是秒級級別,比較好的情況是500ms左右。
3) 開發(fā)語言是java和scala。
4) Streaming SQL,主要是我們的工作,我們希望提供 Streaming SQL 的平臺。
特點:
1) Spark生態(tài)和 SparkSQL: 這是Spark比較好的地方,技術(shù)棧是統(tǒng)一的,SQL,圖計算,machine learning的包都是可以互調(diào)的。因為它先做的是批處理,和Flink不一樣,所以它天然的實時和離線的api是統(tǒng)一的。
2) Checkpointon hdfs。
3) On Yarn:Spark是屬于 hadoop 生態(tài)體系,和 yarn 集成度高。
4) 高吞吐: 因為它是 micro-batch 的方式,吞吐也是比較高的。
下面給大家大致展示一下我們平臺用戶快速發(fā)布一個實時任務(wù)的操作頁面,它需要哪些步驟。我們這里不是寫 DDL 和 DML 語句,而是 UI 展示頁面的方式。
頁面里面會讓用戶選一些必要的參數(shù), 首先會選哪一個 kafka 集群,每個分區(qū)消費多少,反壓也是默認開啟的。消費位置需要讓用戶每次去指定,有可能用戶下一次重寫實時任務(wù)的時候,可以根據(jù)業(yè)務(wù)需求去選擇offset消費點。
中間就是讓用戶描述 pipeline。 SQL 就是 kafka 的多個 topic,輸出選擇一個輸出表,SQL 把上面消費的 kafka DStream 注冊成表,然后寫一串 pipeline,最后我們幫用戶封裝了一些對外 sink (剛剛提到的各種存儲都支持,如果存儲能實現(xiàn) upsert 語義的話,我們都是支持了的)。
雖然剛剛滿足一般無狀態(tài)批次內(nèi)的計算要求,但就有用戶想說, 我想做流的 join 怎么辦, 早期的 Spark1.5 可以參考 Spark-streamingsql 這個開源項目把 DStream 注冊為一個表,然后對這個表做 join 的操作,但這只支持1.5之前的版本,Spark2.0 推出 structured streaming 之后項目就廢棄了。我們有一個 tricky 的方式:
讓 Sparkstreaming 去消費多個 topic,但是我根據(jù)一些條件把消費的 DStream 里面的每個批次 RDD 轉(zhuǎn)化為DataFrame,這樣就可以注冊為一張表,根據(jù)特定的條件,切分為兩張表,就可以簡單的做個 join,這個 join 的問題完全依賴于本次消費的數(shù)據(jù),它們 join 的條件是不可控的,是比較 tricky 的方式。比如說下面這個例子,消費兩個 topic,然后簡單通過 filer 條件,拆成兩個表,然后就可以做個兩張表的 join,但它本質(zhì)是一個流。
exactly-once 需要特別注意一個點:
我們必須要求數(shù)據(jù) sink 到外部存儲后,offset 才能 commit,不管是到 zookeeper,還是 mysql 里面,你最好保證它在一個 transaction 里面,而且必須在輸出到外部存儲(這里最好保證一個 upsert 語義,根據(jù) unique key 來實現(xiàn)upset語義)之后,然后這邊源頭driver再根據(jù)存儲的 offeset 去產(chǎn)生 kafka RDD,executor 再根據(jù) kafka 每個分區(qū)的 offset 去消費數(shù)據(jù)。如果滿足這些條件,就可以實現(xiàn)端到端的 exactly-once 這是一個大前提。
1) Stateful Processing SQL ( <2.x mapWithState、updateStateByKey):我們要實現(xiàn)跨批次帶狀態(tài)的計算的話,在1.X版本,我們通過這兩個接口去做,但還是需要把這個狀態(tài)存到 hdfs 或者外部去,實現(xiàn)起來比較麻煩一點。
2) Real Multi-Stream Join:沒辦法實現(xiàn)真正的多個流join的語義。
3) End-To-End Exactly-Once Semantics:它的端到端的 exactly-once 語義實現(xiàn)起來比較麻煩,需要sink到外部存儲后還需要手動的在事務(wù)里面提交offset。
我們調(diào)研然后并去使用了 Spark2.X 之后帶狀態(tài)的增量計算。下面這個圖是官方網(wǎng)站的:
所有的流計算都參照了 Google 的 data flow,里面有個重要的概念:數(shù)據(jù)的 processing time 和 event time,即數(shù)據(jù)的處理時間和真正的發(fā)生時間有個 gap 。于是流計算領(lǐng)域還有個 watermark,當前進來的事件水位需要watermark 來維持,watermark 可以指定時間 delay 的范圍,在延遲窗口之外的數(shù)據(jù)是可以丟棄的,在業(yè)務(wù)上晚到的數(shù)據(jù)也是沒有意義的。
下面是 structured streaming 的架構(gòu)圖:
這里面就是把剛才 sparkstreaming 講 exactly-once 的步驟1,2,3都實現(xiàn)了,它本質(zhì)上還是分批的 batch 方式,offset 自己維護,狀態(tài)存儲用的 hdfs,對外的 sink 沒有做類似的冪等操作,也沒有寫完之后再去 commit offset,它只是再保證容錯的同時去實現(xiàn)內(nèi)部引擎的 exactly-once。
1) Stateful Processing SQL&DSL:可以滿足帶狀態(tài)的流計算
2) Real Multi-Stream Join:可以通過 Spark2.3 實現(xiàn)多個流的 join,多個流的 join 做法和 Flink 類似,你需要先定義兩個流的條件(主要是時間作為一個條件),比如說有兩個topic的流進來,然后你希望通過某一個具體的 schema 中某個字段(通常是 event time)來限定需要 buffer 的數(shù)據(jù),這樣可以實現(xiàn)真正意義上的流的 join。
3)比較容易實現(xiàn)端到端的 exactly-once 的語義,只需要擴展sink的接口支持冪等操作是可以實現(xiàn) exactly-once的。
特別說一下,structured streaming 和原生的 streaming 的 API 有一點區(qū)別,它創(chuàng)建表的 Dataframe 的時候,是需要指定表的 schema 的,意味著你需要提前指定 schema。另外它的 watermark 是不支持 SQL 的,于是我們加了一個擴展,實現(xiàn)完全寫 SQL,可以從左邊到右邊的轉(zhuǎn)換(下圖),我們希望用戶不止是程序員,也希望不會寫程序的數(shù)據(jù)分析師等同學(xué)也能用到。
1) Trigger(Processing Time、 Continuous ):2.3之前主要基于processing Time,每個批次的數(shù)據(jù)處理完了立馬觸發(fā)下一批次的計算。2.3推出了record by record的持續(xù)處理的trigger。
2) Continuous Processing (Only Map-Like Operations):目前它只支持map like的操作,同時sql的支持度也有些限制。
3) LowEnd-To-End Latency With Exactly-Once Guarantees:端到端的exactly-once的保證需要自己做一些額外的擴展, 我們發(fā)現(xiàn)kafka0.11版本提供了事務(wù)的功能,是可以從基于這方面考慮從而去實現(xiàn)從source到引擎再到sink,真正意義上的端到端的exactly-once。
4) CEP(Drools):我們發(fā)現(xiàn)有業(yè)務(wù)方需要提供 CEP 這樣復(fù)雜事件處理的功能,目前我們的語法無法直接支持,我們讓用戶使用規(guī)則引擎 Drools,然后跑在每個 executor 上面,依靠規(guī)則引擎功能去實現(xiàn) CEP。
于是基于以上幾個 Spark Structured Streaming 的特點和缺點,我們考慮使用 Flink 來做這些事情。
Flink 目標是對標 Spark,流這塊是領(lǐng)先比較多,它野心也比較大,圖計算,機器學(xué)習(xí)等它都有,底層也是支持yarn,tez等。對于社區(qū)用的比較多的存儲,F(xiàn)link 社區(qū)官方都支持比較好,相對來說。
Flink 的框架圖:
Flink中的 JobManager,相當于 Spark 的 Driver 角色,TaskManger 相當于 Executor,里面的 Task 也有點類似Spark 的那些 Task。 不過 Flink 用的 RPC 是 akka,同時 Flink Core 自定義了內(nèi)存序列化框架,另外 Task 無需像Spark 每個 Stage 的 Task 必須相互等待而是處理完后即往下游發(fā)送數(shù)據(jù)。
Flink binary data處理operator:
Spark 的序列化用戶一般會使用 kryo 或者 java 默認的序列化,同時也有 Tungsten 項目對 Spark 程序做一 JVM 層面以及代碼生成方面的優(yōu)化。相對于 Spark,F(xiàn)link自己實現(xiàn)了基于內(nèi)存的序列化框架,里面維護著key和pointer 的概念,它的 key 是連續(xù)存儲,在 CPU 層面會做一些優(yōu)化,cache miss 概率極低。比較和排序的時候不需要比較真正的數(shù)據(jù),先通過這個 key 比較,只有當它相等的時候,才會從內(nèi)存中把這個數(shù)據(jù)反序列化出來,再去對比具體的數(shù)據(jù),這是個不錯的性能優(yōu)化點。
Flink Task Chain:
Task中 operator chain,是比較好的概念。如果上下游數(shù)據(jù)分布不需要重新 shuffle 的話,比如圖中 source 是kafka source,后面跟的 map 只是一個簡單的數(shù)據(jù) filter,我們把它放在一個線程里面,就可以減少線程上下文切換的代價。
并行度概念
比如說這里面會有 5 個 Task,就會有幾個并發(fā)線程去跑,chain 起來的話放在一個線程去跑就可以提升數(shù)據(jù)傳輸性能。Spark 是黑盒的,每個 operator 無法設(shè)并發(fā)度,而 Flink 可以對每個 operator 設(shè)并發(fā)度,這樣可以更靈活一點,作業(yè)運行起來對資源利用率也更高一點。
Spark 一般通過 Spark.default.parallelism 來調(diào)整并行度,有 shuffle 操作的話,并行度一般是通Spark.sql.shuffle.partitions 參數(shù)來調(diào)整,實時計算的話其實應(yīng)該調(diào)小一點,比如我們生產(chǎn)中和 kafka 的 partition 數(shù)調(diào)的差不多,batch 在生產(chǎn)上會調(diào)得大一點,我們設(shè)為1000,左邊的圖我們設(shè)并發(fā)度為2,最大是10,這樣首先分2個并發(fā)去跑,另外根據(jù) key 做一個分組的概念,最大分為10組,就可以做到把數(shù)據(jù)盡量的打散。
State & Checkpoint
因為 Flink 的數(shù)據(jù)是一條條過來處理,所以 Flink 中的每條數(shù)據(jù)處理完了立馬發(fā)給下游,而不像 spark,需要等該operator 所在的 stage 所有的 task 都完成了再往下發(fā)。
Flink 有粗粒度的 checkpoint 機制,以非常小的代價為每個元素賦予一個 snapshot 概念,只有當屬于本次snapshot 的所有數(shù)據(jù)都進來后才會觸發(fā)計算,計算完后,才把 buffer 數(shù)據(jù)往下發(fā),目前 Flink sql 沒有提供控制buffer timeout 的接口,即我的數(shù)據(jù)要buffer多久才往下發(fā)??梢栽跇?gòu)建 Flink context 時,指定 buffer timeout為 0,處理完的數(shù)據(jù)才會立馬發(fā)下去,不需要等達到一定閾值后再往下發(fā)。
Backend 默認是維護在 jobmanager 內(nèi)存,我們更多使用的的是寫到 hdfs 上,每個 operator 的狀態(tài)寫到 rocksdb 上,然后異步周期增量同步到外部存儲。
容錯
圖中左半部分的紅色節(jié)點發(fā)生了 failover,如果是 at-least-once,則其最上游把數(shù)據(jù)重發(fā)一次就好;但如果是exactly-once,則需要每個計算節(jié)點從上一次失敗的時機重放。
Exactly Once Two-Phase Commit
Flink1.4 之后有兩階段提交來支持 exactly-once 。它的概念是從上游 kafka 消費數(shù)據(jù)后,每一步都會發(fā)起一次投票,來記錄狀態(tài),通過checkpoint的屏障來處理標記,只有最后再寫到kafka(0.11之后的版本),只有最后完成之后,才會把每一步的狀態(tài)讓 jobmanager 中的 cordinator 去通知可以固化下來,這樣實現(xiàn) exactly-once。
Savepoints
還有一點 Flink 比較好的就是,基于它的 checkpoint 來實現(xiàn) savepoint 功能。業(yè)務(wù)方需要每個應(yīng)用恢復(fù)節(jié)點不一樣,希望恢復(fù)到的版本也是可以指定的,這是比較好的。這個 savepoint 不只是數(shù)據(jù)的恢復(fù),也有計算狀態(tài)的恢復(fù)。
特點:
1) Trigger (Processing Time、 Event Time、IngestionTime):對比下,F(xiàn)link支持的流式語義更豐富,不僅支持 Processing Time, 也支持 Event time 和 Ingestion Time。
2)Continuous Processing & Window:支持純意義上的持續(xù)處理,record by record的,window 也比 Spark處理的好。
3) Low End-To-End Latency With Exactly-Once Guarantees:因為有兩階段提交,用戶是可以選擇在犧牲一定吞吐量的情況下,根據(jù)業(yè)務(wù)需求情況來調(diào)整來保證端到端的exactly-once。
4) CEP:支持得好。
5) Savepoints:可以根據(jù)業(yè)務(wù)的需求做一些版本控制。
也有做的還不好的:
1)SQL (Syntax Function、Parallelism):SQL功能還不是很完備,大部分用戶是從hive遷移過來,Spark支持hive覆蓋率達到99%以上。 SQL函數(shù)不支持,目前還無法對單個operator做并行度的設(shè)置。
2) ML、Graph等:機器學(xué)習(xí),圖計算等其他領(lǐng)域比Spark要弱一點,但社區(qū)也在著力持續(xù)改進這個問題。
因為現(xiàn)在餓了么已經(jīng)屬于阿里的一員,后續(xù)會更多地使用 Flink,也期待用到 Blink。
更多資訊請訪問 Apache Flink 中文社區(qū)網(wǎng)站
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。