溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Java lambda表達(dá)式如何實(shí)現(xiàn)Flink WordCount過程

發(fā)布時間:2021-05-27 14:31:42 來源:億速云 閱讀:157 作者:小新 欄目:編程語言

小編給大家分享一下Java lambda表達(dá)式如何實(shí)現(xiàn)Flink WordCount過程,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

這篇文章主要介紹了Java lambda表達(dá)式實(shí)現(xiàn)Flink WordCount過程解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下

本篇我們將使用Java語言來實(shí)現(xiàn)Flink的單詞統(tǒng)計(jì)。

代碼開發(fā)

環(huán)境準(zhǔn)備

導(dǎo)入Flink 1.9 pom依賴

<dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>1.9.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.11</artifactId>
      <version>1.9.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-lang3</artifactId>
      <version>3.7</version>
    </dependency>
  </dependencies>

構(gòu)建Flink流處理環(huán)境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

自定義source

每秒生成一行文本

DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() {
      private boolean isCanal = false;
      private String[] words = {
          "important oracle jdk license update",
          "the oracle jdk license has changed for releases starting april 16 2019",
          "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ",
          "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ",
          "downloading and using this product an faq is available here ",
          "commercial license and support is available with a low cost java se subscription",
          "oracle also provides the latest openjdk release under the open source gpl license at jdk java net"
      };

      @Override
      public void run(SourceContext<String> ctx) throws Exception {
        // 每秒發(fā)送一行文本
        while (!isCanal) {
          int randomIndex = RandomUtils.nextInt(0, words.length);
          ctx.collect(words[randomIndex]);
          Thread.sleep(1000);
        }
      }

      @Override
      public void cancel() {
        isCanal = true;
      }
    });

單詞計(jì)算

// 3. 單詞統(tǒng)計(jì)
    // 3.1 將文本行切分成一個個的單詞
    SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {
      // 切分單詞
      Arrays.stream(line.split(" ")).forEach(word -> {
        ctx.collect(word);
      });
    }).returns(Types.STRING);

    //3.2 將單詞轉(zhuǎn)換為一個個的元組
    SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS
        .map(word -> Tuple2.of(word, 1))
        .returns(Types.TUPLE(Types.STRING, Types.INT));

    // 3.3 按照單詞進(jìn)行分組
    KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0);

    // 3.4 對每組單詞數(shù)量進(jìn)行累加
    SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS
        .timeWindow(Time.seconds(3))
        .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));

    resultDS.print();

參考代碼

public class WordCount {
  public static void main(String[] args) throws Exception {
    // 1. 構(gòu)建Flink流式初始化環(huán)境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 2. 自定義source - 每秒發(fā)送一行文本
    DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() {
      private boolean isCanal = false;
      private String[] words = {
          "important oracle jdk license update",
          "the oracle jdk license has changed for releases starting april 16 2019",
          "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ",
          "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ",
          "downloading and using this product an faq is available here ",
          "commercial license and support is available with a low cost java se subscription",
          "oracle also provides the latest openjdk release under the open source gpl license at jdk java net"
      };

      @Override
      public void run(SourceContext<String> ctx) throws Exception {
        // 每秒發(fā)送一行文本
        while (!isCanal) {
          int randomIndex = RandomUtils.nextInt(0, words.length);
          ctx.collect(words[randomIndex]);
          Thread.sleep(1000);
        }
      }

      @Override
      public void cancel() {
        isCanal = true;
      }
    });

    // 3. 單詞統(tǒng)計(jì)
    // 3.1 將文本行切分成一個個的單詞
    SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {
      // 切分單詞
      Arrays.stream(line.split(" ")).forEach(word -> {
        ctx.collect(word);
      });
    }).returns(Types.STRING);

    //3.2 將單詞轉(zhuǎn)換為一個個的元組
    SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS
        .map(word -> Tuple2.of(word, 1))
        .returns(Types.TUPLE(Types.STRING, Types.INT));

    // 3.3 按照單詞進(jìn)行分組
    KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0);

    // 3.4 對每組單詞數(shù)量進(jìn)行累加
    SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS
        .timeWindow(Time.seconds(3))
        .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));

    resultDS.print();

    env.execute("app");
  }
}

Flink對Java Lambda表達(dá)式支持情況

Flink支持Java API所有操作符使用Lambda表達(dá)式。但是,但Lambda表達(dá)式使用Java泛型時,就需要聲明類型信息。

我們來看下上述的這段代碼:

SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {
      // 切分單詞
      Arrays.stream(line.split(" ")).forEach(word -> {
        ctx.collect(word);
      });
    }).returns(Types.STRING);

之所以這里將所有的類型信息,因?yàn)镕link無法正確自動推斷出來Collector中帶的泛型。我們來看一下FlatMapFuntion的源代碼

@Public
@FunctionalInterface
public interface FlatMapFunction<T, O> extends Function, Serializable {

  /**
  * The core method of the FlatMapFunction. Takes an element from the input data set and transforms
  * it into zero, one, or more elements.
  *
  * @param value The input value.
  * @param out The collector for returning result values.
  *
  * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
  *          to fail and may trigger recovery.
  */
  void flatMap(T value, Collector<O> out) throws Exception;
}

我們發(fā)現(xiàn) flatMap的第二個參數(shù)是Collector<O>,是一個帶參數(shù)的泛型。Java編譯器編譯該代碼時會進(jìn)行參數(shù)類型擦除,所以Java編譯器會變成成:

void flatMap(T value, Collector out)

這種情況,F(xiàn)link將無法自動推斷類型信息。如果我們沒有顯示地提供類型信息,將會出現(xiàn)以下錯誤:

org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing.
  In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.
  An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface.
  Otherwise the type has to be specified explicitly using type information.

這種情況下,必須要顯示指定類型信息,否則輸出將返回值視為Object類型,這將導(dǎo)致Flink無法正確序列化。

所以,我們需要顯示地指定Lambda表達(dá)式的參數(shù)類型信息,并通過returns方法顯示指定輸出的類型信息

我們再看一段代碼:

SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS
        .map(word -> Tuple2.of(word, 1))
        .returns(Types.TUPLE(Types.STRING, Types.INT));

為什么map后面也需要指定類型呢?

因?yàn)榇颂巑ap返回的是Tuple2類型,Tuple2是帶有泛型參數(shù),在編譯的時候同樣會被查出泛型參數(shù)信息,導(dǎo)致Flink無法正確推斷。

以上是“Java lambda表達(dá)式如何實(shí)現(xiàn)Flink WordCount過程”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識,歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI