溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

如何正確使用FlinkStreamSQL

發(fā)布時(shí)間:2021-12-23 10:48:32 來源:億速云 閱讀:325 作者:柒染 欄目:大數(shù)據(jù)

本篇文章為大家展示了如何正確使用FlinkStreamSQL,內(nèi)容簡明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過這篇文章的詳細(xì)介紹希望你能有所收獲。

一、前期準(zhǔn)備

項(xiàng)目路徑:https://github.com/DTStack/flinkStreamSQL

官方文檔:https://github.com/DTStack/flinkStreamSQL/blob/1.11_release/docs/quickStart.md

Git Clone 項(xiàng)目

首先,需要將項(xiàng)目從Github上導(dǎo)入到IDEA中(導(dǎo)入方法較多,這里介紹一種常用的)

從IDEA菜單欄里,Git 選項(xiàng) -> Clone -> FlinkStreamSQL 項(xiàng)目地址,點(diǎn)擊Clone即可獲取FlinkStreamSQL 源碼一份! 如何正確使用FlinkStreamSQL

項(xiàng)目下載好后,默認(rèn)分支是 1.11_release,對(duì)應(yīng)的Flink版本是Flink 1.11.x(FlinkStreamSQL 的 release版本對(duì)應(yīng)著 Flink 的 release 版本),需要其他版本的自行切換,推薦使用 1.10_release。

如何正確使用FlinkStreamSQL

項(xiàng)目編譯

項(xiàng)目下載下來后,第一次編譯之前,先將整個(gè)項(xiàng)目maven reimport 一次

如何正確使用FlinkStreamSQL

如果有缺少JAR包,在某度或某歌上搜索即可(項(xiàng)目本身并不依賴什么獨(dú)有的JAR包,畢竟是開源項(xiàng)目),或者在官方釘釘群的文件中搜索看看,會(huì)有意外發(fā)現(xiàn)。

上面操作沒有問題后,就可以開始編譯了。

編譯命令:

mvn clean package -DskipTests

打包結(jié)束后會(huì)生成對(duì)應(yīng)的插件包文件夾,1.8 版本對(duì)應(yīng)的是plugins,1.10 及 之后的版本對(duì)應(yīng)的sqlplugins

如何正確使用FlinkStreamSQL

如果有用不到的插件,可以在項(xiàng)目的root路徑下的pom中,注釋掉不需要使用的插件

【!??!注意?。?!】【?。?!注意?。?!】【?。。∽⒁猓。。 ?/p>

部分插件之間有依賴關(guān)系,所以在注釋的時(shí)候,請(qǐng)小心別把相關(guān)依賴的插件注釋掉

rdb模塊被所有關(guān)系型數(shù)據(jù)庫所依賴,包括impala 模塊(雖然它不是關(guān)系型數(shù)據(jù)庫,但是它使用了JDBC)

core模塊是所有模塊所依賴的,不能注釋??!

Launcher模塊是任務(wù)提交必備,不能注釋!!

Kafka-base模塊是kafka插件的基礎(chǔ),如果使用了kafka插件(不管什么版本),不能注釋??!

1.10 及之后的版本,新增了dirtyData模塊,是用來提供臟數(shù)據(jù)指定存儲(chǔ)功能(比如將臟數(shù)據(jù)存儲(chǔ)到指定mysql數(shù)據(jù)庫中),不能注釋??!

【?。?!注意!?。 俊荆。?!注意?。?!】【?。?!注意!??!】

任務(wù)提交

項(xiàng)目編譯完之后,就可以提交任務(wù)了。任務(wù)提交的方式有l(wèi)ocal、standalone、yarn-session、yarn-per-job模式,后續(xù)會(huì)支持application(需要等到1.12版本)

從idea提交任務(wù)

如果以下概念中,有不懂的,自行查資料了解(學(xué)會(huì)查資料,比問別人更有效率

使用的idea版本是2020.3 公開版,有不一樣的地方自行修改

這里以yarn-per-job模式為例,其他模式類似,可以看文檔自行配置任務(wù)提交參數(shù)

1.配置idea-application

如何正確使用FlinkStreamSQL

有個(gè)快捷的方法,找到LauncherMain,然后運(yùn)行,在idea自動(dòng)生成的application中修改,或者直接"Modify Run Configuration"

如何正確使用FlinkStreamSQL

這里貼下自己一直使用的任務(wù)提交參數(shù),需要的自行修改,每個(gè)參數(shù)具體什么意思,在官方參數(shù)文檔中也有詳細(xì)說明。

-name
Test
-mode
yarnPer
-sql
/dtstack/sql/test/JoinDemoFour.sql
-localSqlPluginPath
/IdeaProjects/StreamSQLOne/sqlplugins
-flinkconf
/dtstack/conf/flink
-yarnconf
/dtstack/conf/yarn
-flinkJarPath
/dtstack/flink-1.10.1/lib
-confProp
{\"metrics.latency.interval\":\"30000\",\"metrics.latency.granularity\":\"operator\",\"time.characteristic\":\"ProcessingTime\",\"disableChain\":\"true\"}
-pluginLoadMode
shipfile
-queue
b

任務(wù)SQL怎么寫?這個(gè)根據(jù)自己的插件,去看對(duì)應(yīng)的插件文檔,最基本的任務(wù)SQL框架是:

CREATE Source(源表) -> CREATE Side(維表,根據(jù)自己業(yè)務(wù)來確定是否需要) -> CREATE Sink(結(jié)果表) -> INSERT INTO Sink blablabla...(實(shí)際執(zhí)行的業(yè)務(wù)SQL,這個(gè)必須要,不然任務(wù)執(zhí)行個(gè)????)

這里也貼下日常使用的SQL,需要自行修改。

CREATE TABLE SourceOne
(
    id        int,
    name      varchar,
    age       bigint,
    phone     bigint,
    birth     timestamp,
    todayTime time,
    todayDate date,
    money     decimal,
    price     double,
    wechat    varchar,
    proName   varchar
) WITH (
      type = 'kafka11',
      bootstrapServers = 'kudu1:9092',
      zookeeperQuorum = 'kudu1:2181/kafka',
      offsetReset = 'latest',
      topic = 'tiezhu_in_one',
      enableKeyPartitions = 'false',
      topicIsPattern = 'false',
      parallelism = '1'
      );

CREATE TABLE DimOne
(
    id    int,
    age   bigint,
    name  varchar,
    birth timestamp,
    PRIMARY KEY (id, age, name),
    period for system_time
) WITH (
      type = 'mysql',
      url = 'jdbc:mysql://k3:3306/tiezhu?useSSL=false',
      userName = 'root',
      password = 'admin123',
      tableName = 'TestOne',
      parallelism = '1',
      cache = 'LRU',
      asyncCapacity = '100',
      asyncTimeout = '1000',
      errorLimit = '10',
      cacheTTLMs = '1000'
      );

CREATE VIEW ViewOne AS
SELECT DO.age       as age,
       SO.todayTime as todayTime,
       SO.todayDate as todayDate,
       SO.name      as name,
       DO.id        as id,
       DO.birth     as birth,
       SO.proName   as proName
FROM SourceOne SO
         LEFT JOIN DimOne DO
                   ON SO.id = DO.id;

CREATE TABLE DimTwo
(
    id         int,
    proName    varchar,
    createDate date,
    createTime time,
    PRIMARY KEY (id),
    period for system_time
) WITH (
      type = 'mysql',
      url = 'jdbc:mysql://k3:3306/tiezhu?useSSL=false',
      userName = 'root',
      password = 'admin123',
      tableName = 'TestDemoTwo',
      parallelism = '1',
      cache = 'LRU',
      asyncCapacity = '100',
      errorLimit = '10'

      );

CREATE View ViewTwo AS
SELECT DimTwo.proName    as proName,
       DimTwo.createDate as createDate,
       DimTwo.createTime as createTime,
       ViewOne.todayTime as todayTime,
       ViewOne.todayDate as todayDate,
       ViewOne.name      as name,
       ViewOne.birth     as birth,
       ViewOne.age       as age,
       DimTwo.id         as id
FROM ViewOne
         LEFT JOIN DimTwo DimTwo
                   ON ViewOne.id = DimTwo.id
                       and '2020-10-28' = DimTwo.createDate
                       and DimTwo.id >= 2;

CREATE TABLE SinkOne
(
    id         int,
    name       varchar,
    age        bigint,
    birth      timestamp,
    todayTime  time,
    todayDate  date,
    createTime time,
    createDate date,
    proName    varchar
) WITH (
      type = 'kafka11',
      bootstrapServers = 'kudu1:9092',
      topic = 'tiezhu_out',
      parallelism = '1',
      updateMode = 'upsert'
      );

INSERT INTO SinkOne
SELECT *
FROM ViewTwo;

如果需要遠(yuǎn)程調(diào)試,那么需要在flink-conf.yaml中增加Flink 的遠(yuǎn)程調(diào)試配置,然后在idea中配置”JVM Remote“,在代碼塊中打斷點(diǎn)(這種方法還能調(diào)試Flink 本身的代碼)

env.java.opts.jobmanager: -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005
env.java.opts.taskmanager: -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006

只需要修改標(biāo)記的這兩個(gè)地方,如果是HA集群,需要根據(jù)日志修改(怎么看日志,怎么修改,自行查資料

如何正確使用FlinkStreamSQL

至此,任務(wù)遠(yuǎn)程提交流程就這些。

本地調(diào)試

如果嫌棄遠(yuǎn)程調(diào)試,那么可以試試FlinkStreamSQL的本地調(diào)試,LocalTest模塊(這個(gè)模塊默認(rèn)是注釋掉的,如果有需要,自行打開即可),使用方法很簡單,修改對(duì)應(yīng)的參數(shù),然后執(zhí)行RUN 即可

如何正確使用FlinkStreamSQL

但是【注意?。。 縇ocalTest模塊的pom文件中有大部分常用的插件模塊,但是如果出現(xiàn)了類似"ClassNotFoundException",大概率是pom中沒有對(duì)應(yīng)的插件模塊,同時(shí)需要注意,Kafka模塊因?yàn)橛蓄悰_突的存在,所以在LocalTest模塊中,Kafka模塊只能存在一種

如何正確使用FlinkStreamSQL

上述內(nèi)容就是如何正確使用FlinkStreamSQL,你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI