溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

flink sql怎么實(shí)時(shí)計(jì)算當(dāng)天pv寫入mysql

發(fā)布時(shí)間:2021-09-16 12:41:15 來源:億速云 閱讀:280 作者:chen 欄目:大數(shù)據(jù)

這篇文章主要講解了“flink sql怎么實(shí)時(shí)計(jì)算當(dāng)天pv寫入mysql”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“flink sql怎么實(shí)時(shí)計(jì)算當(dāng)天pv寫入mysql”吧!

首先我們還是使用datagen生成測試數(shù)據(jù),隨機(jī)生成一些用戶id

     String sourceSql = "CREATE TABLE datagen (\n" +
               " userid int,\n" +
               " proctime as PROCTIME()\n" +
               ") WITH (\n" +
               " 'connector' = 'datagen',\n" +
               " 'rows-per-second'='100',\n" +
               " 'fields.userid.kind'='random',\n" +
               " 'fields.userid.min'='1',\n" +
               " 'fields.userid.max'='100'\n" +
               ")";

定義mysql的sink,這里mysql是作為了一個(gè)upsert的sink,所以必須要一個(gè)主鍵,在mysql建表的時(shí)候我們指定了當(dāng)天的日期作為主鍵,mysql ddl如下

CREATE TABLE `pv` (
 `day_str` varchar(100) NOT NULL,
 `pv` bigint(10) DEFAULT NULL,
 PRIMARY KEY (`day_str`)
)

Flink中的ddl要和mysql中對(duì)的上,也要指定主鍵。

    String mysqlsql = "CREATE TABLE pv (\n" +
               "  day_str STRING,\n" +
               "  pv bigINT,\n" +
               "  PRIMARY KEY (day_str) NOT ENFORCED\n" +
               ") WITH (\n" +
               "   'connector' = 'jdbc',\n" +
               "   'username' = 'root',\n" +
               "   'password' = 'root',\n" +
               "   'url' = 'jdbc:mysql://localhost:3306/test',\n" +
               "   'table-name' = 'pv'\n" +
               ")";

接下來我們寫一個(gè)簡單的查詢:

     tEnv.executeSql("insert into pv SELECT DATE_FORMAT(proctime, 'yyyy-MM-dd') as day_str, count(*) \n" +
               "FROM datagen \n" +
               "GROUP BY DATE_FORMAT(proctime, 'yyyy-MM-dd')");

可能對(duì)于以前一直做批處理的同學(xué)來說會(huì)感到疑惑,對(duì)于流式處理來說,group by將會(huì)返回一個(gè)可撤回流(RetractStream),轉(zhuǎn)化成datastream,將會(huì)得到一個(gè)Tuple2<Boolean, T>對(duì)象,這個(gè)對(duì)象第一個(gè)字段如果是false表示數(shù)據(jù)要撤回,true表示數(shù)據(jù)是我們新添加的,第二個(gè)字段是實(shí)際的數(shù)據(jù)。在這里,我們將這個(gè)實(shí)時(shí)更新的結(jié)果寫入到了mysql。這樣mysql表,每天就會(huì)只有一個(gè)數(shù)據(jù),系統(tǒng)會(huì)不斷地更新pv字段。

flink sql怎么實(shí)時(shí)計(jì)算當(dāng)天pv寫入mysql

類似的需求我們還可以使用flink的窗口來實(shí)現(xiàn),定義一個(gè)窗口周期是一天的窗口,然后自定義一個(gè)觸發(fā)器,比如每秒鐘觸發(fā)一次,然后將結(jié)果輸出寫入第三方sink。

感謝各位的閱讀,以上就是“flink sql怎么實(shí)時(shí)計(jì)算當(dāng)天pv寫入mysql”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對(duì)flink sql怎么實(shí)時(shí)計(jì)算當(dāng)天pv寫入mysql這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI