溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Flink的SessionWindow怎么用

發(fā)布時(shí)間:2021-12-31 10:24:42 來源:億速云 閱讀:180 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要講解了“Flink的SessionWindow怎么用”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“Flink的SessionWindow怎么用”吧!

sessionWindows會(huì)話窗口:按不活躍時(shí)間切成不同分區(qū)窗口,并進(jìn)行窗口計(jì)算

示例環(huán)境

java.version: 1.8.xflink.version: 1.11.1

示例數(shù)據(jù)源 (項(xiàng)目碼云下載)

Flink 系例 之 搭建開發(fā)環(huán)境與數(shù)據(jù)

SessionWindow.java

import com.flink.examples.DataSource;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.List;

/**
 * @Description sessionWindows會(huì)話窗口:按不活躍時(shí)間切成不同分區(qū)窗口,并進(jìn)行窗口計(jì)算
 */
public class SessionWindow {

    /**
     * 遍歷集合,返回會(huì)話滑動(dòng)窗口下按不活躍時(shí)間切分后的,每個(gè)窗口下性別分區(qū)里最大年齡數(shù)據(jù)記錄
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //設(shè)置流處理時(shí)間事件,對(duì)于會(huì)話窗口必需設(shè)置此時(shí)間類型,有三種類型:
        //1.ProcessingTime:以operator處理的時(shí)間為準(zhǔn),它使用的是機(jī)器的系統(tǒng)時(shí)間來作為data stream的時(shí)間
        //2.IngestionTime:以數(shù)據(jù)進(jìn)入flink streaming data flow的時(shí)間為準(zhǔn)
        //3.EventTime:以數(shù)據(jù)自帶的時(shí)間戳字段為準(zhǔn),應(yīng)用程序需要指定如何從record中抽取時(shí)間戳字段
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        env.setParallelism(4);
        DataStream<Tuple3<String, String, Integer>> inStream = env.addSource(new MyRichSourceFunction());
        DataStream<Tuple3<String, String, Integer>> dataStream = inStream.keyBy((KeySelector<Tuple3<String, String, Integer>, String>) k ->k.f1)
                //按會(huì)話窗口滾動(dòng),當(dāng)2秒之內(nèi)沒有指定分區(qū)數(shù)據(jù)流,則計(jì)算一次
                //會(huì)話窗口是根據(jù)在指定時(shí)間之后沒有活躍的數(shù)據(jù)接入,則認(rèn)為窗口結(jié)束,進(jìn)行窗口計(jì)算
                .window(EventTimeSessionWindows.withGap(Time.seconds(2)))
                .reduce(new ReduceFunction<Tuple3<String, String, Integer>>() {
                    @Override
                    public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> t1, Tuple3<String, String, Integer> t2) throws Exception {
                        //返回年齡最大的
                        return t1.f2 > t2.f2 ? t1: t2;
                    }
                });
        dataStream.print();
        env.execute("flink EventTimeSessionWindows job");
    }

    /**
     * 模擬數(shù)據(jù)持續(xù)輸出
     */
    public static class MyRichSourceFunction extends RichParallelSourceFunction<Tuple3<String, String, Integer>> {
        @Override
        public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
            List<Tuple3<String, String, Integer>> tuple3List = DataSource.getTuple3ToList();
            for (Tuple3 tuple3 : tuple3List){
                ctx.collect(tuple3);
                //1秒鐘輸出一個(gè)
                Thread.sleep(2 * 1000);
            }
        }
        @Override
        public void cancel() {
            try{
                super.close();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
}

打印結(jié)果

2> (張三,man,20)
4> (李四,girl,24)
2> (王五,man,29)
4> (劉六,girl,32)
2> (吳八,man,30)

感謝各位的閱讀,以上就是“Flink的SessionWindow怎么用”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對(duì)Flink的SessionWindow怎么用這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI