溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Apache Calcite官方文檔中文版- 進(jìn)階-3. 流(Streaming)

發(fā)布時(shí)間:2020-08-09 23:00:26 來源:網(wǎng)絡(luò) 閱讀:1696 作者:Lynn_Yuan 欄目:大數(shù)據(jù)

第二部分 進(jìn)階(Advanced)

3. 流(Streaming)

??Calcite擴(kuò)展了SQL和關(guān)系代數(shù)以支持流式查詢。

3.1 簡介

??流是收集到持續(xù)不斷流動(dòng)的記錄,永遠(yuǎn)不停止。與表不同,它們通常不存儲(chǔ)在磁盤上,而流是通過網(wǎng)絡(luò),并在內(nèi)存中保存很短的時(shí)間。
??數(shù)據(jù)流是對表格的補(bǔ)充,因?yàn)樗鼈兇砹似髽I(yè)現(xiàn)在和將來發(fā)生的事情,而表格代表了過去。一個(gè)流被存檔到一個(gè)表中是很常見的。
??與表一樣,您經(jīng)常希望根據(jù)關(guān)系代數(shù)以高級語言查詢流,根據(jù)模式(schema)進(jìn)行驗(yàn)證,并優(yōu)化以充分利用可用的資源和算法。
??Calcite的SQL是對標(biāo)準(zhǔn)SQL的擴(kuò)展,而不是另一種“類SQL”的語言。區(qū)別很重要,原因如下:

  • 對于任何知道常規(guī)SQL的人來說,流式SQL都很容易學(xué)習(xí)。
  • 語義清晰,因?yàn)槲覀兊哪繕?biāo)是在一個(gè)流上產(chǎn)生相同的結(jié)果,就好像表中的數(shù)據(jù)是一樣的。
  • 可以編寫結(jié)合了流和表(或者流的歷史,基本上是內(nèi)存表)的查詢。
  • 許多現(xiàn)有的工具可以生成標(biāo)準(zhǔn)的SQL。

??如果不使用STREAM關(guān)鍵字,則返回常規(guī)標(biāo)準(zhǔn)SQL。

3.2 schema示例

??流式SQL使用以下schema:

  • Orders (rowtime, productId, orderId, units):一個(gè)流和一個(gè)表
  • Products (rowtime, productId, name)?:一個(gè)表
  • Shipments (rowtime, orderId)?:一個(gè)流

3.3 簡單查詢

??最簡單的流式查詢:

SELECT STREAM *
FROM Orders;

  rowtime | productId | orderId | units
----------+-----------+---------+-------
 10:17:00 |        30 |       5 |     4
 10:17:05 |        10 |       6 |     1
 10:18:05 |        20 |       7 |     2
 10:18:07 |        30 |       8 |    20
 11:02:00 |        10 |       9 |     6
 11:04:00 |        10 |      10 |     1
 11:09:30 |        40 |      11 |    12
 11:24:11 |        10 |      12 |     4

??該查詢讀取Orders流中的所有列和行。與任何流式查詢一樣,它永遠(yuǎn)不會(huì)終止。只要記錄到達(dá),它就會(huì)輸出一條記錄Orders。
??輸入Control-C以終止查詢。
??STREAM關(guān)鍵字是SQL流的主要擴(kuò)展。它告訴系統(tǒng)你對訂單有興趣,而不是現(xiàn)有訂單。

??查詢:

SELECT *
FROM Orders;

  rowtime | productId | orderId | units
----------+-----------+---------+-------
 08:30:00 |        10 |       1 |     3
 08:45:10 |        20 |       2 |     1
 09:12:21 |        10 |       3 |    10
 09:27:44 |        30 |       4 |     2

4 records returned.

??也是有效的,但會(huì)打印出現(xiàn)有的所有訂單,然后終止。我們把它稱為關(guān)系查詢,而不是流式處理。它具有傳統(tǒng)的SQL語義。
??Orders很特殊,因?yàn)樗幸粋€(gè)流和一個(gè)表。如果您嘗試在表上運(yùn)行流式查詢或在流上運(yùn)行關(guān)系式查詢,則Calcite會(huì)拋出一個(gè)錯(cuò)誤:

SELECT * FROM Shipments;

ERROR: Cannot convert stream 'SHIPMENTS' to a table

SELECT STREAM * FROM Products;

ERROR: Cannot convert table 'PRODUCTS' to a stream

3.4 過濾行

??與常規(guī)的SQL中一樣,使用一個(gè)WHERE子句來過濾行:

SELECT STREAM *
FROM Orders
WHERE units > 3;

  rowtime | productId | orderId | units
----------+-----------+---------+-------
 10:17:00 |        30 |       5 |     4
 10:18:07 |        30 |       8 |    20
 11:02:00 |        10 |       9 |     6
 11:09:30 |        40 |      11 |    12
11:24:11 |        10 |      12 |     4

3.5 表達(dá)式投影

??在SELECT子句中使用表達(dá)式來選擇要返回或計(jì)算表達(dá)式的列:

SELECT STREAM rowtime,
  'An order for ' || units || ' '
    || CASE units WHEN 1 THEN 'unit' ELSE 'units' END
    || ' of product #' || productId AS description
FROM Orders;

  rowtime | description
----------+---------------------------------------
 10:17:00 | An order for 4 units of product #30
 10:17:05 | An order for 1 unit of product #10
 10:18:05 | An order for 2 units of product #20
 10:18:07 | An order for 20 units of product #30
 11:02:00 | An order by 6 units of product #10
 11:04:00 | An order by 1 unit of product #10
 11:09:30 | An order for 12 units of product #40
 11:24:11 | An order by 4 units of product #10

??我們建議您始終在SELECT?條款中包含rowtime列。在每個(gè)流和流式查詢中有一個(gè)有序的時(shí)間戳,可以在稍后進(jìn)行高級計(jì)算,例如GROUP BY和JOIN。

3.6 滾動(dòng)窗口

??有幾種方法可以計(jì)算流上的聚合函數(shù)。差異是:

  • How many rows come out for each row in?
  • 每個(gè)輸入值總共出現(xiàn)一次還是多次?
  • 什么定義了“窗口”,一組貢獻(xiàn)給輸出行的行?
  • 結(jié)果是流還是關(guān)系?

??窗口類型:

  • 滾動(dòng)窗口(GROUP BY)
  • 跳轉(zhuǎn)窗口(多GROUP BY)(hopping)
  • 滑動(dòng)窗口(窗口函數(shù))
  • 級聯(lián)窗口(窗口函數(shù))
    ??下圖顯示了使用它們的查詢類型:
    Apache Calcite官方文檔中文版- 進(jìn)階-3.	流(Streaming)
    ??首先,看一個(gè)滾動(dòng)窗口,它是由一個(gè)流GROUP BY定義的?。這里是一個(gè)例子:
SELECT STREAM CEIL(rowtime TO HOUR) AS rowtime,
  productId,
  COUNT(*) AS c,
  SUM(units) AS units
FROM Orders
GROUP BY CEIL(rowtime TO HOUR), productId;

rowtime | productId |      c | units
------------+---------------+------------+-------
 11:00:00 |     30 |       2 |    24
 11:00:00 |     10 |       1 |     1
 11:00:00 |     20 |       1 |     7
 12:00:00 |     10 |       3 |    11
 12:00:00 |     40 |       1 |    12

??結(jié)果是流。在11點(diǎn)整,Calcite發(fā)出自10點(diǎn)以來一直到11點(diǎn)有下訂單的?productId的小計(jì)。12點(diǎn),它會(huì)發(fā)出11:00至12:00之間的訂單。每個(gè)輸入行只貢獻(xiàn)到一個(gè)輸出行。
??Calcite是如何知道10:00:00的小計(jì)在11:00:00完成的,這樣就可以發(fā)出它們了?它知道rowtime是在增加,而且它也知道CEIL(rowtime TO HOUR)在增加。所以,一旦在11:00:00時(shí)間點(diǎn)或之后看到一行,它將永遠(yuǎn)不會(huì)看到貢獻(xiàn)到上午10:00:00的一行。
??增加或減少的列以及表達(dá)式是單調(diào)的。(單調(diào)遞增或單調(diào)遞減)
??如果列或表達(dá)式的值具有輕微的失序,并且流具有用于聲明特定值將不會(huì)再被看到的機(jī)制(例如標(biāo)點(diǎn)符號或水?。?,則該列或表達(dá)式被稱為準(zhǔn)單調(diào)。
??在GROUP BY子句中沒有單調(diào)或準(zhǔn)單調(diào)表達(dá)式的情況下,Calcite無法取得進(jìn)展,并且不允許查詢:

SELECT STREAM productId,
COUNT(*) AS c,
SUM(units) AS units
FROM Orders
GROUP BY productId;

ERROR: Streaming aggregation requires at least one monotonic expression

??單調(diào)和準(zhǔn)單調(diào)的列需要在模式中聲明。當(dāng)記錄輸入流并且由從該流中讀取數(shù)據(jù)的假定查詢時(shí),單調(diào)性被強(qiáng)制執(zhí)行。我們建議為每個(gè)流指定一個(gè)時(shí)間戳列rowtime,但也可以聲明其他列是單調(diào)的,例如orderId。
??我們將在下面的內(nèi)容討論標(biāo)點(diǎn)符號,水印,并取得進(jìn)展的其他方法?。

3.7 滾動(dòng)窗口,改進(jìn)

??前面的滾動(dòng)窗口的例子很容易寫,因?yàn)榇翱谑且粋€(gè)小時(shí)。對于不是整個(gè)時(shí)間單位的時(shí)間間隔,例如2小時(shí)或2小時(shí)17分鐘,則不能使用CEIL,表達(dá)式將變得更復(fù)雜。
??Calcite支持滾動(dòng)窗口的替代語法:

SELECT STREAM TUMBLE_END(rowtime, INTERVAL '1' HOUR) AS rowtime,
  productId,
  COUNT(*) AS c,
  SUM(units) AS units
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId;

  rowtime | productId |       c | units
----------+-----------+---------+-------
 11:00:00 |        30 |       2 |    24
 11:00:00 |        10 |       1 |     1
 11:00:00 |        20 |       1 |     7
 12:00:00 |        10 |       3 |    11
 12:00:00 |        40 |       1 |    12

??正如你所看到的,它返回與前一個(gè)查詢相同的結(jié)果。TUMBLE?函數(shù)返回一個(gè)分組鍵,這個(gè)分組鍵在給定的匯總行中將會(huì)以相同的方式結(jié)束;?TUMBLE_END函數(shù)采用相同的參數(shù)并返回該窗口的結(jié)束時(shí)間; 當(dāng)然還有一個(gè)TUMBLE_START函數(shù)。
??TUMBLE有一個(gè)可選參數(shù)來對齊窗口。在以下示例中,我們使用30分鐘間隔和0:12作為對齊時(shí)間,因此查詢在每小時(shí)過去12分鐘和42分鐘時(shí)發(fā)出匯總:

SELECT STREAM
  TUMBLE_END(rowtime, INTERVAL '30' MINUTE, TIME '0:12') AS rowtime,
  productId,
  COUNT(*) AS c,
  SUM(units) AS units
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL '30' MINUTE, TIME '0:12'),
  productId;

  rowtime | productId |       c | units
----------+-----------+---------+-------
 10:42:00 |        30 |       2 |    24
 10:42:00 |        10 |       1 |     1
 10:42:00 |        20 |       1 |     7
 11:12:00 |        10 |       2 |     7
 11:12:00 |        40 |       1 |    12
 11:42:00 |        10 |       1 |     4

3.8 跳轉(zhuǎn)窗口

??跳轉(zhuǎn)窗口是滾動(dòng)窗口的泛化(概括),它允許數(shù)據(jù)在窗口中保持比發(fā)出間隔更長的時(shí)間。
??查詢發(fā)出的行的時(shí)間戳11:00,包含數(shù)據(jù)從08:00至11:00(或10:59.9);以及行的時(shí)間戳12:00,包含數(shù)據(jù)從09:00至12:00。

SELECT STREAM
  HOP_END(rowtime, INTERVAL '1' HOUR, INTERVAL '3' HOUR) AS rowtime,
  COUNT(*) AS c,
  SUM(units) AS units
FROM Orders
GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '3' HOUR);

  rowtime |        c | units
----------+----------+-------
 11:00:00 |        4 |    27
 12:00:00 |        8 |    50

??在這個(gè)查詢中,因?yàn)楸A羝谑前l(fā)出期的3倍,所以每個(gè)輸入行都貢獻(xiàn)到3個(gè)輸出行。想象一下,HOP函數(shù)為傳入行生成一組Group Keys,并將其值存儲(chǔ)在每個(gè)Group Key的累加器中。例如,HOP(10:18:00, INTERVAL '1' HOUR, INTERVAL '3')產(chǎn)生3個(gè)時(shí)間間隔周期:
[08:00, 09:00)
[09:00, 10:00)
[10:00, 11:00)
??這就提出了允許不滿意內(nèi)置函數(shù)HOP和TUMBLE的用戶來自定義的分區(qū)函數(shù)的可能性。
??我們可以建立復(fù)雜的復(fù)雜表達(dá)式,如指數(shù)衰減的移動(dòng)平均線:

SELECT STREAM HOP_END(rowtime),
  productId,
  SUM(unitPrice * EXP((rowtime - HOP_START(rowtime)) SECOND / INTERVAL '1' HOUR))
   / SUM(EXP((rowtime - HOP_START(rowtime)) SECOND / INTERVAL '1' HOUR))

發(fā)出:

  • 1:00:00包含[10:00:00, 11:00:00)的行;
  • 1:00:01包含[10:00:01, 11:00:01)的行。
    這個(gè)表達(dá)最近的訂單比舊訂單權(quán)重更高。將窗口從1小時(shí)擴(kuò)展到2小時(shí)或1年對結(jié)果的準(zhǔn)確性幾乎沒有影響(但會(huì)使用更多的內(nèi)存和計(jì)算資源)。
    ??請注意,我們在一個(gè)聚合函數(shù)(SUM)中使用HOP_START,因?yàn)樗且粋€(gè)子匯總(sub-total)內(nèi)所有行的常量值。對于典型的集合函數(shù)(?SUM,COUNT等等),這是不允許的。
    ??如果您熟悉GROUPING SETS,可能會(huì)注意到,分區(qū)函數(shù)可以看作是泛化的GROUPING SETS,因?yàn)樗鼈冊试S一個(gè)輸入行對多個(gè)子匯總做出貢獻(xiàn)。用于GROUPING SETS的輔助函數(shù)諸如如GROUPING()和GROUP_ID可以在聚合函數(shù)內(nèi)部使用,所以并不奇怪,?HOP_START和HOP_END可以以相同的方式使用。

    3.9 分組集合

    ??GROUPING SETS對于流式查詢是有效的,只要每個(gè)分組集合包含單調(diào)或準(zhǔn)單調(diào)表達(dá)式。
    ??CUBE和ROLLUP不適用于流式查詢,因?yàn)樗鼈儗⑸芍辽僖粋€(gè)聚合所有內(nèi)容(如GROUP BY ())的分組集合?。

    3.10 聚合后Consideration

    ??與標(biāo)準(zhǔn)SQL一樣,可以使用HAVING子句來過濾由流GROUP BY發(fā)出的行:

SELECT STREAM TUMBLE_END(rowtime, INTERVAL '1' HOUR) AS rowtime,
 productId
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId
HAVING COUNT(*) > 2 OR SUM(units) > 10;

rowtime | productId
----------+-----------
10:00:00 |        30
11:00:00 |        10

3.11 子查詢,視圖和SQL閉包屬性

??前述的HAVING查詢可以使用WHERE子查詢中的子句來表示:

SELECT STREAM rowtime, productId
FROM (
  SELECT TUMBLE_END(rowtime, INTERVAL '1' HOUR) AS rowtime,
    productId,
    COUNT(*) AS c,
    SUM(units) AS su
  FROM Orders
  GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId)
WHERE c > 2 OR su > 10;

rowtime | productId
----------+-----------
 10:00:00 |        30
 11:00:00 |        10
 11:00:00 |        40

??HAVING子句是在SQL早期引入的,當(dāng)需要在聚合之后執(zhí)行過濾器時(shí),(回想一下,WHERE在輸入到達(dá)GROUP BY子句之前過濾行)。
??從那時(shí)起,SQL已經(jīng)成為一種數(shù)學(xué)封閉的語言,這意味著您可以在一個(gè)表上執(zhí)行的任何操作也可以在查詢上執(zhí)行。
??SQL?的閉包屬性非常強(qiáng)大。它不僅使?HAVING陳舊過時(shí)(或至少減少到語法糖),它使視圖成為可能:

CREATE VIEW HourlyOrderTotals (rowtime, productId, c, su) AS
  SELECT TUMBLE_END(rowtime, INTERVAL '1' HOUR),
    productId,
    COUNT(*),
    SUM(units)
  FROM Orders
  GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId;

SELECT STREAM rowtime, productId
FROM HourlyOrderTotals
WHERE c > 2 OR su > 10;

 rowtime | productId
----------+-----------
 10:00:00 |        30
 11:00:00 |        10
 11:00:00 |        40

??FROM子句中的子查詢有時(shí)被稱為“內(nèi)聯(lián)視圖”,但實(shí)際上它們比視圖更基礎(chǔ)。視圖只是一個(gè)方便的方法,通過給出這些分片命名并將它們存儲(chǔ)在元數(shù)據(jù)存儲(chǔ)庫中,將SQL分割成可管理的塊。
??很多人發(fā)現(xiàn)嵌套的查詢和視圖在流上比在關(guān)系上更有用。流式查詢是連續(xù)運(yùn)行的運(yùn)算符的管道,而且這些管道通常會(huì)很長。嵌套的查詢和視圖有助于表達(dá)和管理這些管道。
??順便說一下,WITH子句可以完成與子查詢或視圖相同的操作:

WITH HourlyOrderTotals (rowtime, productId, c, su) AS (
  SELECT TUMBLE_END(rowtime, INTERVAL '1' HOUR),
    productId,
    COUNT(*),
    SUM(units)
  FROM Orders
  GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId)
SELECT STREAM rowtime, productId
FROM HourlyOrderTotals
WHERE c > 2 OR su > 10;

 rowtime | productId
------------+-----------
 10:00:00 |        30
 11:00:00 |        10
 11:00:00 |        40

3.12 流和關(guān)系之間的轉(zhuǎn)換

??回顧一下HourlyOrderTotals視圖的定義。此視圖是流還是關(guān)系?
??它不包含STREAM關(guān)鍵字,所以它是一個(gè)關(guān)系。但是,這是一種可以轉(zhuǎn)換成流的關(guān)系。
??可以在關(guān)系和流式查詢中使用它:

# A relation; will query the historic Orders table.
# Returns the largest number of product #10 ever sold in one hour.
SELECT max(su)
FROM HourlyOrderTotals
WHERE productId = 10;

# A stream; will query the Orders stream.
# Returns every hour in which at least one product #10 was sold.
SELECT STREAM rowtime
FROM HourlyOrderTotals
WHERE productId = 10;

??這種方法不限于視圖和子查詢。遵循CQL [?1?]中規(guī)定的方法,流式SQL中的每個(gè)查詢都被定義為關(guān)系查詢,并最上面的SELECT使用STREAM關(guān)鍵字轉(zhuǎn)換為流。
??如果STREAM關(guān)鍵字存在于子查詢或視圖定義中,則不起作用。
??在查詢準(zhǔn)備時(shí)間,Calcite計(jì)算查詢中引用的關(guān)系是否可以轉(zhuǎn)換為流或歷史的關(guān)系。
??有時(shí)候,一個(gè)流可以提供它的一些歷史記錄(比如Apache Kafka [?2?]主題中最后24小時(shí)的數(shù)據(jù)),但不是全部。在運(yùn)行時(shí),Calcite計(jì)算出是否有足夠的歷史記錄來運(yùn)行查詢,如果沒有,則會(huì)給出錯(cuò)誤。

3.13 “餅圖”問題:流上的關(guān)系查詢

??一個(gè)特定的情況下,需要將流轉(zhuǎn)換為關(guān)系時(shí)會(huì)發(fā)生我所說的“餅圖問題”。想象一下,你需要寫一個(gè)帶有圖表的網(wǎng)頁,如下所示,它總結(jié)了每個(gè)產(chǎn)品在過去一小時(shí)內(nèi)的訂單數(shù)量。
Apache Calcite官方文檔中文版- 進(jìn)階-3.	流(Streaming)
??但是這個(gè)Orders流只包含幾條記錄,而不是一個(gè)小時(shí)的匯總。我們需要對流的歷史記錄運(yùn)行一個(gè)關(guān)系查詢:

SELECT productId, count(*)
FROM Orders
WHERE rowtime BETWEEN current_timestamp - INTERVAL '1' HOUR
          AND current_timestamp;

??如果Orders流的歷史記錄正在滾動(dòng)到Orders表中,盡管成本很高,我們可以回答查詢。更好的辦法是,如果我們可以告訴系統(tǒng)將一小時(shí)的匯總轉(zhuǎn)化為表格,在流式處理過程中不斷維護(hù)它,并自動(dòng)重寫查詢以使用表格。

3.14 排序

??ORDER BY的故事類似于GROUP BY。語法看起來像普通的SQL,但是Calcite必須確保它能夠提供及時(shí)的結(jié)果。因此,它需要在ORDER BY鍵的前沿(leading edge)有一個(gè)單調(diào)的表達(dá)式。

SELECT STREAM CEIL(rowtime TO hour) AS rowtime, productId, orderId, units
FROM Orders
ORDER BY CEIL(rowtime TO hour) ASC, units DESC;

  rowtime | productId | orderId | units
----------+-----------+---------+-------
 10:00:00 |        30 |       8 |    20
 10:00:00 |        30 |       5 |     4
 10:00:00 |        20 |       7 |     2
 10:00:00 |        10 |       6 |     1
 11:00:00 |        40 |      11 |    12
 11:00:00 |        10 |       9 |     6
 11:00:00 |        10 |      12 |     4
 11:00:00 |        10 |      10 |     1

??大多數(shù)查詢將按照插入的順序返回結(jié)果,因?yàn)橐褂昧魇剿惴?,但不?yīng)該依賴它。例如,考慮一下:

SELECT STREAM *
FROM Orders
WHERE productId = 10
UNION ALL
SELECT STREAM *
FROM Orders
WHERE productId = 30;

  rowtime | productId | orderId | units
----------+-----------+---------+-------
 10:17:05 |        10 |       6 |     1
 10:17:00 |        30 |       5 |     4
 10:18:07 |        30 |       8 |    20
 11:02:00 |        10 |       9 |     6
 11:04:00 |        10 |      10 |     1
 11:24:11 |        10 |      12 |     4

??productId= 30?的行顯然是不符合order要求的,可能是因?yàn)镺rders流以productId分區(qū),分區(qū)后的流在不同的時(shí)間發(fā)送了他們的數(shù)據(jù)。
??如果您需要特定的順序,請?zhí)砑右粋€(gè)顯式的ORDER BY:
??Calcite可能會(huì)通過合并使用rowtime實(shí)現(xiàn)UNION ALL,這樣只是效率稍微低些。
只需要添加一個(gè)ORDER BY到最外層的查詢。如果需要在UNION ALL之后執(zhí)行GROUP BY,Calcite將會(huì)?隱式添加ORDER BY,以便使GROUP BY算法成為可能。

3.15 表格構(gòu)造器

  VALUES子句創(chuàng)建一個(gè)擁有給定行集合的內(nèi)聯(lián)表。
  流式傳輸是不允許的。這組行不會(huì)發(fā)生改變,因此一個(gè)流永遠(yuǎn)不會(huì)返回任何行。

> SELECT STREAM * FROM (VALUES (1, 'abc'));

ERROR: Cannot stream VALUES

3.16 滑動(dòng)窗口

  標(biāo)準(zhǔn)SQL的功能特性之一可以在SELECT子句中使用所謂的“分析函數(shù)”?。不像GROUP BY,不會(huì)折疊記錄。對于每個(gè)進(jìn)來的記錄,出來一個(gè)記錄。但是聚合函數(shù)是基于一個(gè)多行的窗口。
  我們來看一個(gè)例子。

SELECT STREAM rowtime,
  productId,
  units,
  SUM(units) OVER (ORDER BY rowtime RANGE INTERVAL '1' HOUR PRECEDING) unitsLastHour

  這個(gè)功能特性付出很小的努力就包含了很多Power。在SELECT子句中可以有多個(gè)函數(shù),基于多個(gè)窗口規(guī)則定義。
  以下示例返回在過去10分鐘內(nèi)平均訂單數(shù)量大于上周平均訂單數(shù)量的訂單。

SELECT STREAM *
FROM (
  SELECT STREAM rowtime,
    productId,
    units,
    AVG(units) OVER product (RANGE INTERVAL '10' MINUTE PRECEDING) AS m10,
    AVG(units) OVER product (RANGE INTERVAL '7' DAY PRECEDING) AS d7
  FROM Orders
  WINDOW product AS (
    ORDER BY rowtime
    PARTITION BY productId))

  為了簡潔起見,在這里我們使用一種語法,其中使用WINDOW子句部分定義窗口,然后在每個(gè)OVER子句中細(xì)化窗口。也可以定義WINDOW子句中的所有窗口,或者如果您愿意,可以定義所有內(nèi)聯(lián)窗口。
  但真正的power超越語法。在幕后,這個(gè)查詢維護(hù)著兩個(gè)表,并且使用FIFO隊(duì)列添加和刪除子匯總中的值。但是,無需在查詢中引入聯(lián)接,也可以訪問這些表。
窗口化聚合語法的一些其他功能特性:

  • 可以根據(jù)行數(shù)定義窗口。
  • 該窗口可以引用尚未到達(dá)的行。(流會(huì)等到他們到達(dá))。
  • 可以計(jì)算與順序有關(guān)的函數(shù),如RANK中位數(shù)。

    3.17 級聯(lián)窗口

      如果我們想要一個(gè)返回每個(gè)記錄的結(jié)果的查詢,比如一個(gè)滑動(dòng)窗口,但是在一個(gè)固定的時(shí)間段重置總數(shù),就像一個(gè)翻滾的窗口?這種模式被稱為級聯(lián)窗口。這里是一個(gè)例子:

    SELECT STREAM rowtime,
    productId,
    units,
    SUM(units) OVER (PARTITION BY FLOOR(rowtime TO HOUR)) AS unitsSinceTopOfHour

      它看起來類似于滑動(dòng)窗口查詢,但單調(diào)表達(dá)式出現(xiàn)在PARTITION BY窗口的子句中。由于rowtime從10:59:59到11:00:00,F(xiàn)LOOR(rowtime TO HOUR)從10:00:00到11:00:00發(fā)生改變,因此一個(gè)新的分區(qū)開始。在新的時(shí)間到達(dá)的第一行將開始新的匯總;?第二行將有一個(gè)由兩行組成的匯總,依此類推。
      Calcite知道舊分區(qū)永遠(yuǎn)不會(huì)再被使用,因此從內(nèi)部存儲(chǔ)中刪除該分區(qū)的所有子匯總。
      使用級聯(lián)和滑動(dòng)窗口的分析函數(shù)可以組合在同一個(gè)查詢中。

    3.18 流與表Join

      有兩種類型的連接,即stream-to-table join和stream-to-stream join。
      如果表的內(nèi)容沒有改變,則流到表的連接是直接的。這個(gè)查詢以每個(gè)產(chǎn)品的列出價(jià)格豐富了訂單流:

    SELECT STREAM o.rowtime, o.productId, o.orderId, o.units,
    p.name, p.unitPrice
    FROM Orders AS o JOIN Products AS p ON o.productId = p.productId;
    
    rowtime | productId | orderId | units | name   | unitPrice
    ----------+-----------+---------+-------+ -------+-----------
    10:17:00 |        30 |       5 |     4 | Cheese |        17
    10:17:05 |        10 |       6 |     1 | Beer   |      0.25
    10:18:05 |        20 |       7 |     2 | Wine   |         6
    10:18:07 |        30 |       8 |    20 | Cheese |        17
    11:02:00 |        10 |       9 |     6 | Beer   |      0.25
    11:04:00 |        10 |      10 |     1 | Beer   |      0.25
    11:09:30 |        40 |      11 |    12 | Bread  |       100
    11:24:11 |        10 |      12 |     4 | Beer   |      0.25

      如果表格在改變,會(huì)發(fā)生什么?例如,假設(shè)product#10的單價(jià)在11點(diǎn)增加到0.35。在11:00之前下的訂單應(yīng)該是舊價(jià)格,在11:00之后下的訂單應(yīng)該反映新價(jià)格。
    實(shí)現(xiàn)此目的的一種方法是創(chuàng)建一個(gè)表,使每個(gè)版本的開始和結(jié)束生效日期保持一致,ProductVersions如下所示:

    SELECT STREAM *
    FROM Orders AS o JOIN ProductVersions AS p
    ON o.productId = p.productId
    AND o.rowtime BETWEEN p.startDate AND p.endDate
    
    rowtime | productId | orderId | units | productId1 |   name | unitPrice
    ----------+-----------+---------+-------+ -----------+--------+-----------
    10:17:00 |        30 |       5 |     4 |         30 | Cheese |        17
    10:17:05 |        10 |       6 |     1 |         10 | Beer   |      0.25
    10:18:05 |        20 |       7 |     2 |         20 | Wine   |         6
    10:18:07 |        30 |       8 |    20 |         30 | Cheese |        17
    11:02:00 |        10 |       9 |     6 |         10 | Beer   |      0.35
    11:04:00 |        10 |      10 |     1 |         10 | Beer   |      0.35
    11:09:30 |        40 |      11 |    12 |         40 | Bread  |       100
    11:24:11 |        10 |      12 |     4 |         10 | Beer   |      0.35

      另一種實(shí)現(xiàn)方法是使用具有臨時(shí)支持的數(shù)據(jù)庫(能夠像過去的任何時(shí)候一樣查找數(shù)據(jù)庫的內(nèi)容),并且系統(tǒng)需要知道Orders流的rowtime列對應(yīng)于Products表的事務(wù)時(shí)間戳?。
      對于許多應(yīng)用程序而言,暫時(shí)支持或版本化表格的成本和努力是不值得的。查詢在重放時(shí)給出不同的結(jié)果是可以接受的:在這個(gè)例子中,在重放時(shí),product#10的所有訂單被分配后來的單價(jià)0.35。

    3.19 流與流Join

      如果連接條件以某種方式強(qiáng)迫它們彼此保持有限的距離,那么流與流的連接就是合理的。在以下查詢中,發(fā)貨日期在訂單日期的一小時(shí)內(nèi):

    SELECT STREAM o.rowtime, o.productId, o.orderId, s.rowtime AS shipTime
    FROM Orders AS o JOIN Shipments AS s
    ON o.orderId = s.orderId
    AND s.rowtime BETWEEN o.rowtime AND o.rowtime + INTERVAL '1' HOUR;
    
    rowtime | productId | orderId | shipTime
    ----------+-----------+---------+----------
    10:17:00 |        30 |       5 | 10:55:00
    10:17:05 |        10 |       6 | 10:20:00
    11:02:00 |        10 |       9 | 11:58:00
    11:24:11 |        10 |      12 | 11:44:00

      請注意,相當(dāng)多的訂單不會(huì)顯示,因?yàn)樗鼈冊谝粋€(gè)小時(shí)內(nèi)沒有發(fā)貨。在系統(tǒng)接收到Order#10時(shí),時(shí)間戳為11:24:11,它已經(jīng)從其哈希表中刪除了訂單包括Order#8(時(shí)間戳10:18:07)。
      正如你所看到的,把這兩個(gè)流的單調(diào)或準(zhǔn)單調(diào)列聯(lián)系在一起的“鎖定步驟”是系統(tǒng)取得進(jìn)展所必需的。如果它不能推斷出一個(gè)鎖定步驟, 它將拒絕執(zhí)行一個(gè)查詢。

    3.20 DML

      這不僅是查詢對流來說有意義。運(yùn)行DML語句(INSERT,UPDATE,DELETE,UPSERT和REPLACE)對流來說同樣有意義。
      DML非常有用,因?yàn)樗试S基于流實(shí)現(xiàn)物華流或表格,因此經(jīng)常使用值可以節(jié)省工作量。
      考慮到流的應(yīng)用程序通常由查詢管道組成,每個(gè)查詢將輸入流轉(zhuǎn)換為輸出流。管道的組件可以是一個(gè)視圖:

    CREATE VIEW LargeOrders AS
    SELECT STREAM * FROM Orders WHERE units > 1000;

    或者一個(gè)標(biāo)準(zhǔn)的INSERT語句:

    INSERT INTO LargeOrders
    SELECT STREAM * FROM Orders WHERE units > 1000;

      這些看起來很相似,在這兩種情況下,管道中的下一個(gè)步驟都可以讀取LargeOrders,而不用擔(dān)心它是如何填充的。效率是有差別的:INSERT無論有多少消費(fèi)者,做的工作都是相同的。這個(gè)視圖的確與消費(fèi)者的數(shù)量成正比,特別是沒有消費(fèi)者的情況下就沒有工作。
      其他形式的DML對于流也是有意義的。例如,以下常設(shè)UPSERT語句維護(hù)一個(gè)表格,以實(shí)現(xiàn)最后一小時(shí)訂單的匯總:

    UPSERT INTO OrdersSummary
    SELECT STREAM productId,
    COUNT(*) OVER lastHour AS c
    FROM Orders
    WINDOW lastHour AS (
    PARTITION BY productId
    ORDER BY rowtime
    RANGE INTERVAL '1' HOUR PRECEDING)

    3.21 標(biāo)點(diǎn)(Punctuation)

      Punctuation [?5?]允許流式查詢?nèi)〉眠M(jìn)展,即使單調(diào)的鍵中沒有足夠的值來推送出結(jié)果。
      (我更喜歡術(shù)語“rowtime bounds”,水印[?6?]是一個(gè)相關(guān)的概念,但為了這些目的,Punctuation就足夠了。)
      如果某個(gè)流具有Punctuation,那么它可能不會(huì)被排序,不過仍然可以排序。因此,出于語義的目的,按照排序的流來工作就足夠了。
      順便說一下,一個(gè)無序的流也是可排序的,如果按t-sorted排序?(即,每個(gè)記錄保證在其時(shí)間戳的t秒內(nèi)到達(dá))或k-sorted排序(即每個(gè)記錄保證不超過k的位置造成無序)。所以對這些流的查詢可以像帶有Punctuation的流式查詢來進(jìn)行計(jì)劃。
      而且,我們經(jīng)常要聚合不是時(shí)間的且是單調(diào)的屬性。“一個(gè)團(tuán)隊(duì)在獲勝狀態(tài)和失敗狀態(tài)之間轉(zhuǎn)移的次數(shù)”就是這樣一個(gè)單調(diào)的屬性。系統(tǒng)需要自己弄清楚聚合這樣一個(gè)屬性是否安全;?Punctuation不會(huì)添加任何額外的信息。

  我記得一些計(jì)劃器的元數(shù)據(jù)(成本指標(biāo)):

  1. 這個(gè)流按給定的一個(gè)或多個(gè)屬性排序嗎?
  2. 是否可以對給定屬性的流進(jìn)行排序?(對于有限的關(guān)系,答案總是“是”;對于流,它依賴于Punctuation的存在,或?qū)傩院团判蜴I之間的聯(lián)系)。
  3. 我們需要引入什么延遲才能執(zhí)行此類操作?
  4. 執(zhí)行此類操作的成本(CPU,內(nèi)存等)是多少?
      在BuiltInMetadata.Collation中,我們已經(jīng)有了(1)。對于(2),答案對于有限關(guān)系總是“true”。但是我們需要為流實(shí)現(xiàn)(2),(3)和(4)。

3.22 流的狀態(tài)

  并非本文中的所有概念都已經(jīng)在Calcite中實(shí)現(xiàn)。其他的可能在Calcite中實(shí)現(xiàn),但不能在SamzaSQL [?3?] [?4?]?等特定的適配器中實(shí)現(xiàn)。
已實(shí)現(xiàn)

  • 流式SELECT,WHERE,GROUP BY,HAVING,UNION ALL,ORDER BY
  • FLOOR和CEIL函數(shù)
  • 單調(diào)性
  • 流式VALUES是不允許的

未實(shí)現(xiàn)
本文檔中提供的以下功能特性,以為Calcite支持它們,但實(shí)際上它還沒有實(shí)現(xiàn)。全面支持意味著參考實(shí)現(xiàn)支持該功能特性(包括負(fù)面情況),TCK則對其進(jìn)行測試。

  • 流與流的?JOIN
  • 流與表的?JOIN
  • 視圖上的流
  • 帶有ORDER BY流UNION ALL(合并)
  • 流上的關(guān)系查詢
  • 流式窗口聚合(滑動(dòng)和級聯(lián)窗口)
  • 檢查STREAM在子查詢和視圖是否被忽略
  • 檢查流的ORDER BY子句不能有OFFSET或LIMIT
  • 歷史有限性;?在運(yùn)行時(shí),檢查是否有足夠的歷史記錄來運(yùn)行查詢。
  • 準(zhǔn)單調(diào)
  • HOP和TUMBLE(和輔助HOP_START,HOP_END,?TUMBLE_START,TUMBLE_END)函數(shù)

   本文檔做了什么

  • 重新訪問是否可以流式傳輸?VALUES
  • OVER?子句來定義窗口上的流
  • 考慮在流式查詢中是否允許CUBE和ROLLUP,理解某些級別的聚合將永遠(yuǎn)不會(huì)完成(因?yàn)樗鼈儧]有單調(diào)表達(dá)式),因此不會(huì)被發(fā)出。
  • 修復(fù)該UPSERT示例以刪除在過去一小時(shí)內(nèi)沒有發(fā)生的產(chǎn)品的記錄。
  • 輸出到多個(gè)流的DML;?也許是標(biāo)準(zhǔn)REPLACE語句的擴(kuò)展?。

3.23 函數(shù)

  以下函數(shù)在標(biāo)準(zhǔn)SQL中不存在,但在流式SQL中定義。

標(biāo)量函數(shù):

  • FLOOR(dateTime TO intervalType)?將日期,時(shí)間或時(shí)間戳值取下限為給定的間隔類型
  • CEIL(dateTime TO intervalType)?將日期,時(shí)間或時(shí)間戳值取上限到給定的間隔類型

分區(qū)函數(shù):

  • HOP(t, emit, retain)?返回一個(gè)集合of group keys for a row作為跳轉(zhuǎn)窗口的一部分
  • HOP(t, emit, retain, align)?返回一個(gè)集合of group keys for a row作為給定對齊的跳轉(zhuǎn)窗口的一部分
  • TUMBLE(t, emit)?返回一個(gè)group key for a row作為滾動(dòng)窗口的一部分
  • TUMBLE(t, emit, align) 返回一個(gè)group key for a row作為給定對齊滾動(dòng)窗口的一部分
    注:
    TUMBLE(t, e)相當(dāng)于TUMBLE(t, e, TIME '00:00:00')。
    TUMBLE(t, e, a)相當(dāng)于HOP(t, e, e, a)。
    HOP(t, e, r)相當(dāng)于HOP(t, e, r, TIME '00:00:00')
向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI