有狀態(tài)的計算作為容錯以及數(shù)據(jù)一致性的保證,是當(dāng)今實時計算必不可少的特性之一,流行的實時計算引擎包括 Google Dataflow、Flink、Spark (Structure) Streaming、Kafka Streams 都分別提供對內(nèi)置 State 的支持。State 的引入使得實時應(yīng)用可以不依賴外部數(shù)據(jù)庫來存儲元數(shù)據(jù)及中間數(shù)據(jù),部分情況下甚至可以直接用 State 存儲結(jié)果數(shù)據(jù),這讓業(yè)界不禁思考: State 和 Database 是何種關(guān)系?有沒有可能用 State 來代替數(shù)據(jù)庫呢?
在這個課題上,F(xiàn)link 社區(qū)是比較早就開始探索的??傮w來說,F(xiàn)link 社區(qū)的努力可以分為兩條線: 一是在作業(yè)運行時通過作業(yè)查詢接口訪問 State 的能力,即 QueryableState;二是通過 State 的離線 dump 文件(Savepoint)來離線查詢和修改 State 的能力,即即將引入的 Savepoint Processor API。
QueryableState
在 2017 年發(fā)布的 Flink 1.2 版本,F(xiàn)link 引入了 QueryableState 的特性以允許用戶通過特定的 client 查詢作業(yè) State 的內(nèi)容 [1],這意味著 Flink 應(yīng)用可以在完全不依賴 State 存儲介質(zhì)以外的外部存儲的情況下提供實時訪問計算結(jié)果的能力。
只通過 Queryable State 提供實時數(shù)據(jù)訪問
然而,QueryableState 雖然設(shè)想上比較理想化,但由于依賴底層架構(gòu)的改動較多且功能也比較受限,它一直處于 Beta 版本并不能用于生產(chǎn)環(huán)境。針對這個問題,在前段時間騰訊的工程師楊華提出 QueryableState 的改進計劃 [2]。在郵件列表中,社區(qū)就 QueryableState 是否可以用于代替數(shù)據(jù)庫作了討論并出現(xiàn)了不同的觀點。筆者結(jié)合個人見解將 State as Database 的主要優(yōu)缺點整理如下。
優(yōu)點:
更低的數(shù)據(jù)延遲。一般情況下 Flink 應(yīng)用的計算結(jié)果需要同步到外部的數(shù)據(jù)庫,比如定時觸發(fā)輸出窗口計算結(jié)果,而這種同步通常是定時的會帶來一定的延遲,導(dǎo)致計算是實時的而查詢卻不是實時的尷尬局面,而直接 State 則可以避免這個問題。
SLA 保障不足。數(shù)據(jù)庫技術(shù)已經(jīng)非常成熟,在可用性、容錯性和運維上都很多的積累,在這點上 State 還相當(dāng)于是處于原始人時期。另外從定位上來看,F(xiàn)link 作業(yè)有版本迭代維護或者遇到錯誤自動重啟帶來的 down time,并不能達到數(shù)據(jù)庫在數(shù)據(jù)訪問上的高可用性。
只可以讀取,不能修改。State 在運行時只可以被作業(yè)本身修改,如果實在要修改 State 只能通過下文的 Savepoint Processor API 來實現(xiàn)。
總體來說,目前 State 代替數(shù)據(jù)庫的缺點還是遠多于其優(yōu)點,不過對于某些對數(shù)據(jù)可用性要求不高的作業(yè)來說,使用 State 作為數(shù)據(jù)庫還是完全合理的。由于定位上的不同,F(xiàn)link State 在短時間內(nèi)很難看到可以完全替代數(shù)據(jù)庫的可能性,但在數(shù)據(jù)訪問特性上 State 往數(shù)據(jù)庫方向發(fā)展是無需質(zhì)疑的。
Savepoint Processor API
Savepoint Processor API 是社區(qū)最近提出的一個新特性(見 FLIP-42 [3]),用于離線對 State 的 dump 文件 Savepoint 進行分析、修改或者直接根據(jù)數(shù)據(jù)構(gòu)建出一個初始的 Savepoint。Savepoint Processor API 屬于 Flink State Evolution 的 State Management。如果說 QueryableState 是 DSL 的話,F(xiàn)link State Evolution 就是 DML,而 Savepoint Processor API 就是 DML 中最為重要的部分。
Savepoint 作為 State 的 dump 文件,通過 Savepoint Processor API 可以暴露數(shù)據(jù)查詢和修改功能,類似于一個離線的數(shù)據(jù)庫,但 State 的概念和典型關(guān)系型數(shù)據(jù)的概念還是有很多不同,F(xiàn)LIP-43 也對這些差異進行了類比和總結(jié)。
首先 Savepoint 是多個 operator 的 state 的物理存儲集合,不同 operator 的 state 是獨立的,這類似于數(shù)據(jù)庫下不同 namespace 之間的 table。我們可以得到 Savepoint 對應(yīng)數(shù)據(jù)庫,單個 operator 對應(yīng) Namespace。
DatabaseSavepointNamespaceUidTableState
但就 table 而言,其在 Savepoint 里對應(yīng)的概念根據(jù) State 類型的不同而有所差別。State 有 Operator State、Keyed State 和 Broadcast State 三種,其中 Operator State 和 Broadcast State 屬于 non-partitioned state,即沒有按 key 分區(qū)的 state,而相反地 Keyed State 則屬于 partitioned state。對于 non-partitioned state 來說,state 是一個 table,state 的每個元素即是 table 里的一行;而對于 partitioned state 來說,同一個 operator 下的所有 state 對應(yīng)一個 table。這個 table 像是 HBase 一樣有個 row key,然后每個具體的 state 對應(yīng) table 里的一個 column。
舉個例子,假設(shè)有一個游戲玩家得分和在線時長的數(shù)據(jù)流,我們需要用 Keyed State 來記錄玩家所在組的分數(shù)和游戲時長,用 Operator State 記錄玩家的總得分和總時長。
相對地,假如用 Operator State 來記錄總得分和總時長(并行度設(shè)為 1),我們注冊 total_score 和 total_time 兩個 State,得到的表有兩個:
total_score |
------- |
14,500 |
total_time5,600
至此 Savepoint 和 Database 的對應(yīng)關(guān)系應(yīng)該是比較清晰明了的。而對于 Savepoint 來說還有不同的 StateBackend 來決定 State 具體如何持續(xù)化,這顯然對應(yīng)的是數(shù)據(jù)庫的存儲引擎。在 MySQL 中,我們可以通過簡單的一行命令 ALTER TABLE xxx ENGINE = InnoDB; 來改變存儲引擎,在背后 MySQL 會自動完成繁瑣的格式轉(zhuǎn)換工作。而對于 Savepoint 來說,由于 StateBackend 各自的存儲格式不兼容,目前尚不能方便地切換 StateBackend。為此,社區(qū)在不久前創(chuàng)建 FLIP-41 [5] 來進一步完善 Savepoint 的可操作性。
總結(jié)
State as Database 是實時計算發(fā)展的大趨勢,它并不是要代替數(shù)據(jù)庫的使用,而是借鑒數(shù)據(jù)庫領(lǐng)域的經(jīng)驗拓展 State 接口使其操作方式更接近我們熟悉的數(shù)據(jù)庫。對于 Flink 而言,State 的外部使用可以分為在線的實時訪問和離線的訪問和修改,分別將由 Queryable State 和 Savepoint Processor API 兩個特性支持。