溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Flink 1.11新特性之SQL Hive Streaming的示例分析

發(fā)布時間:2021-12-10 09:13:59 來源:億速云 閱讀:129 作者:小新 欄目:大數(shù)據(jù)

這篇文章主要介紹了Flink 1.11新特性之SQL Hive Streaming的示例分析,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。



添加相關(guān)依賴


測試集群上的 Hive 版本為 1.1.0,Hadoop 版本為 2.6.0,Kafka 版本為 1.0.1。

<properties>  <scala.bin.version>2.11</scala.bin.version>  <flink.version>1.11.0</flink.version>  <flink-shaded-hadoop.version>2.6.5-10.0</flink-shaded-hadoop.version>  <hive.version>1.1.0</hive.version></properties>
<dependencies>  <dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-streaming-scala_${scala.bin.version}</artifactId>    <version>${flink.version}</version>  </dependency>  <dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-clients_${scala.bin.version}</artifactId>    <version>${flink.version}</version>  </dependency>  <dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-table-common</artifactId>    <version>${flink.version}</version>  </dependency>  <dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-table-api-scala-bridge_${scala.bin.version}</artifactId>    <version>${flink.version}</version>  </dependency>  <dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-table-planner-blink_${scala.bin.version}</artifactId>    <version>${flink.version}</version>  </dependency>  <dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-connector-hive_${scala.bin.version}</artifactId>    <version>${flink.version}</version>  </dependency>  <dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-sql-connector-kafka_${scala.bin.version}</artifactId>    <version>${flink.version}</version>  </dependency>  <dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-json</artifactId>    <version>${flink.version}</version>  </dependency>  <dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-shaded-hadoop-2-uber</artifactId>    <version>${flink-shaded-hadoop.version}</version>  </dependency>  <dependency>    <groupId>org.apache.hive</groupId>    <artifactId>hive-exec</artifactId>    <version>${hive.version}</version>  </dependency>

另外,別忘了找到 hdfs-site.xml 和 hive-site.xml,并將其加入項目。

創(chuàng)建執(zhí)行環(huán)境


Flink 1.11 的 Table/SQL API 中,F(xiàn)ileSystem Connector 是靠增強版 StreamingFileSink 組件實現(xiàn),在源碼中名為 StreamingFileWriter。我們知道,只有在 Checkpoint 成功時,StreamingFileSink 寫入的文件才會由 Pending 狀態(tài)變成 Finished 狀態(tài),從而能夠安全地被下游讀取。所以,我們一定要打開 Checkpointing,并設(shè)定合理的間隔。

Flink 1.11新特性之SQL Hive Streaming的示例分析

val streamEnv = StreamExecutionEnvironment.getExecutionEnvironmentstreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)streamEnv.setParallelism(3)
val tableEnvSettings = EnvironmentSettings.newInstance()    .useBlinkPlanner()    .inStreamingMode()    .build()val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20))

   
   
  

           

注冊 HiveCatalog


val catalogName = "my_catalog"val catalog = new HiveCatalog(  catalogName,              // catalog name  "default",                // default database  "/Users/lmagic/develop",  // Hive config (hive-site.xml) directory  "1.1.0"                   // Hive version)tableEnv.registerCatalog(catalogName, catalog)tableEnv.useCatalog(catalogName)
     


創(chuàng)建 Kafka 流表


Kafka Topic 中存儲的是 JSON 格式的埋點日志,建表時用計算列生成事件時間與水印。1.11 版本 SQL Kafka Connector 的參數(shù)相比 1.10 版本有一定簡化。

tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS stream_tmp")tableEnv.executeSql("DROP TABLE IF EXISTS stream_tmp.analytics_access_log_kafka")
tableEnv.executeSql(  """    |CREATE TABLE stream_tmp.analytics_access_log_kafka (    |  ts BIGINT,    |  userId BIGINT,    |  eventType STRING,    |  fromType STRING,    |  columnType STRING,    |  siteId BIGINT,    |  grouponId BIGINT,    |  partnerId BIGINT,    |  merchandiseId BIGINT,    |  procTime AS PROCTIME(),    |  eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000,'yyyy-MM-dd HH:mm:ss')),    |  WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND    |) WITH (    |  'connector' = 'kafka',    |  'topic' = 'ods_analytics_access_log',    |  'properties.bootstrap.servers' = 'kafka110:9092,kafka111:9092,kafka112:9092'    |  'properties.group.id' = 'flink_hive_integration_exp_1',    |  'scan.startup.mode' = 'latest-offset',    |  'format' = 'json',    |  'json.fail-on-missing-field' = 'false',    |  'json.ignore-parse-errors' = 'true'    |)  """.stripMargin)

前面已經(jīng)注冊了 HiveCatalog,故在 Hive 中可以觀察到創(chuàng)建的 Kafka 流表的元數(shù)據(jù)(注意該表并沒有事實上的列)。

hive> DESCRIBE FORMATTED stream_tmp.analytics_access_log_kafka;OK# col_name              data_type               comment

# Detailed Table InformationDatabase:               stream_tmpOwner:                  nullCreateTime:             Wed Jul 15 18:25:09 CST 2020LastAccessTime:         UNKNOWNProtect Mode:           NoneRetention:              0Location:               hdfs://sht-bdmq-cls/user/hive/warehouse/stream_tmp.db/analytics_access_log_kafkaTable Type:             MANAGED_TABLETable Parameters:    flink.connector         kafka    flink.format            json    flink.json.fail-on-missing-field    false    flink.json.ignore-parse-errors  true    flink.properties.bootstrap.servers  kafka110:9092,kafka111:9092,kafka112:9092    flink.properties.group.id   flink_hive_integration_exp_1    flink.scan.startup.mode latest-offset    flink.schema.0.data-type    BIGINT    flink.schema.0.name     ts    flink.schema.1.data-type    BIGINT    flink.schema.1.name     userId    flink.schema.10.data-type   TIMESTAMP(3)    flink.schema.10.expr    TO_TIMESTAMP(FROM_UNIXTIME(`ts` / 1000, 'yyyy-MM-dd HH:mm:ss'))    flink.schema.10.name    eventTime    flink.schema.2.data-type    VARCHAR(2147483647)    flink.schema.2.name     eventType    # 略......    flink.schema.9.data-type    TIMESTAMP(3) NOT NULL    flink.schema.9.expr     PROCTIME()    flink.schema.9.name     procTime    flink.schema.watermark.0.rowtime    eventTime    flink.schema.watermark.0.strategy.data-type TIMESTAMP(3)    flink.schema.watermark.0.strategy.expr  `eventTime` - INTERVAL '15' SECOND    flink.topic             ods_analytics_access_log    is_generic              true    transient_lastDdlTime   1594808709
# Storage InformationSerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDeInputFormat:            org.apache.hadoop.mapred.TextInputFormatOutputFormat:           org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormatCompressed:             NoNum Buckets:            -1Bucket Columns:         []Sort Columns:           []Storage Desc Params:    serialization.format    1Time taken: 1.797 seconds, Fetched: 61 row(s)

創(chuàng)建 Hive 表


Flink SQL 提供了兼容 HiveQL 風(fēng)格的 DDL,指定 SqlDialect.HIVE 即可( DML 兼容還在開發(fā)中)。

為了方便觀察結(jié)果,以下的表采用了天/小時/分鐘的三級分區(qū),實際應(yīng)用中可以不用這樣細(xì)的粒度(10分鐘甚至1小時的分區(qū)可能更合適)。

tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS hive_tmp")tableEnv.executeSql("DROP TABLE IF EXISTS hive_tmp.analytics_access_log_hive")
tableEnv.executeSql(  """    |CREATE TABLE hive_tmp.analytics_access_log_hive (    |  ts BIGINT,    |  user_id BIGINT,    |  event_type STRING,    |  from_type STRING,    |  column_type STRING,    |  site_id BIGINT,    |  groupon_id BIGINT,    |  partner_id BIGINT,    |  merchandise_id BIGINT    |) PARTITIONED BY (    |  ts_date STRING,    |  ts_hour STRING,    |  ts_minute STRING    |) STORED AS PARQUET    |TBLPROPERTIES (    |  'sink.partition-commit.trigger' = 'partition-time',    |  'sink.partition-commit.delay' = '1 min',    |  'sink.partition-commit.policy.kind' = 'metastore,success-file',    |  'partition.time-extractor.timestamp-pattern' = '$ts_date $ts_hour:$ts_minute:00'    |)  """.stripMargin)

Hive 表的參數(shù)復(fù)用了 SQL FileSystem Connector 的相關(guān)參數(shù),與分區(qū)提交(Partition Commit)密切相關(guān)。僅就上面出現(xiàn)的4個參數(shù)簡單解釋一下。

  • sink.partition-commit.trigger :觸發(fā)分區(qū)提交的時間特征。默認(rèn)為 processing-time,即處理時間,很顯然在有延遲的情況下,可能會造成數(shù)據(jù)分區(qū)錯亂。所以這里使用 partition-time,即按照分區(qū)時間戳(即分區(qū)內(nèi)數(shù)據(jù)對應(yīng)的事件時間)來提交。
  • partition.time-extractor.timestamp-pattern :分區(qū)時間戳的抽取格式。需要寫成 yyyy-MM-dd HH:mm:ss 的形式,并用 Hive 表中相應(yīng)的分區(qū)字段做占位符替換。顯然,Hive 表的分區(qū)字段值來自流表中定義好的事件時間,后面會看到。
  • sink.partition-commit.delay :觸發(fā)分區(qū)提交的延遲。在時間特征設(shè)為 partition-time 的情況下,當(dāng)水印時間戳大于分區(qū)創(chuàng)建時間加上此延遲時,分區(qū)才會真正提交。此值最好與分區(qū)粒度相同,例如若 Hive 表按1小時分區(qū),此參數(shù)可設(shè)為 1 h,若按 10 分鐘分區(qū),可設(shè)為 10 min。
  • sink.partition-commit.policy.kind :分區(qū)提交策略,可以理解為使分區(qū)對下游可見的附加操作。 metastore 表示更新 Hive Metastore 中的表元數(shù)據(jù), success-file 則表示在分區(qū)內(nèi)創(chuàng)建 _SUCCESS 標(biāo)記文件。

當(dāng)然,SQL FileSystem Connector 的功能并不限于此,還有很大自定義的空間(如可以自定義分區(qū)提交策略以合并小文件等)。具體可參見官方文檔。

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#streaming-sink

流式寫入 Hive


注意將流表中的事件時間轉(zhuǎn)化為 Hive 的分區(qū)。

tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)tableEnv.executeSql(  """    |INSERT INTO hive_tmp.analytics_access_log_hive    |SELECT    |  ts,userId,eventType,fromType,columnType,siteId,grouponId,partnerId,merchandiseId,    |  DATE_FORMAT(eventTime,'yyyy-MM-dd'),    |  DATE_FORMAT(eventTime,'HH'),    |  DATE_FORMAT(eventTime,'mm')    |FROM stream_tmp.analytics_access_log_kafka    |WHERE merchandiseId > 0  """.stripMargin)

來觀察一下流式 Sink 的結(jié)果吧。

Flink 1.11新特性之SQL Hive Streaming的示例分析


上文設(shè)定的 Checkpoint Interval 是 20 秒,可以看到,上圖中的數(shù)據(jù)文件恰好是以 20 秒的間隔寫入的。由于并行度為 3,所以每次寫入會生成 3 個文件。分區(qū)內(nèi)所有數(shù)據(jù)寫入完畢后,會同時生成 _SUCCESS 文件。如果是正在寫入的分區(qū),則會看到 .inprogress 文件。

通過 Hive 查詢一下,確定數(shù)據(jù)的時間無誤。

  
    
  
  
  
hive> SELECT from_unixtime(min(cast(ts / 1000 AS BIGINT))),from_unixtime(max(cast(ts / 1000 AS BIGINT)))    > FROM hive_tmp.analytics_access_log_hive    > WHERE ts_date = '2020-07-15' AND ts_hour = '23' AND ts_minute = '23';OK2020-07-15 23:23:00 2020-07-15 23:23:59Time taken: 1.115 seconds, Fetched: 1 row(s)
           

           

流式讀取 Hive


要將 Hive 表作為流式 Source,需要啟用 Dynamic Table Options,并通過 Table Hints 來指定 Hive 數(shù)據(jù)流的參數(shù)。以下是簡單地通過 Hive 計算商品 PV 的例子。

  
    
  
  
  
tableEnv.getConfig.getConfiguration.setBoolean(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true)
val result = tableEnv.sqlQuery(  """     |SELECT merchandise_id,count(1) AS pv     |FROM hive_tmp.analytics_access_log_hive     |/*+ OPTIONS(     |  'streaming-source.enable' = 'true',     |  'streaming-source.monitor-interval' = '1 min',     |  'streaming-source.consume-start-offset' = '2020-07-15 23:30:00'     |) */     |WHERE event_type = 'shtOpenGoodsDetail'     |AND ts_date >= '2020-07-15'     |GROUP BY merchandise_id     |ORDER BY pv DESC LIMIT 10   """.stripMargin)
result.toRetractStream[Row].print().setParallelism(1)streamEnv.execute()
           

           
三個 Table Hint 參數(shù)的含義解釋如下。

  • streaming-source.enable:設(shè)為 true,表示該 Hive 表可以作為 Source。
  • streaming-source.monitor-interval:感知 Hive 表新增數(shù)據(jù)的周期,以上設(shè)為 1 分鐘。對于分區(qū)表而言,則是監(jiān)控新分區(qū)的生成,以增量讀取數(shù)據(jù)。
  • streaming-source.consume-start-offset:開始消費的時間戳,同樣需要寫成 yyyy-MM-dd HH:mm:ss 的形式。

更加具體的說明仍然可參見官方文檔。

https://links.jianshu.com/go?to=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.11%2Fdev%2Ftable%2Fhive%2Fhive_streaming.html%23streaming-reading  
最后,由于 SQL 語句中有 ORDER BY 和 LIMIT 邏輯,所以需要調(diào)用 toRetractStream() 方法轉(zhuǎn)化為回撤流,即可輸出結(jié)果。

感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“Flink 1.11新特性之SQL Hive Streaming的示例分析”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,更多相關(guān)知識等著你來學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI