您好,登錄后才能下訂單哦!
Flink開發(fā)如何批處理應(yīng)用程序,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。
詞頻統(tǒng)計,即給一個文件,統(tǒng)計文件中每個單詞出現(xiàn)的次數(shù),分隔符是\t。這個文件內(nèi)容如下:
hello world welcome hello welcome
統(tǒng)計結(jié)果直接打印在控制臺。生產(chǎn)環(huán)境下一般Sink到目的地。
JDK:1.8
Maven:3.6.1(最低Maven 3.0.4)
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.8.1 -DarchetypeCatalog=local
groupId: com.vincent artifactId: springboot-flink-train version:1.0 這樣就創(chuàng)建了一個項目,使用Idea導(dǎo)入這個項目,項目結(jié)構(gòu)如下:
里面有兩個自動為我們準(zhǔn)備好的java類。
第一步:創(chuàng)建批處理上下文環(huán)境
// set up the batch execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
第二步:讀取數(shù)據(jù)
env.readTextFile(textPath);
第三步:transform operations,例如 filter() flatMap() join() coGroup(),這是開發(fā)的核心所在,一般就是業(yè)務(wù)邏輯
第四步:execute program
第一步:讀取數(shù)據(jù)
hello welcome
第二步:每一行的數(shù)據(jù)按照指定的分隔符拆分
hello welcome
第三步:為每一個單詞賦上次數(shù)為1
(hello,1) (welcome,1)
第四步:合并操作
/** * 使用Java API來開發(fā)Flink的批處理應(yīng)用程序 */ public class BatchWCJavaApp { public static void main(String[] args) throws Exception { String input = "E:/test/input/test.txt"; // step1: 獲取運行環(huán)境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // step2: 讀取數(shù)據(jù) DataSource<String> text = env.readTextFile(input); // step3: transform // FlatMapFunction<String, Tuple2<String, Integer>表示進(jìn)來一個String, 轉(zhuǎn)換成一個<String, Integer>類型 text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { /** * * @param value 就是一行一行的字符串 * @param out 轉(zhuǎn)換成(單詞,次數(shù)) * @throws Exception */ @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] tokens = value.toLowerCase().split("\t"); for(String token: tokens) { if(token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } }).groupBy(0).sum(1).print(); } }
(world,1) (hello,2) (welcome,2)
JDK:1.8
Maven:3.6.1(最低Maven 3.0.4)
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-scala -DarchetypeVersion=1.8.1 -DarchetypeCatalog=local
groupId: com.vincent artifactId: springboot-flink-train-scala version:1.0 這樣就創(chuàng)建了一個項目,使用Idea導(dǎo)入這個項目:
接下來的開發(fā)步驟與使用java實現(xiàn)的開發(fā)步驟是一樣的:這里給出
import org.apache.flink.api.scala.ExecutionEnvironment /** * 使用Scala開發(fā)Flink的批處理應(yīng)用程序 */ object BatchWCScalaApp { def main(args: Array[String]): Unit = { val input = "E:/test/input/test.txt" val env = ExecutionEnvironment.getExecutionEnvironment val text = env.readTextFile(input) // 引入隱式轉(zhuǎn)換 import org.apache.flink.api.scala._ text.flatMap(_.toLowerCase.split("\t")) .filter(_.nonEmpty) .map((_, 1)) .groupBy(0) .sum(1) .print() } }
也就是transform部分雖然原理是一樣的,但是實現(xiàn)的方式不一樣,scala更加簡潔
看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識有進(jìn)一步的了解或閱讀更多相關(guān)文章,請關(guān)注億速云行業(yè)資訊頻道,感謝您對億速云的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。