您好,登錄后才能下訂單哦!
private final static Logger logger = LoggerFactory.getLogger(GetData.class);
public static void main(String[] arg) throws Exception {
TypeInformation[] fieldTypes = new TypeInformation[] {
BasicTypeInfo.STRING_TYPE_INFO
};
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://ip:3306/tablename?characterEncoding=utf8")
.setUsername("*")
.setPassword("*")
.setQuery("select name from words")
.setRowTypeInfo(rowTypeInfo)
.finish();
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource s = env.createInput(jdbcInputFormat); // datasource
BatchTableEnvironment tableEnv = new BatchTableEnvironment(env, TableConfig.DEFAULT());
tableEnv.registerDataSet("t2", s);
tableEnv.sqlQuery("select * from t2").printSchema();
Table query = tableEnv.sqlQuery("select * from t2");
DataSet result = tableEnv.toDataSet(query, Row.class);
result.print();
System.out.println(s.count());
}
通過插件將所需的類打到一個(gè)jar中
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 此處指定main方法入口的class -->
<mainClass>*</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
然后執(zhí)行
./bin/flink run /flink-1.8.0/collector-api-0.1.jar
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。