溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Flink SQL怎么實(shí)現(xiàn)數(shù)據(jù)流的Join

發(fā)布時(shí)間:2021-08-31 11:00:09 來源:億速云 閱讀:170 作者:chen 欄目:建站服務(wù)器

這篇文章主要介紹“Flink SQL怎么實(shí)現(xiàn)數(shù)據(jù)流的Join”,在日常操作中,相信很多人在Flink SQL怎么實(shí)現(xiàn)數(shù)據(jù)流的Join問題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”Flink SQL怎么實(shí)現(xiàn)數(shù)據(jù)流的Join”的疑惑有所幫助!接下來,請(qǐng)跟著小編一起來學(xué)習(xí)吧!

無論在 OLAP 還是 OLTP 領(lǐng)域,Join 都是業(yè)務(wù)常會(huì)涉及到且優(yōu)化規(guī)則比較復(fù)雜的 SQL 語句。對(duì)于離線計(jì)算而言,經(jīng)過數(shù)據(jù)庫領(lǐng)域多年的積累,Join 語義以及實(shí)現(xiàn)已經(jīng)十分成熟,然而對(duì)于近年來剛興起的 Streaming SQL 來說 Join 卻處于剛起步的狀態(tài)。

其中最為關(guān)鍵的問題在于 Join 的實(shí)現(xiàn)依賴于緩存整個(gè)數(shù)據(jù)集,而 Streaming SQL Join 的對(duì)象卻是無限的數(shù)據(jù)流,內(nèi)存壓力和計(jì)算效率在長(zhǎng)期運(yùn)行來說都是不可避免的問題。下文將結(jié)合 SQL 的發(fā)展解析 Flink SQL 是如何解決這些問題并實(shí)現(xiàn)兩個(gè)數(shù)據(jù)流的 Join。

離線 Batch SQL Join 的實(shí)現(xiàn)

傳統(tǒng)的離線 Batch SQL (面向有界數(shù)據(jù)集的 SQL)有三種基礎(chǔ)的實(shí)現(xiàn)方式,分別是 Nested-loop Join、Sort-Merge Join 和 Hash Join。

  • Nested-loop Join 最為簡(jiǎn)單直接,將兩個(gè)數(shù)據(jù)集加載到內(nèi)存,并用內(nèi)嵌遍歷的方式來逐個(gè)比較兩個(gè)數(shù)據(jù)集內(nèi)的元素是否符合 Join 條件。Nested-loop Join 雖然時(shí)間效率以及空間效率都是最低的,但勝在比較靈活適用范圍廣,因此其變體 BNL 常被傳統(tǒng)數(shù)據(jù)庫用作為 Join 的默認(rèn)基礎(chǔ)選項(xiàng)。

  • Sort-Merge Join 顧名思義,分為兩個(gè) Sort 和 Merge 階段。首先將兩個(gè)數(shù)據(jù)集進(jìn)行分別排序,然后對(duì)兩個(gè)有序數(shù)據(jù)集分別進(jìn)行遍歷和匹配,類似于歸并排序的合并。值得注意的是,Sort-Merge 只適用于 Equi-Join(Join 條件均使用等于作為比較算子)。Sort-Merge Join 要求對(duì)兩個(gè)數(shù)據(jù)集進(jìn)行排序,成本很高,通常作為輸入本就是有序數(shù)據(jù)集的情況下的優(yōu)化方案。

  • Hash Join 同樣分為兩個(gè)階段,首先將一個(gè)數(shù)據(jù)集轉(zhuǎn)換為 Hash Table,然后遍歷另外一個(gè)數(shù)據(jù)集元素并與 Hash Table 內(nèi)的元素進(jìn)行匹配。第一階段和第一個(gè)數(shù)據(jù)集分別稱為 build 階段和 build table,第二個(gè)階段和第二個(gè)數(shù)據(jù)集分別稱為 probe 階段和 probe table。Hash Join 效率較高但對(duì)空間要求較大,通常是作為 Join 其中一個(gè)表為適合放入內(nèi)存的小表的情況下的優(yōu)化方案。和 Sort-Merge Join 類似,Hash Join 也只適用于 Equi-Join。

實(shí)時(shí) Streaming SQL Join

相對(duì)于離線的 Join,實(shí)時(shí) Streaming SQL(面向無界數(shù)據(jù)集的 SQL)無法緩存所有數(shù)據(jù),因此 Sort-Merge Join 要求的對(duì)數(shù)據(jù)集進(jìn)行排序基本是無法做到的,而 Nested-loop Join 和 Hash Join 經(jīng)過一定的改良則可以滿足實(shí)時(shí) SQL 的要求。
我們通過例子來看基本的 Nested Join 在實(shí)時(shí) Streaming SQL 的基礎(chǔ)實(shí)現(xiàn)(案例及圖來自 Piotr Nowojski 在 Flink Forward San Francisco 的分享[2])。

Flink SQL怎么實(shí)現(xiàn)數(shù)據(jù)流的Join

圖1. Join-in-continuous-query-1

Table A 有 1、42 兩個(gè)元素,Table B 有 42 一個(gè)元素,所以此時(shí)的 Join 結(jié)果會(huì)輸出 42。

Flink SQL怎么實(shí)現(xiàn)數(shù)據(jù)流的Join

圖2. Join-in-continuous-query-2

接著 Table B 依次接受到三個(gè)新的元素,分別是 7、3、1。因?yàn)?1 匹配到 Table A 的元素,因此結(jié)果表再輸出一個(gè)元素 1。

Flink SQL怎么實(shí)現(xiàn)數(shù)據(jù)流的Join

圖3. Join-in-continuous-query-3

隨后 Table A 出現(xiàn)新的輸入 2、3、6,3 匹配到 Table B 的元素,因此再輸出 3 到結(jié)果表。

可以看到在 Nested-Loop Join 中我們需要保存兩個(gè)輸入表的內(nèi)容,而隨著時(shí)間的增長(zhǎng) Table A 和 Table B 需要保存的歷史數(shù)據(jù)無止境地增長(zhǎng),導(dǎo)致很不合理的內(nèi)存磁盤資源占用,而且單個(gè)元素的匹配效率也會(huì)越來越低。類似的問題也存在于 Hash Join 中。

那么有沒有可能設(shè)置一個(gè)緩存剔除策略,將不必要的歷史數(shù)據(jù)及時(shí)清理呢?答案是肯定的,關(guān)鍵在于緩存剔除策略如何實(shí)現(xiàn),這也是 Flink SQL 提供的三種 Join 的主要區(qū)別。

Flink SQL 的 Join

  • Regular Join

Regular Join 是最為基礎(chǔ)的沒有緩存剔除策略的 Join。Regular Join 中兩個(gè)表的輸入和更新都會(huì)對(duì)全局可見,影響之后所有的 Join 結(jié)果。舉例,在一個(gè)如下的 Join 查詢里,Orders 表的新紀(jì)錄會(huì)和 Product 表所有歷史紀(jì)錄以及未來的紀(jì)錄進(jìn)行匹配。

SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id

因?yàn)闅v史數(shù)據(jù)不會(huì)被清理,所以 Regular Join 允許對(duì)輸入表進(jìn)行任意種類的更新操作(insert、update、delete)。然而因?yàn)橘Y源問題 Regular Join 通常是不可持續(xù)的,一般只用做有界數(shù)據(jù)流的 Join。

  • Time-Windowed Join

Time-Windowed Join 利用窗口給兩個(gè)輸入表設(shè)定一個(gè) Join 的時(shí)間界限,超出時(shí)間范圍的數(shù)據(jù)則對(duì) JOIN 不可見并可以被清理掉。值得注意的是,這里涉及到的一個(gè)問題是時(shí)間的語義,時(shí)間可以指計(jì)算發(fā)生的系統(tǒng)時(shí)間(即 Processing Time),也可以指從數(shù)據(jù)本身的時(shí)間字段提取的 Event Time。如果是 Processing Time,F(xiàn)link 根據(jù)系統(tǒng)時(shí)間自動(dòng)劃分 Join 的時(shí)間窗口并定時(shí)清理數(shù)據(jù);如果是 Event Time,F(xiàn)link 分配 Event Time 窗口并依據(jù) Watermark 來清理數(shù)據(jù)。

以更常用的 Event Time Windowed Join 為例,一個(gè)將 Orders 訂單表和 Shipments 運(yùn)輸單表依據(jù)訂單時(shí)間和運(yùn)輸時(shí)間 Join 的查詢?nèi)缦?

SELECT *
FROM 
  Orders o, 
  Shipments s
WHERE 
  o.id = s.orderId AND
  s.shiptime BETWEEN o.ordertime AND o.ordertime + INTERVAL '4' HOUR

這個(gè)查詢會(huì)為 Orders 表設(shè)置了 o.ordertime > s.shiptime- INTERVAL ‘4’ HOUR 的時(shí)間下界(圖4)。

Flink SQL怎么實(shí)現(xiàn)數(shù)據(jù)流的Join

圖4. Time-Windowed Join 的時(shí)間下界 - Orders 表

并為 Shipmenets 表設(shè)置了 s.shiptime >= o.ordertime 的時(shí)間下界(圖5)。

Flink SQL怎么實(shí)現(xiàn)數(shù)據(jù)流的Join

圖5. Time-Windowed Join 的時(shí)間下界 - Shipment 表

因此兩個(gè)輸入表都只需要緩存在時(shí)間下界以上的數(shù)據(jù),將空間占用維持在合理的范圍。

不過雖然底層實(shí)現(xiàn)上沒有問題,但如何通過 SQL 語法定義時(shí)間仍是難點(diǎn)。盡管在實(shí)時(shí)計(jì)算領(lǐng)域 Event Time、Processing Time、Watermark 這些概念已經(jīng)成為業(yè)界共識(shí),但在 SQL 領(lǐng)域?qū)r(shí)間數(shù)據(jù)類型的支持仍比較弱[4]。因此,定義 Watermark 和時(shí)間語義都需要通過編程 API 的方式完成,比如從 DataStream 轉(zhuǎn)換至 Table ,不能單純靠 SQL 完成。這方面的支持 Flink 社區(qū)計(jì)劃通過拓展 SQL 方言來完成,感興趣的讀者可以通過 FLIP-66[7] 來追蹤進(jìn)度。

  • Temporal Table Join

雖然 Timed-Windowed Join 解決了資源問題,但也限制了使用場(chǎng)景: Join 兩個(gè)輸入流都必須有時(shí)間下界,超過之后則不可訪問。這對(duì)于很多 Join 維表的業(yè)務(wù)來說是不適用的,因?yàn)楹芏嗲闆r下維表并沒有時(shí)間界限。針對(duì)這個(gè)問題,F(xiàn)link 提供了 Temporal Table Join 來滿足用戶需求。

Temporal Table Join 類似于 Hash Join,將輸入分為 Build Table 和 Probe Table。前者一般是緯度表的 changelog,后者一般是業(yè)務(wù)數(shù)據(jù)流,典型情況下后者的數(shù)據(jù)量應(yīng)該遠(yuǎn)大于前者。在 Temporal Table Join 中,Build Table 是一個(gè)基于 append-only 數(shù)據(jù)流的帶時(shí)間版本的視圖,所以又稱為 Temporal Table。Temporal Table 要求定義一個(gè)主鍵和用于版本化的字段(通常就是 Event Time 時(shí)間字段),以反映記錄在不同時(shí)間的內(nèi)容。

比如典型的一個(gè)例子是對(duì)商業(yè)訂單金額進(jìn)行匯率轉(zhuǎn)換。假設(shè)有一個(gè) Orders 流記錄訂單金額,需要和 RatesHistory 匯率流進(jìn)行 Join。RatesHistory 代表不同貨幣轉(zhuǎn)為日元的匯率,每當(dāng)匯率有變化時(shí)就會(huì)有一條更新記錄。兩個(gè)表在某一時(shí)間節(jié)點(diǎn)內(nèi)容如下:

Flink SQL怎么實(shí)現(xiàn)數(shù)據(jù)流的Join

圖6. Temporal Table Join Example]

我們將 RatesHistory 注冊(cè)為一個(gè)名為 Rates 的 Temporal Table,設(shè)定主鍵為 currency,版本字段為 time。

Flink SQL怎么實(shí)現(xiàn)數(shù)據(jù)流的Join

圖7. Temporal Table Registration]

此后給 Rates 指定時(shí)間版本,Rates 則會(huì)基于 RatesHistory 來計(jì)算符合時(shí)間版本的匯率轉(zhuǎn)換內(nèi)容。

Flink SQL怎么實(shí)現(xiàn)數(shù)據(jù)流的Join

圖8. Temporal Table Content]

在 Rates 的幫助下,我們可以將業(yè)務(wù)邏輯用以下的查詢來表達(dá):

SELECT 
  o.amount * r.rate
FROM
  Orders o,
  LATERAL Table(Rates(o.time)) r
WHERE
  o.currency = r.currency

值得注意的是,不同于在 Regular Join 和 Time-Windowed Join 中兩個(gè)表是平等的,任意一個(gè)表的新記錄都可以與另一表的歷史記錄進(jìn)行匹配,在 Temporal Table Join 中,Temoparal Table 的更新對(duì)另一表在該時(shí)間節(jié)點(diǎn)以前的記錄是不可見的。這意味著我們只需要保存 Build Side 的記錄直到 Watermark 超過記錄的版本字段。因?yàn)?Probe Side 的輸入理論上不會(huì)再有早于 Watermark 的記錄,這些版本的數(shù)據(jù)可以安全地被清理掉。

到此,關(guān)于“Flink SQL怎么實(shí)現(xiàn)數(shù)據(jù)流的Join”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI