溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

flink例子-讀取數(shù)據(jù)庫

發(fā)布時(shí)間:2020-07-24 01:11:08 來源:網(wǎng)絡(luò) 閱讀:2427 作者:大海之中 欄目:云計(jì)算
private final static Logger logger = LoggerFactory.getLogger(GetData.class);

    public static void main(String[] arg) throws Exception {

        TypeInformation[] fieldTypes = new TypeInformation[] {

                BasicTypeInfo.STRING_TYPE_INFO

        };

        RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);

        JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()

                .setDrivername("com.mysql.jdbc.Driver")

                .setDBUrl("jdbc:mysql://ip:3306/tablename?characterEncoding=utf8")

                .setUsername("*")

                .setPassword("*")

                .setQuery("select name from words")

                .setRowTypeInfo(rowTypeInfo)

                .finish();

    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

    DataSource s = env.createInput(jdbcInputFormat); // datasource

    BatchTableEnvironment tableEnv = new BatchTableEnvironment(env, TableConfig.DEFAULT());

    tableEnv.registerDataSet("t2", s);

    tableEnv.sqlQuery("select * from t2").printSchema();

    Table query = tableEnv.sqlQuery("select * from t2");

    DataSet result = tableEnv.toDataSet(query, Row.class);

    result.print();

    System.out.println(s.count());

}

通過插件將所需的類打到一個(gè)jar中

<plugin>
                            <artifactId>maven-assembly-plugin</artifactId>
                            <configuration>
                                    <appendAssemblyId>false</appendAssemblyId>
                                    <descriptorRefs>
                                            <descriptorRef>jar-with-dependencies</descriptorRef>
                                    </descriptorRefs>
                                    <archive>
                                            <manifest>
                                                    <!-- 此處指定main方法入口的class -->
                                                    <mainClass>*</mainClass>
                                            </manifest>
                                    </archive>
                            </configuration>
                            <executions>
                                    <execution>
                                            <id>make-assembly</id>
                                            <phase>package</phase>
                                            <goals>
                                                    <goal>assembly</goal>
                                            </goals>
                                    </execution>
                            </executions>
                    </plugin>

然后執(zhí)行

./bin/flink run  /flink-1.8.0/collector-api-0.1.jar
向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI