溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么使用Flink TableAPI和SQL /Elasticsearch

發(fā)布時間:2021-12-31 10:18:04 來源:億速云 閱讀:189 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要講解了“怎么使用Flink TableAPI和SQL /Elasticsearch”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“怎么使用Flink TableAPI和SQL /Elasticsearch”吧!

使用Tbale&SQL與Flink Elasticsearch Connector 連接器將數(shù)據(jù)寫入Elasticsearch引擎的索引

示例環(huán)境

java.version: 1.8.x
flink.version: 1.11.1
elasticsearch:6.x

示例數(shù)據(jù)源 (項目碼云下載)

Flink 系例 之 搭建開發(fā)環(huán)境與數(shù)據(jù)

示例模塊 (pom.xml)

Flink 系例 之 TableAPI & SQL 與 示例模塊

InsertToEs.java

package com.flink.examples.elasticsearch;

import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Description 使用Tbale&SQL與Flink Elasticsearch連接器將數(shù)據(jù)寫入Elasticsearch引擎的索引
 */
public class InsertToEs {

    /**
     * Apache Flink 有兩種關系型 API 來做流批統(tǒng)一處理:Table API 和 SQL。
     * 參考官方:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/elasticsearch.html
    */

    //參見屬性配置類:ElasticsearchValidator
    static String table_sql = "CREATE TABLE my_users (\n" +
            "  user_id STRING,\n" +
            "  user_name STRING,\n" +
            "  uv BIGINT,\n" +
            "  pv BIGINT,\n" +
            "  PRIMARY KEY (user_id) NOT ENFORCED\n" +
            ") WITH (\n" +
            "  'connector.type' = 'elasticsearch',\n" +
            "  'connector.version' = '6',\n" +
            "  'connector.property-version' = '1', \n" +
            "  'connector.hosts' = 'http://192.168.110.35:9200',\n" +
            "  'connector.index' = 'users',\n" +
            "  'connector.document-type' = 'doc',\n" +
            "  'format.type' = 'json',\n" +
            "  'update-mode'='append' -- append|upsert\n" +
            ")";

    public static void main(String[] args) {
        //構建StreamExecutionEnvironment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //默認流時間方式
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        //構建EnvironmentSettings 并指定Blink Planner
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        //構建StreamTableEnvironment
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
        //注冊kafka數(shù)據(jù)維表
        tEnv.executeSql(table_sql);
        //Elasticsearch connector 目前只支持了 sink,不支持 source 。不能SELECT elasticsearch table,因此只能通過insert的方式提交數(shù)據(jù);
        String sql = "insert into my_users (user_id,user_name,uv,pv) values('10003','tom',31,10)";
//        TableResult tableResult = tEnv.executeSql(sql);

        //第二種方式:聲明一個操作集合來執(zhí)行sql
        StatementSet stmtSet = tEnv.createStatementSet();
        stmtSet.addInsertSql(sql);
        TableResult tableResult = stmtSet.execute();

        tableResult.print();
    }
}

打印結果

+-------------------------------------------+
| default_catalog.default_database.my_users |
+-------------------------------------------+
|                                        -1 |
+-------------------------------------------+
1 row in set

感謝各位的閱讀,以上就是“怎么使用Flink TableAPI和SQL /Elasticsearch”的內容了,經(jīng)過本文的學習后,相信大家對怎么使用Flink TableAPI和SQL /Elasticsearch這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內容。

AI