您好,登錄后才能下訂單哦!
這篇文章主要講解了“Flink中keyBy有哪些方式指定key”,文中的講解內(nèi)容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Flink中keyBy有哪些方式指定key”吧!
不管是stream還是batch處理,都有一個keyBy(stream)和groupBy(batch)操作。那么該如何指定key?
Some transformations (join, coGroup, keyBy, groupBy) require that a key be defined on a collection of elements. Other transformations (Reduce, GroupReduce, Aggregate, Windows) allow data being grouped on a key before they are applied.
一些算子(transformations)例如join,coGroup,keyBy,groupBy往往需要定義一個key。其他的算子例如Reduce, GroupReduce, Aggregate, Windows,也允許數(shù)據(jù)按照key進行分組。
DataSet
DataSet<...> input = // [...] DataSet<...> reduced = input .groupBy(/*define key here*/) .reduceGroup(/*do something*/);
DataStream
DataStream<...> input = // [...] DataStream<...> windowed = input .keyBy(/*define key here*/) .window(/*window specification*/);
類似于mysql中的join操作:select a.* , b.* from a join b on a.id=b.id
這里的keyBy就是a.id=b.id
DataStream<Tuple3<Integer,String,Long>> input = // [...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
可以傳字段的位置
DataStream<Tuple3<Integer,String,Long>> input = // [...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)
可以傳字段位置的組合
這對于簡單的使用時沒問題的。但是對于內(nèi)嵌的Tuple,如下所示:
DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
如果使用keyBy(0),那么他就會使用整個Tuple2<Integer, Float>作為key,(因為Tuple2<Integer, Float>是Tuple3<Tuple2<Integer, Float>,String,Long>的0號位置)。如果想要指定key到Tuple2<Integer, Float>內(nèi)部中,可以使用下面的方式。
我們可以使用基于字符串字段表達式來引用內(nèi)嵌字段去定義key。
之前我們的算子寫法是這樣的:
text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] tokens = value.toLowerCase().split(","); for(String token: tokens) { if(token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);
其中的new FlatMapFunction<String, Tuple2<String, Integer>>表示輸入是一個String,輸出是一個Tuple2<String, Integer>。這里我們重新定義一個內(nèi)部類:
public static class WC { private String word; private int count; public WC() { } public WC(String word, int count) { this.word = word; this.count = count; } @Override public String toString() { return "WC{" + "word='" + word + '\'' + ", count=" + count + '}'; } public String getWord() { return word; } public void setWord(String word) { this.word = word; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } }
修改算子的寫法:
text.flatMap(new FlatMapFunction<String, WC>() { @Override public void flatMap(String value, Collector<WC> out) throws Exception { String[] tokens = value.toLowerCase().split(","); for (String token : tokens) { if (token.length() > 0) { out.collect(new WC(token, 1)); } } } }).keyBy("word").timeWindow(Time.seconds(5)).sum("count").print().setParallelism(1);
將原來的輸出Tuple2<String, Integer>,修改為輸出WC類型;將原來的keyBy(0)修改為keyBy("word");將原來的sum(1)修改為sum("count")
因此,在這個例子中我們有一個POJO類,有兩個字段分別是"word"和"count",可以傳遞字段名到keyBy("")中。
語法:
字段名一定要與POJO類中的字段名一致。一定要提供默認的構造函數(shù),和get與set方法。
使用Tuple時,0表示第一個字段
可以使用嵌套方式,舉例如下:
public static class WC { public ComplexNestedClass complex; //nested POJO private int count; // getter / setter for private field (count) public int getCount() { return count; } public void setCount(int c) { this.count = c; } } public static class ComplexNestedClass { public Integer someNumber; public float someFloat; public Tuple3<Long, Long, String> word; public IntWritable hadoopCitizen; }
"count",指向的是WC中的字段count
"complex",指向的是復雜數(shù)據(jù)類型,會遞歸選擇所有ComplexNestedClass的字段
"complex.word.f2",指向的是Tuple3中的最后一個字段。
"complex.hadoopCitizen",指向的是Hadoop IntWritable
type
scala寫法:
object StreamingWCScalaApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 引入隱式轉(zhuǎn)換 import org.apache.flink.api.scala._ val text = env.socketTextStream("192.168.152.45", 9999) text.flatMap(_.split(",")) .map(x => WC(x,1)) .keyBy("word") .timeWindow(Time.seconds(5)) .sum("count") .print() .setParallelism(1) env.execute("StreamingWCScalaApp"); } case class WC(word: String, count: Int) }
.keyBy(new KeySelector<WC, Object>() { @Override public Object getKey(WC value) throws Exception { return value.word; } })
感謝各位的閱讀,以上就是“Flink中keyBy有哪些方式指定key”的內(nèi)容了,經(jīng)過本文的學習后,相信大家對Flink中keyBy有哪些方式指定key這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。