您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)Flink怎么實(shí)時(shí)計(jì)算topN,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。
1. 用到的知識(shí)點(diǎn)
Flink創(chuàng)建kafka數(shù)據(jù)源;
基于 EventTime 處理,如何指定 Watermark;
Flink中的Window,滾動(dòng)(tumbling)窗口與滑動(dòng)(sliding)窗口;
State狀態(tài)的使用;
ProcessFunction 實(shí)現(xiàn) TopN 功能;
2. 案例介紹
通過(guò)用戶(hù)訪(fǎng)問(wèn)日志,計(jì)算最近一段時(shí)間平臺(tái)最活躍的幾位用戶(hù)topN。
創(chuàng)建kafka生產(chǎn)者,發(fā)送測(cè)試數(shù)據(jù)到kafka;
消費(fèi)kafka數(shù)據(jù),使用滑動(dòng)(sliding)窗口,每隔一段時(shí)間更新一次排名;
3. 數(shù)據(jù)源
這里使用kafka api發(fā)送測(cè)試數(shù)據(jù)到kafka,代碼如下:
@Data @NoArgsConstructor @AllArgsConstructor @ToString public class User { private long id; private String username; private String password; private long timestamp; } Map<String, String> config = Configuration.initConfig("commons.xml"); @Test public void sendData() throws InterruptedException { int cnt = 0; while (cnt < 200){ User user = new User(); user.setId(cnt); user.setUsername("username" + new Random().nextInt((cnt % 5) + 2)); user.setPassword("password" + cnt); user.setTimestamp(System.currentTimeMillis()); Future<RecordMetadata> future = KafkaUtil.sendDataToKafka(config.get("kafka-topic"), String.valueOf(cnt), JSON.toJSONString(user)); while (!future.isDone()){ Thread.sleep(100); } try { RecordMetadata recordMetadata = future.get(); System.out.println(recordMetadata.offset()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("發(fā)送消息:" + cnt + "******" + user.toString()); cnt = cnt + 1; } }
這里通過(guò)隨機(jī)數(shù)來(lái)擾亂username,便于使用戶(hù)名大小不一,讓結(jié)果更加明顯。KafkaUtil是自己寫(xiě)的一個(gè)kafka工具類(lèi),代碼很簡(jiǎn)單,主要是平時(shí)做測(cè)試方便。
4. 主要程序
創(chuàng)建一個(gè)main程序,開(kāi)始編寫(xiě)代碼。
創(chuàng)建flink環(huán)境,關(guān)聯(lián)kafka數(shù)據(jù)源。
Map<String, String> config = Configuration.initConfig("commons.xml"); Properties kafkaProps = new Properties(); kafkaProps.setProperty("zookeeper.connect", config.get("kafka-zookeeper")); kafkaProps.setProperty("bootstrap.servers", config.get("kafka-ipport")); kafkaProps.setProperty("group.id", config.get("kafka-groupid")); StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
EventTime 與 Watermark
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
設(shè)置屬性senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime),表示按照數(shù)據(jù)時(shí)間字段來(lái)處理,默認(rèn)是TimeCharacteristic.ProcessingTime
/** The time characteristic that is used if none other is set. */ private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
這個(gè)屬性必須設(shè)置,否則后面,可能窗口結(jié)束無(wú)法觸發(fā),導(dǎo)致結(jié)果無(wú)法輸出。取值有三種:
ProcessingTime:事件被處理的時(shí)間。也就是由flink集群機(jī)器的系統(tǒng)時(shí)間來(lái)決定。
EventTime:事件發(fā)生的時(shí)間。一般就是數(shù)據(jù)本身攜帶的時(shí)間。
IngestionTime:攝入時(shí)間,數(shù)據(jù)進(jìn)入flink流的時(shí)間,跟ProcessingTime還是有區(qū)別的;
指定好使用數(shù)據(jù)的實(shí)際時(shí)間來(lái)處理,接下來(lái)需要指定flink程序如何get到數(shù)據(jù)的時(shí)間字段,這里使用調(diào)用DataStream的assignTimestampsAndWatermarks方法,抽取時(shí)間和設(shè)置watermark。
senv.addSource( new FlinkKafkaConsumer010<>( config.get("kafka-topic"), new SimpleStringSchema(), kafkaProps ) ).map(x ->{ return JSON.parseObject(x, User.class); }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<User>(Time.milliseconds(1000)) { @Override public long extractTimestamp(User element) { return element.getTimestamp(); } })
前面給出的代碼中可以看出,由于發(fā)送到kafka的時(shí)候,將User對(duì)象轉(zhuǎn)換為json字符串了,這里使用的是fastjson,接收過(guò)來(lái)可以轉(zhuǎn)化為JsonObject來(lái)處理,我這里還是將其轉(zhuǎn)化為User對(duì)象JSON.parseObject(x, User.class),便于處理。
這里考慮到數(shù)據(jù)可能亂序,使用了可以處理亂序的抽象類(lèi)BoundedOutOfOrdernessTimestampExtractor,并且實(shí)現(xiàn)了唯一的一個(gè)沒(méi)有實(shí)現(xiàn)的方法extractTimestamp,亂序數(shù)據(jù),會(huì)導(dǎo)致數(shù)據(jù)延遲,在構(gòu)造方法中傳入了一個(gè)Time.milliseconds(1000),表明數(shù)據(jù)可以延遲一秒鐘。比如說(shuō),如果窗口長(zhǎng)度是10s,0~10s的數(shù)據(jù)會(huì)在11s的時(shí)候計(jì)算,此時(shí)watermark是10,才會(huì)觸發(fā)計(jì)算,也就是說(shuō)引入watermark處理亂序數(shù)據(jù),最多可以容忍0~t這個(gè)窗口的數(shù)據(jù),最晚在t+1時(shí)刻到來(lái)。
具體關(guān)于watermark的講解可以參考這篇文章
https://blog.csdn.net/qq_39657909/article/details/106081543
窗口統(tǒng)計(jì)
業(yè)務(wù)需求上,通常可能是一個(gè)小時(shí),或者過(guò)去15分鐘的數(shù)據(jù),5分鐘更新一次排名,這里為了演示效果,窗口長(zhǎng)度取10s,每次滑動(dòng)(slide)5s,即5秒鐘更新一次過(guò)去10s的排名數(shù)據(jù)。
.keyBy("username") .timeWindow(Time.seconds(10), Time.seconds(5)) .aggregate(new CountAgg(), new WindowResultFunction())
我們使用.keyBy("username")對(duì)用戶(hù)進(jìn)行分組,使用.timeWindow(Time size, Time slide)對(duì)每個(gè)用戶(hù)做滑動(dòng)窗口(10s窗口,5s滑動(dòng)一次)。然后我們使用 .aggregate(AggregateFunction af, WindowFunction wf) 做增量的聚合操作,它能使用AggregateFunction提前聚合掉數(shù)據(jù),減少 state 的存儲(chǔ)壓力。較之.apply(WindowFunction wf)會(huì)將窗口中的數(shù)據(jù)都存儲(chǔ)下來(lái),最后一起計(jì)算要高效地多。aggregate()方法的第一個(gè)參數(shù)用于
這里的CountAgg實(shí)現(xiàn)了AggregateFunction接口,功能是統(tǒng)計(jì)窗口中的條數(shù),即遇到一條數(shù)據(jù)就加一。
public class CountAgg implements AggregateFunction<User, Long, Long>{ @Override public Long createAccumulator() { return 0L; } @Override public Long add(User value, Long accumulator) { return accumulator + 1; } @Override public Long getResult(Long accumulator) { return accumulator; } @Override public Long merge(Long a, Long b) { return a + b; } }
.aggregate(AggregateFunction af, WindowFunction wf) 的第二個(gè)參數(shù)WindowFunction將每個(gè) key每個(gè)窗口聚合后的結(jié)果帶上其他信息進(jìn)行輸出。我們這里實(shí)現(xiàn)的WindowResultFunction將用戶(hù)名,窗口,訪(fǎng)問(wèn)量封裝成了UserViewCount進(jìn)行輸出。
private static class WindowResultFunction implements WindowFunction<Long, UserViewCount, Tuple, TimeWindow> { @Override public void apply(Tuple key, TimeWindow window, Iterable<Long> input, Collector<UserViewCount> out) throws Exception { Long count = input.iterator().next(); out.collect(new UserViewCount(((Tuple1<String>)key).f0, window.getEnd(), count)); } } @Data @NoArgsConstructor @AllArgsConstructor @ToString public static class UserViewCount { private String userName; private long windowEnd; private long viewCount; }
TopN計(jì)算最活躍用戶(hù)
為了統(tǒng)計(jì)每個(gè)窗口下活躍的用戶(hù),我們需要再次按窗口進(jìn)行分組,這里根據(jù)UserViewCount中的windowEnd進(jìn)行keyBy()操作。然后使用 ProcessFunction 實(shí)現(xiàn)一個(gè)自定義的 TopN 函數(shù) TopNHotItems 來(lái)計(jì)算點(diǎn)擊量排名前3名的用戶(hù),并將排名結(jié)果格式化成字符串,便于后續(xù)輸出。
.keyBy("windowEnd") .process(new TopNHotUsers(3)) .print();
ProcessFunction 是 Flink 提供的一個(gè) low-level API,用于實(shí)現(xiàn)更高級(jí)的功能。它主要提供了定時(shí)器 timer 的功能(支持EventTime或ProcessingTime)。本案例中我們將利用 timer 來(lái)判斷何時(shí)收齊了某個(gè) window 下所有用戶(hù)的訪(fǎng)問(wèn)數(shù)據(jù)。由于 Watermark 的進(jìn)度是全局的,在 processElement 方法中,每當(dāng)收到一條數(shù)據(jù)(ItemViewCount),我們就注冊(cè)一個(gè) windowEnd+1 的定時(shí)器(Flink 框架會(huì)自動(dòng)忽略同一時(shí)間的重復(fù)注冊(cè))。windowEnd+1 的定時(shí)器被觸發(fā)時(shí),意味著收到了windowEnd+1的 Watermark,即收齊了該windowEnd下的所有用戶(hù)窗口統(tǒng)計(jì)值。我們?cè)?onTimer() 中處理將收集的所有商品及點(diǎn)擊量進(jìn)行排序,選出 TopN,并將排名信息格式化成字符串后進(jìn)行輸出。
這里我們還使用了 ListState
private static class TopNHotUsers extends KeyedProcessFunction<Tuple, UserViewCount, String> { private int topSize; private ListState<UserViewCount> userViewCountListState; public TopNHotUsers(int topSize) { this.topSize = topSize; } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { super.onTimer(timestamp, ctx, out); List<UserViewCount> userViewCounts = new ArrayList<>(); for(UserViewCount userViewCount : userViewCountListState.get()) { userViewCounts.add(userViewCount); } userViewCountListState.clear(); userViewCounts.sort(new Comparator<UserViewCount>() { @Override public int compare(UserViewCount o1, UserViewCount o2) { return (int)(o2.viewCount - o1.viewCount); } }); // 將排名信息格式化成 String, 便于打印 StringBuilder result = new StringBuilder(); result.append("====================================\n"); result.append("時(shí)間: ").append(new Timestamp(timestamp-1)).append("\n"); for (int i = 0; i < topSize; i++) { UserViewCount currentItem = userViewCounts.get(i); // No1: 商品ID=12224 瀏覽量=2413 result.append("No").append(i).append(":") .append(" 用戶(hù)名=").append(currentItem.userName) .append(" 瀏覽量=").append(currentItem.viewCount) .append("\n"); } result.append("====================================\n\n"); Thread.sleep(1000); out.collect(result.toString()); } @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { super.open(parameters); ListStateDescriptor<UserViewCount> userViewCountListStateDescriptor = new ListStateDescriptor<>( "user-state", UserViewCount.class ); userViewCountListState = getRuntimeContext().getListState(userViewCountListStateDescriptor); } @Override public void processElement(UserViewCount value, Context ctx, Collector<String> out) throws Exception { userViewCountListState.add(value); ctx.timerService().registerEventTimeTimer(value.windowEnd + 1000); } }
結(jié)果輸出
可以看到,每隔5秒鐘更新輸出一次數(shù)據(jù)。
關(guān)于Flink怎么實(shí)時(shí)計(jì)算topN就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。