溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Flink中ProcessFunction類如何使用

發(fā)布時間:2021-07-14 14:15:43 來源:億速云 閱讀:147 作者:Leah 欄目:大數(shù)據(jù)

這篇文章將為大家詳細講解有關Flink中ProcessFunction類如何使用,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。

Flink處理函數(shù)實戰(zhàn)系列鏈接

  1. 深入了解ProcessFunction的狀態(tài)操作(Flink-1.10);

  2. ProcessFunction;

  3. KeyedProcessFunction類;

  4. ProcessAllWindowFunction(窗口處理);

  5. CoProcessFunction(雙流處理);

關于處理函數(shù)(Process Function)

如下圖,在常規(guī)的業(yè)務開發(fā)中,SQL、Table API、DataStream API比較常用,處于Low-level的Porcession相對用得較少,從本章開始,我們一起通過實戰(zhàn)來熟悉處理函數(shù)(Process Function),看看這一系列的低級算子可以帶給我們哪些能力? Flink中ProcessFunction類如何使用

關于ProcessFunction類

處理函數(shù)有很多種,最基礎的應該ProcessFunction類,來看看它的類圖,可見有RichFunction的特性open、close,然后自己有兩個重要的方法processElement和onTimer: Flink中ProcessFunction類如何使用 常用特性如下所示:

  1. 處理單個元素;

  2. 訪問時間戳;

  3. 旁路輸出;

接下來寫兩個應用體驗上述功能;

版本信息

  1. 開發(fā)環(huán)境操作系統(tǒng):MacBook Pro 13寸, macOS Catalina 10.15.3

  2. 開發(fā)工具:IDEA ULTIMATE 2018.3

  3. JDK:1.8.0_211

  4. Maven:3.6.0

  5. Flink:1.9.2

源碼下載

如果您不想寫代碼,整個系列的源碼可在GitHub下載到,地址和鏈接信息如下表所示(https://github.com/zq2599/blog_demos):

名稱鏈接備注
項目主頁https://github.com/zq2599/blog_demos該項目在GitHub上的主頁
git倉庫地址(https)https://github.com/zq2599/blog_demos.git該項目源碼的倉庫地址,https協(xié)議
git倉庫地址(ssh)git@github.com:zq2599/blog_demos.git該項目源碼的倉庫地址,ssh協(xié)議

這個git項目中有多個文件夾,本章的應用在<font color="blue">flinkstudy</font>文件夾下,如下圖紅框所示: Flink中ProcessFunction類如何使用

創(chuàng)建工程

執(zhí)行以下命令創(chuàng)建一個flink-1.9.2的應用工程:

mvn \
archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.2

按提示輸入groupId:com.bolingcavalry,architectid:flinkdemo

第一個demo

第一個demo用來體驗以下兩個特性:

  1. 處理單個元素;

  2. 訪問時間戳;

創(chuàng)建Simple.java,內容如下:

package com.bolingcavalry.processfunction;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

public class Simple {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 并行度為1
        env.setParallelism(1);

        // 設置數(shù)據(jù)源,一共三個元素
        DataStream<Tuple2<String,Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
            @Override
            public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
                for(int i=1; i<4; i++) {

                    String name = "name" + i;
                    Integer value = i;
                    long timeStamp = System.currentTimeMillis();

                    // 將將數(shù)據(jù)和時間戳打印出來,用來驗證數(shù)據(jù)
                    System.out.println(String.format("source,%s, %d, %d\n",
                            name,
                            value,
                            timeStamp));

                    // 發(fā)射一個元素,并且戴上了時間戳
                    ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, value), timeStamp);

                    // 為了讓每個元素的時間戳不一樣,每發(fā)射一次就延時10毫秒
                    Thread.sleep(10);
                }
            }

            @Override
            public void cancel() {

            }
        });


        // 過濾值為奇數(shù)的元素
        SingleOutputStreamOperator<String> mainDataStream = dataStream
                .process(new ProcessFunction<Tuple2<String, Integer>, String>() {
                    @Override
                    public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
                        // f1字段為奇數(shù)的元素不會進入下一個算子
                        if(0 == value.f1 % 2) {
                            out.collect(String.format("processElement,%s, %d, %d\n",
                                    value.f0,
                                    value.f1,
                                    ctx.timestamp()));
                        }
                    }
                });

        // 打印結果,證明每個元素的timestamp確實可以在ProcessFunction中取得
        mainDataStream.print();

        env.execute("processfunction demo : simple");
    }
}

這里對上述代碼做個介紹:

  1. 創(chuàng)建一個數(shù)據(jù)源,每個10毫秒發(fā)出一個元素,一共三個,類型是Tuple2,f0是個字符串,f1是整形,每個元素都帶時間戳;

  2. 數(shù)據(jù)源發(fā)出元素時,提前把元素的f0、f1、時間戳打印出來,和后面的數(shù)據(jù)核對是否一致;

  3. 在后面的處理中,創(chuàng)建了ProcessFunction的匿名子類,里面可以處理上游發(fā)來的每個元素,并且還能取得每個元素的時間戳(這個能力很重要),然后將f1字段為奇數(shù)的元素過濾掉;

  4. 最后將ProcessFunction處理過的數(shù)據(jù)打印出來,驗證處理結果是否符合預期;

直接執(zhí)行Simple類,結果如下,可見過濾和提取時間戳都成功了: Flink中ProcessFunction類如何使用

第二個demo

第二個demo是實現(xiàn)旁路輸出(Side Outputs),對于一個DataStream來說,可以通過旁路輸出將數(shù)據(jù)輸出到其他算子中去,而不影響原有的算子的處理,下面來演示旁路輸出:

創(chuàng)建SideOutput類:

package com.bolingcavalry.processfunction;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.ArrayList;
import java.util.List;

public class SideOutput {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 并行度為1
        env.setParallelism(1);

        // 定義OutputTag
        final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};

        // 創(chuàng)建一個List,里面有兩個Tuple2元素
        List<Tuple2<String, Integer>> list = new ArrayList<>();
        list.add(new Tuple2("aaa", 1));
        list.add(new Tuple2("bbb", 2));
        list.add(new Tuple2("ccc", 3));

        //通過List創(chuàng)建DataStream
        DataStream<Tuple2<String, Integer>> fromCollectionDataStream = env.fromCollection(list);

        //所有元素都進入mainDataStream,f1字段為奇數(shù)的元素進入SideOutput
        SingleOutputStreamOperator<String> mainDataStream = fromCollectionDataStream
                .process(new ProcessFunction<Tuple2<String, Integer>, String>() {
                    @Override
                    public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {

                        //進入主流程的下一個算子
                        out.collect("main, name : " + value.f0 + ", value : " + value.f1);

                        //f1字段為奇數(shù)的元素進入SideOutput
                        if(1 == value.f1 % 2) {
                            ctx.output(outputTag, "side, name : " + value.f0 + ", value : " + value.f1);
                        }
                    }
                });

        // 禁止chanin,這樣可以在頁面上看清楚原始的DAG
        mainDataStream.disableChaining();

        // 取得旁路數(shù)據(jù)
        DataStream<String> sideDataStream = mainDataStream.getSideOutput(outputTag);

        mainDataStream.print();
        sideDataStream.print();

        env.execute("processfunction demo : sideoutput");
    }
}

這里對上述代碼做個介紹:

  1. 數(shù)據(jù)源是個集合,類型是Tuple2,f0字段是字符串,f1字段是整形;

  2. ProcessFunction的匿名子類中,將每個元素的f0和f1拼接成字符串,發(fā)給主流程算子,再將f1字段為奇數(shù)的元素發(fā)到旁路輸出;

  3. 數(shù)據(jù)源發(fā)出元素時,提前把元素的f0、f1、時間戳打印出來,和后面的數(shù)據(jù)核對是否一致;

  4. 將主流程和旁路輸出的元素都打印出來,驗證處理結果是否符合預期;

執(zhí)行SideOutput看結果,如下圖,main前綴的都是主流程算子,一共三條記錄,side前綴的是旁路輸出,只有f1字段為奇數(shù)的兩條記錄,符合預期: Flink中ProcessFunction類如何使用 上面的操作都是在IDEA上執(zhí)行的,還可以將flink單獨部署,再將上述工程構建成jar,提交到flink的jobmanager,可見DAG如下:

Flink中ProcessFunction類如何使用 至此,處理函數(shù)中最簡單的ProcessFunction類的學習

關于Flink中ProcessFunction類如何使用就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內容。

AI