您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關(guān)如何使用SQL讀取Kafka并寫(xiě)入MySQL,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
筆者一開(kāi)始是想用 SQL Client 來(lái)貫穿整個(gè)演示環(huán)節(jié),但可惜 1.9 版本 SQL CLI 還不支持處理 CREATE TABLE 語(yǔ)句。所以筆者就只好自己寫(xiě)了個(gè)簡(jiǎn)單的提交腳本。后來(lái)想想,也挺好的,可以讓聽(tīng)眾同時(shí)了解如何通過(guò) SQL 的方式,和編程的方式使用 Flink SQL。
SqlSubmit 的主要任務(wù)是執(zhí)行和提交一個(gè) SQL 文件,實(shí)現(xiàn)非常簡(jiǎn)單,就是通過(guò)正則表達(dá)式匹配每個(gè)語(yǔ)句塊。如果是 CREATE TABLE 或 INSERT INTO 開(kāi)頭,則會(huì)調(diào)用 tEnv.sqlUpdate(...)。如果是 SET 開(kāi)頭,則會(huì)將配置設(shè)置到 TableConfig 上。其核心代碼主要如下所示:
EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build();// 創(chuàng)建一個(gè)使用 Blink Planner 的 TableEnvironment, 并工作在流模式TableEnvironment tEnv = TableEnvironment.create(settings);// 讀取 SQL 文件List<String> sql = Files.readAllLines(path);// 通過(guò)正則表達(dá)式匹配前綴,來(lái)區(qū)分不同的 SQL 語(yǔ)句List<SqlCommandCall> calls = SqlCommandParser.parse(sql);// 根據(jù)不同的 SQL 語(yǔ)句,調(diào)用 TableEnvironment 執(zhí)行for (SqlCommandCall call : calls) { switch (call.command) { case SET: String key = call.operands[0]; String value = call.operands[1]; // 設(shè)置參數(shù) tEnv.getConfig().getConfiguration().setString(key, value); break; case CREATE_TABLE: String ddl = call.operands[0]; tEnv.sqlUpdate(ddl); break; case INSERT_INTO: String dml = call.operands[0]; tEnv.sqlUpdate(dml); break; default: throw new RuntimeException("Unsupported command: " + call.command); } }// 提交作業(yè)tEnv.execute("SQL Job");
在 flink-sql-submit 項(xiàng)目中,我們準(zhǔn)備了一份測(cè)試數(shù)據(jù)集(來(lái)自阿里云天池公開(kāi)數(shù)據(jù)集,特別鳴謝),位于 src/main/resources/user_behavior.log。數(shù)據(jù)以 JSON 格式編碼,大概長(zhǎng)這個(gè)樣子:
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
為了模擬真實(shí)的 Kafka 數(shù)據(jù)源,筆者還特地寫(xiě)了一個(gè) source-generator.sh 腳本(感興趣的可以看下源碼),會(huì)自動(dòng)讀取 user_behavior.log 的數(shù)據(jù)并以默認(rèn)每毫秒1條的速率灌到 Kafka 的 user_behavior topic 中。
有了數(shù)據(jù)源后,我們就可以用 DDL 去創(chuàng)建并連接這個(gè) Kafka 中的 topic(詳見(jiàn) src/main/resources/q1.sql)。
CREATE TABLE user_log ( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP ) WITH ( 'connector.type' = 'kafka', -- 使用 kafka connector 'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本 'connector.topic' = 'user_behavior', -- kafka topic 'connector.startup-mode' = 'earliest-offset', -- 從起始 offset 開(kāi)始讀取 'connector.properties.0.key' = 'zookeeper.connect', -- 連接信息 'connector.properties.0.value' = 'localhost:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = 'localhost:9092', 'update-mode' = 'append', 'format.type' = 'json', -- 數(shù)據(jù)源格式為 json 'format.derive-schema' = 'true' -- 從 DDL schema 確定 json 解析規(guī)則 )
注:可能有用戶會(huì)覺(jué)得其中的 connector.properties.0.key 等參數(shù)比較奇怪,社區(qū)計(jì)劃將在下一個(gè)版本中改進(jìn)并簡(jiǎn)化 connector 的參數(shù)配置。
連接 MySQL 可以使用 Flink 提供的 JDBC connector。例如
CREATE TABLE pvuv_sink ( dt VARCHAR, pv BIGINT, uv BIGINT ) WITH ( 'connector.type' = 'jdbc', -- 使用 jdbc connector 'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url 'connector.table' = 'pvuv_sink', -- 表名 'connector.username' = 'root', -- 用戶名 'connector.password' = '123456', -- 密碼 'connector.write.flush.max-rows' = '1' -- 默認(rèn)5000條,為了演示改為1條 )
假設(shè)我們的需求是計(jì)算每小時(shí)全網(wǎng)的用戶訪問(wèn)量,和獨(dú)立用戶數(shù)。很多用戶可能會(huì)想到使用滾動(dòng)窗口來(lái)計(jì)算。但這里我們介紹另一種方式。即 Group Aggregation 的方式。
INSERT INTO pvuv_sink SELECT DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt, COUNT(*) AS pv, COUNT(DISTINCT user_id) AS uv FROM user_log GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00')
它使用 DATE_FORMAT 這個(gè)內(nèi)置函數(shù),將日志時(shí)間歸一化成“年月日小時(shí)”的字符串格式,并根據(jù)這個(gè)字符串進(jìn)行分組,即根據(jù)每小時(shí)分組,然后通過(guò) COUNT(*) 計(jì)算用戶訪問(wèn)量(PV),通過(guò) COUNT(DISTINCT user_id) 計(jì)算獨(dú)立用戶數(shù)(UV)。這種方式的執(zhí)行模式是每收到一條數(shù)據(jù),便會(huì)進(jìn)行基于之前計(jì)算的值做增量計(jì)算(如+1),然后將最新結(jié)果輸出。所以實(shí)時(shí)性很高,但輸出量也大。
我們將這個(gè)查詢的結(jié)果,通過(guò) INSERT INTO 語(yǔ)句,寫(xiě)到了之前定義的 pvuv_sink MySQL 表中。
注:在深圳 Meetup 中,我們有對(duì)這種查詢的性能調(diào)優(yōu)做了深度的介紹。
本實(shí)戰(zhàn)演示環(huán)節(jié)需要安裝一些必須的服務(wù),包括:
Flink 本地集群:用來(lái)運(yùn)行 Flink SQL 任務(wù)。
Kafka 本地集群:用來(lái)作為數(shù)據(jù)源。
MySQL 數(shù)據(jù)庫(kù):用來(lái)作為結(jié)果表。
Flink 本地集群安裝
1.下載 Flink 1.9.0 安裝包并解壓: https://www.apache.org/dist/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgz
2.下載以下依賴 jar 包,并拷貝到 flink-1.9.0/lib/ 目錄下。因?yàn)槲覀冞\(yùn)行時(shí)需要依賴各個(gè) connector 實(shí)現(xiàn)。
flink-sql-connector-kafka_2.11-1.9.0.jar
http://central.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.9.0/flink-sql-connector-kafka_2.11-1.9.0.jar
flink-json-1.9.0-sql-jar.jar
http://central.maven.org/maven2/org/apache/flink/flink-json/1.9.0/flink-json-1.9.0-sql-jar.jar
flink-jdbc_2.11-1.9.0.jar
http://central.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.9.0/flink-jdbc_2.11-1.9.0.jar
mysql-connector-java-5.1.48.jar
https://dev.mysql.com/downloads/connector/j/5.1.html
3.將 flink-1.9.0/conf/flink-conf.yaml 中的 taskmanager.numberOfTaskSlots 修改成 10,因?yàn)槲覀兊难菔救蝿?wù)可能會(huì)消耗多于1個(gè)的 slot。
4.在 flink-1.9.0 目錄下執(zhí)行 ./bin/start-cluster.sh,啟動(dòng)集群。
運(yùn)行成功的話,可以在 http://localhost:8081 訪問(wèn)到 Flink Web UI。
另外,還需要將 Flink 的安裝路徑填到 flink-sql-submit 項(xiàng)目的 env.sh 中,用于后面提交 SQL 任務(wù),如我的路徑是
FLINK_DIR=/Users/wuchong/dev/install/flink-1.9.0
下載 Kafka 2.2.0 安裝包并解壓: https://www.apache.org/dist/kafka/2.2.0/kafka_2.11-2.2.0.tgz
將安裝路徑填到 flink-sql-submit 項(xiàng)目的 env.sh 中,如我的路徑是
KAFKA_DIR=/Users/wuchong/dev/install/kafka_2.11-2.2.0
在 flink-sql-submit 目錄下運(yùn)行 ./start-kafka.sh 啟動(dòng) Kafka 集群。
在命令行執(zhí)行 jps,如果看到 Kafka 進(jìn)程和 QuorumPeerMain 進(jìn)程即表明啟動(dòng)成功。
可以在官方頁(yè)面下載 MySQL 并安裝:
https://dev.mysql.com/downloads/mysql/
如果有 Docker 環(huán)境的話,也可以直接通過(guò) Docker 安裝
https://hub.docker.com/_/mysql
$ docker pull mysql $ docker run --name mysqldb -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql
然后在 MySQL 中創(chuàng)建一個(gè) flink-test
的數(shù)據(jù)庫(kù),并按照上文的 schema 創(chuàng)建 pvuv_sink
表。
在 flink-sql-submit
目錄下運(yùn)行 ./source-generator.sh
,會(huì)自動(dòng)創(chuàng)建 user_behavior topic
,并實(shí)時(shí)往里灌入數(shù)據(jù)
在 flink-sql-submit
目錄下運(yùn)行 ./run.sh q1
, 提交成功后,可以在 Web UI 中看到拓?fù)洹?br/>
在 MySQL 客戶端,我們也可以實(shí)時(shí)地看到每個(gè)小時(shí)的 pv uv 值在不斷地變化
看完上述內(nèi)容,你們對(duì)如何使用SQL讀取Kafka并寫(xiě)入MySQL有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。