溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

日處理數(shù)據(jù)量超10億:友信金服基于Flink構(gòu)建實(shí)時(shí)用戶畫(huà)像系統(tǒng)的實(shí)踐

發(fā)布時(shí)間:2020-08-08 10:56:38 來(lái)源:ITPUB博客 閱讀:311 作者:芊寶寶最可愛(ài) 欄目:MySQL數(shù)據(jù)庫(kù)
導(dǎo)讀:當(dāng)今生活節(jié)奏日益加快,企業(yè)面對(duì)不斷增加的海量信息,其信息篩選和處理效率低下的困擾與日俱增。由于用戶營(yíng)銷(xiāo)不夠細(xì)化,企業(yè) App 中許多不合時(shí)宜或不合偏好的消息推送很大程度上影響了用戶體驗(yàn),甚至引發(fā)了用戶流失。在此背景下,友信金服公司推行全域的數(shù)據(jù)體系戰(zhàn)略,通過(guò)打通和整合集團(tuán)各個(gè)業(yè)務(wù)線數(shù)據(jù),利用大數(shù)據(jù)、人工智能等技術(shù)構(gòu)建統(tǒng)一的數(shù)據(jù)資產(chǎn),如 ID-Mapping、用戶標(biāo)簽等。友信金服用戶畫(huà)像項(xiàng)目正是以此為背景成立,旨在實(shí)現(xiàn)“數(shù)據(jù)驅(qū)動(dòng)業(yè)務(wù)與運(yùn)營(yíng)”的集團(tuán)戰(zhàn)略。目前該系統(tǒng)支持日處理數(shù)據(jù)量超 10 億,接入上百種合規(guī)數(shù)據(jù)源。

一、技術(shù)選型

傳統(tǒng)基于 Hadoop 生態(tài)的離線數(shù)據(jù)存儲(chǔ)計(jì)算方案已在業(yè)界大規(guī)模應(yīng)用,但受制于離線計(jì)算的高時(shí)延性,越來(lái)越多的數(shù)據(jù)應(yīng)用場(chǎng)景已從離線轉(zhuǎn)為實(shí)時(shí)。這里引用一張表格對(duì)目前主流的實(shí)時(shí)計(jì)算框架做個(gè)對(duì)比。

日處理數(shù)據(jù)量超10億:友信金服基于Flink構(gòu)建實(shí)時(shí)用戶畫(huà)像系統(tǒng)的實(shí)踐

Apache Storm 的容錯(cuò)機(jī)制需要對(duì)每條數(shù)據(jù)進(jìn)行應(yīng)答(ACK),因此其吞吐量備受影響,在數(shù)據(jù)大吞吐量的場(chǎng)景下會(huì)有問(wèn)題,因此不適用此項(xiàng)目的需求。

Apache Spark 總體生態(tài)更為完善,且在機(jī)器學(xué)習(xí)的集成和應(yīng)用性暫時(shí)領(lǐng)先,但 Spark 底層還是采用微批(Micro Batching)處理的形式。

Apache Flink 在流式計(jì)算上有明顯優(yōu)勢(shì):首先其流式計(jì)算屬于真正意義上的單條處理,即每一條數(shù)據(jù)都會(huì)觸發(fā)計(jì)算。在這一點(diǎn)上明顯與 Spark 的微批流式處理方式不同。其次,F(xiàn)link 的容錯(cuò)機(jī)制較為輕量,對(duì)吞吐量影響較小,使得 Flink 可以達(dá)到很高的吞吐量。最后 Flink 還擁有易用性高,部署簡(jiǎn)單等優(yōu)勢(shì)。相比之下我們最終決定采用基于 Flink 的架構(gòu)方案。

二、用戶畫(huà)像業(yè)務(wù)架構(gòu)

用戶畫(huà)像系統(tǒng)目前為集團(tuán)線上業(yè)務(wù)提供用戶實(shí)時(shí)標(biāo)簽數(shù)據(jù)服務(wù)。為此我們的服務(wù)需要打通多種數(shù)據(jù)源,對(duì)海量的數(shù)字信息進(jìn)行實(shí)時(shí)不間斷的數(shù)據(jù)清洗、聚類(lèi)、分析,從而將它們抽象成標(biāo)簽,并最終為應(yīng)用方提供高質(zhì)量的標(biāo)簽服務(wù)。在此背景下,我們?cè)O(shè)計(jì)用戶畫(huà)像系統(tǒng)的整體架構(gòu)如下圖所示:

