您好,登錄后才能下訂單哦!
用戶自定義函數(shù)(UDF)是大多數(shù)SQL環(huán)境的一個關(guān)鍵特性,其主要用于擴展系統(tǒng)的內(nèi)置功能。UDF允許開發(fā)人員通過抽象其低級語言實現(xiàn)在更高級語言(如SQL)中應用的新函數(shù)。Apache Spark也不例外,其為UDF與Spark SQL工作流集成提供了各種選項。
在本篇博文中,我們將回顧Python、Java和Scala上的Apache Spark UDF和UDAF(用戶自定義的聚合函數(shù))實現(xiàn)的簡單示例。我們還將討論重要的UDF API功能和集成點,包括各發(fā)行版本之間的當前可用性??偠灾?,我們將介紹一些重要的性能注意事項,使您對應用程序中利用UDF的選擇有所了解。
Spark SQL UDFs
UDF轉(zhuǎn)換了表中單個行的數(shù)值,為每行生成單個對應的輸出值。例如,大多數(shù)的SQL環(huán)境都提供了一個UPPER函數(shù),同時返回了一個大寫版本的字符串以作為輸入。
自定義函數(shù)可以在Spark SQL中定義和注冊為UDF,并具有可用于SQL查詢的關(guān)聯(lián)別名。下面我們將為您介紹一個簡單的例子,我們將定義一個UDF將以下JSON數(shù)據(jù)中的溫度值從攝氏度(Celsius)轉(zhuǎn)換為華氏度(Fahrenheit):
下面的示例代碼使用SQL別名CTOF注冊我們的轉(zhuǎn)換UDF,然后使用它從SQL查詢中轉(zhuǎn)換每個城市的溫度值。為簡潔起見,省略了SQLContext對象和其他樣板代碼的創(chuàng)建,并在每個代碼段下面提供了完整列表的鏈接。
Python
Scala
Java
請注意,Spark SQL定義了UDF1~UDF22類別,支持包含最多22個輸入?yún)?shù)的UDF。上面的例子中使用UDF1處理單個溫度值作為輸入。如果未能對Apache Spark源代碼進行更新,使用數(shù)組(arrays)或結(jié)構(gòu)體(structs)作為參數(shù)對于需要超過22個輸入的應用程序可能很有幫助;從風格的角度來看,如果您發(fā)現(xiàn)自己使用的是UDF6或更高版本,這一方案可能是首選。
Spark SQL UDAF函數(shù)
用戶自定義聚合函數(shù)(UDAF)可以同時處理多行,然后返回單個值作為結(jié)果,其通常與GROUP BY語句(例如COUNT或SUM)共同使用。為了讓示例簡單明了,我們將實現(xiàn)一個別名為SUMPRODUCT的UDAF按使用分組、給定價格和庫存中的整數(shù)數(shù)量計算所有車輛的零售價值:
目前,Apache Spark UDAF的實現(xiàn)定義在擴展繼承的了UserDefinedAggregateFunction類別中并有由Scala和Java語法支持。一旦定義好之后,我們可以在別名SUMPRODUCT下舉例說明并注冊我們的SumProductAggregateFunction UDAF對象,并從SQL查詢中予以使用,這與前面示例中的CTOF UDF大致相同。
Scala
Apache Spark中的其他UDF支持
Spark SQL支持UDF、UDAF和UDTF等現(xiàn)有Hive(Java或Scala)函數(shù)實現(xiàn)的集成。順便提醒一下,UDTFs(用戶自定義表函數(shù))可以返回多個列和行 – 這超出了本文的范圍,但是我們可能在以后的博文中涉及。對于使用前面示例中強調(diào)的方法重新實現(xiàn)和注冊相同邏輯,集成現(xiàn)有的Hive UDF是有價值的一種替代方法;從性能角度來看,該方法對于PySpark也是有幫助的,這將在下一節(jié)中討論。通過包含Hive UDF函數(shù)實現(xiàn)的JAR文件利用spark-submit的-jars選項,可以從HiveContext中訪問Hive函數(shù);然后使用CREATE TEMPORARY FUNCTION對函數(shù)進行聲明(如在Hive [1]中所做,包含一個UDF),具體示例如下所述:
Java 中的Hive UDF定義
從Python訪問HiveUDF
請注意,如上文中我們實現(xiàn)的UDF和UDAF函數(shù),Hive UDF只能使用Apache Spark的SQL查詢語言進行調(diào)用 – 也就是說,不能與Dataframe API的域特定語言(DSL)一起使用。
或者,通過包含實現(xiàn)jar文件(使用含有spark-submit的-jars選項),以Scala和Java語言實現(xiàn)的UDF可以從PySpark中進行訪問,然后通過SparkContext對象的私有引用執(zhí)行器JVM、底層Scala或裝載在jar文件中的Java UDF實現(xiàn)來訪問UDF定義。Holden Karau在一次精彩的演講中[2]對這種方法進行了探討。請注意,在此技術(shù)中所使用的一些Apache Spark私有變量不是正式面向終端用戶的。這樣做還帶來了額外的好處,允許將UDAF(目前必須在Java和Scala中定義)用于PySpark,下文中的示例中使用了前面在Scala中定義的SUMPRODUCT UDAF進行證明:
Scala UDAF定義
Scala UDAF from PySpark
UDF相關(guān)的功能正在連續(xù)不斷地添加至Apache Spark的每一個版本中。例如2.0版本在R中增加了對UDF的支持。作為參考,下表總結(jié)了本文中討論的各版本的關(guān)鍵特性:
表格中匯總了目前為止本博客中介紹過的相關(guān)版本的關(guān)鍵特性。
性能注意事項
了解Apache Spark UDF功能的性能影響是非常重要的。例如,Python UDF(比如我們的CTOF函數(shù))導致數(shù)據(jù)在運行UDF邏輯的執(zhí)行器JVM和Python注釋器之間被序列化 - 與Java或Scala中的UDF實現(xiàn)相比,這大大降低了性能。緩解這種序列化瓶頸的潛在解決方案包括以下方面:
一般來說,UDF邏輯應盡可能的精簡,因為可能每一行都會被調(diào)用。例如,在擴展到10億行時,UDF邏輯中的一個步驟需要耗費100毫秒的時間才能完成,從而很快就會導致重大的要性能問題。
Spark SQL的另一個重要組成部分是Catalyst查詢優(yōu)化器。這一功能隨著每個版本而擴展,通??梢詾镾park SQL查詢提供顯著的性能改進;然而,任意UDF實現(xiàn)代碼對于Catalyst而言可能不是很好理解(雖然分析字節(jié)碼的未來功能[3]被認為可以解決這一問題)。因此,使用Apache Spark內(nèi)置SQL查詢函數(shù)功能通??梢詭碜罴研阅?,并且應該是在避免引入UDF時考慮的第一種方法。高級用戶尋求利用Catalyst與其代碼更緊密地結(jié)合,可以參考以下Chris Fregly的演講[4],該演講人使用Expression.genCode優(yōu)化UDF代碼,并且使用了新的Apache Spark 2.0實驗功能[5],其為定制Catalyst優(yōu)化程序規(guī)則提供了一個可即插即用的API。
結(jié)論
當Spark SQL的內(nèi)置功能需要擴展時,UDF是一個非常有用的工具。本篇博文中提供了一次UDF和UDAF實現(xiàn)的演練,并討論了其集成步驟,以在Spark SQL中利用Spark SQL中現(xiàn)有的Java Hive UDF。UDF可以在Python、Scala、Java和(在Spark 2.0中)R中實現(xiàn),同時UDAF 可以在以及Scala和Java的UDAF中實現(xiàn)。當在PySpark中使用UDF時,必須考慮數(shù)據(jù)序列化成本,并且應該考慮采用上文所討論的兩個策略來解決這個問題。最后,我們探討了Spark SQL的Catalyst優(yōu)化器,以及基于性能考慮的因素,在解決方案中引入UDF之前堅持使用內(nèi)置SQL函數(shù)的性能考慮因素。
代碼https://github.com/curtishoward/sparkudfexamples
CDH版本:5.8.0(Apache Spark 1.6.0)
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。