溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Apache下Flink transformation的用法

發(fā)布時間:2021-09-01 19:33:37 來源:億速云 閱讀:128 作者:chen 欄目:大數(shù)據(jù)

這篇文章主要介紹“Apache下Flink transformation的用法”,在日常操作中,相信很多人在Apache下Flink transformation的用法問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Apache下Flink transformation的用法”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

Map Function

Scala

新建一個Object

object DataSetTransformationApp {

  def main(args: Array[String]): Unit = {
    val environment = ExecutionEnvironment.getExecutionEnvironment

  }

  def mapFunction(env: ExecutionEnvironment): Unit = {
    val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))
  }

}

這里的數(shù)據(jù)源是一個1到10的list集合。Map的原理是:假設data數(shù)據(jù)集中有N個元素,將每一個元素進行轉(zhuǎn)化:

data.map { x => x.toInt }

好比:y=f(x)

    // 對data中的每一個元素都去做一個+1操作
    data.map((x:Int) => x + 1 ).print()

然后對每一個元素都做一個+1操作。

簡單寫法:

如果這個里面只有一個元素,就可以直接寫成下面形式:

data.map((x) => x + 1).print()

更簡潔的寫法:

data.map(x => x + 1).print()

更簡潔的方法:

data.map(_ + 1).print()

輸出結(jié)果:

2
3
4
5
6
7
8
9
10
11

Java

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        mapFunction(executionEnvironment);
    }

    public static void mapFunction(ExecutionEnvironment executionEnvironment) throws Exception {
        List<String> list = new ArrayList<>();
        for (int i = 1; i <= 10; i++) {
            list.add(i + "");
        }
        DataSource<String> data = executionEnvironment.fromCollection(list);
        data.map(new MapFunction<String, Integer>() {
            public Integer map(String input) {
                return Integer.parseInt(input) + 1;
            }
        }).print();
    }

因為我們定義的List是一個String的泛型,因此MapFunction的泛型是<String, Integer>,第一個參數(shù)表示輸入的類型,第二個參數(shù)表示輸出是一個Integer類型。

Filter Function

將每個元素執(zhí)行+1操作,并取出大于5的元素。

