溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

如何使用Hive Catalog

發(fā)布時(shí)間:2021-12-16 13:53:16 來(lái)源:億速云 閱讀:841 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要講解了“如何使用Hive Catalog”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“如何使用Hive Catalog”吧!

什么是Hive Catalog

我們知道,Hive使用Hive Metastore(HMS)存儲(chǔ)元數(shù)據(jù)信息,使用關(guān)系型數(shù)據(jù)庫(kù)來(lái)持久化存儲(chǔ)這些信息。所以,F(xiàn)link集成Hive需要打通Hive的metastore,去管理Flink的元數(shù)據(jù),這就是Hive Catalog的功能。

Hive Catalog的主要作用是使用Hive MetaStore去管理Flink的元數(shù)據(jù)。Hive Catalog可以將元數(shù)據(jù)進(jìn)行持久化,這樣后續(xù)的操作就可以反復(fù)使用這些表的元數(shù)據(jù),而不用每次使用時(shí)都要重新注冊(cè)。如果不去持久化catalog,那么在每個(gè)session中取處理數(shù)據(jù),都要去重復(fù)地創(chuàng)建元數(shù)據(jù)對(duì)象,這樣是非常耗時(shí)的。 

如何使用Hive Catalog

HiveCatalog是開(kāi)箱即用的,所以,一旦配置好Flink與Hive集成,就可以使用HiveCatalog。比如,我們通過(guò)FlinkSQL 的DDL語(yǔ)句創(chuàng)建一張kafka的數(shù)據(jù)源表,立刻就能查看該表的元數(shù)據(jù)信息。

HiveCatalog可以處理兩種類型的表:一種是Hive兼容的表,另一種是普通表(generic table)。其中Hive兼容表是以兼容Hive的方式來(lái)存儲(chǔ)的,所以,對(duì)于Hive兼容表而言,我們既可以使用Flink去操作該表,又可以使用Hive去操作該表。

普通表是對(duì)Flink而言的,當(dāng)使用HiveCatalog創(chuàng)建一張普通表,僅僅是使用Hive MetaStore將其元數(shù)據(jù)進(jìn)行了持久化,所以可以通過(guò)Hive查看這些表的元數(shù)據(jù)信息(通過(guò)DESCRIBE FORMATTED命令),但是不能通過(guò)Hive去處理這些表,因?yàn)檎Z(yǔ)法不兼容。

對(duì)于是否是普通表,F(xiàn)link使用is_generic屬性進(jìn)行標(biāo)識(shí)。默認(rèn)情況下,創(chuàng)建的表是普通表,即is_generic=true,如果要?jiǎng)?chuàng)建Hive兼容表,需要在建表屬性中指定is_generic=false。

尖叫提示:

由于依賴Hive Metastore,所以必須開(kāi)啟Hive MetaStore服務(wù) 

代碼中使用Hive Catalog

   EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        String name            = "myhive";
        String defaultDatabase = "default";
        String hiveConfDir = "/opt/modules/apache-hive-2.3.4-bin/conf";

        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
        tableEnv.registerCatalog("myhive", hive);
        // 使用注冊(cè)的catalog
        tableEnv.useCatalog("myhive");
   

Flink SQLCli中使用Hive Catalog

在FlinkSQL Cli中使用Hive Catalog很簡(jiǎn)單,只需要配置一下sql-cli-defaults.yaml文件即可。配置內(nèi)容如下:

catalogs:
   - name: myhive
     type: hive
     default-database: default
     hive-conf-dir: /opt/modules/apache-hive-2.3.4-bin/conf
 

如何使用Hive Catalog

在FlinkSQL Cli中創(chuàng)建一張kafka表,該表默認(rèn)為普通表,即is_generic=true

CREATE TABLE user_behavior ( 
    `user_id` BIGINT, -- 用戶id
    `item_id` BIGINT, -- 商品id
    `cat_id` BIGINT, -- 品類id
    `action` STRING, -- 用戶行為
    `province` INT, -- 用戶所在的省份
    `ts` BIGINT, -- 用戶行為發(fā)生的時(shí)間戳
    `proctime` AS PROCTIME(), -- 通過(guò)計(jì)算列產(chǎn)生一個(gè)處理時(shí)間列
    `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件時(shí)間
     WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND  -- 定義watermark
 ) WITH ( 
    'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'user_behavior', -- kafka主題
    'scan.startup.mode' = 'earliest-offset', -- 偏移量
    'properties.group.id' = 'group1', -- 消費(fèi)者組
    'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 
    'format' = 'json', -- 數(shù)據(jù)源格式為json
    'json.fail-on-missing-field' = 'true',
    'json.ignore-parse-errors' = 'false'
);
 

我們可以在Hive客戶端中查看該表的元數(shù)據(jù)信息

hive (default)> desc formatted  user_behavior;
Table Parameters:                
       ...
        is_generic              true                
      ...         
 

從上面的元數(shù)據(jù)信息可以看出,is_generic=true,說(shuō)明該表是一張普通表,如果在Hive中去查看該表,則會(huì)報(bào)錯(cuò)。

上面創(chuàng)建的表是普通表,該表不能使用Hive去查詢。那么,該如何創(chuàng)建一張Hive兼容表呢?我們只需要在建表的屬性中顯示指定is_generic=false即可,具體如下:

CREATE TABLE hive_compatible_tbl ( 
    `user_id` BIGINT, -- 用戶id
    `item_id` BIGINT, -- 商品id
    `cat_id` BIGINT, -- 品類id
    `action` STRING, -- 用戶行為
    `province` INT, -- 用戶所在的省份
    `ts` BIGINT -- 用戶行為發(fā)生的時(shí)間戳
 ) WITH ( 
    'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'user_behavior', -- kafka主題
    'scan.startup.mode' = 'earliest-offset', -- 偏移量
    'properties.group.id' = 'group1', -- 消費(fèi)者組
    'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 
    'format' = 'json', -- 數(shù)據(jù)源格式為json
    'json.fail-on-missing-field' = 'true',
    'json.ignore-parse-errors' = 'false',
    'is_generic' = 'false'
);
 

當(dāng)我們?cè)贖ive中查看該表的元數(shù)據(jù)信息時(shí),可以看出:is_generic =false

hive (default)> desc formatted hive_compatible_tbl;
Table Parameters:                
        ...           
        is_generic              false               
        ...
 

我們可以使用FlinkSQL Cli或者HiveCli向該表中寫(xiě)入數(shù)據(jù),然后分別通過(guò)FlinkSQL Cli和Hive Cli去查看該表數(shù)據(jù)的變化

hive (default)> insert into hive_compatible_tbl select 2020,1221,100,'buy',11,1574330486;
hive (default)> select * from hive_compatible_tbl;
 

再在FlinkSQL Cli中查看該表,

Flink SQL> select user_id,item_id,action from hive_compatible_tbl;
                   user_id                   item_id                    action
                      2020                      1221                       buy
    
 

同樣,我們可以在FlinkSQL Cli中去向該表中寫(xiě)入數(shù)據(jù):

Flink SQL>  insert into hive_compatible_tbl select 2020,1222,101,'fav',11,1574330486;
Flink SQL> select user_id,item_id,action from hive_compatible_tbl;

                   user_id                   item_id                    action
                      2020                      1221                       buy
                      2020                      1222                       fav
 

尖叫提示:

對(duì)于Hive兼容的表,需要注意數(shù)據(jù)類型,具體的數(shù)據(jù)類型對(duì)應(yīng)關(guān)系以及注意點(diǎn)如下

Flink 數(shù)據(jù)類型Hive 數(shù)據(jù)類型
CHAR(p)CHAR(p)
VARCHAR(p)VARCHAR(p)
STRINGSTRING
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTLONG
FLOATFLOAT
DOUBLEDOUBLE
DECIMAL(p, s)DECIMAL(p, s)
DATEDATE
TIMESTAMP(9)TIMESTAMP
BYTESBINARY
ARRAYLIST
MAP<K, V>MAP<K, V>
ROWSTRUCT

注意

  • Hive     CHAR(p) 類型的最大長(zhǎng)度為255
  • Hive     VARCHAR(p)類型的最大長(zhǎng)度為65535
  • Hive     MAP類型的key僅支持基本類型,而Flink’s     MAP 類型的key執(zhí)行任意類型
  • Hive不支持聯(lián)合數(shù)據(jù)類型,比如STRUCT
  • Hive’s     TIMESTAMP 的精度是 9 , Hive UDFs函數(shù)只能處理 precision <= 9的     TIMESTAMP
  • Hive 不支持 Flink提供的     TIMESTAMP_WITH_TIME_ZONE,     TIMESTAMP_WITH_LOCAL_TIME_ZONE, 及     MULTISET類型
  • Flink     INTERVAL 類型與 Hive     INTERVAL 類型不一樣

上面介紹了普通表和Hive兼容表,那么我們?cè)撊绾问褂肏ive的語(yǔ)法進(jìn)行建表呢?這個(gè)時(shí)候就需要使用Hive Dialect。 

什么是Hive Dialect

從Flink1.11.0開(kāi)始,只要開(kāi)啟了Hive dialect配置,用戶就可以使用HiveQL語(yǔ)法,這樣我們就可以在Flink中使用Hive的語(yǔ)法使用一些DDL和DML操作。

Flink目前支持兩種SQL方言(SQL dialects),分別為:default和hive。默認(rèn)的SQL方言是default,如果要使用Hive的語(yǔ)法,需要將SQL方言切換到hive。 

如何使用Hive Dialect 

在SQL Cli中使用Hive dialect

使用hive dialect只需要配置一個(gè)參數(shù)即可,該參數(shù)名稱為:table.sql-dialect。我們就可以在sql-client-defaults.yaml配置文件中進(jìn)行配置,也可以在具體的會(huì)話窗口中進(jìn)行設(shè)定,對(duì)于SQL dialect的切換,不需要進(jìn)行重啟session。

execution:
  planner: blink
  type: batch
  result-mode: table

configuration:
  table.sql-dialect: hive
 

如果我們需要在SQL Cli中進(jìn)行切換hive dialect,可以使用如下命令:

Flink SQL> set table.sql-dialect=hive; -- 使用hive dialect
Flink SQL> set table.sql-dialect=default; -- 使用default dialect
 

尖叫提示:

一旦切換到了hive dialect,就只能使用Hive的語(yǔ)法建表,如果嘗試使用Flink的語(yǔ)法建表,則會(huì)報(bào)錯(cuò) 

在Table API中配合dialect

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner()...build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// 使用hive dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
// 使用 default dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
   

操作示例

Flink SQL> set table.sql-dialect=hive;
-- 使用Hive語(yǔ)法創(chuàng)建一張表
CREATE TABLE IF NOT EXISTS `hive_dialect_tbl` (
  `id` int COMMENT 'id',
  `name` string COMMENT '名稱',
  `age` int COMMENT '年齡' 
)
COMMENT 'hive dialect表測(cè)試'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
 

進(jìn)入Hive客戶端去查看該表的元數(shù)據(jù)信息

desc formatted hive_dialect_tbl;
col_name        data_type       comment
# col_name              data_type               comment             
                 
id                      int                                         
name                    string                                      
age                     int                                         
                 
# Detailed Table Information             
Database:               default                  
Owner:                  null                     
CreateTime:             Mon Dec 21 17:23:48 CST 2020     
LastAccessTime:         UNKNOWN                  
Retention:              0                        
Location:               hdfs://kms-1.apache.com:8020/user/hive/warehouse/hive_dialect_tbl        
Table Type:             MANAGED_TABLE            
Table Parameters:                
        comment                 hive dialect表測(cè)試     
        is_generic              false               
        transient_lastDdlTime   1608542628          
                 
# Storage Information            
SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe       
InputFormat:            org.apache.hadoop.mapred.TextInputFormat         
OutputFormat:           org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat   
Compressed:             No                       
Num Buckets:            -1                       
Bucket Columns:         []                       
Sort Columns:           []                       
Storage Desc Params:             
        field.delim             ,                   
        serialization.format    ,                   
 

很明顯,該表是一張Hive兼容表,即is_generic=false。

使用FlinkSQLCli向該表中寫(xiě)入一條數(shù)據(jù):

Flink SQL> insert into hive_dialect_tbl select 1,'tom',20;
 

我們也可以在Hive的Cli中去操作該表

hive (default)> select * from hive_dialect_tbl;
hive (default)> insert into hive_dialect_tbl select 2,'jack',22;
 

以下是使用Hive方言的一些注意事項(xiàng)。

  • Hive dialect只能用于操作Hive表,不能用于普通表。Hive方言應(yīng)與HiveCatalog一起使用。
  • 雖然所有Hive版本都支持相同的語(yǔ)法,但是是否有特定功能仍然取決于使用的Hive版本。例如,僅在Hive-2.4.0或更高版本中支持更新數(shù)據(jù)庫(kù)位置。
  • Hive和Calcite具有不同的保留關(guān)鍵字。例如,     default在Calcite中是保留關(guān)鍵字,在Hive中是非保留關(guān)鍵字。所以,在使用Hive dialect時(shí),必須使用反引號(hào)(`)引用此類關(guān)鍵字,才能將其用作標(biāo)識(shí)符。
  • 在Hive中不能查詢?cè)贔link中創(chuàng)建的視圖。

當(dāng)然,一旦開(kāi)啟了Hive dialect,我們就可以按照Hive的操作方式在Flink中去處理Hive的數(shù)據(jù)了,具體的操作與Hive一致,本文不再贅述。

感謝各位的閱讀,以上就是“如何使用Hive Catalog”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)如何使用Hive Catalog這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI