溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

PyFlink在聚美優(yōu)品的應(yīng)用實踐是怎樣的

發(fā)布時間:2022-01-04 15:18:32 來源:億速云 閱讀:130 作者:柒染 欄目:大數(shù)據(jù)

PyFlink在聚美優(yōu)品的應(yīng)用實踐是怎樣的,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。

下面將跟大家分享 PyFlink 在刷寶的應(yīng)用,包括:背景介紹、架構(gòu)演進、技術(shù)選型以及一個問題的解決思路分享。

1.背景介紹


業(yè)務(wù)場景

刷寶有許多重要的業(yè)務(wù)場景,其中之一是為用戶實時推薦短視頻。其中  推  薦的實時性,決定了用戶在視頻上的停留時長、觀看視頻時長、留存等指標,進而影響到廣告位的收益,比如廣告的單價等。
刷寶從 2019 年開始,業(yè)務(wù)飛速發(fā)展,截止到 2020 年 5 月份,用戶行為數(shù)據(jù)峰值每秒過百萬,每天有 200 億數(shù)據(jù)。這個業(yè)務(wù)量,對我們現(xiàn)有的技術(shù)架構(gòu)、數(shù)據(jù)計算的實時性提出了挑戰(zhàn)。


實時化挑戰(zhàn)


我們的數(shù)據(jù)流程  整個環(huán)節(jié)完成需要1小時左右時間,遠達不到實時的要求。如何更快速的根據(jù)用戶瀏覽習慣實時推薦相關(guān)視頻  會對用戶觀看視頻時長、停留時長、留存等有重大的影響,比如在現(xiàn)有基礎(chǔ)上提升10-20%。

我們更期望數(shù)據(jù)的計算實時化,也就是將原有技術(shù)架構(gòu)中的批量計算(hive)變成實時計算(Flink SQL),架構(gòu)圖如下。
          

2.架構(gòu)演進


架構(gòu)演進

PyFlink在聚美優(yōu)品的應(yīng)用實踐是怎樣的

     
  • 第一層:最開始是離線計算,完成一次計算需要30分鐘,還不包括后續(xù)的模型處理;

  • 第二層:考慮實時計算后,我們打算采取 Flink 架構(gòu)來處理,整體主件過程如圖;

  • 第三層:考慮到人力和時間等成本,還有技術(shù)人員技能匹配度,最終選擇第三層;

   
我們成員更多的是對 Python 和 SQL 熟悉,所以 PyFlink 更加適合我們。我們用   PyFlink    開發(fā)了    20    個業(yè)務(wù)作業(yè),目前每秒  過百萬,每天有    200    億,業(yè)務(wù)平穩(wěn)運行(P  yF  link    1.10)。

3.技術(shù)選型


面對實時化的業(yè)務(wù)和架構(gòu)升級需求,我們團隊本身沒有 Spark、Flink 等框架的背景積累,但是一個偶然的機會,我們觀看了金竹老師的直播,了解到了 PyFlink 是 Flink 的 Python API 和我團隊現(xiàn)有的開發(fā)人員語言技能比較吻合。所以就想利用 PyFlink 進行業(yè)務(wù)的實時化升級。

初識與困難

雖然 PyFlink 和團隊的語言技能比較 match,但是其中還是涉及到很多 Flink 的環(huán)境、文檔、算子等的使用問題,遇到了很多困難:

  • PyFlink 的知識文檔、示例、答疑等都非常少,除了官網(wǎng)和阿里云,基本無其他參考。

  • PyFlink 官方文檔缺少很多細節(jié),比如:給了方法不給參數(shù)格式。

  • PyFlink 的內(nèi)容不明確,官網(wǎng)上沒有明確具體寫出哪些 PyFlink 沒有,哪些有。沒法將 Flink 和 PyFlink 清晰的區(qū)分開。

  • PyFlink 本身等局限性,比如:left/rigint Join 產(chǎn)生 retraction 無法寫入 Kafka,要寫入需要改寫 Flink SQL 讓流改為 append 模式,或者修改 kafka-connector 源碼支持 retraction。


所以一時感覺利用 PyFlink 的學習時間也比較漫長。大家比較擔心短時間內(nèi)很難滿足業(yè)務(wù)開發(fā)。
 
機遇

在我和團隊擔心開發(fā)進度時候,我也一直關(guān)注 Flink 社區(qū)的動態(tài),恰巧發(fā)現(xiàn) Flink 社區(qū)在進行 “PyFlink 扶持計劃”,所以我和團隊都眼前一亮,填寫了 PyFlink 調(diào)查問卷。也和金竹老師進行了幾次郵件溝通。最終有幸參與了 PyFlink 社區(qū)扶持計劃。
 

4. OOM 報錯解決思路分享


其實了解下來 PyFlink 的開發(fā)是非常便捷的,在完成了第一個作業(yè)的開發(fā)之后,大家逐漸熟悉 PyFlink 的使用,3周左右就完成了 20 個業(yè)務(wù)邏輯的開發(fā),進入了測試階段。這個快速一方面是團隊成員不斷的熟悉 PyFlink,一方面是由社區(qū) PyFlink 團隊金竹/付典等老師的幫助和支持。這里,不一一為大家分享全部內(nèi)容,我這里列舉一個具體的例子。

■ 背景:  

從接觸到 Flink 開始,有個別 job,一直有 running beyond physical memory limits 問題。多次調(diào)整 tm 內(nèi)存,修改 tm 和 slos 的比例,都沒用,最終還是會掛。最后妥協(xié)的方案是,增加自動重啟次數(shù),定期重啟任務(wù)

■ 現(xiàn)象:  

Flink job 通常會穩(wěn)定運行5-6天,然后就報出這個錯誤。一直持續(xù)和反復。

■ 詳細信息:  

Closing TaskExecutor connection container_e36_1586139242205_122975_01_000011 because: Container [pid=45659,containerID=container_e36_1586139242205_122975_01_000011] is running beyond physical memory limits. Current usage: 4.0 GB of 4 GB physical memory used; 5.8 GB of 32 GB virtual memory used. Killing container.    Dump of the process-tree for container_e36_1586139242205_122975_01_000011 :    |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE    |- 45659 45657 45659 45659 (bash) 0 0 115814400 297 /bin/bash -c /usr/local/jdk//bin/java -Xms2764m -Xmx2764m -XX:MaxDirectMemorySize=1332m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/test.bin -Dlog.file=/data/emr/yarn/logs/application_1586139242205_122975/container_e36_1586139242205_122975_01_000011/taskmanager.log -Dlogback.configurationFile=file:./logback.xml -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> /data/emr/yarn/logs/application_1586139242205_122975/container_e36_1586139242205_122975_01_000011/taskmanager.out 2> /data/emr/yarn/logs/application_1586139242205_122975/container_e36_1586139242205_122975_01_000011/taskmanager.err     |- 45705 45659 45659 45659 (java) 13117928 609539 6161567744 1048471 /usr/local/jdk//bin/java -Xms2764m -Xmx2764m -XX:MaxDirectMemorySize=1332m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/test.bin -Dlog.file=/data/emr/yarn/logs/application_1586139242205_122975/container_e36_1586139242205_122975_01_000011/taskmanager.log -Dlogback.configurationFile=file:./logback.xml -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner --configDir .
   Container killed on request. Exit code is 143    Container exited with a non-zero exit code 143

我們的解決思路:

        1. 從內(nèi)容上看是 oom 問題,所以一開始調(diào)整了 tm 大小,直接到最大內(nèi)存,2調(diào)整 tm 和 slot 的比例,盡量做到 1v1.
        2. dump heap 的內(nèi)存,分析占用情況。
        3. 調(diào)整 backend state 的類型

結(jié)果:以上手段都失敗了,在持續(xù)一段時間后,依然一定報錯。

PyFlink 團隊處理思路:

1.分析當前作業(yè)的 state 情況,作業(yè)情況,作業(yè)環(huán)境參數(shù)情況  。通過 flink-conf 可以看 backend state 情況,通過 flinkdashboard 可以知道作業(yè)圖和環(huán)境參數(shù)。
 
2. 由于 1.10 中,rocksdb statebackend 占用的內(nèi)存默認為非 managed memory,通過在 PyFlink 作業(yè)中增加如下代碼,可以將其設(shè)置為 managed memory:env.get_state_backend()._j_rocks_db_state_backend.getMemoryConfiguration().setUseManagedMemory(True)
 
3. 為了分析 OOM 是否是由于 rocksdb statebackend 占用的內(nèi)存持續(xù)增長導致的,開啟了關(guān)于 rocksdb 的監(jiān)控,因為我們使用的是 rocksdb,這里需要在 flink-conf 中增加如下配置:

state.backend.rocksdb.metrics.block-cache-capacity: truestate.backend.rocksdb.metrics.block-cache-usage: true                            state.backend.rocksdb.metrics.num-running-compactions: truestate.backend.rocksdb.metrics.num-running-flushes: truestate.backend.rocksdb.metrics.size-all-mem-tables: true
 
然后通過自建的 metrics 系統(tǒng)來收集展示和分析,我們使用的 grafana。
 
4. 通過前面的步驟,觀察到 rocksdb 的內(nèi)存基本是穩(wěn)定的,內(nèi)存占用符合預期,懷疑是“rocksdb 超用了一點點,或者是 jvm overhead 不夠大”導致的。這兩種問題,都可以通過調(diào)整 jvm overhead 的相關(guān)參數(shù)來解決。于是在 flink-conf 中添加了配置:

taskmanager.memory.jvm-overhead.min: 1024m
taskmanager.memory.jvm-overhead.max: 2048m

用大佬的原話:rocksdb 超用了一點點,或者是 jvm overhead 不夠大,這兩種情況調(diào)大 jvm overhead 應(yīng)該都能解決。
 
5. 調(diào)整 flink.size 的大小,讓 flink 自動計算出 process.size,這部分在 flink-conf:
               
taskmanager.memory.flink.size: 1024m
 
完成所有調(diào)整后,經(jīng)歷了14天的等待,job 運行正常,這里充分說明了問題被解決了。同時開始觀察 rocksdb 的 metrics 情況,發(fā)現(xiàn) native 內(nèi)存會超用一些,但是 rocksdb 整體保持穩(wěn)定的。目前能判斷出某個地方用到的 native 內(nèi)存比 flink 預留的多,大概率是用戶代碼或者第三方依賴,所以加大下 jvm-overhead 大數(shù)值,能解決問題。
 
6. 最終需要修改的參數(shù)有:

1) 在 pyflink 作業(yè)中增加如下代碼:
   
env.get_state_backend()._j_rocks_db_state_backend.getMemoryConfiguration().setUseManagedMemory(True)
          
2) flink-conf 修改或增加:

taskmanager.memory.jvm-overhead.min: 1024mtaskmanager.memory.jvm-overhead.max: 2048mtaskmanager.memory.process.size: 6144m

其實針對這個業(yè)務(wù)升級,老板為了不影響最終的業(yè)務(wù)上線,起初我們準備了2套方案同時進行:

  • 基于某個云平臺進行平臺搭建和開發(fā);

  • 基于開源 PyFlink 進行代碼開發(fā);


兩個方案同時進行,最終我們團隊基于 PyFlink 開發(fā)快速的完成了業(yè)務(wù)開發(fā)和測試。最終達到了我前面所說的每秒百萬/每天200億的穩(wěn)定業(yè)務(wù)支撐。
重點,重點,重點,參與這個業(yè)務(wù)升級的開發(fā)只有2個人。
 

5.總結(jié)和展望


通過 PyFlink 的學習,刷寶大數(shù)據(jù)團隊,在短時間能有了實時數(shù)據(jù)開發(fā)的能力。目前穩(wěn)定運行了 20+PyFlink 任務(wù),我們對接了多個需求部門,如推薦部門、運營、廣告等;在多種場景下,模型畫像計算、AB 測試系統(tǒng)、廣告推薦、用戶召回系統(tǒng)等,使用了 PyFlink。為我們的業(yè)務(wù)提供了堅實穩(wěn)定的實時數(shù)據(jù)。此外,我們將搭建 Flink on Zeppelin 這樣的實時計算平臺,擴大 Flink 開發(fā)用戶群體,進一步簡化 Flink 開發(fā)成本。Flink 1.11 版本也準備上線,Python UDF 功能會有進一步的優(yōu)化,Pandas 模塊也會被引入。

看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識有進一步的了解或閱讀更多相關(guān)文章,請關(guān)注億速云行業(yè)資訊頻道,感謝您對億速云的支持。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI