溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》
  • 首頁 > 
  • 教程 > 
  • 數(shù)據(jù)庫 > 
  • 通過Flink實(shí)現(xiàn)個(gè)推海量消息數(shù)據(jù)的實(shí)時(shí)統(tǒng)計(jì)

通過Flink實(shí)現(xiàn)個(gè)推海量消息數(shù)據(jù)的實(shí)時(shí)統(tǒng)計(jì)

發(fā)布時(shí)間:2020-07-05 22:15:41 來源:網(wǎng)絡(luò) 閱讀:443 作者:個(gè)推 欄目:數(shù)據(jù)庫

背景

消息報(bào)表主要用于統(tǒng)計(jì)消息任務(wù)的下發(fā)情況。比如,單條推送消息下發(fā)APP用戶總量有多少,成功推送到手機(jī)的數(shù)量有多少,又有多少APP用戶點(diǎn)擊了彈窗通知并打開APP等。通過消息報(bào)表,我們可以很直觀地看到消息推送的流轉(zhuǎn)情況、消息下發(fā)到達(dá)成功率、用戶對消息的點(diǎn)擊情況等。

個(gè)推在提供消息推送服務(wù)時(shí),為了更好地了解每天的推送情況,會(huì)從不同的維度進(jìn)行數(shù)據(jù)統(tǒng)計(jì),生成消息報(bào)表。個(gè)推每天下發(fā)的消息推送數(shù)巨大,可以達(dá)到數(shù)百億級(jí)別,原本我們采用的離線統(tǒng)計(jì)系統(tǒng)已不能滿足業(yè)務(wù)需求。隨著業(yè)務(wù)能力的不斷提升,我們選擇了Flink作為數(shù)據(jù)處理引擎,以滿足對海量消息推送數(shù)據(jù)的實(shí)時(shí)統(tǒng)計(jì)。

本文將主要闡述選擇Flink的原因、Flink的重要特性以及優(yōu)化后的實(shí)時(shí)計(jì)算方法。

離線計(jì)算平臺(tái)架構(gòu)

在消息報(bào)表系統(tǒng)的初期,我們采用的是離線計(jì)算的方式,主要采用spark作為計(jì)算引擎,原始數(shù)據(jù)存放在HDFS中,聚合數(shù)據(jù)存放在Solr、Hbase和Mysql中:

通過Flink實(shí)現(xiàn)個(gè)推海量消息數(shù)據(jù)的實(shí)時(shí)統(tǒng)計(jì)

查詢的時(shí)候,先根據(jù)篩選條件,查詢的維度主要有三個(gè):

  1. appId
  2. 下發(fā)時(shí)間
  3. taskGroupName

根據(jù)不同維度可以查詢到taskId的列表,然后根據(jù)task查詢hbase獲取相應(yīng)的結(jié)果,獲取下發(fā)、展示和點(diǎn)擊相應(yīng)的指標(biāo)數(shù)據(jù)。在我們考慮將其改造為實(shí)時(shí)統(tǒng)計(jì)時(shí),會(huì)存在著一系列的難點(diǎn):

  1. 原始數(shù)據(jù)體量巨大,每天數(shù)據(jù)量達(dá)到幾百億規(guī)模,需要支持高吞吐量;
  2. 需要支持實(shí)時(shí)的查詢;
  3. 需要對多份數(shù)據(jù)進(jìn)行關(guān)聯(lián);
  4. 需要保證數(shù)據(jù)的完整性和數(shù)據(jù)的準(zhǔn)確性。

Why Flink

Flink是什么

Flink 是一個(gè)針對流數(shù)據(jù)和批數(shù)據(jù)的分布式處理引擎。它主要是由 Java 代碼實(shí)現(xiàn)。目前主要還是依靠開源社區(qū)的貢獻(xiàn)而發(fā)展。

對 Flink 而言,其所要處理的主要場景就是流數(shù)據(jù)。Flink 的前身是柏林理工大學(xué)一個(gè)研究性項(xiàng)目, 在 2014 被 Apache 孵化器所接受,然后迅速地成為了 ASF(Apache Software Foundation)的頂級(jí)項(xiàng)目之一。

方案對比

為了實(shí)現(xiàn)個(gè)推消息報(bào)表的實(shí)時(shí)統(tǒng)計(jì),我們之前考慮使用spark streaming作為我們的實(shí)時(shí)計(jì)算引擎,但是我們在考慮了spark streaming、storm和flink的一些差異點(diǎn)后,還是決定使用Flink作為計(jì)算引擎:
通過Flink實(shí)現(xiàn)個(gè)推海量消息數(shù)據(jù)的實(shí)時(shí)統(tǒng)計(jì)

針對上面的業(yè)務(wù)痛點(diǎn),F(xiàn)link能夠滿足以下需要:

  1. Flink以管道推送數(shù)據(jù)的方式,可以讓Flink實(shí)現(xiàn)高吞吐量。

  2. Flink是真正意義上的流式處理,延時(shí)更低,能夠滿足我們消息報(bào)表統(tǒng)計(jì)的實(shí)時(shí)性要求。

  3. Flink可以依靠強(qiáng)大的窗口功能,實(shí)現(xiàn)數(shù)據(jù)的增量聚合;同時(shí),可以在窗口內(nèi)進(jìn)行數(shù)據(jù)的join操作。

  4. 我們的消息報(bào)表涉及到金額結(jié)算,因此對于不允許存在誤差,F(xiàn)link依賴自身的exact once機(jī)制,保證了我們數(shù)據(jù)不會(huì)重復(fù)消費(fèi)和漏消費(fèi)。

Flink的重要特性

下面我們來具體說說Flink中一些重要的特性,以及實(shí)現(xiàn)它的原理:

1)低延時(shí)、高吞吐

Flink速度之所以這么快,主要是在于它的流處理模型。

Flink 采用 Dataflow 模型,和 Lambda 模式不同。Dataflow 是純粹的節(jié)點(diǎn)組成的一個(gè)圖,圖中的節(jié)點(diǎn)可以執(zhí)行批計(jì)算,也可以是流計(jì)算,也可以是機(jī)器學(xué)習(xí)算法。流數(shù)據(jù)在節(jié)點(diǎn)之間流動(dòng),被節(jié)點(diǎn)上的處理函數(shù)實(shí)時(shí) apply 處理,節(jié)點(diǎn)之間是用 netty 連接起來,兩個(gè) netty 之間 keepalive,網(wǎng)絡(luò) buffer 是自然反壓的關(guān)鍵。

經(jīng)過邏輯優(yōu)化和物理優(yōu)化,Dataflow 的邏輯關(guān)系和運(yùn)行時(shí)的物理拓?fù)湎嗖畈淮?。這是純粹的流式設(shè)計(jì),時(shí)延和吞吐理論上是最優(yōu)的。

簡單來說,當(dāng)一條數(shù)據(jù)被處理完成后,序列化到緩存中,然后立刻通過網(wǎng)絡(luò)傳輸?shù)较乱粋€(gè)節(jié)點(diǎn),由下一個(gè)節(jié)點(diǎn)繼續(xù)處理。

2)Checkpoint

Flink是通過分布式快照來實(shí)現(xiàn)checkpoint,能夠支持Exactly-Once語義。

分布式快照是基于Chandy和Lamport在1985年設(shè)計(jì)的一種算法,用于生成分布式系統(tǒng)當(dāng)前狀態(tài)的一致性快照,不會(huì)丟失信息且不會(huì)記錄重復(fù)項(xiàng)。

Flink使用的是Chandy Lamport算法的一個(gè)變種,定期生成正在運(yùn)行的流拓?fù)涞臓顟B(tài)快照,并將這些快照存儲(chǔ)到持久存儲(chǔ)中(例如:存儲(chǔ)到HDFS或內(nèi)存中文件系統(tǒng))。檢查點(diǎn)的存儲(chǔ)頻率是可配置的。
通過Flink實(shí)現(xiàn)個(gè)推海量消息數(shù)據(jù)的實(shí)時(shí)統(tǒng)計(jì)

3)backpressure

back pressure出現(xiàn)的原因是為了應(yīng)對短期數(shù)據(jù)尖峰。

舊版本Spark Streaming的back pressure通過限制最大消費(fèi)速度實(shí)現(xiàn),對于基于Receiver 形式,我們可以通過配置spark.streaming. receiver.maxRate參數(shù)來限制每個(gè) receiver 每秒最大可以接收的記錄的數(shù)據(jù)。

對于 Direct Approach 的數(shù)據(jù)接收,我們可以通過配置spark.streaming. kafka.maxRatePerPartition 參數(shù)來限制每次作業(yè)中每個(gè) Kafka 分區(qū)最多讀取的記錄條數(shù)。

但這樣是非常不方便的,在實(shí)際上線前,還需要對集群進(jìn)行壓測,來決定參數(shù)的大小。

Flink運(yùn)行時(shí)的構(gòu)造部件是operators以及streams。每一個(gè)operator消費(fèi)一個(gè)中間/過渡狀態(tài)的流,對它們進(jìn)行轉(zhuǎn)換,然后生產(chǎn)一個(gè)新的流。

描述這種機(jī)制最好的類比是:Flink使用有效的分布式阻塞隊(duì)列來作為有界的緩沖區(qū)。如同Java里通用的阻塞隊(duì)列跟處理線程進(jìn)行連接一樣,一旦隊(duì)列達(dá)到容量上限,一個(gè)相對較慢的接受者將拖慢發(fā)送者。

消息報(bào)表的實(shí)時(shí)計(jì)算

優(yōu)化之后,架構(gòu)升級(jí)成如下:

通過Flink實(shí)現(xiàn)個(gè)推海量消息數(shù)據(jù)的實(shí)時(shí)統(tǒng)計(jì)

可以看出,我們做了以下幾點(diǎn)優(yōu)化:

  1. Flink替換了之前的spark,進(jìn)行消息報(bào)表的實(shí)時(shí)計(jì)算;
  2. ES替換了之前的Solr。

對于Flink進(jìn)行實(shí)時(shí)計(jì)算,我們的關(guān)注點(diǎn)主要有以下4個(gè)方面:

  1. ExactlyOnce保證了數(shù)據(jù)只會(huì)被消費(fèi)一次
  2. 狀態(tài)管理的能力
  3. 強(qiáng)大的時(shí)間窗口
  4. 流批一體

為了實(shí)現(xiàn)我們實(shí)時(shí)統(tǒng)計(jì)報(bào)表的需求,主要依靠Flink的增量聚合功能。

首先,我們設(shè)置了Event Time作為時(shí)間窗口的類型,保證了只會(huì)計(jì)算當(dāng)天的數(shù)據(jù);同時(shí),我們每隔一分鐘增量統(tǒng)計(jì)當(dāng)日的消息報(bào)表,因此分配1分鐘的時(shí)間窗口。

然后我們使用.aggregate (AggregateFunction af, WindowFunction wf) 做增量的聚合操作,它能使用AggregateFunction提前聚合掉數(shù)據(jù),減少 state 的存儲(chǔ)壓力。之后,我們將增量聚合后的數(shù)據(jù)寫入到ES和Hbase中。

流程如下所示:

通過Flink實(shí)現(xiàn)個(gè)推海量消息數(shù)據(jù)的實(shí)時(shí)統(tǒng)計(jì)

同時(shí),在查詢的時(shí)候,我們通過taskID、日期等維度進(jìn)行查詢,先從ES中獲取taskID的集合,之后通過taskID查詢hbase,得出統(tǒng)計(jì)結(jié)果。

總結(jié)

通過使用Flink,我們實(shí)現(xiàn)了對消息推送數(shù)據(jù)的實(shí)時(shí)統(tǒng)計(jì),能夠?qū)崟r(shí)查看消息下發(fā)、展示、點(diǎn)擊等數(shù)據(jù)指標(biāo),同時(shí),借助FLink強(qiáng)大的狀態(tài)管理功能,服務(wù)的穩(wěn)定性也得到了一定的保障。未來,個(gè)推也將持續(xù)優(yōu)化消息推送服務(wù),并將Flink引入到其他的業(yè)務(wù)線中,以滿足一些實(shí)時(shí)性要求高的業(yè)務(wù)場景需求。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI