溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

FlinkSQL API怎么調(diào)用

發(fā)布時間:2021-12-23 16:21:54 來源:億速云 閱讀:267 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要介紹“FlinkSQL  API怎么調(diào)用”,在日常操作中,相信很多人在FlinkSQL  API怎么調(diào)用問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”FlinkSQL  API怎么調(diào)用”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

FlinkSQL出現(xiàn)的背景

        Flink SQL 是 Flink 實時計算為簡化計算模型,降低用戶使用實時計算門檻而設(shè)計的一套符合標(biāo)準(zhǔn) SQL 語義的開發(fā)語言。

        Flink SQL 是面向用戶的 API 層,在我們傳統(tǒng)的流式計算領(lǐng)域,比如 Storm、Spark Streaming 都會提供一些 Function 或者 Datastream API,用戶通過 Java 或 Scala 寫業(yè)務(wù)邏輯,這種方式雖然靈活,但有一些不足,比如具備一定門檻且調(diào)優(yōu)較難,隨著版本的不斷更新,API 也出現(xiàn)了很多不兼容的地方。

FlinkSQL  API怎么調(diào)用
        在這個背景下,毫無疑問,SQL 就成了我們最佳選擇,之所以選擇將 SQL 作為核心 API,是因為其具有幾個非常重要的特點:

  • SQL 屬于設(shè)定式語言,用戶只要表達(dá)清楚需求即可,不需要了解具體做法

  • SQL 可優(yōu)化,內(nèi)置多種查詢優(yōu)化器,這些查詢優(yōu)化器可為 SQL 翻譯出最優(yōu)執(zhí)行計劃;

  • SQL 易于理解,不同行業(yè)和領(lǐng)域的人都懂,學(xué)習(xí)成本較低

  • SQL 非常穩(wěn)定,在數(shù)據(jù)庫 30 多年的歷史中,SQL 本身變化較少

  • 流與批的統(tǒng)一,F(xiàn)link 底層 Runtime 本身就是一個流與批統(tǒng)一的引擎,而 SQL 可以做到 API 層的流與批統(tǒng)一。

整體介紹

1 什么是 Table API 和 Flink SQL?

        Flink本身是批流統(tǒng)一的處理框架,所以Table API和SQL,就是批流統(tǒng)一的上層處理API。目前功能尚未完善,處于活躍的開發(fā)階段。

        Table API是一套內(nèi)嵌在Java和Scala語言中的查詢API,它允許我們以非常直觀的方式,組合來自一些關(guān)系運算符的查詢(比如select、filter和join)。而對于Flink SQL,就是直接可以在代碼中寫SQL,來實現(xiàn)一些查詢(Query)操作。Flink的SQL支持,基于實現(xiàn)了SQL標(biāo)準(zhǔn)的Apache Calcite(Apache開源SQL解析工具)。

        無論輸入是批輸入還是流式輸入,在這兩套API中,指定的查詢都具有相同的語義,得到相同的結(jié)果。

2 需要引入的依賴

        Table API 和 SQL 需要引入的依賴有兩個:plannerbridge

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.11</artifactId><version>1.10.0</version></dependency>

        其中:

        flink-table-plannerplanner計劃器,是table API最主要的部分,提供了運行時環(huán)境和生成程序執(zhí)行計劃的planner;

        flink-table-api-scala-bridgebridge橋接器,主要負(fù)責(zé)table API和 DataStream/DataSet API的連接支持,按照語言分java和scala;

        這里的兩個依賴,是IDE環(huán)境下運行需要添加的;如果是生產(chǎn)環(huán)境,lib目錄下默認(rèn)已經(jīng)有了planner,就只需要有bridge就可以了。

        當(dāng)然,如果想使用用戶自定義函數(shù),或是跟 kafka 做連接,需要有一個SQL client,這個包含在 flink-table-common 里。
        

3 兩種planner(old & blink)的區(qū)別

        1、批流統(tǒng)一:Blink將批處理作業(yè),視為流式處理的特殊情況。所以,blink不支持表和DataSet之間的轉(zhuǎn)換,批處理作業(yè)將不轉(zhuǎn)換為DataSet應(yīng)用程序,而是跟流處理一樣,轉(zhuǎn)換為DataStream程序來處理。

        2、因為批流統(tǒng)一,Blink planner也不支持BatchTableSource,而使用有界的StreamTableSource代替。

        3、Blink planner只支持全新的目錄,不支持已棄用的ExternalCatalog。

        4、舊 planner 和 Blink planner 的FilterableTableSource實現(xiàn)不兼容。舊的planner會把PlannerExpressions下推到filterableTableSource中,而blink planner則會把Expressions下推。

        5、基于字符串的鍵值配置選項僅適用于Blink planner。

        6、PlannerConfig在兩個planner中的實現(xiàn)不同。

        7、Blink planner會將多個sink優(yōu)化在一個DAG中(僅在TableEnvironment上受支持,而在StreamTableEnvironment上不受支持)。而舊 planner 的優(yōu)化總是將每一個sink放在一個新的DAG中,其中所有DAG彼此獨立。

        8、舊的planner不支持目錄統(tǒng)計,而Blink planner支持。

API 調(diào)用

1 基本程序結(jié)構(gòu)

        Table API 和 SQL 的程序結(jié)構(gòu),與流式處理的程序結(jié)構(gòu)類似;也可以近似地認(rèn)為有這么幾步:首先創(chuàng)建執(zhí)行環(huán)境,然后定義source、transform和sink。

        具體操作流程如下:

val tableEnv = ...     // 創(chuàng)建表的執(zhí)行環(huán)境// 創(chuàng)建一張表,用于讀取數(shù)據(jù)tableEnv.connect(...).createTemporaryTable("inputTable")// 注冊一張表,用于把計算結(jié)果輸出tableEnv.connect(...).createTemporaryTable("outputTable")// 通過 Table API 查詢算子,得到一張結(jié)果表val result = tableEnv.from("inputTable").select(...)// 通過 SQL查詢語句,得到一張結(jié)果表val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM inputTable ...")// 將結(jié)果表寫入輸出表中result.insertInto("outputTable")

2 創(chuàng)建表環(huán)境

        創(chuàng)建表環(huán)境最簡單的方式,就是基于流處理執(zhí)行環(huán)境,調(diào)create方法直接創(chuàng)建:

val tableEnv = StreamTableEnvironment.create(env)

        表環(huán)境(TableEnvironment)是flink中集成 Table API & SQL 的核心概念。它負(fù)責(zé):

  • 注冊catalog

  • 在內(nèi)部 catalog 中注冊表

  • 執(zhí)行 SQL 查詢

  • 注冊用戶自定義函數(shù)

  • 將 DataStream 或 DataSet 轉(zhuǎn)換為表

  • 保存對 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

        在創(chuàng)建TableEnv的時候,可以多傳入一個EnvironmentSettings 或者 TableConfig 參數(shù),可以用來配置 TableEnvironment 的一些特性。

        比如,配置老版本的流式查詢(Flink-Streaming-Query):

val settings = EnvironmentSettings.newInstance()
  .useOldPlanner()      // 使用老版本planner
  .inStreamingMode()    // 流處理模式
  .build()val tableEnv = StreamTableEnvironment.create(env, settings)

        基于老版本的批處理環(huán)境(Flink-Batch-Query):

val batchEnv = ExecutionEnvironment.getExecutionEnvironmentval batchTableEnv = BatchTableEnvironment.create(batchEnv)

        基于 blink 版本的流處理環(huán)境(Blink-Streaming-Query):

val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)

        基于blink版本的批處理環(huán)境(Blink-Batch-Query):

val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()val bbTableEnv = TableEnvironment.create(bbSettings)

3 在Catalog中注冊表

3.1 表(Table)的概念

        TableEnvironment 可以注冊目錄 Catalog ,并可以基于Catalog注冊表。它會維護(hù)一個 Catalog-Table 表之間的map。

        表(Table)是由一個“標(biāo)識符”來指定的,由3部分組成:Catalog名、數(shù)據(jù)庫(database)名和對象名(表名)。如果沒有指定目錄或數(shù)據(jù)庫,就使用當(dāng)前的默認(rèn)值。

        表可以是常規(guī)的(Table,表),或者虛擬的(View,視圖)。常規(guī)表(Table)一般可以用來描述外部數(shù)據(jù),比如文件、數(shù)據(jù)庫表或消息隊列的數(shù)據(jù),也可以直接從 DataStream轉(zhuǎn)換而來。視圖可以從現(xiàn)有的表中創(chuàng)建,通常是 table API 或者SQL查詢的一個結(jié)果。

3.2 連接到文件系統(tǒng)(Csv格式)

        連接外部系統(tǒng)在Catalog中注冊表,直接調(diào)用 tableEnv.connect() 就可以,里面參數(shù)要傳入一個 ConnectorDescriptor ,也就是connector描述器。對于文件系統(tǒng)的 connector 而言,flink內(nèi)部已經(jīng)提供了,就叫做FileSystem()

        代碼如下:

tableEnv.connect( new FileSystem().path("sensor.txt"))  // 定義表數(shù)據(jù)來源,外部連接
  .withFormat(new OldCsv())    // 定義從外部系統(tǒng)讀取數(shù)據(jù)之后的格式化方法
  .withSchema( new Schema().field("id", DataTypes.STRING()).field("timestamp", DataTypes.BIGINT()).field("temperature", DataTypes.DOUBLE())
  )    // 定義表結(jié)構(gòu)
  .createTemporaryTable("inputTable")    // 創(chuàng)建臨時表

        這是舊版本的csv格式描述器。由于它是非標(biāo)的,跟外部系統(tǒng)對接并不通用,所以將被棄用,以后會被一個符合RFC-4180標(biāo)準(zhǔn)的新format描述器取代。新的描述器就叫Csv(),但flink沒有直接提供,需要引入依賴flink-csv

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.10.0</version></dependency>

        代碼非常類似,只需要把 withFormat 里的 OldCsv 改成Csv就可以了。

3.3 連接到Kafka

        kafka的連接器 flink-kafka-connector 中,1.10 版本的已經(jīng)提供了 Table API 的支持。我們可以在 connect方法中直接傳入一個叫做Kafka的類,這就是kafka連接器的描述器ConnectorDescriptor。

tableEnv.connect(
  new Kafka().version("0.11") // 定義kafka的版本.topic("sensor") // 定義主題.property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092"))
  .withFormat(new Csv())
  .withSchema(new Schema()
  .field("id", DataTypes.STRING())
  .field("timestamp", DataTypes.BIGINT())
  .field("temperature", DataTypes.DOUBLE()))
  .createTemporaryTable("kafkaInputTable")

        當(dāng)然也可以連接到 ElasticSearch、MySql、HBase、Hive等外部系統(tǒng),實現(xiàn)方式基本上是類似的。感興趣的 小伙伴可以自行去研究,這里就不詳細(xì)贅述了。

4 表的查詢

        通過上面的學(xué)習(xí),我們已經(jīng)利用外部系統(tǒng)的連接器connector,我們可以讀寫數(shù)據(jù),并在環(huán)境的Catalog中注冊表。接下來就可以對表做查詢轉(zhuǎn)換了。

        Flink給我們提供了兩種查詢方式:Table API和 SQL。

4.1 Table API的調(diào)用

        Table API是集成在Scala和Java語言內(nèi)的查詢API。與SQL不同,Table API的查詢不會用字符串表示,而是在宿主語言中一步一步調(diào)用完成的。

        Table API基于代表一張“表”的Table類,并提供一整套操作處理的方法API。這些方法會返回一個新的Table對象,這個對象就表示對輸入表應(yīng)用轉(zhuǎn)換操作的結(jié)果。有些關(guān)系型轉(zhuǎn)換操作,可以由多個方法調(diào)用組成,構(gòu)成鏈?zhǔn)秸{(diào)用結(jié)構(gòu)。例如table.select(…).filter(…),其中 select(…)表示選擇表中指定的字段,filter(…)表示篩選條件。

        代碼中的實現(xiàn)如下:

val sensorTable: Table = tableEnv.from("inputTable")val resultTable: Table = senorTable.select("id, temperature").filter("id ='sensor_1'")
4.2 SQL查詢

        Flink的SQL集成,基于的是ApacheCalcite,它實現(xiàn)了SQL標(biāo)準(zhǔn)。在Flink中,用常規(guī)字符串來定義SQL查詢語句。SQL 查詢的結(jié)果,是一個新的 Table。

        代碼實現(xiàn)如下:

val resultSqlTable: Table = tableEnv.sqlQuery("select id, temperature from inputTable where id ='sensor_1'")

        或者:

val resultSqlTable: Table = tableEnv.sqlQuery(
  """
    |select id, temperature
    |from inputTable
    |where id = 'sensor_1'
  """.stripMargin)

        當(dāng)然,也可以加上聚合操作,比如我們統(tǒng)計每個sensor溫度數(shù)據(jù)出現(xiàn)的個數(shù),做個count統(tǒng)計:

val aggResultTable = sensorTable.groupBy('id).select('id, 'id.count as 'count)

        SQL的實現(xiàn):

val aggResultSqlTable = tableEnv.sqlQuery("select id, count(id) as cnt from inputTable group by id")

        這里Table API里指定的字段,前面加了一個單引號’,這是Table API中定義的Expression類型的寫法,可以很方便地表示一個表中的字段。

        字段可以直接全部用雙引號引起來,也可以用半邊單引號+字段名的方式。以后的代碼中,一般都用后一種形式。

4.5 將DataStream 轉(zhuǎn)換成表

        Flink允許我們把Table和DataStream做轉(zhuǎn)換:我們可以基于一個DataStream,先流式地讀取數(shù)據(jù)源,然后map成樣例類,再把它轉(zhuǎn)成Table。Table的列字段(column fields),就是樣例類里的字段,這樣就不用再麻煩地定義schema了。

4.5.1 代碼表達(dá)

        代碼中實現(xiàn)非常簡單,直接用 tableEnv.fromDataStream() 就可以了。默認(rèn)轉(zhuǎn)換后的 Table schema 和 DataStream 中的字段定義一一對應(yīng),也可以單獨指定出來。

        這就允許我們更換字段的順序、重命名,或者只選取某些字段出來,相當(dāng)于做了一次map操作(或者Table API的 select操作)。

        代碼具體如下:

val inputStream: DataStream[String] = env.readTextFile("sensor.txt")val dataStream: DataStream[SensorReading] = inputStream  .map(data => {
   
   
   val dataArray = data.split(",")SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
  })val sensorTable: Table = tableEnv.fromDataStreama(datStream)val sensorTable2 = tableEnv.fromDataStream(dataStream, 'id, 'timestamp as 'ts)
4.5.2 數(shù)據(jù)類型與 Table schema的對應(yīng)

        在上節(jié)的例子中,DataStream 中的數(shù)據(jù)類型,與表的 Schema 之間的對應(yīng)關(guān)系,是按照樣例類中的字段名來對應(yīng)的(name-based mapping),所以還可以用as做重命名。

        另外一種對應(yīng)方式是,直接按照字段的位置來對應(yīng)(position-based mapping),對應(yīng)的過程中,就可以直接指定新的字段名了。

        基于名稱的對應(yīng):

val sensorTable = tableEnv.fromDataStream(dataStream, 'timestamp as 'ts, 'id as 'myId, 'temperature)

        基于位置的對應(yīng):

val sensorTable = tableEnv.fromDataStream(dataStream, 'myId, 'ts)

        Flink的 DataStream 和 DataSet API 支持多種類型。

        組合類型,比如元組(內(nèi)置Scala和Java元組)、POJO、Scala case類和Flink的Row類型等,允許具有多個字段的嵌套數(shù)據(jù)結(jié)構(gòu),這些字段可以在Table的表達(dá)式中訪問。其他類型,則被視為原子類型。

        元組類型和原子類型,一般用位置對應(yīng)會好一些;如果非要用名稱對應(yīng),也是可以的:元組類型,默認(rèn)的名稱是 “_1”, “_2”;而原子類型,默認(rèn)名稱是 ”f0”。

4.6 創(chuàng)建臨時視圖(Temporary View)

        創(chuàng)建臨時視圖的第一種方式,就是直接從DataStream轉(zhuǎn)換而來。同樣,可以直接對應(yīng)字段轉(zhuǎn)換;也可以在轉(zhuǎn)換的時候,指定相應(yīng)的字段。

        代碼如下:

tableEnv.createTemporaryView("sensorView", dataStream)tableEnv.createTemporaryView("sensorView", dataStream, 'id, 'temperature, 'timestamp as 'ts)

        另外,當(dāng)然還可以基于Table創(chuàng)建視圖:

tableEnv.createTemporaryView("sensorView", sensorTable)

        View和Table的Schema完全相同。事實上,在Table API中,可以認(rèn)為View 和 Table 是等價的。

4.7 輸出表

        表的輸出,是通過將數(shù)據(jù)寫入 TableSink 來實現(xiàn)的。TableSink 是一個通用接口,可以支持不同的文件格式、存儲數(shù)據(jù)庫和消息隊列。

        具體實現(xiàn),輸出表最直接的方法,就是通過 Table.insertInto() 方法將一個 Table 寫入注冊過的 TableSink 中

4.7.1 輸出到文件

        代碼如下:

// 注冊輸出表tableEnv.connect(
  new FileSystem().path("…\\resources\\out.txt")) // 定義到文件系統(tǒng)的連接
  .withFormat(new Csv()) // 定義格式化方法,Csv格式
  .withSchema(new Schema()
  .field("id", DataTypes.STRING())
  .field("temp", DataTypes.DOUBLE())) // 定義表結(jié)構(gòu)
  .createTemporaryTable("outputTable") // 創(chuàng)建臨時表resultSqlTable.insertInto("outputTable")
4.7.2 更新模式(Update Mode)

        在流處理過程中,表的處理并不像傳統(tǒng)定義的那樣簡單。

        對于流式查詢(Streaming Queries),需要聲明如何在(動態(tài))表和外部連接器之間執(zhí)行轉(zhuǎn)換。與外部系統(tǒng)交換的消息類型,由更新模式(update mode)指定。

        Flink Table API中的更新模式有以下三種:

  • 追加模式(Append Mode)

        在追加模式下,表(動態(tài)表)和外部連接器只交換插入(Insert)消息。

  • 撤回模式(Retract Mode)

        在撤回模式下,表和外部連接器交換的是:添加(Add)和撤回(Retract)消息。

        其中:

  • 插入(Insert)會被編碼為添加消息;

  • 刪除(Delete)則編碼為撤回消息;

  • 更新(Update)則會編碼為,已更新行(上一行)的撤回消息,和更新行(新行)的添加消息。

        在此模式下,不能定義key,這一點跟upsert模式完全不同。

  • Upsert(更新插入)模式

        在Upsert模式下,動態(tài)表和外部連接器交換Upsert和Delete消息。

        這個模式需要一個唯一的key,通過這個key可以傳遞更新消息。為了正確應(yīng)用消息,外部連接器需要知道這個唯一key的屬性。

  • 插入(Insert)和更新(Update)都被編碼為Upsert消息;

  • 刪除(Delete)編碼為Delete信息

        這種模式和 Retract 模式的主要區(qū)別在于,Update操作是用單個消息編碼的,所以效率會更高。

4.7.3 輸出到Kafka

        除了輸出到文件,也可以輸出到Kafka。我們可以結(jié)合前面Kafka作為輸入數(shù)據(jù),構(gòu)建數(shù)據(jù)管道,kafka進(jìn),kafka出。

        代碼如下:

// 輸出到 kafkatableEnv.connect(
  new Kafka().version("0.11").topic("sinkTest").property("zookeeper.connect", "localhost:2181").property("bootstrap.servers", "localhost:9092"))
  .withFormat( new Csv() )
  .withSchema( new Schema().field("id", DataTypes.STRING()).field("temp", DataTypes.DOUBLE())
  )
  .createTemporaryTable("kafkaOutputTable")resultTable.insertInto("kafkaOutputTable")
4.7.4 輸出到ElasticSearch

        ElasticSearch的connector可以在upsert(update+insert,更新插入)模式下操作,這樣就可以使用Query定義的鍵(key)與外部系統(tǒng)交換UPSERT/DELETE消息。

        另外,對于“僅追加”(append-only)的查詢,connector還可以在 append 模式下操作,這樣就可以與外部系統(tǒng)只交換 insert 消息。

        es目前支持的數(shù)據(jù)格式,只有Json,而 flink 本身并沒有對應(yīng)的支持,所以還需要引入依賴:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.10.0</version></dependency>

        代碼實現(xiàn)如下:

// 輸出到estableEnv.connect(
  new Elasticsearch().version("6").host("localhost", 9200, "http").index("sensor").documentType("temp"))
  .inUpsertMode()           // 指定是 Upsert 模式
  .withFormat(new Json())
  .withSchema( new Schema().field("id", DataTypes.STRING()).field("count", DataTypes.BIGINT())
  )
  .createTemporaryTable("esOutputTable")aggResultTable.insertInto("esOutputTable")
4.7.5 輸出到MySql

        Flink專門為Table API的jdbc連接提供了flink-jdbc連接器,我們需要先引入依賴:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_2.11</artifactId><version>1.10.0</version></dependency>

        jdbc連接的代碼實現(xiàn)比較特殊,因為沒有對應(yīng)的java/scala類實現(xiàn) ConnectorDescriptor,所以不能直接 tableEnv.connect()。不過Flink SQL留下了執(zhí)行DDL的接口:tableEnv.sqlUpdate()

        對于jdbc的創(chuàng)建表操作,天生就適合直接寫DDL來實現(xiàn),所以我們的代碼可以這樣寫:

// 輸出到 Mysqlval sinkDDL: String =
  """
    |create table jdbcOutputTable (
    |  id varchar(20) not null,
    |  cnt bigint not null
    |) with (
    |  'connector.type' = 'jdbc',
    |  'connector.url' = 'jdbc:mysql://localhost:3306/test',
    |  'connector.table' = 'sensor_count',
    |  'connector.driver' = 'com.mysql.jdbc.Driver',
    |  'connector.username' = 'root',
    |  'connector.password' = '123456'
    |)
  """.stripMargin

tableEnv.sqlUpdate(sinkDDL)aggResultSqlTable.insertInto("jdbcOutputTable")
4.7.6 將表轉(zhuǎn)換成DataStream

        表可以轉(zhuǎn)換為DataStream或DataSet。這樣,自定義流處理或批處理程序就可以繼續(xù)在 Table API或SQL查詢的結(jié)果上運行了。

        將表轉(zhuǎn)換為DataStream或DataSet時,需要指定生成的數(shù)據(jù)類型,即要將表的每一行轉(zhuǎn)換成的數(shù)據(jù)類型。通常,最方便的轉(zhuǎn)換類型就是Row。當(dāng)然,因為結(jié)果的所有字段類型都是明確的,我們也經(jīng)常會用元組類型來表示。

        表作為流式查詢的結(jié)果,是動態(tài)更新的。所以,將這種動態(tài)查詢轉(zhuǎn)換成的數(shù)據(jù)流,同樣需要對表的更新操作進(jìn)行編碼,進(jìn)而有不同的轉(zhuǎn)換模式。

        Table API 中表到 DataStream 有兩種模式:

  • 追加模式(Append Mode)

        用于表只會被插入(Insert)操作更改的場景

  • 撤回模式(Retract Mode)

        用于任何場景。有些類似于更新模式中Retract模式,它只有 Insert 和 Delete 兩類操作。

        得到的數(shù)據(jù)會增加一個Boolean類型的標(biāo)識位(返回的第一個字段),用它來表示到底是新增的數(shù)據(jù)(Insert),還是被刪除的數(shù)據(jù)(老數(shù)據(jù), Delete)。

        代碼實現(xiàn)如下:

val resultStream: DataStream[Row] = tableEnv.toAppendStream[Row](resultTable)val aggResultStream: DataStream[(Boolean, (String, Long))] = tableEnv.toRetractStream[(String, Long)](aggResultTable)resultStream.print("result")aggResultStream.print("aggResult")

        所以,沒有經(jīng)過groupby之類聚合操作,可以直接用 toAppendStream 來轉(zhuǎn)換;而如果經(jīng)過了聚合,有更新操作,一般就必須用 toRetractDstream。

4.7.7 Query的解釋和執(zhí)行

        Table API提供了一種機(jī)制來解釋(Explain)計算表的邏輯和優(yōu)化查詢計劃。這是通過TableEnvironment.explain(table)方法或TableEnvironment.explain()方法完成的。

        explain方法會返回一個字符串,描述三個計劃:

  • 未優(yōu)化的邏輯查詢計劃

  • 優(yōu)化后的邏輯查詢計劃

  • 實際執(zhí)行計劃

        我們可以在代碼中查看執(zhí)行計劃:

val explaination: String = tableEnv.explain(resultTable)println(explaination)

        Query的解釋和執(zhí)行過程,老planner和 blink planner 大體是一致的,又有所不同。整體來講,Query都會表示成一個邏輯查詢計劃,然后分兩步解釋:

  1. 優(yōu)化查詢計劃

  2. 解釋成 DataStream 或者 DataSet程序

        而 Blink 版本是批流統(tǒng)一的,所以所有的Query,只會被解釋成DataStream程序;另外在批處理環(huán)境 TableEnvironment 下,Blink版本要到 tableEnv.execute() 執(zhí)行調(diào)用才開始解釋。

到此,關(guān)于“FlinkSQL  API怎么調(diào)用”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI