您好,登錄后才能下訂單哦!
這篇文章主要介紹了怎么在Apache Flink中使用Python API,具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
Apache Flink 是流批統(tǒng)一的開源大數(shù)據(jù)計(jì)算引擎,在 Flink 1.9.0 版本開啟了新的 ML 接口和全新的Python API架構(gòu)。那么為什么 Flink 要增加對 Python 的支持,下文將進(jìn)行詳細(xì)分析。
最流行的開發(fā)語言
Python 本身是非常優(yōu)秀的開發(fā)語言,據(jù) RedMonk 數(shù)據(jù)統(tǒng)計(jì),除 Java 和 JavaScript 之外,受歡迎度排名第三。
RedMonk 是著名的以開發(fā)人員為中心的行業(yè)分析公司,其更詳細(xì)的分析信息,大家在拿到我的PPT之后,可以點(diǎn)擊鏈接進(jìn)行詳細(xì)查閱。好了,那么Python的火熱,與我們今天向大家分享的流批統(tǒng)一的大數(shù)據(jù)計(jì)算引擎,Apache Flink有什么關(guān)系呢?帶著這個(gè)問題,我們大家想想目前與大數(shù)據(jù)相關(guān)的著名的開源組件有哪些呢?比如說最早期的批處理框架Hadoop?流計(jì)算平臺(tái)Storm,最近異?;馃岬腟park?異或其他領(lǐng)域數(shù)倉的Hive,KV存儲(chǔ)的HBase?這些都是非常著名的開源項(xiàng)目,那么這些項(xiàng)目都無一例外的進(jìn)行了Python API的支持。
眾多開源項(xiàng)目支持
Python 的生態(tài)已相對完善,基于此,Apache Flink 在 1.9 版本中也投入了大量的精力,去推出了一個(gè)全新的 Pyflink。除大數(shù)據(jù)外,人工智能與Python也有十分密切的關(guān)系。
ML青睞的語言
從上圖統(tǒng)計(jì)數(shù)據(jù)可以發(fā)現(xiàn),Python API 本身已經(jīng)占機(jī)器學(xué)習(xí)崗位需求語言的 0.129%。相對于 R 語言,Python 語言似乎更受青睞。Python 作為解釋型語言,語法的設(shè)計(jì)哲學(xué)是”用一種方法并且只有一種方法來做一件事”。其簡潔和易用性使其成為了世界上最受歡迎的語言,在大數(shù)據(jù)計(jì)算領(lǐng)域都有著很好的生態(tài)建設(shè),同時(shí)Python在機(jī)器學(xué)習(xí) 在機(jī)器學(xué)習(xí)方面也有很好的前景,所以我們在近期發(fā)布的Apache Flink 1.9 以全新的架構(gòu)推出新的 Python API
Flink 是一款流批統(tǒng)一的計(jì)算引擎,社區(qū)非常重視和關(guān)注 Flink 用戶,除 Java 語言或者 Scala 語言,社區(qū)希望提供多種入口,多種途徑,讓更多的用戶更方便的使用 Flink,并收獲 Flink 在大數(shù)據(jù)算力上帶來的價(jià)值。因此 Flink 1.9 開始,F(xiàn)link 社區(qū)以一個(gè)全新的技術(shù)體系來推出 Python API,并且已經(jīng)支持了大部分常用的一些算子,比如如 JOIN,AGG,WINDOW 等。
在 Flink 1.9 中雖然 Python 可以使用 Java 的 User-defined Function,但是還缺乏 Python native 的 User-defined function 的定義,所以我們計(jì)劃在 Flink 1.10 中進(jìn)行支持 Python User-defined function 的支持。并技術(shù)增加對數(shù)據(jù)分析工具類庫 Pandas 的支持,在 Flink 1.11 增加對 DataStream API 和 ML API 的支持。
新的 Python API 架構(gòu)分為用戶 API 部分,PythonVM 和 Java VM 的通訊部分,和最終將作業(yè)提交到 Flink 集群進(jìn)行運(yùn)行的部分。那么 PythonVM 和 JavaVM 是怎樣通訊的呢?我們在Python 端會(huì)會(huì)有一個(gè) Python 的 Gateway 用于保持和 Java 通訊的鏈接,在 Java 部分有一個(gè) GateWayServer 用于接收 Python 部分的調(diào)用請求。
關(guān)于 Python API 的架構(gòu)部分,在 1.9 之前,F(xiàn)link 的 DataSet 和 DataStream 已經(jīng)有了對 Python API 的支持,但是擁有 DataSet API 和 DataStream API 兩套不同的 API。對于 Flink 這樣一個(gè)流批統(tǒng)一的流式計(jì)算引擎來講,統(tǒng)一的架構(gòu)至關(guān)重要。并且對于已有的 Python DataSet API 和 DataStream API 而言,采用了JPython 的技術(shù)體系架構(gòu),而 JPython 本身對目前 Python 的 3.X 系列無法很好的支持,所以 Flink 1.9 發(fā)布后,決定將原有的 Python API 體系架構(gòu)廢棄,以全新的技術(shù)架構(gòu)出現(xiàn)。這套全新的 Python API 基于 Table API 之上。
Table API 和 Python API 之間的通訊采用了一種簡單的辦法,利用 Python VM 和 Java VM 進(jìn)行通信。在 Python API 的書寫或者調(diào)用過程中,以某種方式來與 Java API 進(jìn)行通訊。操作 Python API 就像操作 Java 的 Table API一樣。新架構(gòu)中可以確保以下內(nèi)容:
不需要另外創(chuàng)建一套新的算子,可以輕松與 Java 的 Table API 的功能保持一致;
得益于現(xiàn)有的 Java Table API 優(yōu)化模型,Python 寫出來的API,可以利用 Java API 優(yōu)化模型進(jìn)行優(yōu)化,可以確保 Python 的 API 寫出來的 Job 也能夠具備極致性能。
如圖,當(dāng) Python 發(fā)起對Java的對象請求時(shí)候,在 Java 段創(chuàng)建對象并保存在一個(gè)存儲(chǔ)結(jié)構(gòu)中,并分配一個(gè) ID 給 Python 端,Python 端在拿到 Java 對象的 ID 后就可以對這個(gè)對象進(jìn)行操作,也就是說 Python 端可以操作任何 Java 端的對象,這也就是為什么新的架構(gòu)可以保證Python Table API 和 Java Table API功能一致,并且能過服用現(xiàn)有的優(yōu)化模型。
在新的架構(gòu)和通訊模型下,Python API 調(diào)用 Java API 只需要在持有 Java 對象的 ID,將調(diào)用方法的名字和參數(shù)傳遞給 Java VM,就能完成對 Java Table API 的調(diào)用,所以在這樣的架構(gòu)中開發(fā) Python Table API 與開發(fā) Java Table API 的方式完全一致,接下來我為大家詳細(xì)介紹如何開發(fā)一個(gè)簡單的 Python API 作業(yè)。
通常來講一個(gè) Python Table Job 一般會(huì)分成四個(gè)部分,首先要根據(jù)目前的現(xiàn)狀,要決定這個(gè)Job 是以批的方式運(yùn)行,還是流的方式運(yùn)行。當(dāng)然后續(xù)版本用戶可以不考慮,但當(dāng)前 1.9 版本還是需要考慮。
在決定第一步以怎樣的方式執(zhí)行 Job 后,我們需要了解數(shù)據(jù)從哪里來,如何定義 Source、結(jié)構(gòu)數(shù)據(jù)類型等信息。然后需要寫計(jì)算邏輯,然后就是對數(shù)據(jù)進(jìn)行計(jì)算操作,但最終計(jì)算的結(jié)果需要持久化到某個(gè)系統(tǒng)。最后定義 Sink,與 Source 類似,我們需要定義 Sink Schema,以及每一個(gè)字段類型。
下面將詳細(xì)分享如何用 Python API 寫每一步?首先,我們創(chuàng)建一個(gè)執(zhí)行環(huán)境,對于執(zhí)行環(huán)境本身來講,首先需要一個(gè) ExecutionEnvironment,根本上我們需要一個(gè) TableEnvironment。那么在 TableEnvironment 中,有一個(gè)參數(shù) Table Config,Table Config 中會(huì)有一些在執(zhí)行過程中的配置參數(shù),可以傳遞到 RunTime 層。除此之外,還提供了一些個(gè)性化的配置項(xiàng),可以在實(shí)際業(yè)務(wù)開發(fā)中進(jìn)行使用。
在拿到 Environment 后,需要對數(shù)據(jù)源表進(jìn)行定義,以 CSV 格式文件為例,用"逗號(hào)"分隔,用 Field 來表明這個(gè)文件中有哪些字段。那么會(huì)看到,目前里面用逗號(hào)分隔,并且只有一個(gè)字段叫 word,類型是 String。
在定義并描述完數(shù)據(jù)源數(shù)據(jù)結(jié)構(gòu)轉(zhuǎn)換成 Table 數(shù)據(jù)結(jié)構(gòu)后,也就是說轉(zhuǎn)換到 Table API 層面之后是怎樣的數(shù)據(jù)結(jié)構(gòu)和數(shù)據(jù)類型?下面將通過 with_schema 添加字段及字段類型。這里只有一個(gè)字段,數(shù)據(jù)類型也是 String,最終注冊成一個(gè)表,注冊到 catlog 中,就可以供后面的查詢計(jì)算使用了。
創(chuàng)建結(jié)果表,當(dāng)計(jì)算完成后需要將這些結(jié)果存儲(chǔ)到持久化系統(tǒng)中,以 WordCount 為例,首先存儲(chǔ)表會(huì)有一個(gè) word 以及它的計(jì)數(shù)兩個(gè)字段,一個(gè)是 String 類型的 word,另一個(gè)是 Bigint 的計(jì)數(shù),然后把它注冊成 Sink。
編寫注冊完 Table Sink 后,再來看如何編寫邏輯。其實(shí)用 Python API 寫 WordCount 和 Table API 一樣非常簡單。因?yàn)橄鄬τ?DataSream 而言 Python API 寫一個(gè) WordCount 只需要一行。比如 group by,先掃描Source表,然后 group by 一個(gè) Word,再進(jìn)行 Select word 并加上聚合統(tǒng)計(jì)Count ,最終將最數(shù)據(jù)結(jié)果插入到結(jié)果表里面中。
那么WordCount 怎樣才能真正的運(yùn)行起來?首先需要搭建開發(fā)環(huán)境,不同的機(jī)器上可能安裝的軟件版本不一樣,這里列出來了一些版本的需求和要求,其中括號(hào)中是示例機(jī)器上的版本。
第二步,構(gòu)建一個(gè) Java 的二進(jìn)制發(fā)布包,以從源代碼進(jìn)行構(gòu)建,那么這一頁面就是從原代碼獲取我們的主干代碼,并且拉取 1.9 的分支。當(dāng)然大家可以用 Mater,但是 Master 不夠穩(wěn)定,還是建議大家在自己學(xué)習(xí)的過程中,最好是用 1.9 的分支去做。接下來進(jìn)行實(shí)戰(zhàn)演練環(huán)節(jié),首先驗(yàn)證 PPT 的正確性。首先編譯代碼,示例如下:
//下載源代碼git clone https://github.com/apache/flink.git// 拉取1.9分支cd flink; git fetch origin release-1.9git checkout -b release-1.9 origin/release-1.9//構(gòu)建二進(jìn)制發(fā)布包mvn clean install -DskipTests -Dfast
編譯完成后,需要在相應(yīng)目錄下找到發(fā)布包:
cd flink-dist/target/flink-1.9.0-bin/flink-1.9.0tar -zcvf flink-1.9.0.tar.gz flink-1.9.0
在構(gòu)建完 Java 的 API 之后進(jìn)行檢驗(yàn),我們要構(gòu)建一個(gè) Python 的發(fā)布包。
因?yàn)榇蠖鄶?shù) Python 的用戶我們都知道我們需要 pip install 方式,將需要的依賴庫進(jìn)行與本地的 Python 環(huán)境進(jìn)行集成或者安裝。
那么 Flink 也是一樣,PyFlink 也需要打包一個(gè) Pypip 能夠識(shí)別的資源進(jìn)行安裝,在實(shí)際的使用中,也可以按這種命令去拷貝,在自己的環(huán)境中嘗試。
cd flink-Python;Python setup.py sdist
這個(gè)過程只是將 Java 包囊括進(jìn)來,再把自己 PyFlink 本身模塊的一些 Java 的包和 Python 包打包成一起,它會(huì)在 dist 目錄下,有一個(gè) apache-flink-1.9.dev0.tar.gz。
cd dist/
在 dist 目錄的 apache-flink-1.9.dev0.tar.gz 就是我們可以用于 pip install 的 PyFlink 包。在1.9版本,除了 Flink Table,還有 Flink Table Blink。Flink 同時(shí)會(huì)支持兩個(gè) plan,如果大家可以嘗試,我們可以自由的切換是 Flink 原有的 Planner,還是 Blink 的 Planner,大家可以去嘗試。完成打包后,就可以嘗試把包安裝到我們的實(shí)際環(huán)境當(dāng)中。
接下來是一個(gè)非常簡單的命令,首先檢查命令的正確性,在執(zhí)行之前,我們用 pip 檢查一下 list,我們要看在已有的包里有沒有,現(xiàn)在嘗試把剛才打包的包再安裝。在實(shí)際的使用過程中,如果升級(jí)版,也要有這個(gè)過程,要把新的包要進(jìn)行安裝。
pip install dist/*.tar.gz pip list|grep flink
安裝完成后,就可以用剛才寫的 WordCount 例子來驗(yàn)證環(huán)境是否正確。驗(yàn)證一下剛才的正確性,怎么驗(yàn)證?為了大家方便,可以直接克隆 enjoyment.code 倉庫。
git clone https://github.com/sunjincheng121/enjoyment.code.gitcd enjoyment.code; Python word_count.py
接下來體驗(yàn)并嘗試。在這個(gè)目錄下,我們剛才開發(fā)的 WordCount 例子。直接用 Python 或檢驗(yàn)環(huán)境是否 OK。這個(gè)時(shí)候 Flink Python API 會(huì)啟動(dòng)一個(gè) Mini 的 Cluster,會(huì)將剛才 WordCount Job 進(jìn)行執(zhí)行,提交到一個(gè) Mini Cluster 進(jìn)行執(zhí)行?,F(xiàn)在 Run 的過程中其實(shí)已經(jīng)在集群上進(jìn)行執(zhí)行了。其實(shí)在這個(gè)代碼里面是讀了一個(gè) Source 文件,把結(jié)果寫到 CSV 文件,在當(dāng)前目錄,是有一個(gè) Sink CSV 的。具體的操作步驟可以查看Flink中文社區(qū)視頻Apache Flink Python API 現(xiàn)狀及規(guī)劃
IDE 的配置在正常的開發(fā)過程中,其實(shí)我們大部分還是在本地進(jìn)行開發(fā)的,這里推薦大家還是用 Pychram 來開發(fā) Python 相關(guān)的邏輯或者 Job。
同時(shí)由于有很大量的截圖存在,也把這些內(nèi)容整理到了博客當(dāng)中,大家可以掃描二維碼去關(guān)注和查看那么一些詳細(xì)的注意事項(xiàng),博客詳細(xì)地址: https://enjoyment.cool。這里有一個(gè)很關(guān)鍵的地方,大家要注意,就是可能你的環(huán)境中有多種 Python 的環(huán)境,這時(shí)候選擇的環(huán)境一定是剛才 pip install 環(huán)境。具體操作詳見Apache Flink Python API 現(xiàn)狀及規(guī)劃。
還有哪些方式來提交 Job 呢?這是一個(gè) CLI 的方式,也就是說真正的提交到一個(gè)現(xiàn)有的集群。首先啟動(dòng)一個(gè)集群。構(gòu)建的目錄一般在 target 目錄下,如果要啟動(dòng)一個(gè)集群,直接啟動(dòng)就可以。這里要說一點(diǎn)的是,其中一個(gè)集群外部有個(gè) Web Port,它的端口的地址都是在 flink-conf.yaml 配置的。按照 PPT 中命令,可以去查看日志,看是否啟動(dòng)成功,然后從外部的網(wǎng)站訪問。如果集群正常啟動(dòng),接下來看如何提交 Job 。
Flink 通過 run 提交作業(yè),示例代碼如下:
./bin/flink run -py ~/training/0806/enjoyment.code/myPyFlink/enjoyment/word_count_cli.py
用命令行方式去執(zhí)行,除了用 PY 參數(shù),還可以指定 Python 的 module,以及其他一些依賴的資源文件、JAR等。
在 1.9 版本中還為大家提供一種更便利的方式,就是以 Python Shell 交互式的方式來寫 Python API 拿到結(jié)果。有兩種方式可執(zhí)行,第一種方式是 Local,第二種方式 Remote,其實(shí)這兩種沒有本質(zhì)的差異。首先來看 Local ,命令如下:
bin/pyflink-shell.sh local
啟動(dòng)一個(gè)mini Cluster ,當(dāng)輸出后,會(huì)出來一個(gè) Python 的 Flink CLI 同時(shí)會(huì)有一些示例程序,供大家來體驗(yàn),按照上面的案例就能夠達(dá)到正確的輸出和提交,既可以寫 Streaming,也可以寫 Batch。詳細(xì)步驟大家參考視頻操作即可。
到目前為止,大家應(yīng)該已經(jīng)對 Flink 1.9 上 Python API 架構(gòu)有了大概了解,同時(shí)也了解到如何搭建 Python API 環(huán)境。并且以一個(gè)簡單的 WordCount 示例,體驗(yàn)如何在 IDE 里面去執(zhí)行程序,如何以 Flink run 和交互式的方式去提交 Job。同時(shí)也體驗(yàn)了現(xiàn)有一些交互上的一種方式來使用 Flink Python API。那么介紹完了整個(gè) Flink 的一些環(huán)境搭建和一個(gè)簡單的示例后。接下來詳細(xì)介紹一下在1.9里面所有的核心算子。
上面分享創(chuàng)建一個(gè) Job 的過程,第一要選擇執(zhí)行的方式是Streaming還是Batch;第二個(gè)要定義使用的表,Source、Schema、數(shù)據(jù)類型;第三是開發(fā)邏輯,同時(shí)在寫 WordCount 時(shí),使用 Count 的函數(shù)。最后,在 Python API 里面內(nèi)置了很多聚合函數(shù),可以使用count,sum, max,min等等。
所以在目前 Flink 1.9 版本中,已經(jīng)能夠滿足大多數(shù)常規(guī)需求。除了剛才講到的 count。Flink Table API 算子 1.9 中也已經(jīng)支持。關(guān)于 Flink Table API 算子,不論是 Python Table API 還是 Java 的Table API,都有以下幾種類型的操作。第一單流上的操作,比如說做一些SELECT、Filter,同時(shí)還可以在流上做一些聚合,包括開窗函數(shù)的 windows 窗口聚合以及列的一些操作,比如最下面的 add_columns 和 drop_columns。
除了單流,還有雙流的操作,比如說雙流 JOIN、雙流 minus、union ,這些算子在Python Table API 里面都提供了很好的支持。Python Table API 在 Flink 1.9 中,從功能的角度看幾乎完全等同于Java Table API,下面以實(shí)際代碼來看上述算子是怎么編寫的以及怎么去開發(fā)Python算子。
2.Python Table API 算子-Watermark定義
細(xì)心的同學(xué)可能會(huì)注意到,我們尚未提到流的一個(gè)特質(zhì)性 -> 時(shí)序。流的特性是來的順序是可能亂序,而這種亂序又是流上客觀存在的一種狀態(tài)。在 Flink 中一般采用 Watermark 機(jī)制來解決這種亂序的問題。
在 Python API 中如何定義 Watermark?假設(shè)有一個(gè) JSON 數(shù)據(jù),a 字段 String,time 字段 datetime。這個(gè)時(shí)候定義 Watermark 就要在增加 Schema 時(shí)增加 rowtime 列。rowtime 必須是 timestamps 類型。
Watermark 有多種定義方式,上圖中 watermarks_periodic_bounded 即會(huì)周期性的去發(fā) Watermark,6萬單位是毫秒。如果數(shù)據(jù)是亂序的,能夠處理一分鐘之內(nèi)的亂序,所以這個(gè)值調(diào)的越大,數(shù)據(jù)亂序接受程度越高,但是有一點(diǎn)數(shù)據(jù)的延遲也會(huì)越高。關(guān)于 Watermark 原理大家可以查看我的blog: http://1t.click/7dM。
最后,跟大家分享一下 Java UDF在 Flink 1.9 版本中的應(yīng)用, 雖然在1.9中不支持 Python 的 UDF ,但 Flink 為大家提供了可以在 Python 中使用 Java UDF。在 Flink 1.9 中,對 Table 模塊進(jìn)行了優(yōu)化和重構(gòu),目前開發(fā) Java UDF 只需要引入 Flink common 依賴就可以進(jìn)行 Python API 開發(fā)。
接下來以一個(gè)具體的示例給大家介紹利用 Java UDF 開發(fā) Python API UDF,假設(shè)我們開發(fā)一個(gè)求字符串長度的 UDF,在 Python 中需要用 Java 中的 register_java_function,function 的名字是包全路徑。然后在使用時(shí),就可以用注冊的名字完成UDF的調(diào)用,詳細(xì)可以查閱我的Blog: http://1t.click/HQF。
那怎樣來執(zhí)行?可以用 Flink run 命令去執(zhí)行,同時(shí)需要將UDF的JAR包攜帶上去。
Java UDF 只支持 Scalar Function?其實(shí)不然,在 Java UDF中既支持 Scalar Function,也支持 Table Function和Aggregate Function。如下所示:
感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“怎么在Apache Flink中使用Python API”這篇文章對大家有幫助,同時(shí)也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,更多相關(guān)知識(shí)等著你來學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。