您好,登錄后才能下訂單哦!
導(dǎo)讀:當(dāng)今生活節(jié)奏日益加快,企業(yè)面對(duì)不斷增加的海量信息,其信息篩選和處理效率低下的困擾與日俱增。由于用戶營(yíng)銷(xiāo)不夠細(xì)化,企業(yè) App 中許多不合時(shí)宜或不合偏好的消息推送很大程度上影響了用戶體驗(yàn),甚至引發(fā)了用戶流失。在此背景下,友信金服公司推行全域的數(shù)據(jù)體系戰(zhàn)略,通過(guò)打通和整合集團(tuán)各個(gè)業(yè)務(wù)線數(shù)據(jù),利用大數(shù)據(jù)、人工智能等技術(shù)構(gòu)建統(tǒng)一的數(shù)據(jù)資產(chǎn),如 ID-Mapping、用戶標(biāo)簽等。友信金服用戶畫(huà)像項(xiàng)目正是以此為背景成立,旨在實(shí)現(xiàn)“數(shù)據(jù)驅(qū)動(dòng)業(yè)務(wù)與運(yùn)營(yíng)”的集團(tuán)戰(zhàn)略。目前該系統(tǒng)支持日處理數(shù)據(jù)量超 10 億,接入上百種合規(guī)數(shù)據(jù)源。
傳統(tǒng)基于 Hadoop 生態(tài)的離線數(shù)據(jù)存儲(chǔ)計(jì)算方案已在業(yè)界大規(guī)模應(yīng)用,但受制于離線計(jì)算的高時(shí)延性,越來(lái)越多的數(shù)據(jù)應(yīng)用場(chǎng)景已從離線轉(zhuǎn)為實(shí)時(shí)。這里引用一張表格對(duì)目前主流的實(shí)時(shí)計(jì)算框架做個(gè)對(duì)比。
Apache Storm 的容錯(cuò)機(jī)制需要對(duì)每條數(shù)據(jù)進(jìn)行應(yīng)答(ACK),因此其吞吐量備受影響,在數(shù)據(jù)大吞吐量的場(chǎng)景下會(huì)有問(wèn)題,因此不適用此項(xiàng)目的需求。
Apache Spark 總體生態(tài)更為完善,且在機(jī)器學(xué)習(xí)的集成和應(yīng)用性暫時(shí)領(lǐng)先,但 Spark 底層還是采用微批(Micro Batching)處理的形式。
Apache Flink 在流式計(jì)算上有明顯優(yōu)勢(shì):首先其流式計(jì)算屬于真正意義上的單條處理,即每一條數(shù)據(jù)都會(huì)觸發(fā)計(jì)算。在這一點(diǎn)上明顯與 Spark 的微批流式處理方式不同。其次,F(xiàn)link 的容錯(cuò)機(jī)制較為輕量,對(duì)吞吐量影響較小,使得 Flink 可以達(dá)到很高的吞吐量。最后 Flink 還擁有易用性高,部署簡(jiǎn)單等優(yōu)勢(shì)。相比之下我們最終決定采用基于 Flink 的架構(gòu)方案。
用戶畫(huà)像系統(tǒng)目前為集團(tuán)線上業(yè)務(wù)提供用戶實(shí)時(shí)標(biāo)簽數(shù)據(jù)服務(wù)。為此我們的服務(wù)需要打通多種數(shù)據(jù)源,對(duì)海量的數(shù)字信息進(jìn)行實(shí)時(shí)不間斷的數(shù)據(jù)清洗、聚類(lèi)、分析,從而將它們抽象成標(biāo)簽,并最終為應(yīng)用方提供高質(zhì)量的標(biāo)簽服務(wù)。在此背景下,我們?cè)O(shè)計(jì)用戶畫(huà)像系統(tǒng)的整體架構(gòu)如下圖所示:
整體架構(gòu)分為五層:
在整體架構(gòu)設(shè)計(jì)方案設(shè)計(jì)完成之后,我們針對(duì)數(shù)據(jù)也設(shè)計(jì)了詳盡的處理方案。在數(shù)據(jù)處理階段,鑒于 Kafka 高吞吐量、高穩(wěn)定性的特點(diǎn),我們的用戶畫(huà)像系統(tǒng)統(tǒng)一采用 Kafka 作為分布式發(fā)布訂閱消息系統(tǒng)。數(shù)據(jù)清洗階段利用 Flink 來(lái)實(shí)現(xiàn)用戶唯一性識(shí)別、行為數(shù)據(jù)的清洗等,去除冗余數(shù)據(jù)。這一過(guò)程支持交互計(jì)算和多種復(fù)雜算法,并支持?jǐn)?shù)據(jù)實(shí)時(shí) / 離線計(jì)算。目前我們數(shù)據(jù)處理流程迭代了兩版,具體方案如下:
整體數(shù)據(jù)來(lái)源包含兩種:
根據(jù)不同業(yè)務(wù)的指標(biāo)需求我們直接從集團(tuán)數(shù)據(jù)倉(cāng)庫(kù)抽取數(shù)據(jù)并落入 Kafka,或者直接從業(yè)務(wù)端以 CDC(Capture Data Change)的方式寫(xiě)入 Kafka。在計(jì)算層,數(shù)據(jù)被導(dǎo)入到 Flink 中,通過(guò) DataStream 生成 ID-Mapping、用戶標(biāo)簽碎片等數(shù)據(jù),然后將生成數(shù)據(jù)存入 JanusGraph(JanusGraph 是以 HBase 作為后端存儲(chǔ)的圖數(shù)據(jù)庫(kù)介質(zhì))與 Kafka,并由 Flink 消費(fèi)落入 Kafka 的用戶標(biāo)簽碎片數(shù)據(jù),進(jìn)行聚合生成最新的用戶標(biāo)簽碎片(用戶標(biāo)簽碎片是由用戶畫(huà)像系統(tǒng)獲取來(lái)自多種渠道的碎片化數(shù)據(jù)塊處理后生成的)。
數(shù)據(jù)服務(wù)層處理流程
服務(wù)層將存儲(chǔ)層存儲(chǔ)的用戶標(biāo)簽碎片數(shù)據(jù),通過(guò) JanusGraph Spark On Yarn 模式,執(zhí)行 TinkerPop OLAP 計(jì)算生成全量用戶 Yids 列表文件。Yid 是用戶畫(huà)像系統(tǒng)中定義的集團(tuán)級(jí)用戶 ID 標(biāo)識(shí)。結(jié)合 Yids 列表文件,在 Flink 中批量讀取 HBase 聚合成完整用戶畫(huà)像數(shù)據(jù),生成 HDFS 文件,再通過(guò) Flink 批量操作新生成的數(shù)據(jù)生成用戶評(píng)分預(yù)測(cè)標(biāo)簽,將用戶評(píng)分預(yù)測(cè)標(biāo)簽落入 Phoenix,之后數(shù)據(jù)便可通過(guò)統(tǒng)一數(shù)據(jù)服務(wù)接口進(jìn)行獲取。下圖完整地展示了這一流程。
ID-Mapping 數(shù)據(jù)結(jié)構(gòu)
為了實(shí)現(xiàn)用戶標(biāo)簽的整合,用戶 ID 之間的強(qiáng)打通,我們將用戶 ID 標(biāo)識(shí)看成圖的頂點(diǎn)、ID pair 關(guān)系看作圖的邊,比如已經(jīng)識(shí)別瀏覽器 Cookie 的用戶使用手機(jī)號(hào)登陸了公司網(wǎng)站就形成了對(duì)應(yīng)關(guān)系。這樣所有用戶 ID 標(biāo)識(shí)就構(gòu)成了一張大圖,其中每個(gè)小的連通子圖 / 連通分支就是一個(gè)用戶的全部標(biāo)識(shí) ID 信息。
ID-Mapping 數(shù)據(jù)由圖結(jié)構(gòu)模型構(gòu)建,圖節(jié)點(diǎn)包含 UserKey、Device、IdCard、Phone 等類(lèi)型,分別表示用戶的業(yè)務(wù) ID、設(shè)備 ID、身份證以及電話等信息。節(jié)點(diǎn)之間邊的生成規(guī)則是通過(guò)解析數(shù)據(jù)流中包含的節(jié)點(diǎn)信息,以一定的優(yōu)先級(jí)順序進(jìn)行節(jié)點(diǎn)之間的連接,從而生成節(jié)點(diǎn)之間的邊。比如,識(shí)別了用戶手機(jī)系統(tǒng)的 Android_ID,之后用戶使用郵箱登陸了公司 App,在系統(tǒng)中找到了業(yè)務(wù)線 UID 就形成了和關(guān)系的 ID pair,然后系統(tǒng)根據(jù)節(jié)點(diǎn)類(lèi)型進(jìn)行優(yōu)先級(jí)排序,生成 Android_ID、mail、UID 的關(guān)系圖。數(shù)據(jù)圖結(jié)構(gòu)模型如下圖所示:
Gephi
1.0 版本數(shù)據(jù)處理流程在系統(tǒng)初期較好地滿足了我們的日常需求,但隨著數(shù)據(jù)量的增長(zhǎng),該方案遇到了一些性能瓶頸:
Gephi
鑒于這些問(wèn)題,我們提出了 2.0 版本的解決方案。在 2.0 版本中,我們通過(guò)利用 HBase 列式存儲(chǔ)、修改圖數(shù)據(jù)結(jié)構(gòu)等優(yōu)化方案嘗試解決以上三個(gè)問(wèn)題。
如下圖所示,2.0 版本數(shù)據(jù)處理流程大部分承襲了 1.0 版本。新版本數(shù)據(jù)處理流程在以下幾個(gè)方面做了優(yōu)化:
2.0 版本數(shù)據(jù)處理流程
Gephi
目前,線上部署的用戶畫(huà)像系統(tǒng)中的數(shù)據(jù)絕大部分是來(lái)自于 Kafka 的實(shí)時(shí)數(shù)據(jù)。隨著數(shù)據(jù)量越來(lái)越多,系統(tǒng)的壓力也越來(lái)越大,以至于出現(xiàn)了 Flink 背壓與 Checkpoint 超時(shí)等問(wèn)題,導(dǎo)致 Flink 提交 Kafka 位移失敗,從而影響了數(shù)據(jù)一致性。這些線上出現(xiàn)的問(wèn)題讓我們開(kāi)始關(guān)注 Flink 的可靠性、穩(wěn)定性以及性能。針對(duì)這些問(wèn)題,我們進(jìn)行了詳細(xì)的分析并結(jié)合自身的業(yè)務(wù)特點(diǎn),探索并實(shí)踐出了一些相應(yīng)的解決方案。
下圖展示了 Flink 中 checkpointing 執(zhí)行流程圖:
Flink 中 checkpointing 執(zhí)行流程
通過(guò)以上流程分析,我們通過(guò)三種方式來(lái)提高 Checkpointing 性能。這些方案分別是:
CheckPoint 存儲(chǔ)方式有 MemoryStateBackend、FsStateBackend 和 RocksDBStateBackend。由官方文檔可知,不同 StateBackend 之間的性能以及安全性是有很大差異的。通常情況下,MemoryStateBackend 適合應(yīng)用于測(cè)試環(huán)境,線上環(huán)境則最好選擇 RocksDBStateBackend。
這有兩個(gè)原因:首先,RocksDBStateBackend 是外部存儲(chǔ),其他兩種 Checkpoint 存儲(chǔ)方式都是 JVM 堆存儲(chǔ)。受限于 JVM 堆內(nèi)存的大小,Checkpoint 狀態(tài)大小以及安全性可能會(huì)受到一定的制約;其次,RocksDBStateBackend 支持增量檢查點(diǎn)。增量檢查點(diǎn)機(jī)制(Incremental Checkpoints)僅僅記錄對(duì)先前完成的檢查點(diǎn)的更改,而不是生成完整的狀態(tài)。與完整檢查點(diǎn)相比,增量檢查點(diǎn)可以顯著縮短 checkpointing 時(shí)間,但代價(jià)是需要更長(zhǎng)的恢復(fù)時(shí)間。
Checkpointing 需要對(duì)每個(gè) Task 進(jìn)行數(shù)據(jù)狀態(tài)采集。單個(gè) Task 狀態(tài)數(shù)據(jù)越多則 Checkpointing 越慢。所以我們可以通過(guò)增加 Task 并行度,減少單個(gè) Task 狀態(tài)數(shù)據(jù)的數(shù)量來(lái)達(dá)到縮短 CheckPointing 時(shí)間的效果。
Flink 算子鏈(Operator Chains)越長(zhǎng),Task 也會(huì)越多,相應(yīng)的狀態(tài)數(shù)據(jù)也就更多,Checkpointing 也會(huì)越慢。通過(guò)縮短算子鏈長(zhǎng)度,可以減少 Task 數(shù)量,從而減少系統(tǒng)中的狀態(tài)數(shù)據(jù)總量,間接的達(dá)到優(yōu)化 Checkpointing 的目的。下面展示了 Flink 算子鏈的合并規(guī)則:
基于以上這些規(guī)則,我們?cè)诖a層面上合并了相關(guān)度較大的一些 Task,使得平均的操作算子鏈長(zhǎng)度至少縮短了 60%~70%。
在 Flink 運(yùn)行過(guò)程中,每一個(gè)操作算子都會(huì)消費(fèi)一個(gè)中間 / 過(guò)渡狀態(tài)的流,并對(duì)它們進(jìn)行轉(zhuǎn)換,然后生產(chǎn)一個(gè)新的流。這種機(jī)制可以類(lèi)比為:Flink 使用阻塞隊(duì)列作為有界的緩沖區(qū)。跟 Java 里阻塞隊(duì)列一樣,一旦隊(duì)列達(dá)到容量上限,處理速度較慢的消費(fèi)者會(huì)阻塞生產(chǎn)者向隊(duì)列發(fā)送新的消息或事件。下圖展示了 Flink 中兩個(gè)操作算子之間的數(shù)據(jù)傳輸以及如何感知到背壓的:
首先,Source 中的事件進(jìn)入 Flink 并被操作算子 1 處理且被序列化到 Buffer 中,然后操作算子 2 從這個(gè) Buffer 中讀出該事件。當(dāng)操作算子 2 處理能力不足的時(shí)候,操作算子 1 中的數(shù)據(jù)便無(wú)法放入 Buffer,從而形成背壓。背壓出現(xiàn)的原因可能有以下兩點(diǎn):
實(shí)踐中我們通過(guò)以下方式解決背壓?jiǎn)栴}。首先,縮短算子鏈會(huì)合理的合并算子,節(jié)省出資源。其次縮短算子鏈也會(huì)減少 Task(線程)之間的切換、消息的序列化 / 反序列化以及數(shù)據(jù)在緩沖區(qū)的交換次數(shù),進(jìn)而提高系統(tǒng)的整體吞吐量。最后,根據(jù)數(shù)據(jù)特性將不需要或者暫不需要的數(shù)據(jù)進(jìn)行過(guò)濾,然后根據(jù)業(yè)務(wù)需求將數(shù)據(jù)分別處理,比如有些數(shù)據(jù)源需要實(shí)時(shí)的處理,有些數(shù)據(jù)是可以延遲的,最后通過(guò)使用 keyBy 關(guān)鍵字,控制 Flink 時(shí)間窗口大小,在上游算子處理邏輯中盡量合并更多數(shù)據(jù)來(lái)達(dá)到降低下游算子的處理壓力。
經(jīng)過(guò)以上優(yōu)化,在每天億級(jí)數(shù)據(jù)量下,用戶畫(huà)像可以做到實(shí)時(shí)信息實(shí)時(shí)處理并無(wú)持續(xù)背壓,Checkpointing 平均時(shí)長(zhǎng)穩(wěn)定在 1 秒以內(nèi)。
目前用戶畫(huà)像部分?jǐn)?shù)據(jù)都是從 Hive 數(shù)據(jù)倉(cāng)庫(kù)拿到的,數(shù)據(jù)倉(cāng)庫(kù)本身是 T+1 模式,數(shù)據(jù)延時(shí)性較大,所以為了提高數(shù)據(jù)實(shí)時(shí)性,端到端的實(shí)時(shí)流處理很有必要。
端到端是指一端采集原始數(shù)據(jù),另一端以報(bào)表 / 標(biāo)簽 / 接口的方式對(duì)這些對(duì)數(shù)進(jìn)行呈現(xiàn)與應(yīng)用,連接兩端的是中間實(shí)時(shí)流。在后續(xù)的工作中,我們計(jì)劃將現(xiàn)有的非實(shí)時(shí)數(shù)據(jù)源全部切換到實(shí)時(shí)數(shù)據(jù)源,統(tǒng)一經(jīng)過(guò) Kafka 和 Flink 處理后再導(dǎo)入到 Phoenix/JanusGraph/HBase。強(qiáng)制所有數(shù)據(jù)源數(shù)據(jù)進(jìn)入 Kafka 的一個(gè)好處在于它能夠提高整體流程的穩(wěn)定性與可用性:首先 Kafka 作為下游系統(tǒng)的緩沖,可以避免下游系統(tǒng)的異常影響實(shí)時(shí)流的計(jì)算,起到“削峰填谷”的作用;其次,F(xiàn)link 自 1.4 版本開(kāi)始正式支持與 Kafka 的端到端精確一次處理語(yǔ)義,在一致性方面上更有保證。
原文鏈接
本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。