溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Flink開發(fā)如何批處理應(yīng)用程序

發(fā)布時間:2021-10-20 16:23:45 來源:億速云 閱讀:154 作者:柒染 欄目:大數(shù)據(jù)

Flink開發(fā)如何批處理應(yīng)用程序,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。

需求

詞頻統(tǒng)計,即給一個文件,統(tǒng)計文件中每個單詞出現(xiàn)的次數(shù),分隔符是\t。這個文件內(nèi)容如下:

hello    world    welcome
hello    welcome

統(tǒng)計結(jié)果直接打印在控制臺。生產(chǎn)環(huán)境下一般Sink到目的地。

使用Flink + java實現(xiàn)需求

環(huán)境

JDK:1.8

Maven:3.6.1(最低Maven 3.0.4

創(chuàng)建項目

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.8.1 -DarchetypeCatalog=local

groupId: com.vincent artifactId: springboot-flink-train version:1.0 這樣就創(chuàng)建了一個項目,使用Idea導(dǎo)入這個項目,項目結(jié)構(gòu)如下:

Flink開發(fā)如何批處理應(yīng)用程序

里面有兩個自動為我們準(zhǔn)備好的java類。

開發(fā)步驟

第一步:創(chuàng)建批處理上下文環(huán)境

// set up the batch execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

第二步:讀取數(shù)據(jù)

env.readTextFile(textPath);

第三步:transform operations,例如 filter()  flatMap()  join()  coGroup(),這是開發(fā)的核心所在,一般就是業(yè)務(wù)邏輯

第四步:execute program

具體操作

第一步:讀取數(shù)據(jù)

hello	welcome

第二步:每一行的數(shù)據(jù)按照指定的分隔符拆分

hello
welcome

第三步:為每一個單詞賦上次數(shù)為1

(hello,1)
(welcome,1)

第四步:合并操作

代碼實現(xiàn)

/**
 * 使用Java API來開發(fā)Flink的批處理應(yīng)用程序
 */
public class BatchWCJavaApp {
    public static void main(String[] args) throws Exception {
        String input = "E:/test/input/test.txt";
        // step1: 獲取運行環(huán)境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // step2: 讀取數(shù)據(jù)
        DataSource<String> text = env.readTextFile(input);
        // step3: transform
        // FlatMapFunction<String, Tuple2<String, Integer>表示進(jìn)來一個String, 轉(zhuǎn)換成一個<String, Integer>類型
        text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            /**
             *
             * @param value 就是一行一行的字符串
             * @param out 轉(zhuǎn)換成(單詞,次數(shù))
             * @throws Exception
             */
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] tokens = value.toLowerCase().split("\t");
                for(String token: tokens) {
                    if(token.length() > 0) {
                        out.collect(new Tuple2<String, Integer>(token, 1));
                    }
                }
            }
        }).groupBy(0).sum(1).print();
    }
}

運行結(jié)果

(world,1)
(hello,2)
(welcome,2)

使用Flink + scala實現(xiàn)需求

環(huán)境

JDK:1.8

Maven:3.6.1(最低Maven 3.0.4

創(chuàng)建項目,跟使用java方式是一樣的

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-scala -DarchetypeVersion=1.8.1 -DarchetypeCatalog=local

groupId: com.vincent artifactId: springboot-flink-train-scala version:1.0 這樣就創(chuàng)建了一個項目,使用Idea導(dǎo)入這個項目:

Flink開發(fā)如何批處理應(yīng)用程序

接下來的開發(fā)步驟與使用java實現(xiàn)的開發(fā)步驟是一樣的:這里給出

代碼實現(xiàn)

import org.apache.flink.api.scala.ExecutionEnvironment


/**
  * 使用Scala開發(fā)Flink的批處理應(yīng)用程序
  */
object BatchWCScalaApp {
  def main(args: Array[String]): Unit = {
    val input = "E:/test/input/test.txt"
    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.readTextFile(input)
    // 引入隱式轉(zhuǎn)換
    import org.apache.flink.api.scala._

    text.flatMap(_.toLowerCase.split("\t"))
      .filter(_.nonEmpty)
      .map((_, 1))
      .groupBy(0)
      .sum(1)
      .print()
  }
}

Java與Scala實現(xiàn)方式對比

算子與簡潔性

也就是transform部分雖然原理是一樣的,但是實現(xiàn)的方式不一樣,scala更加簡潔

看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識有進(jìn)一步的了解或閱讀更多相關(guān)文章,請關(guān)注億速云行業(yè)資訊頻道,感謝您對億速云的支持。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI