您好,登錄后才能下訂單哦!
這篇文章主要介紹“flinksql如何鏈接kafka”,在日常操作中,相信很多人在flinksql如何鏈接kafka問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”flinksql如何鏈接kafka”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>flinksqldemo</artifactId> <version>1.0-SNAPSHOT</version> <properties> <!-- Encoding --> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <scala.binary.version>2.11</scala.binary.version> <scala.version>2.11.8</scala.version> <kafka.version>0.10.2.1</kafka.version> <flink.version>1.12.0</flink.version> <hadoop.version>2.7.3</hadoop.version> <!-- scope 本地調(diào)試時注銷 設(shè)定為默認的 compile 打包時設(shè)定為 provided --> <setting.scope>compile</setting.scope> </properties> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <!--flink start--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>1.12.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>${setting.scope}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> <scope>${setting.scope}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> <scope>${setting.scope}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.12.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>1.12.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>${setting.scope}</scope> </dependency> <!--<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>--> <!-- flink end--> <!-- kafka start --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_${scala.binary.version}</artifactId> <version>${kafka.version}</version> <scope>${setting.scope}</scope> </dependency> <!-- kafka end--> <!-- hadoop start --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> <scope>${setting.scope}</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> <scope>${setting.scope}</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> <scope>${setting.scope}</scope> </dependency> <!-- hadoop end --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.72</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>29.0-jre</version> </dependency> </dependencies> </project>
代碼:
package com.jd.data; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Csv; import org.apache.flink.table.descriptors.Kafka; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.types.Row; public class TableApiConnectKafka04 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 1、創(chuàng)建表執(zhí)行環(huán)節(jié) StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.connect(new Kafka() .version("0.11") // 定義版本 .topic("xxx") // 定義主題 .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") ).withFormat(new Csv()).withSchema(new Schema().field("a", DataTypes.STRING()) // 定義表的結(jié)構(gòu) .field("b", DataTypes.STRING()) .field("c", DataTypes.STRING()) ) .inAppendMode() .createTemporaryTable("xxx"); Table xxx = tableEnv.from("xxx"); xxx.printSchema(); tableEnv.toAppendStream(xxx, Row.class ).print(); env.execute("job"); } }
到此,關(guān)于“flinksql如何鏈接kafka”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。