溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

flink中Look up維表怎么使用

發(fā)布時間:2021-12-31 10:41:45 來源:億速云 閱讀:202 作者:iii 欄目:大數(shù)據(jù)

本篇內(nèi)容主要講解“flink中Look up維表怎么使用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“flink中Look up維表怎么使用”吧!

背景

在流式計算中,維表是一個很常見的概念,一般用于sql的join中,對流式數(shù)據(jù)進行數(shù)據(jù)補全,比如我們的source stream是來自日志的訂單數(shù)據(jù),但是日志中我們只是記錄了訂單商品的id,并沒有其他的信息,但是我們把數(shù)據(jù)存入數(shù)倉進行數(shù)據(jù)分析的時候,卻需要商品名稱、價格等等其他的信息,這種問題我們可以在進行流處理的時候通過查詢維表的方式對數(shù)據(jù)進行數(shù)據(jù)補全。

維表一般存儲在外部存儲中,比如mysql、hbase、redis等等,今天我們以mysql為例,講講flink中維表的使用。

LookupableTableSource

在flink中提供了一個LookupableTableSource,可以用于實現(xiàn)維表,也就是我們可以通過某幾個key列去查詢外部存儲來獲取相關的信息用于補全stream的數(shù)據(jù)。

public interface LookupableTableSource<T> extends TableSource<T> {

TableFunction<T> getLookupFunction(String[] lookupKeys);

AsyncTableFunction<T> getAsyncLookupFunction(String[] lookupKeys);

boolean isAsyncEnabled();
}

我們看到,LookupableTableSource有三個方法

  • getLookupFunction:用于同步查詢維表的數(shù)據(jù),返回一個TableFunction,所以本質(zhì)上還是通過用戶自定義 UDTF來實現(xiàn)的。

  • getAsyncLookupFunction:用于異步查詢維表的數(shù)據(jù),該方法返回一個對象

  • isAsyncEnabled:默認情況下是同步查詢,如果要開啟異步查詢,這個方法需要返回true

在flink里,我們看到實現(xiàn)了這個接口的主要有四個類,JdbcTableSource,HBaseTableSource,CsvTableSource,HiveTableSource,今天我們主要以jdbc為例講講如何進行維表查詢。

實例講解

接下來我們講一個小例子,首先定義一下stream source,我們使用flink 1.11提供的datagen來生成數(shù)據(jù)。

我們來模擬生成用戶的數(shù)據(jù),這里只生成的用戶的id,范圍在1-100之間。

CREATE TABLE datagen (
userid int,
proctime as PROCTIME()
) WITH (
'connector' = 'datagen',
'rows-per-second'='100',
'fields.userid.kind'='random',
'fields.userid.min'='1',
'fields.userid.max'='100'
)

datagen具體的使用方法可以參考:

聊聊flink 1.11 中的隨機數(shù)據(jù)生成器-DataGen connector

然后再創(chuàng)建一個mysql維表信息:

CREATE TABLE dim_mysql (
 id int,
 name STRING,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://localhost:3306/test',
  'table-name' = 'userinfo',
  'username' = 'root',
  'password' = 'root'
)

我們這個mysql表中樣例數(shù)據(jù)如下:

flink中Look up維表怎么使用

最后執(zhí)行sql查詢,流表關聯(lián)維表:

SELECT * FROM datagen LEFT JOIN dim_mysql FOR SYSTEM_TIME AS OF datagen.proctime  ON datagen.userid = dim_mysql.id

結(jié)果示例如下:

3> 53,2020-09-03T07:19:34.565,null,null
3> 73,2020-09-03T07:19:34.566,null,null
1> 14,2020-09-03T07:19:34.566,14,aaddda
2> 11,2020-09-03T07:19:34.566,null,null
4> 8,2020-09-03T07:19:34.566,8,name8
1> 61,2020-09-03T07:19:34.567,null,null
3> 12,2020-09-03T07:19:34.567,12,aaa
2> 99,2020-09-03T07:19:34.567,null,null
4> 37,2020-09-03T07:19:34.568,null,null
2> 13,2020-09-03T07:19:34.569,13,aaddda
3> 6,2020-09-03T07:19:34.568,6,name6

我們看到對于維表中存在的數(shù)據(jù),已經(jīng)關聯(lián)出來了,對于維表中沒有的數(shù)據(jù),顯示為null

源碼解析

JdbcTableSource

以jdbc為例,我們來看看flink底層是怎么做的。

JdbcTableSource#isAsyncEnabled方法返回的是false,也就是不支持異步的查詢,所以進入JdbcTableSource#getLookupFunction方法。

	@Override
public TableFunction<Row> getLookupFunction(String[] lookupKeys) {
final RowTypeInfo rowTypeInfo = (RowTypeInfo) fromDataTypeToLegacyInfo(producedDataType);
return JdbcLookupFunction.builder()
.setOptions(options)
.setLookupOptions(lookupOptions)
.setFieldTypes(rowTypeInfo.getFieldTypes())
.setFieldNames(rowTypeInfo.getFieldNames())
.setKeyNames(lookupKeys)
.build();
}

最終是構(gòu)造了一個JdbcLookupFunction對象,

  • options是連接jdbc的一些參數(shù),比如user、pass、url等。

  • lookupOptions是一些有關維表的參數(shù),主要是緩存的大小、超時時間等。

  • lookupKeys也就是要去關聯(lián)查詢維表的字段。

JdbcLookupFunction

所以我們來看看JdbcLookupFunction類,這個JdbcLookupFunction是一個TableFunction的子類,具體的TableFunction的使用可以參考這個文章:

Flink實戰(zhàn)教程-自定義函數(shù)之TableFunction

一個TableFunction最核心的就是eval方法,在這個方法里,做的主要的工作就是通過傳進來的多個keys拼接成sql去來查詢數(shù)據(jù),首先查詢的是緩存,緩存有數(shù)據(jù)就直接返回,緩存沒有的話再去查詢數(shù)據(jù)庫,然后再將查詢的結(jié)果返回并放入緩存,下次查詢的時候直接查詢緩存。

為什么要加一個緩存呢?默認情況下是不開啟緩存的,每來一個查詢,都會給維表發(fā)送一個請求去查詢,如果數(shù)據(jù)量比較大的話,勢必會給存儲維表的系統(tǒng)造成一定的壓力,所以flink提供了一個LRU緩存,查詢維表的時候,先查詢緩存,緩存沒有再去查詢外部系統(tǒng),但是如果有一個數(shù)據(jù)查詢頻率比較高,一直被命中,就無法獲取新數(shù)據(jù)了。所以緩存還要加一個超時時間,過了這個時間,把這個數(shù)據(jù)強制刪除,去外部系統(tǒng)查詢新的數(shù)據(jù)。

具體的怎么開啟緩存呢?我們看下JdbcLookupFunction#open方法

	@Override
public void open(FunctionContext context) throws Exception {
try {
establishConnectionAndStatement();
this.cache = cacheMaxSize == -1 || cacheExpireMs == -1 ? null : CacheBuilder.newBuilder()
.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
.maximumSize(cacheMaxSize)
.build();
} catch (SQLException sqe) {
throw new IllegalArgumentException("open() failed.", sqe);
} catch (ClassNotFoundException cnfe) {
throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
}
}

也就是說cacheMaxSize和cacheExpireMs需要同時設置,就會構(gòu)造一個緩存對象cache來緩存數(shù)據(jù).這兩個參數(shù)對應的DDL的屬性就是lookup.cache.max-rows和lookup.cache.ttl

對于具體的緩存的大小和超時時間的設置,用戶需要根據(jù)自身的情況來自己定義,在數(shù)據(jù)的準確性和系統(tǒng)的吞吐量之間做一個權衡。

到此,相信大家對“flink中Look up維表怎么使用”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關內(nèi)容可以進入相關頻道進行查詢,關注我們,繼續(xù)學習!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。

AI