您好,登錄后才能下訂單哦!
這篇文章主要講解了“怎么使用Flink TableAPI和SQL /Elasticsearch”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“怎么使用Flink TableAPI和SQL /Elasticsearch”吧!
使用Tbale&SQL與Flink Elasticsearch Connector 連接器將數(shù)據(jù)寫入Elasticsearch引擎的索引
示例環(huán)境
java.version: 1.8.x flink.version: 1.11.1 elasticsearch:6.x
示例數(shù)據(jù)源 (項目碼云下載)
Flink 系例 之 搭建開發(fā)環(huán)境與數(shù)據(jù)
示例模塊 (pom.xml)
Flink 系例 之 TableAPI & SQL 與 示例模塊
InsertToEs.java
package com.flink.examples.elasticsearch; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; /** * @Description 使用Tbale&SQL與Flink Elasticsearch連接器將數(shù)據(jù)寫入Elasticsearch引擎的索引 */ public class InsertToEs { /** * Apache Flink 有兩種關系型 API 來做流批統(tǒng)一處理:Table API 和 SQL。 * 參考官方:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/elasticsearch.html */ //參見屬性配置類:ElasticsearchValidator static String table_sql = "CREATE TABLE my_users (\n" + " user_id STRING,\n" + " user_name STRING,\n" + " uv BIGINT,\n" + " pv BIGINT,\n" + " PRIMARY KEY (user_id) NOT ENFORCED\n" + ") WITH (\n" + " 'connector.type' = 'elasticsearch',\n" + " 'connector.version' = '6',\n" + " 'connector.property-version' = '1', \n" + " 'connector.hosts' = 'http://192.168.110.35:9200',\n" + " 'connector.index' = 'users',\n" + " 'connector.document-type' = 'doc',\n" + " 'format.type' = 'json',\n" + " 'update-mode'='append' -- append|upsert\n" + ")"; public static void main(String[] args) { //構建StreamExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //默認流時間方式 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //構建EnvironmentSettings 并指定Blink Planner EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //構建StreamTableEnvironment StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings); //注冊kafka數(shù)據(jù)維表 tEnv.executeSql(table_sql); //Elasticsearch connector 目前只支持了 sink,不支持 source 。不能SELECT elasticsearch table,因此只能通過insert的方式提交數(shù)據(jù); String sql = "insert into my_users (user_id,user_name,uv,pv) values('10003','tom',31,10)"; // TableResult tableResult = tEnv.executeSql(sql); //第二種方式:聲明一個操作集合來執(zhí)行sql StatementSet stmtSet = tEnv.createStatementSet(); stmtSet.addInsertSql(sql); TableResult tableResult = stmtSet.execute(); tableResult.print(); } }
打印結果
+-------------------------------------------+ | default_catalog.default_database.my_users | +-------------------------------------------+ | -1 | +-------------------------------------------+ 1 row in set
感謝各位的閱讀,以上就是“怎么使用Flink TableAPI和SQL /Elasticsearch”的內容了,經(jīng)過本文的學習后,相信大家對怎么使用Flink TableAPI和SQL /Elasticsearch這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內容。