您好,登錄后才能下訂單哦!
這期內(nèi)容當(dāng)中小編將會給大家?guī)碛嘘P(guān)spark2.4.3中sparkSQL用戶自定義函數(shù)該怎么理解,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
從Spark2.0以上的版本開始,spark是使用全新的SparkSession接口代替Spark1.6中的SQLcontext和HiveContext
來實現(xiàn)對數(shù)據(jù)的加載、轉(zhuǎn)換、處理等工作,并且實現(xiàn)了SQLcontext和HiveContext的所有功能。
我們在新版本中并不需要之前那么繁瑣的創(chuàng)建很多對象,只需要創(chuàng)建一個SparkSession對象即可。
SparkSession支持從不同的數(shù)據(jù)源加載數(shù)據(jù),并把數(shù)據(jù)轉(zhuǎn)換成DataFrame,并支持把DataFrame轉(zhuǎn)換成SQLContext自身中的表。
然后使用SQL語句來操作數(shù)據(jù),也提供了HiveQL以及其他依賴于Hive的功能支持。
創(chuàng)建SparkSession
SparkSession 是 Spark SQL 的入口。
使用 Dataset 或者 Datafram 編寫 Spark SQL 應(yīng)用的時候,第一個要創(chuàng)建的對象就是 SparkSession。
Builder 是 SparkSession 的構(gòu)造器。 通過 Builder, 可以添加各種配置。
Builder 的方法如下:
Method | Description |
---|---|
getOrCreate | 獲取或者新建一個 sparkSession |
enableHiveSupport | 增加支持 hive Support |
appName | 設(shè)置 application 的名字 |
config | 設(shè)置各種配置 |
使用的spark版本2.4.3
spark 1.x中的SQLContext在新版本中已經(jīng)被廢棄,改為SparkSession.builder
可以寫成
val conf = new SparkConf().setAppName("helloworld").setMaster("local[*]") val spark1=SparkSession.builder().config(conf).getOrCreate()
或(sparksession構(gòu)造器私有化在builder中)
val spark = SparkSession.builder .appName("my spark application") .master("local[2]") .getOrCreate()
例:
import org.apache.spark.sql.SparkSession object HelloWorld { def main(args: Array[String]): Unit = { /* val conf = new SparkConf().setAppName("helloworld").setMaster("local[*]") val spark1=SparkSession.builder().config(conf).getOrCreate()*/ val spark = SparkSession.builder .appName("my spark application") .master("local[2]") .getOrCreate() //讀取數(shù)據(jù) val df = spark.read.json("/usr/local/opt/spark-2.4.3/examples/src/main/resources/people.json") //展示所有數(shù)據(jù) df.show() //DSL df.select("name").show() //SQL df.createTempView("people") spark.sql("select * from people where age=30").show() //關(guān)閉 spark.close() } }
輸出結(jié)果 1:
//展示所有數(shù)據(jù) df.show()
輸出結(jié)果 2:
//DSL df.select("name").show()
輸出結(jié)果 3:
//SQL df.createTempView("people") spark.sql("select * from people where age=30").show()
scala> spark.read.json("./examples/src/main/resources/people.json") res32: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> res32.createOrReplaceTempView("people") scala> spark.sql("select * from people") res38: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> spark.sql("select * from people").show +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+ scala> spark.udf.register("addName",(x:String)=> "name:"+x) res40: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType))) scala> spark.sql("select addName(name) as name from people").show +------------+ | name| +------------+ |name:Michael| | name:Andy| | name:Justin| +------------+
package com.ny.service import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._ import org.apache.spark.sql.{Row, SparkSession} class CustomerAvg extends UserDefinedAggregateFunction { //輸入的類型 override def inputSchema: StructType = StructType(StructField("salary", LongType) :: Nil) //緩存數(shù)據(jù)的類型 override def bufferSchema: StructType = { StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil) } //返回值類型 override def dataType: DataType = LongType //冪等性 override def deterministic: Boolean = true //初始化 override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L buffer(1) = 0L } //更新 分區(qū)內(nèi)操作 override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { buffer(0)=buffer.getLong(0) +input.getLong(0) buffer(1)=buffer.getLong(1)+1L } //合并 分區(qū)與分區(qū)之間操作 override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0)=buffer1.getLong(0)+buffer2.getLong(0) buffer1(1)=buffer1.getLong(1)+buffer2.getLong(1) } //最終執(zhí)行的方法 override def evaluate(buffer: Row): Any = { buffer.getLong(0)/buffer.getLong(1) } } object CustomerAvg{ def main(args: Array[String]): Unit = { val spark= SparkSession.builder() .appName("MyAvg") .master("local[2]") .getOrCreate() spark.udf.register("MyAvg",new CustomerAvg) //讀數(shù)據(jù) val frame = spark.read.json("/usr/local/opt/spark-2.4.3/examples/src/main/resources/peopleCP.json") frame.createTempView("peopleCP") spark.sql("select MyAvg(age) avg_age from peopleCP").show() spark.stop() } }
nancylulululu:resources nancy$ vi peopleCP.json {"name":"Michael","age":11} {"name":"Andy", "age":30} {"name":"Justin", "age":19}
返回結(jié)果
上述就是小編為大家分享的spark2.4.3中sparkSQL用戶自定義函數(shù)該怎么理解了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。