您好,登錄后才能下訂單哦!
Flink-SQL的擴(kuò)展實(shí)現(xiàn)是怎樣的,很多新手對此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。
小編所述均基于flink 1.5.4。
我們?yōu)槭裁磾U(kuò)展Flink-SQL?
由于Flink 本身SQL語法并不提供在對接輸入源和輸出目的的SQL語法。數(shù)據(jù)開發(fā)在使用的過程中需要根據(jù)其提供的Api接口編寫Source和 Sink, 異常繁瑣,不僅需要了解FLink 各類Operator的API,還需要對各個(gè)組件的相關(guān)調(diào)用方式有了解(比如kafka,redis,mongo,hbase等),并且在需要關(guān)聯(lián)到外部數(shù)據(jù)源的時(shí)候沒有提供SQL相關(guān)的實(shí)現(xiàn)方式,因此數(shù)據(jù)開發(fā)直接使用Flink編寫SQL作為實(shí)時(shí)的數(shù)據(jù)分析時(shí)需要較大的額外工作量。
我們的目的是在使用Flink-SQL的時(shí)候只需要關(guān)心做什么,而不需要關(guān)心怎么做。不需要過多的關(guān)心程序的實(shí)現(xiàn),專注于業(yè)務(wù)邏輯。
接下來,我們一起來看下Flink-SQL的擴(kuò)展實(shí)現(xiàn)吧!
01擴(kuò)展了哪些flink相關(guān)sql
(1)創(chuàng)建源表語句
(2)創(chuàng)建輸出表語句
(3)創(chuàng)建自定義函數(shù)
(4)維表關(guān)聯(lián)
02各個(gè)模塊是如何翻譯到flink的實(shí)現(xiàn)
( 1 ) 如何將創(chuàng)建源表的sql語句轉(zhuǎn)換為flink的operator;
Flink中表的都會映射到Table這個(gè)類。然后調(diào)用注冊方法將Table注冊到environment。
StreamTableEnvironment.registerTable(tableName, table);
當(dāng)前我們只支持kafka數(shù)據(jù)源。Flink本身有讀取kafka 的實(shí)現(xiàn)類, FlinkKafkaConsumer09,所以只需要根據(jù)指定參數(shù)實(shí)例化出該對象。并調(diào)用注冊方法注冊即可。
另外需要注意在flink sql經(jīng)常會需要用到rowtime, proctime, 所以我們在注冊表結(jié)構(gòu)的時(shí)候額外添加rowtime,proctime。
當(dāng)需要用到rowtime的使用需要額外指定DataStream.watermarks(assignTimestampsAndWatermarks),自定義watermark主要做兩個(gè)事情:1:如何從Row中獲取時(shí)間字段。 2:設(shè)定最大延遲時(shí)間。
( 2 ) 如何將創(chuàng)建的輸出表sql語句轉(zhuǎn)換為flink的operator;
Flink輸出Operator的基類是OutputFormat, 我們這里繼承的是RichOutputFormat, 該抽象類繼承OutputFormat,額外實(shí)現(xiàn)了獲取運(yùn)行環(huán)境的方法getRuntimeContext(), 方便于我們之后自定義metric等操作。
我們以輸出到mysql插件mysql-sink為例,分兩部分:
將create table 解析出表名稱,字段信息,mysql連接信息。
該部分使用正則表達(dá)式的方式將create table 語句轉(zhuǎn)換為內(nèi)部的一個(gè)實(shí)現(xiàn)類。該類存儲了表名稱,字段信息,插件類型,插件連接信息。
繼承RichOutputFormat將數(shù)據(jù)寫到對應(yīng)的外部數(shù)據(jù)源。
主要是實(shí)現(xiàn)writeRecord方法,在mysql插件中其實(shí)就是調(diào)用jdbc 實(shí)現(xiàn)插入或者更新方法。
( 3) 如何將自定義函數(shù)語句轉(zhuǎn)換為flink的operator;
Flink對udf提供兩種類型的實(shí)現(xiàn)方式:
(1)繼承ScalarFunction
(2)繼承TableFunction
需要做的將用戶提供的jar添加到URLClassLoader, 并加載指定的class (實(shí)現(xiàn)上述接口的類路徑),然后調(diào)用TableEnvironment.registerFunction(funcName, udfFunc);即完成了udf的注冊。之后即可使用改定義的udf;
( 4 ) 維表功能是如何實(shí)現(xiàn)的?
流計(jì)算中一個(gè)常見的需求就是為數(shù)據(jù)流補(bǔ)齊字段。因?yàn)閿?shù)據(jù)采集端采集到的數(shù)據(jù)往往比較有限,在做數(shù)據(jù)分析之前,就要先將所需的維度信息補(bǔ)全,但是當(dāng)前flink并未提供join外部數(shù)據(jù)源的SQL功能。
實(shí)現(xiàn)該功能需要注意的幾個(gè)問題:
(1)維表的數(shù)據(jù)是不斷變化的
在實(shí)現(xiàn)的時(shí)候需要支持定時(shí)更新內(nèi)存中的緩存的外部數(shù)據(jù)源,比如使用LRU等策略。
(2)IO吞吐問題
如果每接收到一條數(shù)據(jù)就串行到外部數(shù)據(jù)源去獲取對應(yīng)的關(guān)聯(lián)記錄的話,網(wǎng)絡(luò)延遲將會是系統(tǒng)最大的瓶頸。這里我們選擇阿里貢獻(xiàn)給flink社區(qū)的算子RichAsyncFunction。該算子使用異步的方式從外部數(shù)據(jù)源獲取數(shù)據(jù),大大減少了花費(fèi)在網(wǎng)絡(luò)請求上的時(shí)間。
(3)如何將sql 中包含的維表解析到flink operator
為了從sql中解析出指定的維表和過濾條件, 使用正則明顯不是一個(gè)合適的辦法。需要匹配各種可能性。將是一個(gè)無窮無盡的過程。查看flink本身對sql的解析。它使用了calcite做為sql解析的工作。將sql解析出一個(gè)語法樹,通過迭代的方式,搜索到對應(yīng)的維表;然后將維表和非維表結(jié)構(gòu)分開。
通過上述步驟可以通過SQL完成常用的從kafka源表,join外部數(shù)據(jù)源,寫入到指定的外部目的結(jié)構(gòu)中。
看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識有進(jìn)一步的了解或閱讀更多相關(guān)文章,請關(guān)注億速云行業(yè)資訊頻道,感謝您對億速云的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。