日處理數(shù)據(jù)量超10億:友信金服基于Flink構(gòu)建實(shí)時(shí)用戶畫(huà)像系統(tǒng)的實(shí)踐

整體架構(gòu)分為五層:

  1. 接入層:接入原始數(shù)據(jù)并對(duì)其進(jìn)行處理,如 Kafka、Hive、文件等。
  2. 計(jì)算層:選用 Flink 作為實(shí)時(shí)計(jì)算框架,對(duì)實(shí)時(shí)數(shù)據(jù)進(jìn)行清洗,關(guān)聯(lián)等操作。
  3. 存儲(chǔ)層:對(duì)清洗完成的數(shù)據(jù)進(jìn)行數(shù)據(jù)存儲(chǔ),我們對(duì)此進(jìn)行了實(shí)時(shí)用戶畫(huà)像的模型分層與構(gòu)建,將不同應(yīng)用場(chǎng)景的數(shù)據(jù)分別存儲(chǔ)在如 Phoenix,HBase,HDFS,Kafka 等。
  4. 服務(wù)層:對(duì)外提供統(tǒng)一的數(shù)據(jù)查詢服務(wù),支持從底層明細(xì)數(shù)據(jù)到聚合層數(shù)據(jù)的多維計(jì)算服務(wù)。
  5. 應(yīng)用層:以統(tǒng)一查詢服務(wù)對(duì)各個(gè)業(yè)務(wù)線數(shù)據(jù)場(chǎng)景進(jìn)行支撐。目前業(yè)務(wù)主要包含用戶興趣分、用戶質(zhì)量分、用戶的事實(shí)信息等數(shù)據(jù)。

三、用戶畫(huà)像數(shù)據(jù)處理流程

在整體架構(gòu)設(shè)計(jì)方案設(shè)計(jì)完成之后,我們針對(duì)數(shù)據(jù)也設(shè)計(jì)了詳盡的處理方案。在數(shù)據(jù)處理階段,鑒于 Kafka 高吞吐量、高穩(wěn)定性的特點(diǎn),我們的用戶畫(huà)像系統(tǒng)統(tǒng)一采用 Kafka 作為分布式發(fā)布訂閱消息系統(tǒng)。數(shù)據(jù)清洗階段利用 Flink 來(lái)實(shí)現(xiàn)用戶唯一性識(shí)別、行為數(shù)據(jù)的清洗等,去除冗余數(shù)據(jù)。這一過(guò)程支持交互計(jì)算和多種復(fù)雜算法,并支持?jǐn)?shù)據(jù)實(shí)時(shí) / 離線計(jì)算。目前我們數(shù)據(jù)處理流程迭代了兩版,具體方案如下:

1.0 版數(shù)據(jù)處理流程

數(shù)據(jù)接入、計(jì)算、存儲(chǔ)三層處理流程

整體數(shù)據(jù)來(lái)源包含兩種:

  1. 歷史數(shù)據(jù):從外部數(shù)據(jù)源接入的海量歷史業(yè)務(wù)數(shù)據(jù)。接入后經(jīng)過(guò) ETL 處理,進(jìn)入用戶畫(huà)像底層數(shù)據(jù)表。
  2. 實(shí)時(shí)數(shù)據(jù):從外部數(shù)據(jù)源接入的實(shí)時(shí)業(yè)務(wù)數(shù)據(jù),如用戶行為埋點(diǎn)數(shù)據(jù),風(fēng)控?cái)?shù)據(jù)等。

根據(jù)不同業(yè)務(wù)的指標(biāo)需求我們直接從集團(tuán)數(shù)據(jù)倉(cāng)庫(kù)抽取數(shù)據(jù)并落入 Kafka,或者直接從業(yè)務(wù)端以 CDC(Capture Data Change)的方式寫(xiě)入 Kafka。在計(jì)算層,數(shù)據(jù)被導(dǎo)入到 Flink 中,通過(guò) DataStream 生成 ID-Mapping、用戶標(biāo)簽碎片等數(shù)據(jù),然后將生成數(shù)據(jù)存入 JanusGraph(JanusGraph 是以 HBase 作為后端存儲(chǔ)的圖數(shù)據(jù)庫(kù)介質(zhì))與 Kafka,并由 Flink 消費(fèi)落入 Kafka 的用戶標(biāo)簽碎片數(shù)據(jù),進(jìn)行聚合生成最新的用戶標(biāo)簽碎片(用戶標(biāo)簽碎片是由用戶畫(huà)像系統(tǒng)獲取來(lái)自多種渠道的碎片化數(shù)據(jù)塊處理后生成的)。

日處理數(shù)據(jù)量超10億:友信金服基于Flink構(gòu)建實(shí)時(shí)用戶畫(huà)像系統(tǒng)的實(shí)踐

數(shù)據(jù)服務(wù)層處理流程

服務(wù)層將存儲(chǔ)層存儲(chǔ)的用戶標(biāo)簽碎片數(shù)據(jù),通過(guò) JanusGraph Spark On Yarn 模式,執(zhí)行 TinkerPop OLAP 計(jì)算生成全量用戶 Yids 列表文件。Yid 是用戶畫(huà)像系統(tǒng)中定義的集團(tuán)級(jí)用戶 ID 標(biāo)識(shí)。結(jié)合 Yids 列表文件,在 Flink 中批量讀取 HBase 聚合成完整用戶畫(huà)像數(shù)據(jù),生成 HDFS 文件,再通過(guò) Flink 批量操作新生成的數(shù)據(jù)生成用戶評(píng)分預(yù)測(cè)標(biāo)簽,將用戶評(píng)分預(yù)測(cè)標(biāo)簽落入 Phoenix,之后數(shù)據(jù)便可通過(guò)統(tǒng)一數(shù)據(jù)服務(wù)接口進(jìn)行獲取。下圖完整地展示了這一流程。

日處理數(shù)據(jù)量超10億:友信金服基于Flink構(gòu)建實(shí)時(shí)用戶畫(huà)像系統(tǒng)的實(shí)踐

ID-Mapping 數(shù)據(jù)結(jié)構(gòu)

為了實(shí)現(xiàn)用戶標(biāo)簽的整合,用戶 ID 之間的強(qiáng)打通,我們將用戶 ID 標(biāo)識(shí)看成圖的頂點(diǎn)、ID pair 關(guān)系看作圖的邊,比如已經(jīng)識(shí)別瀏覽器 Cookie 的用戶使用手機(jī)號(hào)登陸了公司網(wǎng)站就形成了對(duì)應(yīng)關(guān)系。這樣所有用戶 ID 標(biāo)識(shí)就構(gòu)成了一張大圖,其中每個(gè)小的連通子圖 / 連通分支就是一個(gè)用戶的全部標(biāo)識(shí) ID 信息。

ID-Mapping 數(shù)據(jù)由圖結(jié)構(gòu)模型構(gòu)建,圖節(jié)點(diǎn)包含 UserKey、Device、IdCard、Phone 等類(lèi)型,分別表示用戶的業(yè)務(wù) ID、設(shè)備 ID、身份證以及電話等信息。節(jié)點(diǎn)之間邊的生成規(guī)則是通過(guò)解析數(shù)據(jù)流中包含的節(jié)點(diǎn)信息,以一定的優(yōu)先級(jí)順序進(jìn)行節(jié)點(diǎn)之間的連接,從而生成節(jié)點(diǎn)之間的邊。比如,識(shí)別了用戶手機(jī)系統(tǒng)的 Android_ID,之后用戶使用郵箱登陸了公司 App,在系統(tǒng)中找到了業(yè)務(wù)線 UID 就形成了和關(guān)系的 ID pair,然后系統(tǒng)根據(jù)節(jié)點(diǎn)類(lèi)型進(jìn)行優(yōu)先級(jí)排序,生成 Android_ID、mail、UID 的關(guān)系圖。數(shù)據(jù)圖結(jié)構(gòu)模型如下圖所示:

日處理數(shù)據(jù)量超10億:友信金服基于Flink構(gòu)建實(shí)時(shí)用戶畫(huà)像系統(tǒng)的實(shí)踐

Gephi

1.0 版本數(shù)據(jù)處理流程性能瓶頸

1.0 版本數(shù)據(jù)處理流程在系統(tǒng)初期較好地滿足了我們的日常需求,但隨著數(shù)據(jù)量的增長(zhǎng),該方案遇到了一些性能瓶頸:

  1. 首先,這版的數(shù)據(jù)處理使用了自研的 Java 程序來(lái)實(shí)現(xiàn)。隨著數(shù)據(jù)量上漲,自研 JAVA 程序由于數(shù)據(jù)量暴增導(dǎo)致 JVM 內(nèi)存大小不可控,同時(shí)它的維護(hù)成本很高,因此我們決定在新版本中將處理邏輯全部遷移至 Flink 中。
  2. 其次,在生成用戶標(biāo)簽過(guò)程中,ID-Mapping 出現(xiàn)很多大的連通子圖(如下圖所示)。這通常是因?yàn)橛脩舻男袨閿?shù)據(jù)比較隨機(jī)離散,導(dǎo)致部分節(jié)點(diǎn)間連接混亂。這不僅增加了數(shù)據(jù)的維護(hù)難度,也導(dǎo)致部分?jǐn)?shù)據(jù)被“污染”。另外這類(lèi)異常大的子圖會(huì)嚴(yán)重降低 JanusGraph 與 HBase 的查詢性能。

日處理數(shù)據(jù)量超10億:友信金服基于Flink構(gòu)建實(shí)時(shí)用戶畫(huà)像系統(tǒng)的實(shí)踐

Gephi

  1. 最后,該版方案中數(shù)據(jù)經(jīng) Protocol Buffer(PB)序列化之后存入 HBase,這會(huì)導(dǎo)致合并 / 更新用戶畫(huà)像標(biāo)簽碎片的次數(shù)過(guò)多,使得一個(gè)標(biāo)簽需要讀取多次 JanusGraph 與 HBase,這無(wú)疑會(huì)加重 HBase 讀取壓力。此外,由于數(shù)據(jù)經(jīng)過(guò)了 PB 序列化,使得其原始存儲(chǔ)格式不可讀,增加了排查問(wèn)題的難度。

鑒于這些問(wèn)題,我們提出了 2.0 版本的解決方案。在 2.0 版本中,我們通過(guò)利用 HBase 列式存儲(chǔ)、修改圖數(shù)據(jù)結(jié)構(gòu)等優(yōu)化方案嘗試解決以上三個(gè)問(wèn)題。

2.0 版數(shù)據(jù)處理流程

版本流程優(yōu)化點(diǎn)

如下圖所示,2.0 版本數(shù)據(jù)處理流程大部分承襲了 1.0 版本。新版本數(shù)據(jù)處理流程在以下幾個(gè)方面做了優(yōu)化:

日處理數(shù)據(jù)量超10億:友信金服基于Flink構(gòu)建實(shí)時(shí)用戶畫(huà)像系統(tǒng)的實(shí)踐

2.0 版本數(shù)據(jù)處理流程

  1. 歷史數(shù)據(jù)的離線補(bǔ)錄方式由 JAVA 服務(wù)變更為使用 Flink 實(shí)現(xiàn)。
  2. 優(yōu)化用戶畫(huà)像圖數(shù)據(jù)結(jié)構(gòu)模型,主要是對(duì)邊的連接方式進(jìn)行了修改。之前我們會(huì)判斷節(jié)點(diǎn)的類(lèi)型并根據(jù)預(yù)設(shè)的優(yōu)先級(jí)順序?qū)⒍鄠€(gè)節(jié)點(diǎn)進(jìn)行連接,新方案則采用以 UserKey 為中心的連接方式。做此修改后,之前的大的連通子圖(圖 6)優(yōu)化為下面的小的連通子圖(圖 8),同時(shí)解決了數(shù)據(jù)污染問(wèn)題,保證了數(shù)據(jù)準(zhǔn)確性。另外,1.0 版本中一條數(shù)據(jù)需要平均讀取十多次 HBase 的情況也得到極大緩解。采用新方案之后平均一條數(shù)據(jù)只需讀取三次 HBase,從而降低 HBase 六七倍的讀取壓力(此處優(yōu)化是數(shù)據(jù)計(jì)算層優(yōu)化)。

日處理數(shù)據(jù)量超10億:友信金服基于Flink構(gòu)建實(shí)時(shí)用戶畫(huà)像系統(tǒng)的實(shí)踐

Gephi

  1. 舊版本是用 Protocol Buffer 作為用戶畫(huà)像數(shù)據(jù)的存儲(chǔ)對(duì)象,生成用戶畫(huà)像數(shù)據(jù)后作為一個(gè)列整體存入 HBase。新版本使用 Map 存儲(chǔ)用戶畫(huà)像標(biāo)簽數(shù)據(jù),Map 的每對(duì) KV 都是單獨(dú)的標(biāo)簽,KV 在存入 HBase 后也是單獨(dú)的列。新版本存儲(chǔ)模式利用 HBase 做列的擴(kuò)展與合并,直接生成完整用戶畫(huà)像數(shù)據(jù),去掉 Flink 合并 / 更新用戶畫(huà)像標(biāo)簽過(guò)程,優(yōu)化數(shù)據(jù)加工流程。使用此方案后,存入 HBase 的標(biāo)簽數(shù)據(jù)具備了即席查詢功能。數(shù)據(jù)具備即席查詢是指在 HBase 中可用特定條件直接查看指定標(biāo)簽數(shù)據(jù)詳情的功能,它是數(shù)據(jù)治理可以實(shí)現(xiàn)校驗(yàn)數(shù)據(jù)質(zhì)量、數(shù)據(jù)生命周期、數(shù)據(jù)安全等功能的基礎(chǔ)條件。
  2. 在數(shù)據(jù)服務(wù)層,我們利用 Flink 批量讀取 HBase 的 Hive 外部表生成用戶質(zhì)量分等數(shù)據(jù),之后將其存入 Phoenix。相比于舊方案中 Spark 全量讀 HBase 導(dǎo)致其讀壓力過(guò)大,從而會(huì)出現(xiàn)集群節(jié)點(diǎn)宕機(jī)的問(wèn)題,新方案能夠有效地降低 HBase 的讀取壓力。經(jīng)過(guò)我們線上驗(yàn)證,新方案對(duì) HBase 的讀負(fù)載下降了數(shù)十倍(此處優(yōu)化與 2 優(yōu)化不同,屬于服務(wù)層優(yōu)化)。

四、問(wèn)題

目前,線上部署的用戶畫(huà)像系統(tǒng)中的數(shù)據(jù)絕大部分是來(lái)自于 Kafka 的實(shí)時(shí)數(shù)據(jù)。隨著數(shù)據(jù)量越來(lái)越多,系統(tǒng)的壓力也越來(lái)越大,以至于出現(xiàn)了 Flink 背壓與 Checkpoint 超時(shí)等問(wèn)題,導(dǎo)致 Flink 提交 Kafka 位移失敗,從而影響了數(shù)據(jù)一致性。這些線上出現(xiàn)的問(wèn)題讓我們開(kāi)始關(guān)注 Flink 的可靠性、穩(wěn)定性以及性能。針對(duì)這些問(wèn)題,我們進(jìn)行了詳細(xì)的分析并結(jié)合自身的業(yè)務(wù)特點(diǎn),探索并實(shí)踐出了一些相應(yīng)的解決方案。

CheckPointing 流程分析與性能優(yōu)化方案

CheckPointing 流程分析

下圖展示了 Flink 中 checkpointing 執(zhí)行流程圖:

日處理數(shù)據(jù)量超10億:友信金服基于Flink構(gòu)建實(shí)時(shí)用戶畫(huà)像系統(tǒng)的實(shí)踐

Flink 中 checkpointing 執(zhí)行流程

  1. Coordinator 向所有 Source 節(jié)點(diǎn)發(fā)出 Barrier。
  2. Task 從輸入中收到所有 Barrier 后,將自己的狀態(tài)寫(xiě)入持久化存儲(chǔ)中,并向自己的下游繼續(xù)傳遞 Barrier。
  3. 當(dāng) Task 完成狀態(tài)持久化之后將存儲(chǔ)后的狀態(tài)地址通知到 Coordinator。
  4. 當(dāng) Coordinator 匯總所有 Task 的狀態(tài),并將這些數(shù)據(jù)的存放路徑寫(xiě)入持久化存儲(chǔ)中,完成 CheckPointing。

性能優(yōu)化方案

通過(guò)以上流程分析,我們通過(guò)三種方式來(lái)提高 Checkpointing 性能。這些方案分別是:

  1. 選擇合適的 Checkpoint 存儲(chǔ)方式
  2. 合理增加算子(Task)并行度
  3. 縮短算子鏈(Operator Chains)長(zhǎng)度

選擇合適的 Checkpoint 存儲(chǔ)方式

CheckPoint 存儲(chǔ)方式有 MemoryStateBackend、FsStateBackend 和 RocksDBStateBackend。由官方文檔可知,不同 StateBackend 之間的性能以及安全性是有很大差異的。通常情況下,MemoryStateBackend 適合應(yīng)用于測(cè)試環(huán)境,線上環(huán)境則最好選擇 RocksDBStateBackend。

這有兩個(gè)原因:首先,RocksDBStateBackend 是外部存儲(chǔ),其他兩種 Checkpoint 存儲(chǔ)方式都是 JVM 堆存儲(chǔ)。受限于 JVM 堆內(nèi)存的大小,Checkpoint 狀態(tài)大小以及安全性可能會(huì)受到一定的制約;其次,RocksDBStateBackend 支持增量檢查點(diǎn)。增量檢查點(diǎn)機(jī)制(Incremental Checkpoints)僅僅記錄對(duì)先前完成的檢查點(diǎn)的更改,而不是生成完整的狀態(tài)。與完整檢查點(diǎn)相比,增量檢查點(diǎn)可以顯著縮短 checkpointing 時(shí)間,但代價(jià)是需要更長(zhǎng)的恢復(fù)時(shí)間。

合理增加算子(Task)并行度

Checkpointing 需要對(duì)每個(gè) Task 進(jìn)行數(shù)據(jù)狀態(tài)采集。單個(gè) Task 狀態(tài)數(shù)據(jù)越多則 Checkpointing 越慢。所以我們可以通過(guò)增加 Task 并行度,減少單個(gè) Task 狀態(tài)數(shù)據(jù)的數(shù)量來(lái)達(dá)到縮短 CheckPointing 時(shí)間的效果。

縮短算子鏈(Operator Chains)長(zhǎng)度

Flink 算子鏈(Operator Chains)越長(zhǎng),Task 也會(huì)越多,相應(yīng)的狀態(tài)數(shù)據(jù)也就更多,Checkpointing 也會(huì)越慢。通過(guò)縮短算子鏈長(zhǎng)度,可以減少 Task 數(shù)量,從而減少系統(tǒng)中的狀態(tài)數(shù)據(jù)總量,間接的達(dá)到優(yōu)化 Checkpointing 的目的。下面展示了 Flink 算子鏈的合并規(guī)則:

  1. 上下游的并行度一致
  2. 下游節(jié)點(diǎn)的入度為 1
  3. 上下游節(jié)點(diǎn)都在同一個(gè) Slot Group 中
  4. 下游節(jié)點(diǎn)的 Chain 策略為 ALWAYS
  5. 上游節(jié)點(diǎn)的 Chain 策略為 ALWAYS 或 HEAD
  6. 兩個(gè)節(jié)點(diǎn)間數(shù)據(jù)分區(qū)方式是 Forward
  7. 用戶沒(méi)有禁用 Chain

基于以上這些規(guī)則,我們?cè)诖a層面上合并了相關(guān)度較大的一些 Task,使得平均的操作算子鏈長(zhǎng)度至少縮短了 60%~70%。

Flink 背壓產(chǎn)生過(guò)程分析及解決方案

背壓產(chǎn)生過(guò)程分析

在 Flink 運(yùn)行過(guò)程中,每一個(gè)操作算子都會(huì)消費(fèi)一個(gè)中間 / 過(guò)渡狀態(tài)的流,并對(duì)它們進(jìn)行轉(zhuǎn)換,然后生產(chǎn)一個(gè)新的流。這種機(jī)制可以類(lèi)比為:Flink 使用阻塞隊(duì)列作為有界的緩沖區(qū)。跟 Java 里阻塞隊(duì)列一樣,一旦隊(duì)列達(dá)到容量上限,處理速度較慢的消費(fèi)者會(huì)阻塞生產(chǎn)者向隊(duì)列發(fā)送新的消息或事件。下圖展示了 Flink 中兩個(gè)操作算子之間的數(shù)據(jù)傳輸以及如何感知到背壓的:

日處理數(shù)據(jù)量超10億:友信金服基于Flink構(gòu)建實(shí)時(shí)用戶畫(huà)像系統(tǒng)的實(shí)踐

首先,Source 中的事件進(jìn)入 Flink 并被操作算子 1 處理且被序列化到 Buffer 中,然后操作算子 2 從這個(gè) Buffer 中讀出該事件。當(dāng)操作算子 2 處理能力不足的時(shí)候,操作算子 1 中的數(shù)據(jù)便無(wú)法放入 Buffer,從而形成背壓。背壓出現(xiàn)的原因可能有以下兩點(diǎn):

  1. 下游算子處理能力不足;
  2. 數(shù)據(jù)發(fā)生了傾斜。

背壓解決方案

實(shí)踐中我們通過(guò)以下方式解決背壓?jiǎn)栴}。首先,縮短算子鏈會(huì)合理的合并算子,節(jié)省出資源。其次縮短算子鏈也會(huì)減少 Task(線程)之間的切換、消息的序列化 / 反序列化以及數(shù)據(jù)在緩沖區(qū)的交換次數(shù),進(jìn)而提高系統(tǒng)的整體吞吐量。最后,根據(jù)數(shù)據(jù)特性將不需要或者暫不需要的數(shù)據(jù)進(jìn)行過(guò)濾,然后根據(jù)業(yè)務(wù)需求將數(shù)據(jù)分別處理,比如有些數(shù)據(jù)源需要實(shí)時(shí)的處理,有些數(shù)據(jù)是可以延遲的,最后通過(guò)使用 keyBy 關(guān)鍵字,控制 Flink 時(shí)間窗口大小,在上游算子處理邏輯中盡量合并更多數(shù)據(jù)來(lái)達(dá)到降低下游算子的處理壓力。

優(yōu)化結(jié)果

經(jīng)過(guò)以上優(yōu)化,在每天億級(jí)數(shù)據(jù)量下,用戶畫(huà)像可以做到實(shí)時(shí)信息實(shí)時(shí)處理并無(wú)持續(xù)背壓,Checkpointing 平均時(shí)長(zhǎng)穩(wěn)定在 1 秒以內(nèi)。

五、未來(lái)工作的思考和展望

端到端的實(shí)時(shí)流處理

目前用戶畫(huà)像部分?jǐn)?shù)據(jù)都是從 Hive 數(shù)據(jù)倉(cāng)庫(kù)拿到的,數(shù)據(jù)倉(cāng)庫(kù)本身是 T+1 模式,數(shù)據(jù)延時(shí)性較大,所以為了提高數(shù)據(jù)實(shí)時(shí)性,端到端的實(shí)時(shí)流處理很有必要。

端到端是指一端采集原始數(shù)據(jù),另一端以報(bào)表 / 標(biāo)簽 / 接口的方式對(duì)這些對(duì)數(shù)進(jìn)行呈現(xiàn)與應(yīng)用,連接兩端的是中間實(shí)時(shí)流。在后續(xù)的工作中,我們計(jì)劃將現(xiàn)有的非實(shí)時(shí)數(shù)據(jù)源全部切換到實(shí)時(shí)數(shù)據(jù)源,統(tǒng)一經(jīng)過(guò) Kafka 和 Flink 處理后再導(dǎo)入到 Phoenix/JanusGraph/HBase。強(qiáng)制所有數(shù)據(jù)源數(shù)據(jù)進(jìn)入 Kafka 的一個(gè)好處在于它能夠提高整體流程的穩(wěn)定性與可用性:首先 Kafka 作為下游系統(tǒng)的緩沖,可以避免下游系統(tǒng)的異常影響實(shí)時(shí)流的計(jì)算,起到“削峰填谷”的作用;其次,F(xiàn)link 自 1.4 版本開(kāi)始正式支持與 Kafka 的端到端精確一次處理語(yǔ)義,在一致性方面上更有保證。

日處理數(shù)據(jù)量超10億:友信金服基于Flink構(gòu)建實(shí)時(shí)用戶畫(huà)像系統(tǒng)的實(shí)踐

原文鏈接

本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI