您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關(guān)Spark 3.0的新功能是什么呢,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
最近,Apache Spark社區(qū)發(fā)布了Spark 3.0的預(yù)覽版,該預(yù)覽版包含許多重要的新功能,這些功能將幫助Spark創(chuàng)造強(qiáng)大的影響力,在此大數(shù)據(jù)和數(shù)據(jù)科學(xué)時(shí)代,該產(chǎn)品已擁有廣泛的企業(yè)用戶和開發(fā)人員。
在新版本中,Spark社區(qū)已將一些功能從Spark SQL移植到了編程的Scala API(
org.apache.spark.sql.functions),這鼓勵(lì)開發(fā)人員直接將此功能用作其DataFrame轉(zhuǎn)換的一部分,而不是直接輸入 進(jìn)入SQL模式或創(chuàng)建視圖,并使用此函數(shù)以及SQL表達(dá)式或callUDF函數(shù)。
社區(qū)還辛苦地引入了一些新的數(shù)據(jù)轉(zhuǎn)換功能和partition_transforms函數(shù),這些功能在與Spark的新DataFrameWriterv2一起使用以將數(shù)據(jù)寫到某些外部存儲(chǔ)時(shí)非常有用。
Spark 3中的一些新功能已經(jīng)是Databricks Spark以前版本的一部分。 因此,如果您在Databricks云中工作,您可能會(huì)發(fā)現(xiàn)其中一些熟悉的功能。
下面介紹了Spark SQL和Scala API中用于DataFrame操作訪問的Spark新功能,以及從Spark SQL移植到Scala API以進(jìn)行編程訪問的功能。
from_csv
像from_json一樣,此函數(shù)解析包含CSV字符串的列,并將其轉(zhuǎn)換為Struct類型。 如果CSV字符串不可解析,則將返回null。
例:
該函數(shù)需要一個(gè)Struct模式和一些選項(xiàng),這些模式和選項(xiàng)指示如何解析CSV字符串。 選項(xiàng)與CSV數(shù)據(jù)源相同。
ss="dp-sql">ss="alt">val studentInfo = ss="string">"1,Jerin,CSE"::ss="string">"2,Jerlin,ECE"::ss="string">"3,Arun,CSE"::Nil ss="">val ss="keyword">schema = new StructType() ss="alt"> .ss="keyword">add(ss="string">"Id",IntegerType) ss=""> .ss="keyword">add(ss="string">"Name",StringType) ss="alt"> .ss="keyword">add(ss="string">"Dept",StringType) ss="">val options = Map(ss="string">"delimiter" ->ss="string">",") ss="alt">val studentDF = studentInfo.toDF(ss="string">"Student_Info") ss="">.withColumn(ss="string">"csv_struct",from_csv('Student_Info, ss="keyword">schema,options)) ss="alt">studentDF.show()
to_csv
要將"結(jié)構(gòu)類型"列轉(zhuǎn)換為CSV字符串。
例:
與Struct type列一起,此函數(shù)還接受可選的options參數(shù),該參數(shù)指示如何將Struct列轉(zhuǎn)換為CSV字符串。
ss="dp-sql">ss="alt">studentDF ss="">.withColumn(ss="string">"csv_string",to_csv($ss="string">"csv_struct",Map.empty[String, String].asJava)) ss="alt">.show
推斷CSV字符串的模式,并以DDL格式返回模式。
例:
該函數(shù)需要一個(gè)CSV字符串列和一個(gè)可選參數(shù),其中包含如何解析CSV字符串的選項(xiàng)。
ss="dp-sql">ss="alt">studentDF ss=""> .withColumn(ss="string">"schema",schema_of_csv(ss="string">"csv_string")) ss="alt"> .show
for_all
將給定謂詞應(yīng)用于數(shù)組中的所有元素,并且僅當(dāng)數(shù)組中的所有元素求值為true時(shí)返回true,否則返回false。
例:
檢查給定Array列中的所有元素是否均是偶數(shù)。
ss="dp-sql">ss="alt">val df = Seq(Seq(2,4,6),Seq(5,10,3)).toDF(ss="string">"int_array") ss="">df.withColumn(ss="string">"flag",forall($ss="string">"int_array",(x:ss="keyword">Column)=>(lit(x%2==0)))) ss="alt">.show
transform
將函數(shù)應(yīng)用于數(shù)組中的所有元素后,返回一個(gè)新數(shù)組。
例:
將" 1"添加到數(shù)組中的所有元素。
ss="dp-sql">ss="alt">val df = Seq((Seq(2,4,6)),(Seq(5,10,3))).toDF(ss="string">"num_array") ss="">df.withColumn(ss="string">"num_array",transform($ss="string">"num_array",x=>x+1)).show
overlay
要替換列的內(nèi)容,請使用從指定字節(jié)位置到可選的指定字節(jié)長度的實(shí)際替換內(nèi)容。
例:
將特定人員的問候語更改為傳統(tǒng)的" Hello World"
這里我們用世界替換人名,因?yàn)槊值钠鹗嘉恢檬?,并且我們要在替換內(nèi)容之前刪除完整的姓名,需要?jiǎng)h除的字節(jié)位置的長度應(yīng)大于或等于最大值 列中名稱的長度。
因此,我們將替換詞傳遞為"world",將內(nèi)容替換為" 7"的特定起始位置,從指定起始位置移除的位置數(shù)為" 12"(如果未指定,則該位置是可選的 函數(shù)只會(huì)從指定的起始位置將源內(nèi)容替換為替換內(nèi)容)。
覆蓋替換了StringType,TimeStampType,IntegerType等中的內(nèi)容。但是Column的返回類型將始終為StringType,而與Column輸入類型無關(guān)。
ss="dp-sql">ss="alt">val greetingMsg = ss="string">"Hello Arun"::ss="string">"Hello Mohit Chawla"::ss="string">"Hello Shaurya"::Nil ss="">val greetingDF = greetingMsg.toDF(ss="string">"greet_msg") ss="alt">greetingDF.withColumn(ss="string">"greet_msg",overlay($ss="string">"greet_msg",lit(ss="string">"World"),lit(ss="string">"7"),lit(ss="string">"12"))) ss="">.show
分裂
根據(jù)給定的正則表達(dá)式和指定的限制拆分字符串表達(dá)式,該限制指示將正則表達(dá)式應(yīng)用于給定的字符串表達(dá)式的次數(shù)。
如果指定的限制小于或等于零,則正則表達(dá)式將在字符串上應(yīng)用多次,并且結(jié)果數(shù)組將根據(jù)給定的正則表達(dá)式包含所有可能的字符串拆分。
如果指定的限制大于零,則將使用不超過該限制的正則表達(dá)式
例:
根據(jù)正則表達(dá)式將給定的字符串表達(dá)式拆分為兩個(gè)。 即 字符串定界符。
ss="dp-sql">ss="alt">val num = ss="string">"one~two~three"::ss="string">"four~five"::Nil ss="">val numDF = num.toDF(ss="string">"numbers") ss="alt">numDF ss="">.withColumn(ss="string">"numbers",split($ss="string">"numbers",ss="string">"~",2)) ss="alt">.show
將同一個(gè)字符串表達(dá)式分成多個(gè)部分,直到出現(xiàn)分隔符
ss="dp-sql">ss="alt">numDF ss="">.withColumn(ss="string">"numbers",split($ss="string">"numbers",ss="string">"~",0)) ss="alt">.show
map_entries
將映射鍵值轉(zhuǎn)換為無序的條目數(shù)組。
例:
獲取數(shù)組中Map的所有鍵和值。
ss="dp-sql">ss="alt">val df = Seq(Map(1->ss="string">"x",2->ss="string">"y")).toDF(ss="string">"key_values") ss="">df.withColumn(ss="string">"key_value_array",map_entries($ss="string">"key_values")) ss="alt">.show
map_zip_with
使用功能根據(jù)鍵將兩個(gè)Map合并為一個(gè)。
例:
要計(jì)算跨部門員工的總銷售額,并通過傳遞一個(gè)函數(shù),該函數(shù)將基于鍵匯總來自兩個(gè)不同"地圖"列的總銷售額,從而在單個(gè)地圖中獲取特定員工的總銷售額。
ss="dp-sql">ss="alt">val df = Seq((Map(ss="string">"EID_1"->10000,ss="string">"EID_2"->25000), ss=""> Map(ss="string">"EID_1"->1000,ss="string">"EID_2"->2500))) .toDF(ss="string">"emp_sales_dept1",ss="string">"emp_sales_dept2") ss="alt"> ss="">df. ss="alt">withColumn(ss="string">"total_emp_sales",map_zip_with($ss="string">"emp_sales_dept1",$ss="string">"emp_sales_dept2",(k,v1,v2)=>(v1+v2))) ss="">.show
map_filter
返回僅包含滿足給定謂詞功能的Map值的新鍵值對。
例:
僅篩選出銷售值高于20000的員工
ss="dp-sql">ss="alt">val df = Seq(Map(ss="string">"EID_1"->10000,ss="string">"EID_2"->25000)) ss=""> .toDF(ss="string">"emp_sales") ss="alt"> ss="">df ss="alt">.withColumn(ss="string">"filtered_sales",map_filter($ss="string">"emp_sales",(k,v)=>(v>20000))) ss="">.show
transform_values
根據(jù)給定的函數(shù)操作Map列中所有元素的值。
例:
通過給每個(gè)雇員加薪5000來計(jì)算雇員薪水
ss="dp-sql">ss="alt">val df = Seq(Map(ss="string">"EID_1"->10000,ss="string">"EID_2"->25000)) ss=""> .toDF(ss="string">"emp_salary") ss="alt"> ss="">df ss="alt">.withColumn(ss="string">"emp_salary",transform_values($ss="string">"emp_salary",(k,v)=>(v+5000))) ss="">.show
transform_keys
根據(jù)給定的函數(shù)操作Map列中所有元素的鍵。
例:
要將公司名稱" XYZ"添加到員工編號(hào)。
ss="dp-sql">ss="alt">val df = Seq(Map(ss="string">"EID_1" -> 10000, ss="string">"EID_2" -> 25000)) ss=""> .toDF(ss="string">"employees") ss="alt">df ss="">.withColumn(ss="string">"employees", transform_keys($ss="string">"employees", (k, v) => concat(k,lit(ss="string">"_XYZ")))) ss="alt">.show
xhash74
要計(jì)算給定列內(nèi)容的哈希碼,請使用64位xxhash算法并將結(jié)果返回為long。
Scala API可使用大多數(shù)Spark SQL函數(shù),該函數(shù)可將相同的函數(shù)用作DataFrame操作的一部分。 但是仍然有一些功能不能作為編程功能使用。 要使用這些功能,必須進(jìn)入Spark SQL模式并將這些功能用作SQL表達(dá)式的一部分,或使用Spark" callUDF"功能使用相同的功能。 隨著功能的普及和使用不斷發(fā)展,這些功能中的某些功能過去曾被移植到新版本的程序化Spark API中。 以下是從以前版本的Spark SQL函數(shù)移植到Scala API(
org.spark.apache.sql.functions)的函數(shù)
date_sub
從日期,時(shí)間戳記和字符串?dāng)?shù)據(jù)類型中減去天數(shù)。 如果數(shù)據(jù)類型為字符串,則其格式應(yīng)可轉(zhuǎn)換為日期" yyyy-MM-dd"或" yyyy-MM-dd HH:mm:ss.ssss"
例:
從eventDateTime中減去" 1天"。
如果要減去的天數(shù)為負(fù),則此功能會(huì)將給定的天數(shù)添加到實(shí)際日期中。
ss="dp-sql">ss="alt">var df = Seq( ss=""> (1, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-01 23:00:01")), ss="alt"> (2, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-02 12:40:32")), ss=""> (3, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-03 09:54:00")), ss="alt"> (4, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-04 10:12:43")) ss=""> ) ss="alt"> .toDF(ss="string">"typeId",ss="string">"eventDateTime") ss=""> ss="alt"> df.withColumn(ss="string">"Adjusted_Date",date_sub($ss="string">"eventDateTime",1)).show()
date_add
與date_sub相同,但是將天數(shù)添加到實(shí)際天數(shù)中。
例:
將" 1天"添加到eventDateTime
ss="dp-sql">ss="alt">var df = Seq( ss=""> (1, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-01 23:00:01")), ss="alt"> (2, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-02 12:40:32")), ss=""> (3, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-03 09:54:00")), ss="alt"> (4, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-04 10:12:43")) ss=""> ) ss="alt"> .toDF(ss="string">"Id",ss="string">"eventDateTime") ss="">df ss="alt">.withColumn(ss="string">"Adjusted Date",date_add($ss="string">"eventDateTime",1)) ss="">.show()
months_add
像date_add和date_sub一樣,此功能有助于添加月份。
減去月份,將要減去的月份數(shù)設(shè)為負(fù)數(shù),因?yàn)闆]有單獨(dú)的減去函數(shù)用于減去月份
例:
從eventDateTime添加和減去一個(gè)月。
ss="dp-sql">ss="alt">var df = Seq( ss=""> (1, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-01 23:00:01")), ss="alt"> (2, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-02 12:40:32")), ss=""> (3, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-03 09:54:00")), ss="alt"> (4, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-04 10:12:43")) ss=""> ).toDF(ss="string">"typeId",ss="string">"eventDateTime") ss="alt">//ss="keyword">To ss="keyword">add one months ss=""> df ss="alt">.withColumn(ss="string">"Adjusted Date",add_months($ss="string">"eventDateTime",1)) ss="">.show() ss="alt">//ss="keyword">To subtract one months ss="">df ss="alt">.withColumn(ss="string">"Adjusted Date",add_months($ss="string">"eventDateTime",-1)) ss="">.show()
zip_with
通過應(yīng)用函數(shù)合并左右數(shù)組。
此函數(shù)期望兩個(gè)數(shù)組的長度都相同,如果其中一個(gè)數(shù)組比另一個(gè)數(shù)組短,則將添加null以匹配更長的數(shù)組長度。
例:
將兩個(gè)數(shù)組的內(nèi)容相加并合并為一個(gè)
ss="dp-sql">ss="alt">val df = Seq((Seq(2,4,6),Seq(5,10,3))) ss=""> .toDF(ss="string">"array_1",ss="string">"array_2") ss="alt"> ss=""> df ss="alt">.withColumn(ss="string">"merged_array",zip_with($ss="string">"array_1",$ss="string">"array_2",(x,y)=>(x+y))) ss=""> .show
將謂詞應(yīng)用于所有元素,并檢查數(shù)組中的至少一個(gè)或多個(gè)元素是否對謂詞函數(shù)成立。
例:
檢查數(shù)組中至少一個(gè)元素是否為偶數(shù)。
ss="dp-sql">ss="alt">val df= Seq(Seq(2,4,6),Seq(5,10,3)).toDF(ss="string">"num_array") ss="">df.withColumn(ss="string">"flag",exists($ss="string">"num_array", x =>lit(x%2===0))) ss="alt">.show
過濾
將給定謂詞應(yīng)用于數(shù)組中的所有元素,并過濾掉謂詞為true的元素。
例:
僅過濾掉數(shù)組中的偶數(shù)元素。
ss="dp-sql">ss="alt">val df = Seq(Seq(2,4,6),Seq(5,10,3)).toDF(ss="string">"num_array") ss="">df.withColumn(ss="string">"even_array",filter($ss="string">"num_array", x =>lit(x%2===0))) ss="alt">.show
聚合 aggregate
使用給定函數(shù)將給定數(shù)組和另一個(gè)值/狀態(tài)簡化為單個(gè)值,并應(yīng)用可選的finish函數(shù)將縮減后的值轉(zhuǎn)換為另一個(gè)狀態(tài)/值。
例:
將10加到數(shù)組的總和并將結(jié)果乘以2
ss="dp-sql">ss="alt">val df = Seq((Seq(2,4,6),3),(Seq(5,10,3),8)) ss=""> .toDF(ss="string">"num_array",ss="string">"constant") ss="alt">df.withColumn(ss="string">"reduced_array",aggregate($ss="string">"num_array", $ss="string">"constant",(x,y)=>x+y,x => x*2)) ss=""> .show
以下是新的SQL函數(shù),您只能在Spark SQL模式下才能使用它們。
acosh
查找給定表達(dá)式的雙曲余弦的倒數(shù)。
asinh
找出給定表達(dá)式的雙曲正弦的逆。
atanh
查找給定表達(dá)式的雙曲正切的逆。
bit_and,bit_or和bit_xor
計(jì)算按位AND,OR和XOR值
bit_count
返回計(jì)數(shù)的位數(shù)。
bool_and和bool_or
驗(yàn)證表達(dá)式的所有值是否為真或驗(yàn)證表達(dá)式中的至少一個(gè)為真。
count_if
返回一列中的真值數(shù)量
例:
找出給定列中的偶數(shù)值
ss="dp-sql">ss="alt">var df = Seq((1),(2),(4)).toDF(ss="string">"num") ss=""> ss="alt"> df.createOrReplaceTempView(ss="string">"table") ss="">spark.sql(ss="string">"select count_if(num %2==0) from table").show
date_part
提取日期/時(shí)間戳的一部分,例如小時(shí),分鐘等…
div
用于將表達(dá)式或帶有另一個(gè)表達(dá)式/列的列分開
every 和 sum
如果給定的表達(dá)式對每個(gè)列的所有列值都求值為true,并且至少一個(gè)值對某些值求得true,則此函數(shù)返回true。
make_date,make_interval和make_timestamp
構(gòu)造日期,時(shí)間戳和特定間隔。
例:
ss="dp-sql">ss="alt">ss="keyword">SELECT make_timestamp(2020, 01, 7, 30, 45.887)
max_by和min_by
比較兩列并返回與右列的最大值/最小值關(guān)聯(lián)的左列的值
例:
ss="dp-sql">ss="alt">var df = Seq((1,1),(2,1),(4,3)).toDF(ss="string">"x",ss="string">"y") ss=""> ss="alt"> df.createOrReplaceTempView(ss="string">"table") ss="">spark.sql(ss="string">"select max_by(x,y) from table").show
類型
返回列值的數(shù)據(jù)類型
版
返回Spark版本及其git版本
justify_days,justify_hours和justify_interval
新引入的對齊功能用于調(diào)整時(shí)間間隔。
例:
表示30天為一個(gè)月,
ss="dp-sql">ss="alt">ss="keyword">SELECT justify_days(interval ss="string">'30 day')
分區(qū)轉(zhuǎn)換功能
從Spark 3.0及更高版本開始,存在一些新功能,這些功能有助于對數(shù)據(jù)進(jìn)行分區(qū),我將在另一篇文章中介紹。
總體而言,我們已經(jīng)分析了所有數(shù)據(jù)轉(zhuǎn)換和分析功能,這些功能是3.0版本中產(chǎn)生的火花。 希望本指南有助于您了解這些新功能。 這些功能肯定會(huì)加速火花開發(fā)工作,并有助于建立堅(jiān)固有效的火花管道。
看完上述內(nèi)容,你們對Spark 3.0的新功能是什么呢有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。