溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Flink中keyBy有哪些方式指定key

發(fā)布時間:2021-11-16 16:48:08 來源:億速云 閱讀:612 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要講解了“Flink中keyBy有哪些方式指定key”,文中的講解內(nèi)容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Flink中keyBy有哪些方式指定key”吧!

keyBy 如何指定key

不管是stream還是batch處理,都有一個keyBy(stream)和groupBy(batch)操作。那么該如何指定key?

Some transformations (join, coGroup, keyBy, groupBy) require that a key be defined on a collection of elements. Other transformations (Reduce, GroupReduce, Aggregate, Windows) allow data being grouped on a key before they are applied.

 一些算子(transformations)例如join,coGroup,keyBy,groupBy往往需要定義一個key。其他的算子例如Reduce, GroupReduce, Aggregate, Windows,也允許數(shù)據(jù)按照key進行分組。

DataSet

DataSet<...> input = // [...]
DataSet<...> reduced = input
  .groupBy(/*define key here*/)
  .reduceGroup(/*do something*/);

DataStream

DataStream<...> input = // [...]
DataStream<...> windowed = input
  .keyBy(/*define key here*/)
  .window(/*window specification*/);

類似于mysql中的join操作:select a.* , b.* from a join b on a.id=b.id

這里的keyBy就是a.id=b.id

有哪幾種方式定義Key?

方式一:Tuple

DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)

可以傳字段的位置

DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)

可以傳字段位置的組合

這對于簡單的使用時沒問題的。但是對于內(nèi)嵌的Tuple,如下所示:

DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;

如果使用keyBy(0),那么他就會使用整個Tuple2<Integer, Float>作為key,(因為Tuple2<Integer, Float>是Tuple3<Tuple2<Integer, Float>,String,Long>的0號位置)。如果想要指定key到Tuple2<Integer, Float>內(nèi)部中,可以使用下面的方式。

方式二:字段表達式

我們可以使用基于字符串字段表達式來引用內(nèi)嵌字段去定義key。

之前我們的算子寫法是這樣的:

text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] tokens = value.toLowerCase().split(",");
                for(String token: tokens) {
                    if(token.length() > 0) {
                        out.collect(new Tuple2<String, Integer>(token, 1));
                    }
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);

其中的new FlatMapFunction<String, Tuple2<String, Integer>>表示輸入是一個String,輸出是一個Tuple2<String, Integer>。這里我們重新定義一個內(nèi)部類:

public static class WC {
        private String word;
        private int count;

        public WC() {
        }

        public WC(String word, int count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return "WC{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }

        public String getWord() {
            return word;
        }

        public void setWord(String word) {
            this.word = word;
        }

        public int getCount() {
            return count;
        }

        public void setCount(int count) {
            this.count = count;
        }
    }

修改算子的寫法:

        text.flatMap(new FlatMapFunction<String, WC>() {
            @Override
            public void flatMap(String value, Collector<WC> out) throws Exception {
                String[] tokens = value.toLowerCase().split(",");
                for (String token : tokens) {
                    if (token.length() > 0) {
                        out.collect(new WC(token, 1));
                    }
                }
            }
        }).keyBy("word").timeWindow(Time.seconds(5)).sum("count").print().setParallelism(1);

將原來的輸出Tuple2<String, Integer>,修改為輸出WC類型;將原來的keyBy(0)修改為keyBy("word");將原來的sum(1)修改為sum("count")

因此,在這個例子中我們有一個POJO類,有兩個字段分別是"word"和"count",可以傳遞字段名到keyBy("")中。

語法:

  • 字段名一定要與POJO類中的字段名一致。一定要提供默認的構造函數(shù),和get與set方法。

  • 使用Tuple時,0表示第一個字段

  • 可以使用嵌套方式,舉例如下:

public static class WC {
  public ComplexNestedClass complex; //nested POJO
  private int count;
  // getter / setter for private field (count)
  public int getCount() {
    return count;
  }
  public void setCount(int c) {
    this.count = c;
  }
}
public static class ComplexNestedClass {
  public Integer someNumber;
  public float someFloat;
  public Tuple3<Long, Long, String> word;
  public IntWritable hadoopCitizen;
}
  • "count",指向的是WC中的字段count

  • "complex",指向的是復雜數(shù)據(jù)類型,會遞歸選擇所有ComplexNestedClass的字段

  • "complex.word.f2",指向的是Tuple3中的最后一個字段。

  • "complex.hadoopCitizen",指向的是Hadoop IntWritable type

scala寫法:

object StreamingWCScalaApp {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 引入隱式轉(zhuǎn)換
    import org.apache.flink.api.scala._

    val text = env.socketTextStream("192.168.152.45", 9999)
    text.flatMap(_.split(","))
        .map(x => WC(x,1))
        .keyBy("word")
        .timeWindow(Time.seconds(5))
        .sum("count")
        .print()
        .setParallelism(1)

    env.execute("StreamingWCScalaApp");
  }
  case class WC(word: String, count: Int)
}

 方式三:key選擇器函數(shù)

.keyBy(new KeySelector<WC, Object>() {
            @Override
            public Object getKey(WC value) throws Exception {
                return value.word;
            }
        })

感謝各位的閱讀,以上就是“Flink中keyBy有哪些方式指定key”的內(nèi)容了,經(jīng)過本文的學習后,相信大家對Flink中keyBy有哪些方式指定key這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。

AI