Scala

  def filterFunction(env: ExecutionEnvironment): Unit = {
    val data = env.fromCollection(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
    data.map(_ + 1).filter(_ > 5).print()
  }

filter只會返回滿足條件的記錄。

Java

    public static void filterFunction(ExecutionEnvironment env) throws Exception {
        List<Integer> list = new ArrayList<>();
        for (int i = 1; i <= 10; i++) {
            list.add(i);
        }
        DataSource<Integer> data = env.fromCollection(list);
        data.map(new MapFunction<Integer, Integer>() {
            public Integer map(Integer input) {
                return input + 1;
            }
        }).filter(new FilterFunction<Integer>() {
            @Override
            public boolean filter(Integer input) throws Exception {
                return input > 5;
            }
        }).print();
    }

MapPartition Function

map function 與 MapPartition function有什么區(qū)別?

需求:DataSource 中有100個元素,把結(jié)果存儲在數(shù)據(jù)庫中

如果使用map function ,那么實現(xiàn)方法如下:

  // DataSource 中有100個元素,把結(jié)果存儲在數(shù)據(jù)庫中
  def mapPartitionFunction(env: ExecutionEnvironment): Unit = {
    val students = new ListBuffer[String]
    for (i <- 1 to 100) {
      students.append("Student" + i)
    }
    val data = env.fromCollection(students)
    data.map(x=>{
      // 每一個元素要存儲到數(shù)據(jù)庫中去,肯定需要先獲取到connection
      val connection = DBUtils.getConnection()
      println(connection + " ... ")
      // TODO .... 保存數(shù)據(jù)到DB
      DBUtils.returnConnection(connection)
    }).print()
  }

打印結(jié)果,將會打印100個獲取DBUtils.getConnection()的請求。如果數(shù)據(jù)量增多,顯然不停的獲取連接是不現(xiàn)實的。

因此MapPartition就應運而生了,轉(zhuǎn)換一個分區(qū)里面的數(shù)據(jù),也就是說一個分區(qū)中的數(shù)據(jù)調(diào)用一次。

因此要首先設置分區(qū):

val data = env.fromCollection(students).setParallelism(4)

設置4個分區(qū),也就是并行度,然后使用mapPartition來處理:

data.mapPartition(x => {
      val connection = DBUtils.getConnection()
      println(connection + " ... ")
      // TODO .... 保存數(shù)據(jù)到DB
      DBUtils.returnConnection(connection)
      x
    }).print()

那么就會的到4次連接請求,每一個分區(qū)獲取一個connection。

Java

public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {
        List<String> list = new ArrayList<>();
        for (int i = 1; i <= 100; i++) {
            list.add("student:" + i);
        }
        DataSource<String> data = env.fromCollection(list);
        /*data.map(new MapFunction<String, String>() {
            @Override
            public String map(String input) throws Exception {
                String connection = DBUtils.getConnection();
                System.out.println("connection = [" + connection + "]");
                DBUtils.returnConnection(connection);
                return input;
            }
        }).print();*/
        data.mapPartition(new MapPartitionFunction<String, Object>() {
            @Override
            public void mapPartition(Iterable<String> values, Collector<Object> out) throws Exception {
                String connection = DBUtils.getConnection();
                System.out.println("connection = [" + connection + "]");
                DBUtils.returnConnection(connection);
            }
        }).print();
    }

first   groupBy sortGroup

Scala

first表示獲取前幾個,groupBy表示分組,sortGroup表示分組內(nèi)排序

def firstFunction(env:ExecutionEnvironment): Unit = {
    val info = ListBuffer[(Int, String)]()
    info.append((1, "hadoop"))
    info.append((1, "spark"))
    info.append((1, "flink"))
    info.append((2, "java"))
    info.append((2, "springboot"))
    info.append((3, "linux"))
    info.append((4, "vue"))
    val data = env.fromCollection(info)
    data.first(3).print()
    //輸出:(1,hadoop)
    //(1,spark)
    //(1,flink)
    data.groupBy(0).first(2).print()//根據(jù)第一個字段分組,每個分組獲取前兩個數(shù)據(jù)
    //(3,linux)
    //(1,hadoop)
    //(1,spark)
    //(2,java)
    //(2,springboot)
    //(4,vue)
    data.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print() //根據(jù)第一個字段分組,然后在分組內(nèi)根據(jù)第二個字段升序排序,并取出前兩個數(shù)據(jù)
    //輸出(3,linux)
    //(1,flink)
    //(1,hadoop)
    //(2,java)
    //(2,springboot)
    //(4,vue)
  }

Java

    public static void firstFunction(ExecutionEnvironment env) throws Exception {
        List<Tuple2<Integer, String>> info = new ArrayList<>();
        info.add(new Tuple2<>(1, "hadoop"));
        info.add(new Tuple2<>(1, "spark"));
        info.add(new Tuple2<>(1, "flink"));
        info.add(new Tuple2<>(2, "java"));
        info.add(new Tuple2<>(2, "springboot"));
        info.add(new Tuple2<>(3, "linux"));
        info.add(new Tuple2<>(4, "vue"));
        DataSource<Tuple2<Integer, String>> data = env.fromCollection(info);
        data.first(3).print();
        data.groupBy(0).first(2).print();
        data.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print();
    }

FlatMap Function

獲取一個元素,然后產(chǎn)生0個、1個或多個元素

Scala

  def flatMapFunction(env: ExecutionEnvironment): Unit = {
    val info = ListBuffer[(String)]()
    info.append("hadoop,spark");
    info.append("hadoop,flink");
    info.append("flink,flink");
    val data = env.fromCollection(info)
    data.flatMap(_.split(",")).print()
  }

輸出:

hadoop
spark
hadoop
flink
flink
flink

FlatMap將每個元素都用逗號分割,然后變成多個。

經(jīng)典例子:

data.flatMap(_.split(",")).map((_,1)).groupBy(0).sum(1).print()

將每個元素用逗號分割,然后每個元素做map,然后根據(jù)第一個字段分組,然后根據(jù)第二個字段求和。

輸出結(jié)果如下:

(hadoop,2)
(flink,3)
(spark,1)

Java

同樣實現(xiàn)一個經(jīng)典案例wordcount

public static void flatMapFunction(ExecutionEnvironment env) throws Exception {
        List<String> info = new ArrayList<>();
        info.add("hadoop,spark");
        info.add("hadoop,flink");
        info.add("flink,flink");
        DataSource<String> data = env.fromCollection(info);
        data.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String input, Collector<String> out) throws Exception {
                String[] splits = input.split(",");
                for(String split: splits) {
                    //發(fā)送出去
                    out.collect(split);
                }
            }
        }).map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return new Tuple2<>(value,1);
            }
        }).groupBy(0).sum(1).print();
    }

Distinct

去重操作

Scala

  def distinctFunction(env: ExecutionEnvironment): Unit = {
    val info = ListBuffer[(String)]()
    info.append("hadoop,spark");
    info.append("hadoop,flink");
    info.append("flink,flink");
    val data = env.fromCollection(info)
    data.flatMap(_.split(",")).distinct().print()
  }

這樣就將每一個元素都做了去重操作。輸出如下:

hadoop
flink
spark

Java

    public static void distinctFunction(ExecutionEnvironment env) throws Exception {
        List<String> info = new ArrayList<>();
        info.add("hadoop,spark");
        info.add("hadoop,flink");
        info.add("flink,flink");
        DataSource<String> data = env.fromCollection(info);
        data.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String input, Collector<String> out) throws Exception {
                String[] splits = input.split(",");
                for(String split: splits) {
                    //發(fā)送出去
                    out.collect(split);
                }
            }
        }).distinct().print();
    }

Join

Joins two data sets by creating all pairs of elements that are equal on their keys. Optionally uses a JoinFunction to turn the pair of elements into a single element, or a FlatJoinFunction to turn the pair of elements into arbitrarily many (including none) elements. See the keys section to learn how to define join keys.

result = input1.join(input2)
               .where(0)       // key of the first input (tuple field 0)
               .equalTo(1);    // key of the second input (tuple field 1)

 表示第一個tuple input1中的第0個字段,與第二個tuple input2中的第一個字段進行join。

  def joinFunction(env: ExecutionEnvironment): Unit = {
    val info1 = ListBuffer[(Int, String)]() //編號 名字
    info1.append((1, "hadoop"))
    info1.append((2, "spark"))
    info1.append((3, "flink"))
    info1.append((4, "java"))

    val info2 = ListBuffer[(Int, String)]() //編號 城市
    info2.append((1, "北京"))
    info2.append((2, "上海"))
    info2.append((3, "深圳"))
    info2.append((5, "廣州"))

    val data1 = env.fromCollection(info1)
    val data2 = env.fromCollection(info2)
    data1.join(data2).where(0).equalTo(0).apply((first, second)=>{
      (first._1, first._2, second._2)
    }).print()
  }

輸出結(jié)果如下:

(3,flink,深圳)
(1,hadoop,北京)
(2,spark,上海)

Java

    public static void joinFunction(ExecutionEnvironment env) throws Exception {
        List<Tuple2<Integer, String>> info1 = new ArrayList<>(); //編號 名字
        info1.add(new Tuple2<>(1, "hadoop"));
        info1.add(new Tuple2<>(2, "spark"));
        info1.add(new Tuple2<>(3, "flink"));
        info1.add(new Tuple2<>(4, "java"));

        List<Tuple2<Integer, String>> info2 = new ArrayList<>(); //編號 城市
        info2.add(new Tuple2<>(1, "北京"));
        info2.add(new Tuple2<>(2, "上海"));
        info2.add(new Tuple2<>(3, "深圳"));
        info2.add(new Tuple2<>(5, "廣州"));
        DataSource<Tuple2<Integer, String>> data1 = env.fromCollection(info1);
        DataSource<Tuple2<Integer, String>> data2 = env.fromCollection(info2);
        data1.join(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {
            @Override
            public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
                return new Tuple3<Integer, String, String>(first.f0, first.f1,second.f1);
            }
        }).print();
    }

Tuple2<Integer, String>, Tuple2<Integer, String>表示兩個輸入的集合,Tuple3<Integer, String, String>>表示輸出的Tuple3

OuterJoin

上面講的join是內(nèi)連接,這個OuterJoin是外連接,包括左外連接,右外連接,全連接在兩個數(shù)據(jù)集上。

def outJoinFunction(env: ExecutionEnvironment): Unit = {
    val info1 = ListBuffer[(Int, String)]() //編號 名字
    info1.append((1, "hadoop"))
    info1.append((2, "spark"))
    info1.append((3, "flink"))
    info1.append((4, "java"))

    val info2 = ListBuffer[(Int, String)]() //編號 城市
    info2.append((1, "北京"))
    info2.append((2, "上海"))
    info2.append((3, "深圳"))
    info2.append((5, "廣州"))
    val data1 = env.fromCollection(info1)
    val data2 = env.fromCollection(info2)
    data1.leftOuterJoin(data2).where(0).equalTo(0).apply((first, second) => {
      if (second == null) {
        (first._1, first._2, "-")
      }else {
        (first._1, first._2, second._2)
      }

    }).print() //左外連接 把左邊的所有數(shù)據(jù)展示出來
  }

左外連接,當左邊的數(shù)據(jù)在右邊沒有對應的數(shù)據(jù)時,需要進行處理,否則會出現(xiàn)空指針異常。輸出如下:

(3,flink,深圳)
(1,hadoop,北京)
(2,spark,上海)
(4,java,-)

右外連接:

    data1.rightOuterJoin(data2).where(0).equalTo(0).apply((first, second) => {
      if (first == null) {
        (second._1, "-", second._2)
      }else {
        (first._1, first._2, second._2)
      }

    }).print()

右外連接,輸出:

(3,flink,深圳)
(1,hadoop,北京)
(5,-,廣州)
(2,spark,上海)

全連接:

    data1.fullOuterJoin(data2).where(0).equalTo(0).apply((first, second) => {
      if (first == null) {
        (second._1, "-", second._2)
      }else if (second == null){
        (second._1, "-", second._2)
      } else {
        (first._1, first._2, second._2)
      }
    }).print()
(3,flink,深圳)
(1,hadoop,北京)
(5,-,廣州)
(2,spark,上海)
(4,java,-)

Java

左外連接:

    public static void outjoinFunction(ExecutionEnvironment env) throws Exception {
        List<Tuple2<Integer, String>> info1 = new ArrayList<>(); //編號 名字
        info1.add(new Tuple2<>(1, "hadoop"));
        info1.add(new Tuple2<>(2, "spark"));
        info1.add(new Tuple2<>(3, "flink"));
        info1.add(new Tuple2<>(4, "java"));

        List<Tuple2<Integer, String>> info2 = new ArrayList<>(); //編號 城市
        info2.add(new Tuple2<>(1, "北京"));
        info2.add(new Tuple2<>(2, "上海"));
        info2.add(new Tuple2<>(3, "深圳"));
        info2.add(new Tuple2<>(5, "廣州"));
        DataSource<Tuple2<Integer, String>> data1 = env.fromCollection(info1);
        DataSource<Tuple2<Integer, String>> data2 = env.fromCollection(info2);
        data1.leftOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {
            @Override
            public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
                if(second == null) {
                    return new Tuple3<Integer, String, String>(first.f0, first.f1, "-");
                }
                return new Tuple3<Integer, String, String>(first.f0, first.f1,second.f1);
            }
        }).print();
    }

右外連接:

        data1.rightOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {
            @Override
            public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
                if (first == null) {
                    return new Tuple3<Integer, String, String>(second.f0, "-", second.f1);
                }
                return new Tuple3<Integer, String, String>(first.f0, first.f1, second.f1);
            }
        }).print();

全連接:

        data1.fullOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {
            @Override
            public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
                if (first == null) {
                    return new Tuple3<Integer, String, String>(second.f0, "-", second.f1);
                } else if (second == null) {
                    return new Tuple3<Integer, String, String>(first.f0, first.f1, "-");
                }
                return new Tuple3<Integer, String, String>(first.f0, first.f1, second.f1);
            }
        }).print();

cross function

Scala

笛卡爾積,左邊與右邊交叉處理

  def crossFunction(env: ExecutionEnvironment): Unit = {
    val info1 = List("喬峰", "慕容復")
    val info2 = List(3,1,0)
    val data1 = env.fromCollection(info1)
    val data2 = env.fromCollection(info2)
    data1.cross(data2).print()
  }

輸出:

(喬峰,3)
(喬峰,1)
(喬峰,0)
(慕容復,3)
(慕容復,1)
(慕容復,0)

Java

public static void crossFunction(ExecutionEnvironment env) throws Exception {
        List<String> info1 = new ArrayList<>();
        info1.add("喬峰");
        info1.add("慕容復");
        List<String> info2 = new ArrayList<>();
        info2.add("3");
        info2.add("1");
        info2.add("0");
        DataSource<String> data1 = env.fromCollection(info1);
        DataSource<String> data2 = env.fromCollection(info2);
        data1.cross(data2).print();
    }

到此,關(guān)于“Apache下Flink transformation的用法”的學習就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI