溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Flink 1.11與Hive批流一體數(shù)倉(cāng)的示例分析

發(fā)布時(shí)間:2021-12-10 11:13:49 來源:億速云 閱讀:176 作者:小新 欄目:大數(shù)據(jù)

小編給大家分享一下Flink 1.11與Hive批流一體數(shù)倉(cāng)的示例分析,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

一、 Flink 與 Hive 集成背景

為什么要做 Flink 和 Hive 集成的功能呢?最早的初衷是我們希望挖掘 Flink 在批處理方面的能力。眾所周知,F(xiàn)link 在流計(jì)算方面已經(jīng)是成功的引擎了,使用的用戶也非常多。在 Flink 的設(shè)計(jì)理念當(dāng)中,批計(jì)算是流處理中的一個(gè)特例。也就意味著,如果 Flink 在流計(jì)算方面做好,其實(shí)它的架構(gòu)也能很好的支持批計(jì)算的場(chǎng)景。在批計(jì)算的場(chǎng)景中,SQL 是一個(gè)很重要的切入點(diǎn)。因?yàn)樽鰯?shù)據(jù)分析的同學(xué),他們更習(xí)慣使用SQL 進(jìn)行開發(fā),而不是去寫 DataStream 或者 DataSet 這樣的程序。

Hadoop 生態(tài)圈的 SQL 引擎,Hive 是一個(gè)事實(shí)上的標(biāo)準(zhǔn)。大部分的用戶環(huán)境中都會(huì)使用到了 Hive 的一些功能,來搭建數(shù)倉(cāng)。一些比較新的 SQL 的引擎,例如 Spark SQL、Impala ,它們其實(shí)都提供了與 Hive 集成的能力。為了方便的能夠?qū)由夏壳坝脩粢延械氖褂脠?chǎng)景,所以我們認(rèn)為對(duì) Flink 而言,對(duì)接 Hive 也是不可缺少的功能。

因此,我們?cè)?Flink 1.9 當(dāng)中,就開始提供了與 Hive 集成的功能。當(dāng)然在 1.9 版本里面,這個(gè)功能是作為試用版發(fā)布的。到了 Flink 1.10 版本,與 Hive 集成的功能就達(dá)到了生產(chǎn)可用。同時(shí)在 Flink 1.10 發(fā)布的時(shí)候,我們用 10TB 的 TPC-DS 測(cè)試集,對(duì) Flink 和 Hive on MapReduce 進(jìn)行了對(duì)比,對(duì)比結(jié)果如下:

Flink 1.11與Hive批流一體數(shù)倉(cāng)的示例分析

藍(lán)色的方框表示 Flink 用的時(shí)間,桔紅色的方框表示 Hive on MapReduce 用的時(shí)間。最終的結(jié)果是 Flink 對(duì)于 Hive on MapReduce 大概提升了 7 倍左右的性能。所以驗(yàn)證了 Flink SQL 可以很好的支持批計(jì)算的場(chǎng)景。
接下來介紹下 Flink 對(duì)接 Hive 的設(shè)計(jì)架構(gòu)。對(duì)接 Hive 的時(shí)候需要幾個(gè)層面,分別是:

  • 能夠訪問 Hive 的元數(shù)據(jù);

  • 讀寫 Hive 表數(shù)據(jù);

  • Production Ready ;

1. 訪問 Hive 元數(shù)據(jù)

使用過 Hive 的同學(xué)應(yīng)該都知道,Hive 的元數(shù)據(jù)是通過 Hive Metastore 來管理的。所以意味著 Flink 需要打通與 Hive Metastore 的通信。為了更好的訪問 Hive 元數(shù)據(jù),在 Flink 這邊是提出了一套全新設(shè)計(jì)的 Catalog API 。

Flink 1.11與Hive批流一體數(shù)倉(cāng)的示例分析

這個(gè)全新的接口是一個(gè)通用化的設(shè)計(jì)。它并不只是為了對(duì)接 Hive 元數(shù)據(jù),理論上是它可以對(duì)接不同外部系統(tǒng)的元數(shù)據(jù)。

而且在一個(gè) Flink Session 當(dāng)中,是可以創(chuàng)建多個(gè) Catalog ,每一個(gè) Catalog 對(duì)應(yīng)于一個(gè)外部系統(tǒng)。用戶可以在 Flink Table API 或者如果使用的是 SQL Client 的話,可以在 Yaml 文件里指定定義哪些 Catalog 。然后在 SQL Client 創(chuàng)建 TableEnvironment 的時(shí)候,就會(huì)把這些 Catalog 加載起來。TableEnvironment 通過CatalogManager 來管理這些不同的 Catalog 的實(shí)例。這樣 SQL Client 在后續(xù)的提交 SQL 語句的過程中,就可以使用這些 Catalog 去訪問外部系統(tǒng)的元數(shù)據(jù)了。

上面這張圖里列出了 2 個(gè) Catalog 的實(shí)現(xiàn)。一個(gè)是 GenericlnMemoryCatalog ,把所有的元數(shù)據(jù)都保存在 Flink Client 端的內(nèi)存里。它的行為是類似于 Catalog 接口出現(xiàn)之前 Flink 的行為。也就是所有的元數(shù)據(jù)的生命周期跟 SQL Client 的 Session 周期是一樣的。當(dāng) Session 結(jié)束,在 Session 里面創(chuàng)建的元數(shù)據(jù)也就自動(dòng)的丟失了。

另一個(gè)是對(duì)接 Hive 著重介紹的 HiveCatalog 。HiveCatalog 背后對(duì)接的是 Hive Metastore 的實(shí)例,要與 Hive Metastore 進(jìn)行通信來做元數(shù)據(jù)的讀寫。為了支持多個(gè)版本的 Hive,不同版本的 Hive Metastore 的API可能存在不兼容。所以在 HiveCatalog 和 Hive Metastore 之間又加了一個(gè) HiveShim ,通過 HiveShim 可以支持不同版本的 Hive 。

這里的 HiveCatalog 一方面可以讓 Flink 去訪問 Hive 自身有的元數(shù)據(jù),另一方面它也為 Flink 提供了持久化元數(shù)據(jù)的能力。也就是 HiveCatalog 既可以用來存儲(chǔ) Hive的元數(shù)據(jù),也可以存 Flink 使用的元數(shù)據(jù)。例如,在 Flink 中創(chuàng)建一張 Kafka 的表,那么這張表也是可以存到 HiveCatalog 里的。這樣也就是為 Flink 提供了持久化元數(shù)據(jù)的能力。在沒有 HiveCatalog 之前,是沒有持久化能力的。

2. 讀寫 Hive 表數(shù)據(jù)

有了訪問 Hive 元數(shù)據(jù)的能力后,另一個(gè)重要的方面是讀寫 Hive 表數(shù)據(jù)。Hive 的表是存在 Hadoop 的 file system 里的,這個(gè) file system 是一個(gè) HDFS ,也可能是其他文件系統(tǒng)。只要是實(shí)現(xiàn)了 Hadoop 的 file system 接口的,理論上都可以存儲(chǔ)Hive 的表。

在 Flink 當(dāng)中:

  • 讀數(shù)據(jù)時(shí)實(shí)現(xiàn)了 HiveTableSource

  • 寫數(shù)據(jù)時(shí)實(shí)現(xiàn)了 HiveTableSink

而且設(shè)計(jì)的一個(gè)原則是:希望盡可能去復(fù)用 Hive 原有的 Input/Output Format、SerDe 等,來讀寫 Hive 的數(shù)據(jù)。這樣做的好處主要是 2 點(diǎn),一個(gè)是復(fù)用可以減少開發(fā)的工作量。另一個(gè)是復(fù)用好處是盡可能與 Hive 保證寫入數(shù)據(jù)的兼容性。目標(biāo)是Flink 寫入的數(shù)據(jù),Hive 必須可以正常的讀取。反之, Hive 寫入的數(shù)據(jù),F(xiàn)link 也可以正常讀取。

3. Production Ready

在 Flink 1.10 中,對(duì)接 Hive 的功能已經(jīng)實(shí)現(xiàn)了 Production Ready 。實(shí)現(xiàn) Production Ready 主要是認(rèn)為在功能上已經(jīng)完備了。具體實(shí)現(xiàn)的功能如下:

Flink 1.11與Hive批流一體數(shù)倉(cāng)的示例分析

二、Flink 1.11 中的新特性

下面將介紹下,在 Flink 1.11 版本中,對(duì)接 Hive 的一些新特性。

1. 簡(jiǎn)化的依賴管理

首先做的是簡(jiǎn)化使用 Hive connector 的依賴管理。Hive connector 的一個(gè)痛點(diǎn)是需要添加若干個(gè) jar 包的依賴,而且使用的 Hive 版本的不同,所需添加的 jar 包就不同。例如下圖:

Flink 1.11與Hive批流一體數(shù)倉(cāng)的示例分析

Flink 1.11與Hive批流一體數(shù)倉(cāng)的示例分析

第一張圖是使用的 Hive 1.0.0 版本需要添加的 jar 包。第二張圖是用 Hive 2.2.0 版本需要添加的 jar 包??梢钥闯觯还苁菑?jar 包的個(gè)數(shù)、版本等,不同 Hive 版本添加的 jar 包是不一樣的。所以如果不仔細(xì)去讀文檔的話,就很容易導(dǎo)致用戶添加的依賴錯(cuò)誤。一旦添加錯(cuò)誤,例如添加少了或者版本不對(duì),那么會(huì)報(bào)出來一些比較奇怪、難理解的錯(cuò)誤。這也是用戶在使用 Hive connector 時(shí)暴露最多的問題之一。

所以我們希望能簡(jiǎn)化依賴管理,給用戶提供更好的體驗(yàn)。具體的做法是,在 Flink 1.11 版本中開始,會(huì)提供一些預(yù)先打好的 Hive 依賴包:

Flink 1.11與Hive批流一體數(shù)倉(cāng)的示例分析

用戶可以根據(jù)自己的 Hive 版本,選擇對(duì)應(yīng)的依賴包就可以了。

如果用戶使用的 Hive 并不是開源版本的 Hive ,用戶還是可以使用 1.10 那種方式,去自己添加單個(gè) jar 包。

2. Hive Dialect 的增強(qiáng)

在 Flink 1.10 就引入了 Hive Dialect ,但是很少有人使用,因?yàn)檫@個(gè)版本的 Hive Dialect 功能比較弱。僅僅的一個(gè)功能是:是否允許創(chuàng)建分區(qū)表的開關(guān)。就是如果設(shè)置了 Hive Dialect ,那就可以在 Flink SQL 中創(chuàng)建分區(qū)表。如果沒設(shè)置,則不允許創(chuàng)建。

另一個(gè)關(guān)鍵的是它不提供 Hive 語法的兼容。如果設(shè)置了 Hive Dialect 并可以創(chuàng)建分區(qū)表,但是創(chuàng)建分區(qū)表的 DDL 并不是 Hive 的語法。

在 Flink 1.11 中著重對(duì) Hive Dialect 的功能進(jìn)行了增強(qiáng)。增強(qiáng)的目標(biāo)是:希望用戶在使用 Flink SQL Client 的時(shí)候,能夠獲得與使用 Hive CLI 或 Beeline 近似的使用體驗(yàn)。就是在使用 Flink SQL Client 中,可以去寫一些 Hive 特定的一些語法?;蛘哒f用戶在遷移至 Flink 的時(shí)候, Hive 的腳本可以完全不用修改。

為了實(shí)現(xiàn)上述目標(biāo),在 Flink 1.11 中做了如下改進(jìn):

  • 給 Dialect 做了參數(shù)化,目前參數(shù)支持 default 和 hive 兩種值。default 是Flink 自身的 Dialect ,hive 是 Hive 的 Dialect。

  • SQL Client 和 API 均可以使用。

  • 可以靈活的做動(dòng)態(tài)切換,切換是語句級(jí)別的。例如 Session 創(chuàng)建后,第一個(gè)語句想用 Flink 的 Dialect 來寫,就設(shè)置成 default 。在執(zhí)行了幾行語句后,想用 Hive 的 Dialect 來寫,就可以設(shè)置成 hive 。在切換時(shí),就不需要重啟 Session。

  • 兼容 Hive 常用 DDL 以及基礎(chǔ)的 DML。

  • 提供與 Hive CLI 或 Beeline 近似的使用體驗(yàn)。

3. 開啟 Hive Dialect

Flink 1.11與Hive批流一體數(shù)倉(cāng)的示例分析

上圖是在 SQL Client 中開啟 Hive Dialect 的方法。在 SQL Client 中可以設(shè)置初始的 Dialect。可以在 Yaml 文件里設(shè)置,也可以在 SQL Client 起來后,進(jìn)行動(dòng)態(tài)的切換。

還可以通過 Flink Table API 的方式開啟 Hive Dialect :

Flink 1.11與Hive批流一體數(shù)倉(cāng)的示例分析

可以看到通過 TableEnvironment 去獲取 Config 然后設(shè)置開啟。

4. Hive Dialect 支持的語法

Hive Dialect 的語法主要是在 DDL 方面進(jìn)行了增強(qiáng)。因?yàn)樵?1.10 中通過 Flink SQL寫 DDL 去操作 Hive 的元數(shù)據(jù)不是十分可用,所以要解決這個(gè)痛點(diǎn),將主要精力集中在 DDL 方向了。

目前所支持的 DDL 如下:

Flink 1.11與Hive批流一體數(shù)倉(cāng)的示例分析

Flink 1.11與Hive批流一體數(shù)倉(cāng)的示例分析

5. 流式數(shù)據(jù)寫入Hive

在 Flink 1.11 中還做了流式數(shù)據(jù)場(chǎng)景,以及跟 Hive 相結(jié)合的功能,通過 Flink 與Hive 的結(jié)合,來幫助 Hive 數(shù)倉(cāng)進(jìn)行實(shí)時(shí)化的改造。

Flink 1.11與Hive批流一體數(shù)倉(cāng)的示例分析

流式數(shù)據(jù)寫入 Hive 是借助 Streaming File Sink 實(shí)現(xiàn)的,它是完全 SQL 化的,不需要用戶進(jìn)行代碼開發(fā)。流式數(shù)據(jù)寫入 Hive 也支持分區(qū)和非分區(qū)表。Hive 數(shù)倉(cāng)一般都是離線數(shù)據(jù),用戶對(duì)數(shù)據(jù)一致性要求比較高,所以支持 Exactly-Once 語義。流式數(shù)據(jù)寫 Hive 大概有 5-10 分鐘級(jí)別的延遲。如果希望延遲盡可能的低,那么產(chǎn)生的一個(gè)結(jié)果就是會(huì)生成更多的小文件。小文件對(duì) HDFS 來說是不友好的,小文件多了以后,會(huì)影響 HDFS 的性能。這種情況下可以做一些小文的合并操作。

流式數(shù)據(jù)寫入 Hive 需要有幾個(gè)配置的地方:

Flink 1.11與Hive批流一體數(shù)倉(cāng)的示例分析

對(duì)于分區(qū)表來說,要設(shè)置 Partition Commit Delay 的參數(shù)。這個(gè)參數(shù)的意義就是控制每個(gè)分區(qū)包含多長(zhǎng)時(shí)間的數(shù)據(jù),例如可設(shè)置成天、小時(shí)等。

Partition Commit Trigger 表示 Partition Commit 什么時(shí)候觸發(fā),在 1.11 版本中支持 Process-time 和 Partition-time 觸發(fā)機(jī)制。

Partition Commit Policy 表示用什么方式提交分區(qū)。對(duì)于 Hive 來說,是需要將分區(qū)提交到 metastore, 這樣分區(qū)才是可見的。metastore 策略只支持 Hive 表。還有一個(gè)是 success-file 方式,success-file 是告訴下游的作業(yè)分區(qū)的數(shù)據(jù)已經(jīng)準(zhǔn)備好了。用戶也可以自定義,自己去實(shí)現(xiàn)一個(gè)提交方式。另外 Policy 可以指定多個(gè)的,例如可以同時(shí)指定 metastore 和 success-file。

下面看下流式數(shù)據(jù)寫入Hive的實(shí)現(xiàn)原理:

Flink 1.11與Hive批流一體數(shù)倉(cāng)的示例分析

主要是兩個(gè)部分,一個(gè)是 StreamingFileWriter ,借助它實(shí)現(xiàn)數(shù)據(jù)的寫入,它會(huì)區(qū)分 Bucket,這里的 Buck 類似 Hive 的分區(qū)概念,每個(gè) Subtask 都會(huì)往不同的 Bucket去寫數(shù)據(jù)。每個(gè) Subtask 寫的 Bucket 同一個(gè)時(shí)間可能會(huì)維持 3 種文件,In-progress Files 表示正在寫的文件,Pending Files 表示文件已經(jīng)寫完了但是還沒有提交,F(xiàn)inished Files 表示文件已經(jīng)寫完并且也已經(jīng)提交了。

另一個(gè)是 StreamingFileCommitter,在 StreamingFileWriter 后執(zhí)行。它是用來提交分區(qū)的,所以對(duì)于非分區(qū)表就不需要它了。當(dāng) StreamingFileWriter 的一個(gè)分區(qū)數(shù)據(jù)準(zhǔn)備好后,StreamingFileWriter 會(huì)向 StreamingFileCommitter 發(fā)一個(gè) Commit Message,Commit Message 告訴 StreamingFileCommitter 那些數(shù)據(jù)已經(jīng)準(zhǔn)備好了的。然后進(jìn)行提交的觸發(fā) Commit Trigger,以及提交方式 Commit Policy。

下面是一個(gè)具體的例子:

Flink 1.11與Hive批流一體數(shù)倉(cāng)的示例分析

例子中創(chuàng)建了一個(gè)叫 hive_table 的分區(qū)表,它有兩個(gè)分區(qū) dt 和 hour。dt 代表的是日期的字符串,hour 代表小時(shí)的字符串。Commit trigger 設(shè)置的是 partition-time,Commit delay 設(shè)置的是1小時(shí),Commit Policy 設(shè)置的是 metastore 和success-file。

6. 流式消費(fèi) Hive

在 Flink 1.10 中讀 Hive 數(shù)據(jù)的方式是批的方式去讀的,從 1.11 版本中,提供了流式的去讀 Hive 數(shù)據(jù)。

Flink 1.11與Hive批流一體數(shù)倉(cāng)的示例分析

通過不斷的監(jiān)控 Hive 數(shù)據(jù)表有沒有新數(shù)據(jù),有的話就進(jìn)行增量數(shù)據(jù)的消費(fèi)。

如果要針對(duì)某一張 Hive 表開啟流式消費(fèi),可以在 table property 中開啟,或者也可以使用在 1.11 中新加的 dynamic options 功能,可以查詢的時(shí)候動(dòng)態(tài)的指定 Hive 表是否需要打開流式讀取。

流式消費(fèi) Hive 支持分區(qū)表和非分區(qū)表。對(duì)于非分區(qū)表會(huì)監(jiān)控表目錄下新文件添加,并增量讀取。對(duì)于分區(qū)表通過監(jiān)控分區(qū)目錄和 Metastore 的方式確認(rèn)是否有新分區(qū)添加,如果有新增分區(qū),就會(huì)把新增分區(qū)數(shù)據(jù)讀取出來。這里需要注意,讀新增分區(qū)數(shù)據(jù)是一次性的。也就是新增加分區(qū)后,會(huì)把這個(gè)分區(qū)數(shù)據(jù)一次性都讀出來,在這之后就不再監(jiān)控這個(gè)分區(qū)的數(shù)據(jù)了。所以如果需要用 Flink 流式消費(fèi) Hive 的分區(qū)表,那應(yīng)該保證分區(qū)在添加的時(shí)候它的數(shù)據(jù)是完整的。

Flink 1.11與Hive批流一體數(shù)倉(cāng)的示例分析

流式消費(fèi) Hive 數(shù)據(jù)也需要額外的指定一些參數(shù)。首先要指定消費(fèi)順序,因?yàn)閿?shù)據(jù)是增量讀取,所以需要指定要用什么順序消費(fèi)數(shù)據(jù),目前支持兩種消費(fèi)順序 create-time 和 partition-time。

用戶還可以指定消費(fèi)起點(diǎn),類似于消費(fèi) kafka 指定 offset 這樣的功能,希望從哪個(gè)時(shí)間點(diǎn)的數(shù)據(jù)開始消費(fèi)。Flink 去消費(fèi)數(shù)據(jù)的時(shí)候,就會(huì)檢查并只會(huì)讀取這個(gè)時(shí)間點(diǎn)之后的數(shù)據(jù)。

最后還可以指定監(jiān)控的間隔。因?yàn)槟壳氨O(jiān)控新數(shù)據(jù)的添加都是要掃描文件系統(tǒng)的,可能你希望監(jiān)控的不要太頻繁,太頻繁會(huì)給文件系統(tǒng)造成比較大的壓力。所以可以控制一個(gè)間隔。

最后看下流式消費(fèi)的原理。先看流式消費(fèi)非分區(qū)表:

Flink 1.11與Hive批流一體數(shù)倉(cāng)的示例分析

圖中 ContinuoousFileMonitoringFunction 會(huì)不斷監(jiān)控非分區(qū)表目錄下面的文件,會(huì)不斷的跟文件系統(tǒng)進(jìn)行交互。一旦發(fā)現(xiàn)有新的文件添加了,就會(huì)對(duì)這些文件生成Splits ,并將 Splits 傳到 ContinuoousFileReaderOperator,F(xiàn)ileReaderOperator 拿到 Splits 后就會(huì)到文件系統(tǒng)中實(shí)際的消費(fèi)這些數(shù)據(jù),然后把讀出來的數(shù)據(jù)再傳往下游處理。

Flink 1.11與Hive批流一體數(shù)倉(cāng)的示例分析

對(duì)于流式消費(fèi)分區(qū)表和非分區(qū)表區(qū)別不是很大,其中 HiveContinuousMonitoringFunction 也會(huì)去不斷的掃描文件系統(tǒng),但是它掃描的是新增分區(qū)的目錄。當(dāng)它發(fā)現(xiàn)有新增的分區(qū)目錄后,會(huì)進(jìn)一步到 metstore 中做核查,查看是否這個(gè)分區(qū)已經(jīng)提交到 metstore 中。如果已經(jīng)提交,那就可以消費(fèi)分區(qū)中的數(shù)據(jù)了。然后會(huì)把分區(qū)中的數(shù)據(jù)生成 Splits 傳給 ContinuousFileReaderOperator ,然后就可以對(duì)數(shù)據(jù)進(jìn)行消費(fèi)了。

7. 關(guān)聯(lián) Hive 維表

關(guān)于 Hive 跟流式數(shù)據(jù)結(jié)合的另一個(gè)場(chǎng)景就是:關(guān)聯(lián) Hive 維表。例如在消費(fèi)流式數(shù)據(jù)時(shí),與一張線下的 Hive 維表進(jìn)行 join。

Flink 1.11與Hive批流一體數(shù)倉(cāng)的示例分析

關(guān)聯(lián)Hive維表采用了 Flink 的 Temporal Table 的語法,就是把 Hive 的維表作為Temporal Table,然后與流式的表進(jìn)行 join。想了解更多關(guān)于 Temporal Table 的內(nèi)容,可查看 Flink 的官網(wǎng)。

關(guān)聯(lián) Hive 維表的實(shí)現(xiàn)是每個(gè) sub-task 將 Hive 表緩存在內(nèi)存中,是緩存整張的Hive 表。如果 Hive 維表大小超過 sub-task 的可用內(nèi)存,那么作業(yè)會(huì)失敗。

Hive 維表在關(guān)聯(lián)的時(shí)候,Hive 維表可能會(huì)發(fā)生更新,所以會(huì)允許用戶設(shè)置 hive 表緩存的超時(shí)時(shí)間。超過這個(gè)時(shí)間后,sub-task 會(huì)重新加載 Hive 維表。需要注意,這種場(chǎng)景不適用于 Hive 維表頻繁更新的情況,這樣會(huì)對(duì) HDFS 文件系統(tǒng)造成很大的壓力。所以適用于 Hive 維表緩慢更新的情況。緩存超時(shí)時(shí)間一般設(shè)置的比較長(zhǎng),一般是小時(shí)級(jí)別的。

Flink 1.11與Hive批流一體數(shù)倉(cāng)的示例分析

這張圖表示的是關(guān)聯(lián) Hive 維表的原理。Streaming Data 代表流式數(shù)據(jù),LookupJoinRunner 表示 Join 算子,它會(huì)拿到流式數(shù)據(jù)的 join key,并把 join key 傳給FileSystemLookupFunction。

FileSystemLookupFunction 是 一個(gè)Table function,它會(huì)去跟底層的文件系統(tǒng)交互并加載 Hive 表,然后在 Hive 表中查詢 join key,判斷哪些行數(shù)據(jù)是可以 join的。

下面是關(guān)聯(lián) Hive 維表的例子:

Flink 1.11與Hive批流一體數(shù)倉(cāng)的示例分析

這是 Flink 官網(wǎng)的一個(gè)例子,流式表是 Orders,LatestTates 是 Hive 的維表。

三、Hive 批流一體數(shù)倉(cāng)

經(jīng)過上面的介紹可以看出,在 Flink 1.11 中,在 Hive 數(shù)倉(cāng)和批流一體的功能是進(jìn)行了著重的開發(fā)。因?yàn)?Flink 是一個(gè)流處理的引擎,希望幫用戶更好的將批和流結(jié)合,讓 Hive 數(shù)倉(cāng)實(shí)現(xiàn)實(shí)時(shí)化的改造,讓用戶更方便的挖掘數(shù)據(jù)的價(jià)值。

Flink 1.11與Hive批流一體數(shù)倉(cāng)的示例分析

在 Flink 1.11 之前,F(xiàn)link 對(duì)接 Hive 會(huì)做些批處理的計(jì)算,并且只支持離線的場(chǎng)景。離線的場(chǎng)景一個(gè)問題是延遲比較大,批作業(yè)的調(diào)度一般都會(huì)通過一些調(diào)度的框架去調(diào)度。這樣其實(shí)延遲會(huì)有累加的作用。例如第一個(gè) job 跑完,才能去跑第二個(gè) job...這樣依次執(zhí)行。所以端對(duì)端的延遲就是所有 job 的疊加。

Flink 1.11與Hive批流一體數(shù)倉(cāng)的示例分析

到了 1.11 之后,支持了 Hive 的流式處理的能力,就可以對(duì) Hive 數(shù)倉(cāng)進(jìn)行一個(gè)實(shí)時(shí)化的改造。
例如 Online 的一些數(shù)據(jù),用 Flink 做 ETL,去實(shí)時(shí)的寫入 Hive。當(dāng)數(shù)據(jù)寫入 Hive之后,可以進(jìn)一步接一個(gè)新的 Flink job,來做實(shí)時(shí)的查詢或者近似實(shí)時(shí)的查詢,可以很快的返回結(jié)果。同時(shí),其他的 Flink job 還可以利用寫入 Hive 數(shù)倉(cāng)的數(shù)據(jù)作為維表,來跟其它線上的數(shù)據(jù)進(jìn)行關(guān)聯(lián)整合,來得到分析的結(jié)果。

以上是“Flink 1.11與Hive批流一體數(shù)倉(cāng)的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI