您好,登錄后才能下訂單哦!
??Calcite擴(kuò)展了SQL和關(guān)系代數(shù)以支持流式查詢。
??流是收集到持續(xù)不斷流動(dòng)的記錄,永遠(yuǎn)不停止。與表不同,它們通常不存儲(chǔ)在磁盤上,而流是通過網(wǎng)絡(luò),并在內(nèi)存中保存很短的時(shí)間。
??數(shù)據(jù)流是對表格的補(bǔ)充,因?yàn)樗鼈兇砹似髽I(yè)現(xiàn)在和將來發(fā)生的事情,而表格代表了過去。一個(gè)流被存檔到一個(gè)表中是很常見的。
??與表一樣,您經(jīng)常希望根據(jù)關(guān)系代數(shù)以高級語言查詢流,根據(jù)模式(schema)進(jìn)行驗(yàn)證,并優(yōu)化以充分利用可用的資源和算法。
??Calcite的SQL是對標(biāo)準(zhǔn)SQL的擴(kuò)展,而不是另一種“類SQL”的語言。區(qū)別很重要,原因如下:
??如果不使用STREAM關(guān)鍵字,則返回常規(guī)標(biāo)準(zhǔn)SQL。
??流式SQL使用以下schema:
??最簡單的流式查詢:
SELECT STREAM *
FROM Orders;
rowtime | productId | orderId | units
----------+-----------+---------+-------
10:17:00 | 30 | 5 | 4
10:17:05 | 10 | 6 | 1
10:18:05 | 20 | 7 | 2
10:18:07 | 30 | 8 | 20
11:02:00 | 10 | 9 | 6
11:04:00 | 10 | 10 | 1
11:09:30 | 40 | 11 | 12
11:24:11 | 10 | 12 | 4
??該查詢讀取Orders流中的所有列和行。與任何流式查詢一樣,它永遠(yuǎn)不會(huì)終止。只要記錄到達(dá),它就會(huì)輸出一條記錄Orders。
??輸入Control-C以終止查詢。
??STREAM關(guān)鍵字是SQL流的主要擴(kuò)展。它告訴系統(tǒng)你對訂單有興趣,而不是現(xiàn)有訂單。
??查詢:
SELECT *
FROM Orders;
rowtime | productId | orderId | units
----------+-----------+---------+-------
08:30:00 | 10 | 1 | 3
08:45:10 | 20 | 2 | 1
09:12:21 | 10 | 3 | 10
09:27:44 | 30 | 4 | 2
4 records returned.
??也是有效的,但會(huì)打印出現(xiàn)有的所有訂單,然后終止。我們把它稱為關(guān)系查詢,而不是流式處理。它具有傳統(tǒng)的SQL語義。
??Orders很特殊,因?yàn)樗幸粋€(gè)流和一個(gè)表。如果您嘗試在表上運(yùn)行流式查詢或在流上運(yùn)行關(guān)系式查詢,則Calcite會(huì)拋出一個(gè)錯(cuò)誤:
SELECT * FROM Shipments;
ERROR: Cannot convert stream 'SHIPMENTS' to a table
SELECT STREAM * FROM Products;
ERROR: Cannot convert table 'PRODUCTS' to a stream
??與常規(guī)的SQL中一樣,使用一個(gè)WHERE子句來過濾行:
SELECT STREAM *
FROM Orders
WHERE units > 3;
rowtime | productId | orderId | units
----------+-----------+---------+-------
10:17:00 | 30 | 5 | 4
10:18:07 | 30 | 8 | 20
11:02:00 | 10 | 9 | 6
11:09:30 | 40 | 11 | 12
11:24:11 | 10 | 12 | 4
??在SELECT子句中使用表達(dá)式來選擇要返回或計(jì)算表達(dá)式的列:
SELECT STREAM rowtime,
'An order for ' || units || ' '
|| CASE units WHEN 1 THEN 'unit' ELSE 'units' END
|| ' of product #' || productId AS description
FROM Orders;
rowtime | description
----------+---------------------------------------
10:17:00 | An order for 4 units of product #30
10:17:05 | An order for 1 unit of product #10
10:18:05 | An order for 2 units of product #20
10:18:07 | An order for 20 units of product #30
11:02:00 | An order by 6 units of product #10
11:04:00 | An order by 1 unit of product #10
11:09:30 | An order for 12 units of product #40
11:24:11 | An order by 4 units of product #10
??我們建議您始終在SELECT?條款中包含rowtime列。在每個(gè)流和流式查詢中有一個(gè)有序的時(shí)間戳,可以在稍后進(jìn)行高級計(jì)算,例如GROUP BY和JOIN。
??有幾種方法可以計(jì)算流上的聚合函數(shù)。差異是:
??窗口類型:
SELECT STREAM CEIL(rowtime TO HOUR) AS rowtime,
productId,
COUNT(*) AS c,
SUM(units) AS units
FROM Orders
GROUP BY CEIL(rowtime TO HOUR), productId;
rowtime | productId | c | units
------------+---------------+------------+-------
11:00:00 | 30 | 2 | 24
11:00:00 | 10 | 1 | 1
11:00:00 | 20 | 1 | 7
12:00:00 | 10 | 3 | 11
12:00:00 | 40 | 1 | 12
??結(jié)果是流。在11點(diǎn)整,Calcite發(fā)出自10點(diǎn)以來一直到11點(diǎn)有下訂單的?productId的小計(jì)。12點(diǎn),它會(huì)發(fā)出11:00至12:00之間的訂單。每個(gè)輸入行只貢獻(xiàn)到一個(gè)輸出行。
??Calcite是如何知道10:00:00的小計(jì)在11:00:00完成的,這樣就可以發(fā)出它們了?它知道rowtime是在增加,而且它也知道CEIL(rowtime TO HOUR)在增加。所以,一旦在11:00:00時(shí)間點(diǎn)或之后看到一行,它將永遠(yuǎn)不會(huì)看到貢獻(xiàn)到上午10:00:00的一行。
??增加或減少的列以及表達(dá)式是單調(diào)的。(單調(diào)遞增或單調(diào)遞減)
??如果列或表達(dá)式的值具有輕微的失序,并且流具有用于聲明特定值將不會(huì)再被看到的機(jī)制(例如標(biāo)點(diǎn)符號或水?。?,則該列或表達(dá)式被稱為準(zhǔn)單調(diào)。
??在GROUP BY子句中沒有單調(diào)或準(zhǔn)單調(diào)表達(dá)式的情況下,Calcite無法取得進(jìn)展,并且不允許查詢:
SELECT STREAM productId,
COUNT(*) AS c,
SUM(units) AS units
FROM Orders
GROUP BY productId;
ERROR: Streaming aggregation requires at least one monotonic expression
??單調(diào)和準(zhǔn)單調(diào)的列需要在模式中聲明。當(dāng)記錄輸入流并且由從該流中讀取數(shù)據(jù)的假定查詢時(shí),單調(diào)性被強(qiáng)制執(zhí)行。我們建議為每個(gè)流指定一個(gè)時(shí)間戳列rowtime,但也可以聲明其他列是單調(diào)的,例如orderId。
??我們將在下面的內(nèi)容討論標(biāo)點(diǎn)符號,水印,并取得進(jìn)展的其他方法?。
??前面的滾動(dòng)窗口的例子很容易寫,因?yàn)榇翱谑且粋€(gè)小時(shí)。對于不是整個(gè)時(shí)間單位的時(shí)間間隔,例如2小時(shí)或2小時(shí)17分鐘,則不能使用CEIL,表達(dá)式將變得更復(fù)雜。
??Calcite支持滾動(dòng)窗口的替代語法:
SELECT STREAM TUMBLE_END(rowtime, INTERVAL '1' HOUR) AS rowtime,
productId,
COUNT(*) AS c,
SUM(units) AS units
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId;
rowtime | productId | c | units
----------+-----------+---------+-------
11:00:00 | 30 | 2 | 24
11:00:00 | 10 | 1 | 1
11:00:00 | 20 | 1 | 7
12:00:00 | 10 | 3 | 11
12:00:00 | 40 | 1 | 12
??正如你所看到的,它返回與前一個(gè)查詢相同的結(jié)果。TUMBLE?函數(shù)返回一個(gè)分組鍵,這個(gè)分組鍵在給定的匯總行中將會(huì)以相同的方式結(jié)束;?TUMBLE_END函數(shù)采用相同的參數(shù)并返回該窗口的結(jié)束時(shí)間; 當(dāng)然還有一個(gè)TUMBLE_START函數(shù)。
??TUMBLE有一個(gè)可選參數(shù)來對齊窗口。在以下示例中,我們使用30分鐘間隔和0:12作為對齊時(shí)間,因此查詢在每小時(shí)過去12分鐘和42分鐘時(shí)發(fā)出匯總:
SELECT STREAM
TUMBLE_END(rowtime, INTERVAL '30' MINUTE, TIME '0:12') AS rowtime,
productId,
COUNT(*) AS c,
SUM(units) AS units
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL '30' MINUTE, TIME '0:12'),
productId;
rowtime | productId | c | units
----------+-----------+---------+-------
10:42:00 | 30 | 2 | 24
10:42:00 | 10 | 1 | 1
10:42:00 | 20 | 1 | 7
11:12:00 | 10 | 2 | 7
11:12:00 | 40 | 1 | 12
11:42:00 | 10 | 1 | 4
??跳轉(zhuǎn)窗口是滾動(dòng)窗口的泛化(概括),它允許數(shù)據(jù)在窗口中保持比發(fā)出間隔更長的時(shí)間。
??查詢發(fā)出的行的時(shí)間戳11:00,包含數(shù)據(jù)從08:00至11:00(或10:59.9);以及行的時(shí)間戳12:00,包含數(shù)據(jù)從09:00至12:00。
SELECT STREAM
HOP_END(rowtime, INTERVAL '1' HOUR, INTERVAL '3' HOUR) AS rowtime,
COUNT(*) AS c,
SUM(units) AS units
FROM Orders
GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '3' HOUR);
rowtime | c | units
----------+----------+-------
11:00:00 | 4 | 27
12:00:00 | 8 | 50
??在這個(gè)查詢中,因?yàn)楸A羝谑前l(fā)出期的3倍,所以每個(gè)輸入行都貢獻(xiàn)到3個(gè)輸出行。想象一下,HOP函數(shù)為傳入行生成一組Group Keys,并將其值存儲(chǔ)在每個(gè)Group Key的累加器中。例如,HOP(10:18:00, INTERVAL '1' HOUR, INTERVAL '3')產(chǎn)生3個(gè)時(shí)間間隔周期:
[08:00, 09:00)
[09:00, 10:00)
[10:00, 11:00)
??這就提出了允許不滿意內(nèi)置函數(shù)HOP和TUMBLE的用戶來自定義的分區(qū)函數(shù)的可能性。
??我們可以建立復(fù)雜的復(fù)雜表達(dá)式,如指數(shù)衰減的移動(dòng)平均線:
SELECT STREAM HOP_END(rowtime),
productId,
SUM(unitPrice * EXP((rowtime - HOP_START(rowtime)) SECOND / INTERVAL '1' HOUR))
/ SUM(EXP((rowtime - HOP_START(rowtime)) SECOND / INTERVAL '1' HOUR))
發(fā)出:
??GROUPING SETS對于流式查詢是有效的,只要每個(gè)分組集合包含單調(diào)或準(zhǔn)單調(diào)表達(dá)式。
??CUBE和ROLLUP不適用于流式查詢,因?yàn)樗鼈儗⑸芍辽僖粋€(gè)聚合所有內(nèi)容(如GROUP BY ())的分組集合?。
??與標(biāo)準(zhǔn)SQL一樣,可以使用HAVING子句來過濾由流GROUP BY發(fā)出的行:
SELECT STREAM TUMBLE_END(rowtime, INTERVAL '1' HOUR) AS rowtime,
productId
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId
HAVING COUNT(*) > 2 OR SUM(units) > 10;
rowtime | productId
----------+-----------
10:00:00 | 30
11:00:00 | 10
??前述的HAVING查詢可以使用WHERE子查詢中的子句來表示:
SELECT STREAM rowtime, productId
FROM (
SELECT TUMBLE_END(rowtime, INTERVAL '1' HOUR) AS rowtime,
productId,
COUNT(*) AS c,
SUM(units) AS su
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId)
WHERE c > 2 OR su > 10;
rowtime | productId
----------+-----------
10:00:00 | 30
11:00:00 | 10
11:00:00 | 40
??HAVING子句是在SQL早期引入的,當(dāng)需要在聚合之后執(zhí)行過濾器時(shí),(回想一下,WHERE在輸入到達(dá)GROUP BY子句之前過濾行)。
??從那時(shí)起,SQL已經(jīng)成為一種數(shù)學(xué)封閉的語言,這意味著您可以在一個(gè)表上執(zhí)行的任何操作也可以在查詢上執(zhí)行。
??SQL?的閉包屬性非常強(qiáng)大。它不僅使?HAVING陳舊過時(shí)(或至少減少到語法糖),它使視圖成為可能:
CREATE VIEW HourlyOrderTotals (rowtime, productId, c, su) AS
SELECT TUMBLE_END(rowtime, INTERVAL '1' HOUR),
productId,
COUNT(*),
SUM(units)
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId;
SELECT STREAM rowtime, productId
FROM HourlyOrderTotals
WHERE c > 2 OR su > 10;
rowtime | productId
----------+-----------
10:00:00 | 30
11:00:00 | 10
11:00:00 | 40
??FROM子句中的子查詢有時(shí)被稱為“內(nèi)聯(lián)視圖”,但實(shí)際上它們比視圖更基礎(chǔ)。視圖只是一個(gè)方便的方法,通過給出這些分片命名并將它們存儲(chǔ)在元數(shù)據(jù)存儲(chǔ)庫中,將SQL分割成可管理的塊。
??很多人發(fā)現(xiàn)嵌套的查詢和視圖在流上比在關(guān)系上更有用。流式查詢是連續(xù)運(yùn)行的運(yùn)算符的管道,而且這些管道通常會(huì)很長。嵌套的查詢和視圖有助于表達(dá)和管理這些管道。
??順便說一下,WITH子句可以完成與子查詢或視圖相同的操作:
WITH HourlyOrderTotals (rowtime, productId, c, su) AS (
SELECT TUMBLE_END(rowtime, INTERVAL '1' HOUR),
productId,
COUNT(*),
SUM(units)
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId)
SELECT STREAM rowtime, productId
FROM HourlyOrderTotals
WHERE c > 2 OR su > 10;
rowtime | productId
------------+-----------
10:00:00 | 30
11:00:00 | 10
11:00:00 | 40
??回顧一下HourlyOrderTotals視圖的定義。此視圖是流還是關(guān)系?
??它不包含STREAM關(guān)鍵字,所以它是一個(gè)關(guān)系。但是,這是一種可以轉(zhuǎn)換成流的關(guān)系。
??可以在關(guān)系和流式查詢中使用它:
# A relation; will query the historic Orders table.
# Returns the largest number of product #10 ever sold in one hour.
SELECT max(su)
FROM HourlyOrderTotals
WHERE productId = 10;
# A stream; will query the Orders stream.
# Returns every hour in which at least one product #10 was sold.
SELECT STREAM rowtime
FROM HourlyOrderTotals
WHERE productId = 10;
??這種方法不限于視圖和子查詢。遵循CQL [?1?]中規(guī)定的方法,流式SQL中的每個(gè)查詢都被定義為關(guān)系查詢,并最上面的SELECT使用STREAM關(guān)鍵字轉(zhuǎn)換為流。
??如果STREAM關(guān)鍵字存在于子查詢或視圖定義中,則不起作用。
??在查詢準(zhǔn)備時(shí)間,Calcite計(jì)算查詢中引用的關(guān)系是否可以轉(zhuǎn)換為流或歷史的關(guān)系。
??有時(shí)候,一個(gè)流可以提供它的一些歷史記錄(比如Apache Kafka [?2?]主題中最后24小時(shí)的數(shù)據(jù)),但不是全部。在運(yùn)行時(shí),Calcite計(jì)算出是否有足夠的歷史記錄來運(yùn)行查詢,如果沒有,則會(huì)給出錯(cuò)誤。
??一個(gè)特定的情況下,需要將流轉(zhuǎn)換為關(guān)系時(shí)會(huì)發(fā)生我所說的“餅圖問題”。想象一下,你需要寫一個(gè)帶有圖表的網(wǎng)頁,如下所示,它總結(jié)了每個(gè)產(chǎn)品在過去一小時(shí)內(nèi)的訂單數(shù)量。
??但是這個(gè)Orders流只包含幾條記錄,而不是一個(gè)小時(shí)的匯總。我們需要對流的歷史記錄運(yùn)行一個(gè)關(guān)系查詢:
SELECT productId, count(*)
FROM Orders
WHERE rowtime BETWEEN current_timestamp - INTERVAL '1' HOUR
AND current_timestamp;
??如果Orders流的歷史記錄正在滾動(dòng)到Orders表中,盡管成本很高,我們可以回答查詢。更好的辦法是,如果我們可以告訴系統(tǒng)將一小時(shí)的匯總轉(zhuǎn)化為表格,在流式處理過程中不斷維護(hù)它,并自動(dòng)重寫查詢以使用表格。
??ORDER BY的故事類似于GROUP BY。語法看起來像普通的SQL,但是Calcite必須確保它能夠提供及時(shí)的結(jié)果。因此,它需要在ORDER BY鍵的前沿(leading edge)有一個(gè)單調(diào)的表達(dá)式。
SELECT STREAM CEIL(rowtime TO hour) AS rowtime, productId, orderId, units
FROM Orders
ORDER BY CEIL(rowtime TO hour) ASC, units DESC;
rowtime | productId | orderId | units
----------+-----------+---------+-------
10:00:00 | 30 | 8 | 20
10:00:00 | 30 | 5 | 4
10:00:00 | 20 | 7 | 2
10:00:00 | 10 | 6 | 1
11:00:00 | 40 | 11 | 12
11:00:00 | 10 | 9 | 6
11:00:00 | 10 | 12 | 4
11:00:00 | 10 | 10 | 1
??大多數(shù)查詢將按照插入的順序返回結(jié)果,因?yàn)橐褂昧魇剿惴?,但不?yīng)該依賴它。例如,考慮一下:
SELECT STREAM *
FROM Orders
WHERE productId = 10
UNION ALL
SELECT STREAM *
FROM Orders
WHERE productId = 30;
rowtime | productId | orderId | units
----------+-----------+---------+-------
10:17:05 | 10 | 6 | 1
10:17:00 | 30 | 5 | 4
10:18:07 | 30 | 8 | 20
11:02:00 | 10 | 9 | 6
11:04:00 | 10 | 10 | 1
11:24:11 | 10 | 12 | 4
??productId= 30?的行顯然是不符合order要求的,可能是因?yàn)镺rders流以productId分區(qū),分區(qū)后的流在不同的時(shí)間發(fā)送了他們的數(shù)據(jù)。
??如果您需要特定的順序,請?zhí)砑右粋€(gè)顯式的ORDER BY:
??Calcite可能會(huì)通過合并使用rowtime實(shí)現(xiàn)UNION ALL,這樣只是效率稍微低些。
只需要添加一個(gè)ORDER BY到最外層的查詢。如果需要在UNION ALL之后執(zhí)行GROUP BY,Calcite將會(huì)?隱式添加ORDER BY,以便使GROUP BY算法成為可能。
VALUES子句創(chuàng)建一個(gè)擁有給定行集合的內(nèi)聯(lián)表。
流式傳輸是不允許的。這組行不會(huì)發(fā)生改變,因此一個(gè)流永遠(yuǎn)不會(huì)返回任何行。
> SELECT STREAM * FROM (VALUES (1, 'abc'));
ERROR: Cannot stream VALUES
標(biāo)準(zhǔn)SQL的功能特性之一可以在SELECT子句中使用所謂的“分析函數(shù)”?。不像GROUP BY,不會(huì)折疊記錄。對于每個(gè)進(jìn)來的記錄,出來一個(gè)記錄。但是聚合函數(shù)是基于一個(gè)多行的窗口。
我們來看一個(gè)例子。
SELECT STREAM rowtime,
productId,
units,
SUM(units) OVER (ORDER BY rowtime RANGE INTERVAL '1' HOUR PRECEDING) unitsLastHour
這個(gè)功能特性付出很小的努力就包含了很多Power。在SELECT子句中可以有多個(gè)函數(shù),基于多個(gè)窗口規(guī)則定義。
以下示例返回在過去10分鐘內(nèi)平均訂單數(shù)量大于上周平均訂單數(shù)量的訂單。
SELECT STREAM *
FROM (
SELECT STREAM rowtime,
productId,
units,
AVG(units) OVER product (RANGE INTERVAL '10' MINUTE PRECEDING) AS m10,
AVG(units) OVER product (RANGE INTERVAL '7' DAY PRECEDING) AS d7
FROM Orders
WINDOW product AS (
ORDER BY rowtime
PARTITION BY productId))
為了簡潔起見,在這里我們使用一種語法,其中使用WINDOW子句部分定義窗口,然后在每個(gè)OVER子句中細(xì)化窗口。也可以定義WINDOW子句中的所有窗口,或者如果您愿意,可以定義所有內(nèi)聯(lián)窗口。
但真正的power超越語法。在幕后,這個(gè)查詢維護(hù)著兩個(gè)表,并且使用FIFO隊(duì)列添加和刪除子匯總中的值。但是,無需在查詢中引入聯(lián)接,也可以訪問這些表。
窗口化聚合語法的一些其他功能特性:
可以計(jì)算與順序有關(guān)的函數(shù),如RANK中位數(shù)。
如果我們想要一個(gè)返回每個(gè)記錄的結(jié)果的查詢,比如一個(gè)滑動(dòng)窗口,但是在一個(gè)固定的時(shí)間段重置總數(shù),就像一個(gè)翻滾的窗口?這種模式被稱為級聯(lián)窗口。這里是一個(gè)例子:
SELECT STREAM rowtime,
productId,
units,
SUM(units) OVER (PARTITION BY FLOOR(rowtime TO HOUR)) AS unitsSinceTopOfHour
它看起來類似于滑動(dòng)窗口查詢,但單調(diào)表達(dá)式出現(xiàn)在PARTITION BY窗口的子句中。由于rowtime從10:59:59到11:00:00,F(xiàn)LOOR(rowtime TO HOUR)從10:00:00到11:00:00發(fā)生改變,因此一個(gè)新的分區(qū)開始。在新的時(shí)間到達(dá)的第一行將開始新的匯總;?第二行將有一個(gè)由兩行組成的匯總,依此類推。
Calcite知道舊分區(qū)永遠(yuǎn)不會(huì)再被使用,因此從內(nèi)部存儲(chǔ)中刪除該分區(qū)的所有子匯總。
使用級聯(lián)和滑動(dòng)窗口的分析函數(shù)可以組合在同一個(gè)查詢中。
有兩種類型的連接,即stream-to-table join和stream-to-stream join。
如果表的內(nèi)容沒有改變,則流到表的連接是直接的。這個(gè)查詢以每個(gè)產(chǎn)品的列出價(jià)格豐富了訂單流:
SELECT STREAM o.rowtime, o.productId, o.orderId, o.units,
p.name, p.unitPrice
FROM Orders AS o JOIN Products AS p ON o.productId = p.productId;
rowtime | productId | orderId | units | name | unitPrice
----------+-----------+---------+-------+ -------+-----------
10:17:00 | 30 | 5 | 4 | Cheese | 17
10:17:05 | 10 | 6 | 1 | Beer | 0.25
10:18:05 | 20 | 7 | 2 | Wine | 6
10:18:07 | 30 | 8 | 20 | Cheese | 17
11:02:00 | 10 | 9 | 6 | Beer | 0.25
11:04:00 | 10 | 10 | 1 | Beer | 0.25
11:09:30 | 40 | 11 | 12 | Bread | 100
11:24:11 | 10 | 12 | 4 | Beer | 0.25
如果表格在改變,會(huì)發(fā)生什么?例如,假設(shè)product#10的單價(jià)在11點(diǎn)增加到0.35。在11:00之前下的訂單應(yīng)該是舊價(jià)格,在11:00之后下的訂單應(yīng)該反映新價(jià)格。
實(shí)現(xiàn)此目的的一種方法是創(chuàng)建一個(gè)表,使每個(gè)版本的開始和結(jié)束生效日期保持一致,ProductVersions如下所示:
SELECT STREAM *
FROM Orders AS o JOIN ProductVersions AS p
ON o.productId = p.productId
AND o.rowtime BETWEEN p.startDate AND p.endDate
rowtime | productId | orderId | units | productId1 | name | unitPrice
----------+-----------+---------+-------+ -----------+--------+-----------
10:17:00 | 30 | 5 | 4 | 30 | Cheese | 17
10:17:05 | 10 | 6 | 1 | 10 | Beer | 0.25
10:18:05 | 20 | 7 | 2 | 20 | Wine | 6
10:18:07 | 30 | 8 | 20 | 30 | Cheese | 17
11:02:00 | 10 | 9 | 6 | 10 | Beer | 0.35
11:04:00 | 10 | 10 | 1 | 10 | Beer | 0.35
11:09:30 | 40 | 11 | 12 | 40 | Bread | 100
11:24:11 | 10 | 12 | 4 | 10 | Beer | 0.35
另一種實(shí)現(xiàn)方法是使用具有臨時(shí)支持的數(shù)據(jù)庫(能夠像過去的任何時(shí)候一樣查找數(shù)據(jù)庫的內(nèi)容),并且系統(tǒng)需要知道Orders流的rowtime列對應(yīng)于Products表的事務(wù)時(shí)間戳?。
對于許多應(yīng)用程序而言,暫時(shí)支持或版本化表格的成本和努力是不值得的。查詢在重放時(shí)給出不同的結(jié)果是可以接受的:在這個(gè)例子中,在重放時(shí),product#10的所有訂單被分配后來的單價(jià)0.35。
如果連接條件以某種方式強(qiáng)迫它們彼此保持有限的距離,那么流與流的連接就是合理的。在以下查詢中,發(fā)貨日期在訂單日期的一小時(shí)內(nèi):
SELECT STREAM o.rowtime, o.productId, o.orderId, s.rowtime AS shipTime
FROM Orders AS o JOIN Shipments AS s
ON o.orderId = s.orderId
AND s.rowtime BETWEEN o.rowtime AND o.rowtime + INTERVAL '1' HOUR;
rowtime | productId | orderId | shipTime
----------+-----------+---------+----------
10:17:00 | 30 | 5 | 10:55:00
10:17:05 | 10 | 6 | 10:20:00
11:02:00 | 10 | 9 | 11:58:00
11:24:11 | 10 | 12 | 11:44:00
請注意,相當(dāng)多的訂單不會(huì)顯示,因?yàn)樗鼈冊谝粋€(gè)小時(shí)內(nèi)沒有發(fā)貨。在系統(tǒng)接收到Order#10時(shí),時(shí)間戳為11:24:11,它已經(jīng)從其哈希表中刪除了訂單包括Order#8(時(shí)間戳10:18:07)。
正如你所看到的,把這兩個(gè)流的單調(diào)或準(zhǔn)單調(diào)列聯(lián)系在一起的“鎖定步驟”是系統(tǒng)取得進(jìn)展所必需的。如果它不能推斷出一個(gè)鎖定步驟, 它將拒絕執(zhí)行一個(gè)查詢。
這不僅是查詢對流來說有意義。運(yùn)行DML語句(INSERT,UPDATE,DELETE,UPSERT和REPLACE)對流來說同樣有意義。
DML非常有用,因?yàn)樗试S基于流實(shí)現(xiàn)物華流或表格,因此經(jīng)常使用值可以節(jié)省工作量。
考慮到流的應(yīng)用程序通常由查詢管道組成,每個(gè)查詢將輸入流轉(zhuǎn)換為輸出流。管道的組件可以是一個(gè)視圖:
CREATE VIEW LargeOrders AS
SELECT STREAM * FROM Orders WHERE units > 1000;
或者一個(gè)標(biāo)準(zhǔn)的INSERT語句:
INSERT INTO LargeOrders
SELECT STREAM * FROM Orders WHERE units > 1000;
這些看起來很相似,在這兩種情況下,管道中的下一個(gè)步驟都可以讀取LargeOrders,而不用擔(dān)心它是如何填充的。效率是有差別的:INSERT無論有多少消費(fèi)者,做的工作都是相同的。這個(gè)視圖的確與消費(fèi)者的數(shù)量成正比,特別是沒有消費(fèi)者的情況下就沒有工作。
其他形式的DML對于流也是有意義的。例如,以下常設(shè)UPSERT語句維護(hù)一個(gè)表格,以實(shí)現(xiàn)最后一小時(shí)訂單的匯總:
UPSERT INTO OrdersSummary
SELECT STREAM productId,
COUNT(*) OVER lastHour AS c
FROM Orders
WINDOW lastHour AS (
PARTITION BY productId
ORDER BY rowtime
RANGE INTERVAL '1' HOUR PRECEDING)
Punctuation [?5?]允許流式查詢?nèi)〉眠M(jìn)展,即使單調(diào)的鍵中沒有足夠的值來推送出結(jié)果。
(我更喜歡術(shù)語“rowtime bounds”,水印[?6?]是一個(gè)相關(guān)的概念,但為了這些目的,Punctuation就足夠了。)
如果某個(gè)流具有Punctuation,那么它可能不會(huì)被排序,不過仍然可以排序。因此,出于語義的目的,按照排序的流來工作就足夠了。
順便說一下,一個(gè)無序的流也是可排序的,如果按t-sorted排序?(即,每個(gè)記錄保證在其時(shí)間戳的t秒內(nèi)到達(dá))或k-sorted排序(即每個(gè)記錄保證不超過k的位置造成無序)。所以對這些流的查詢可以像帶有Punctuation的流式查詢來進(jìn)行計(jì)劃。
而且,我們經(jīng)常要聚合不是時(shí)間的且是單調(diào)的屬性。“一個(gè)團(tuán)隊(duì)在獲勝狀態(tài)和失敗狀態(tài)之間轉(zhuǎn)移的次數(shù)”就是這樣一個(gè)單調(diào)的屬性。系統(tǒng)需要自己弄清楚聚合這樣一個(gè)屬性是否安全;?Punctuation不會(huì)添加任何額外的信息。
我記得一些計(jì)劃器的元數(shù)據(jù)(成本指標(biāo)):
并非本文中的所有概念都已經(jīng)在Calcite中實(shí)現(xiàn)。其他的可能在Calcite中實(shí)現(xiàn),但不能在SamzaSQL [?3?] [?4?]?等特定的適配器中實(shí)現(xiàn)。
已實(shí)現(xiàn)
未實(shí)現(xiàn)
本文檔中提供的以下功能特性,以為Calcite支持它們,但實(shí)際上它還沒有實(shí)現(xiàn)。全面支持意味著參考實(shí)現(xiàn)支持該功能特性(包括負(fù)面情況),TCK則對其進(jìn)行測試。
本文檔做了什么
以下函數(shù)在標(biāo)準(zhǔn)SQL中不存在,但在流式SQL中定義。
標(biāo)量函數(shù):
分區(qū)函數(shù):
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。