溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Flink-SQL的擴(kuò)展實(shí)現(xiàn)是怎樣的

發(fā)布時(shí)間:2021-11-15 17:00:26 來源:億速云 閱讀:216 作者:柒染 欄目:云計(jì)算

Flink-SQL的擴(kuò)展實(shí)現(xiàn)是怎樣的,很多新手對此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。

小編所述均基于flink 1.5.4。

我們?yōu)槭裁磾U(kuò)展Flink-SQL?

由于Flink 本身SQL語法并不提供在對接輸入源和輸出目的的SQL語法。數(shù)據(jù)開發(fā)在使用的過程中需要根據(jù)其提供的Api接口編寫Source和 Sink, 異常繁瑣,不僅需要了解FLink 各類Operator的API,還需要對各個(gè)組件的相關(guān)調(diào)用方式有了解(比如kafka,redis,mongo,hbase等),并且在需要關(guān)聯(lián)到外部數(shù)據(jù)源的時(shí)候沒有提供SQL相關(guān)的實(shí)現(xiàn)方式,因此數(shù)據(jù)開發(fā)直接使用Flink編寫SQL作為實(shí)時(shí)的數(shù)據(jù)分析時(shí)需要較大的額外工作量。

我們的目的是在使用Flink-SQL的時(shí)候只需要關(guān)心做什么,而不需要關(guān)心怎么做。不需要過多的關(guān)心程序的實(shí)現(xiàn),專注于業(yè)務(wù)邏輯。

接下來,我們一起來看下Flink-SQL的擴(kuò)展實(shí)現(xiàn)吧!

01擴(kuò)展了哪些flink相關(guān)sql

(1)創(chuàng)建源表語句

Flink-SQL的擴(kuò)展實(shí)現(xiàn)是怎樣的

(2)創(chuàng)建輸出表語句

Flink-SQL的擴(kuò)展實(shí)現(xiàn)是怎樣的

(3)創(chuàng)建自定義函數(shù)

Flink-SQL的擴(kuò)展實(shí)現(xiàn)是怎樣的

(4)維表關(guān)聯(lián)

Flink-SQL的擴(kuò)展實(shí)現(xiàn)是怎樣的

02各個(gè)模塊是如何翻譯到flink的實(shí)現(xiàn)

( 1 ) 如何將創(chuàng)建源表的sql語句轉(zhuǎn)換為flink的operator;

Flink中表的都會映射到Table這個(gè)類。然后調(diào)用注冊方法將Table注冊到environment。

StreamTableEnvironment.registerTable(tableName, table);

當(dāng)前我們只支持kafka數(shù)據(jù)源。Flink本身有讀取kafka 的實(shí)現(xiàn)類, FlinkKafkaConsumer09,所以只需要根據(jù)指定參數(shù)實(shí)例化出該對象。并調(diào)用注冊方法注冊即可。

另外需要注意在flink sql經(jīng)常會需要用到rowtime, proctime, 所以我們在注冊表結(jié)構(gòu)的時(shí)候額外添加rowtime,proctime。

當(dāng)需要用到rowtime的使用需要額外指定DataStream.watermarks(assignTimestampsAndWatermarks),自定義watermark主要做兩個(gè)事情:1:如何從Row中獲取時(shí)間字段。 2:設(shè)定最大延遲時(shí)間。

( 2 ) 如何將創(chuàng)建的輸出表sql語句轉(zhuǎn)換為flink的operator;

Flink輸出Operator的基類是OutputFormat, 我們這里繼承的是RichOutputFormat, 該抽象類繼承OutputFormat,額外實(shí)現(xiàn)了獲取運(yùn)行環(huán)境的方法getRuntimeContext(), 方便于我們之后自定義metric等操作。

我們以輸出到mysql插件mysql-sink為例,分兩部分:

  • 將create table 解析出表名稱,字段信息,mysql連接信息。

該部分使用正則表達(dá)式的方式將create table 語句轉(zhuǎn)換為內(nèi)部的一個(gè)實(shí)現(xiàn)類。該類存儲了表名稱,字段信息,插件類型,插件連接信息。

  • 繼承RichOutputFormat將數(shù)據(jù)寫到對應(yīng)的外部數(shù)據(jù)源。

主要是實(shí)現(xiàn)writeRecord方法,在mysql插件中其實(shí)就是調(diào)用jdbc 實(shí)現(xiàn)插入或者更新方法。

( 3) 如何將自定義函數(shù)語句轉(zhuǎn)換為flink的operator;

Flink對udf提供兩種類型的實(shí)現(xiàn)方式:

(1)繼承ScalarFunction

(2)繼承TableFunction

需要做的將用戶提供的jar添加到URLClassLoader, 并加載指定的class (實(shí)現(xiàn)上述接口的類路徑),然后調(diào)用TableEnvironment.registerFunction(funcName, udfFunc);即完成了udf的注冊。之后即可使用改定義的udf;

( 4 ) 維表功能是如何實(shí)現(xiàn)的?

流計(jì)算中一個(gè)常見的需求就是為數(shù)據(jù)流補(bǔ)齊字段。因?yàn)閿?shù)據(jù)采集端采集到的數(shù)據(jù)往往比較有限,在做數(shù)據(jù)分析之前,就要先將所需的維度信息補(bǔ)全,但是當(dāng)前flink并未提供join外部數(shù)據(jù)源的SQL功能。

實(shí)現(xiàn)該功能需要注意的幾個(gè)問題:

(1)維表的數(shù)據(jù)是不斷變化的

在實(shí)現(xiàn)的時(shí)候需要支持定時(shí)更新內(nèi)存中的緩存的外部數(shù)據(jù)源,比如使用LRU等策略。

(2)IO吞吐問題

如果每接收到一條數(shù)據(jù)就串行到外部數(shù)據(jù)源去獲取對應(yīng)的關(guān)聯(lián)記錄的話,網(wǎng)絡(luò)延遲將會是系統(tǒng)最大的瓶頸。這里我們選擇阿里貢獻(xiàn)給flink社區(qū)的算子RichAsyncFunction。該算子使用異步的方式從外部數(shù)據(jù)源獲取數(shù)據(jù),大大減少了花費(fèi)在網(wǎng)絡(luò)請求上的時(shí)間。

(3)如何將sql 中包含的維表解析到flink operator   

為了從sql中解析出指定的維表和過濾條件, 使用正則明顯不是一個(gè)合適的辦法。需要匹配各種可能性。將是一個(gè)無窮無盡的過程。查看flink本身對sql的解析。它使用了calcite做為sql解析的工作。將sql解析出一個(gè)語法樹,通過迭代的方式,搜索到對應(yīng)的維表;然后將維表和非維表結(jié)構(gòu)分開。

Flink-SQL的擴(kuò)展實(shí)現(xiàn)是怎樣的

通過上述步驟可以通過SQL完成常用的從kafka源表,join外部數(shù)據(jù)源,寫入到指定的外部目的結(jié)構(gòu)中。

看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識有進(jìn)一步的了解或閱讀更多相關(guān)文章,請關(guān)注億速云行業(yè)資訊頻道,感謝您對億速云的支持。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI