溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

sparkSQL實(shí)戰(zhàn)詳解

發(fā)布時(shí)間:2020-07-15 03:45:59 來源:網(wǎng)絡(luò) 閱讀:24760 作者:hffzkl 欄目:數(shù)據(jù)庫

摘要  

        如果要想真正的掌握sparkSQL編程,首先要對(duì)sparkSQL的整體框架以及sparkSQL到底能幫助我們解決什么問題有一個(gè)整體的認(rèn)識(shí),然后就是對(duì)各個(gè)層級(jí)關(guān)系有一個(gè)清晰的認(rèn)識(shí)后,才能真正的掌握它,對(duì)于sparkSQL整體框架這一塊,在前一個(gè)博客已經(jīng)進(jìn)行過了一些介紹,如果對(duì)這塊還有疑問可以看我前一個(gè)博客:http://9269309.blog.51cto.com/9259309/1845525。本篇博客主要是對(duì)sparkSQL實(shí)戰(zhàn)進(jìn)行講解和總結(jié),而不是對(duì)sparkSQL源碼的講解,如果想看源碼的請(qǐng)繞道。

    再多說一點(diǎn),對(duì)于初學(xué)者,本人堅(jiān)持的觀點(diǎn)是不要一上來就看源碼,這樣的效果不是很大,還浪費(fèi)時(shí)間,對(duì)這個(gè)東西還沒有大致掌握,還不知道它是干什么的,上來就看源碼,門檻太高,而且看源碼對(duì)個(gè)人的提升也不是很高。我們做軟件開發(fā)的,我們開發(fā)的順序也是,首先是需求,對(duì)需求有了詳細(xì)的認(rèn)識(shí),需要解決什么問題,然后才是軟件的設(shè)計(jì),代碼的編寫。同樣,學(xué)習(xí)框架也是,我們只有對(duì)這個(gè)框架的需求,它需要解決什么問題,它需要干什么工作,都非常了解了,然后再看源碼,這樣效果才能得到很大的提升。對(duì)于閱讀源代碼這一塊,是本人的一點(diǎn)看法,說的對(duì)與錯(cuò),歡迎吐槽sparkSQL實(shí)戰(zhàn)詳解......!


1、sparkSQL層級(jí)

     當(dāng)我們想用sparkSQL來解決我們的需求時(shí),其實(shí)說簡單也簡單,就經(jīng)歷了三步:讀入數(shù)據(jù) -> 對(duì)數(shù)據(jù)進(jìn)行處理  -> 寫入最后結(jié)果,那么這三個(gè)步驟用的主要類其實(shí)就三個(gè):讀入數(shù)據(jù)和寫入最后結(jié)果用到兩個(gè)類HiveContext和SQLContext,對(duì)數(shù)據(jù)進(jìn)行處理用到的是DataFrame類,此類是你把數(shù)據(jù)從外部讀入到內(nèi)存后,數(shù)據(jù)在內(nèi)存中進(jìn)行存儲(chǔ)的基本數(shù)據(jù)結(jié)構(gòu),在對(duì)數(shù)據(jù)進(jìn)行處理時(shí)還會(huì)用到一些中間類,用到時(shí)在進(jìn)行講解。如下圖所示:

sparkSQL實(shí)戰(zhàn)詳解


2、HiveContext和SQLContext

   把HiveContext和SQLContext放在一起講解是因?yàn)樗麄兪遣畈欢嗟模驗(yàn)镠iveContext繼承自SQLContext,為什么會(huì)有兩個(gè)這樣的類,其實(shí)與hive和sql有關(guān)系的,雖然hive擁有HQL語言,但是它是一個(gè)類sql語言,和sql語言還是有差別的,有些sql語法,HQL是不支持的。所以他們還是有差別的。選擇不同的類,最后執(zhí)行的查詢引擎的驅(qū)動(dòng)是不一樣的。但是對(duì)于底層是怎么區(qū)別的這里不做詳細(xì)的介紹,你就知道一點(diǎn),使用不同的讀數(shù)據(jù)的類,底層會(huì)進(jìn)行標(biāo)記,自動(dòng)識(shí)別是使用哪個(gè)類進(jìn)行數(shù)據(jù)操作,然后采用不同的執(zhí)行計(jì)劃執(zhí)行操作,這點(diǎn)在上一篇sparkSQL整體框架中進(jìn)行了介紹,這里不做介紹。當(dāng)從hive庫中讀數(shù)據(jù)的時(shí)候,必須使用HiveContext來進(jìn)行讀取數(shù)據(jù),不然在進(jìn)行查詢的時(shí)候會(huì)出一些奇怪的錯(cuò)。其他的數(shù)據(jù)源兩者都可以選擇,但是最好使用SQLContext來完成。因?yàn)槠渲С值膕ql語法更多。由于HiveContext是繼承自SQLContext,這里只對(duì)SQLContext進(jìn)行詳細(xì)的介紹,但是以下這些方法是完全可以用在HiveContext中的。其實(shí)HiveContext類就擴(kuò)展了SQLContext的兩個(gè)我們可以使用的方法(在看源碼時(shí)以protected和private開頭的方法都是我們不能使用的,這個(gè)是scala的控制邏輯,相反,不是以這兩個(gè)關(guān)鍵字標(biāo)記的方法是我們可以直接使用的方法):analyze(tableName:String)和refreshTable(tableName:String)。


方法用途
analyze方法這個(gè)我們一般使用不到,它是來對(duì)我們寫的sql查詢語句進(jìn)行分析用的,一般用不到。
refreshTable方法

當(dāng)我們?cè)趕parkSQL中處理的某個(gè)表的存儲(chǔ)位置發(fā)生了變換,但是我們?cè)趦?nèi)存的metaData中緩存(cache)了這張表,則需要調(diào)用這個(gè)方法來使這個(gè)緩存無效,需要重新加載。



2.1 讀數(shù)據(jù)

      我們?cè)诮鉀Q我們的需求時(shí),首先是讀入數(shù)據(jù),需要把數(shù)據(jù)讀入到內(nèi)存中去,讀數(shù)據(jù)SQLContext提供了兩個(gè)方法,我們提供兩個(gè)數(shù)據(jù)表,為了便于演示,我采用的是用JSON格式進(jìn)行存儲(chǔ)的,寫成這樣的格式,但是可以保存為.txt格式的文件。

sparkSQL實(shí)戰(zhàn)詳解

1、第一種數(shù)據(jù)讀入:這種是對(duì)數(shù)據(jù)源文件進(jìn)行操作。

import org.apache.spark.sql.SQLContext
val sql = new SQLContext(sc) //聲明一個(gè)SQLContext的對(duì)象,以便對(duì)數(shù)據(jù)進(jìn)行操作
val peopleInfo = sql.read.json("文件路徑")
//其中peopleInfo返回的結(jié)果是:org.apache.spark.sql.DataFrame =
// [age: bigint, id: bigint, name: string],這樣就把數(shù)據(jù)讀入到內(nèi)存中了

        寫了這幾行代碼后面總共發(fā)生了什么,首先sparkSQL先找到文件,以解析json的形式進(jìn)行解析,同時(shí)通過json的key形成schema,scheam的字段的順序不是按照我們讀入數(shù)據(jù)時(shí)期默認(rèn)的順序,如上,其字段的順序是通過字符串的順序進(jìn)行重新組織的。默認(rèn)情況下,會(huì)把整數(shù)解析成bigint類型的,把字符串解析成string類型的,通過這個(gè)方法讀入數(shù)據(jù)時(shí),返回值得結(jié)果是一個(gè)DataFrame數(shù)據(jù)類型。

    DataFrame是什么?其實(shí)它是sparkSQL處理大數(shù)據(jù)的基本并且是核心的數(shù)據(jù)結(jié)構(gòu),是來存儲(chǔ)sparkSQL把數(shù)據(jù)讀入到內(nèi)存中,數(shù)據(jù)在內(nèi)存中進(jìn)行存儲(chǔ)的基本數(shù)據(jù)結(jié)構(gòu)。它采用的存儲(chǔ)是類似于數(shù)據(jù)庫的表的形式進(jìn)行存儲(chǔ)的。我們想一想,一個(gè)數(shù)據(jù)表有幾部分組成:1、數(shù)據(jù),這個(gè)數(shù)據(jù)是一行一行進(jìn)行存儲(chǔ)的,一條記錄就是一行,2、數(shù)據(jù)表的數(shù)據(jù)字典,包括表的名稱,表的字段和字段的類型等元數(shù)據(jù)信息。那么DataFrame也是按照行進(jìn)行存儲(chǔ)的,這個(gè)類是Row,一行一行的進(jìn)行數(shù)據(jù)存儲(chǔ)。一般情況下處理粒度是行粒度的,不需要對(duì)其行內(nèi)數(shù)據(jù)進(jìn)行操作,如果想單獨(dú)操作行內(nèi)數(shù)據(jù)也是可以的,只是在處理的時(shí)候要小心,因?yàn)樘幚硇袃?nèi)的數(shù)據(jù)容易出錯(cuò),比如選錯(cuò)數(shù)據(jù),數(shù)組越界等。數(shù)據(jù)的存儲(chǔ)的形式有了,數(shù)據(jù)表的字段和字段的類型都存放在哪里呢,就是schema中。我們可以調(diào)用schema來看其存儲(chǔ)的是什么。

peopleInfo.schema
//返回的結(jié)果是:org.apache.spark.sql.types.StructType = 
//StructType(StructField(age,LongType,true), StructField(id,LongType,true),
// StructField(name,StringType,true))

可以看出peopleInfo存儲(chǔ)的是數(shù)據(jù),schema中存儲(chǔ)的是這些字段的信息。需要注意的是表的字段的類型與scala數(shù)據(jù)類型的對(duì)應(yīng)關(guān)系:bigint->Long,int -> Int,Float -> Float,double -> Double,string ->  String等。一個(gè)DataFrame是有兩部分組成的:以行進(jìn)行存儲(chǔ)的數(shù)據(jù)和scheam,schema是StructType類型的。當(dāng)我們有數(shù)據(jù)而沒有schema時(shí),我們可以通過這個(gè)形式進(jìn)行構(gòu)造從而形成一個(gè)DataFrame。


read函數(shù)還提供了其他讀入數(shù)據(jù)的接口:

函數(shù)用途

json(path:String)

讀取json文件用此方法
table(tableName:String)讀取數(shù)據(jù)庫中的表
jdbc(url: String,table: String,predicates:Array[String],connectionProperties:Properties)


通過jdbc讀取數(shù)據(jù)庫中的表
orc(path:String)讀取以orc格式進(jìn)行存儲(chǔ)的文件
parquet(path:String)讀取以parquet格式進(jìn)行存儲(chǔ)的文件
schema(schema:StructType)這個(gè)是一個(gè)優(yōu)化,當(dāng)我們讀入數(shù)據(jù)的時(shí)候指定了其schema,底層就不會(huì)再次解析schema從而進(jìn)行了優(yōu)化,一般不需要這樣的優(yōu)化,不進(jìn)行此優(yōu)化,時(shí)間效率還是可以接受


2、第二種讀入數(shù)據(jù):這個(gè)讀入數(shù)據(jù)的方法,主要是處理從一個(gè)數(shù)據(jù)表中選擇部分字段,而不是選擇表中的所有字段。那么這種需求,采用這個(gè)數(shù)據(jù)讀入方式比較有優(yōu)勢(shì)。這種方式是直接寫sql的查詢語句。把上述json格式的數(shù)據(jù)保存為數(shù)據(jù)庫中表的格式。需要注意的是這種只能處理數(shù)據(jù)庫表數(shù)據(jù)。

val peopleInfo = sql.sql("""
|select
| id,
| name,
| age
|from peopleInfo
""".stripMargin)//其中stripMargin方法是來解析我們寫的sql語句的。
//返回的結(jié)果是和read讀取返回的結(jié)果是一樣的:
//org.apache.spark.sql.DataFrame =
// [age: bigint, id: bigint, name: string]

需要注意的是其返回的schmea中字段的順序和我們查詢的順序還是不一致的。


2.2  寫入數(shù)據(jù)


寫入數(shù)據(jù)就比較的簡單,因?yàn)槠鋼碛幸欢ǖ哪J剑凑者@個(gè)模式進(jìn)行數(shù)據(jù)的寫入。一般情況下,我們需要寫入的數(shù)據(jù)是一個(gè)DataFrame類型的,如果其不是DataFrame類型的我們需要把其轉(zhuǎn)換為

DataFrame類型,有些人可能會(huì)有疑問,數(shù)據(jù)讀入到內(nèi)存中,其類型是DataFrame類型,我們?cè)谔幚頂?shù)據(jù)時(shí)用到的是DataFrame類中的方法,但是DataFrame中的方法不一定返回值仍然是DataFrame類型的,同時(shí)有時(shí)我們需要構(gòu)建自己的類型,所以我們需要為我們的數(shù)據(jù)構(gòu)建成DataFrame的類型。把沒有schema的數(shù)據(jù),構(gòu)建schema類型,我所知道的就有兩種方法。


1、通過類構(gòu)建schema,還以上面的peopleInfo為例子。

val sql = new SQLContext(sc) //創(chuàng)建一個(gè)SQLContext對(duì)象
import sql.implicits._ //這個(gè)sql是上面我們定義的sql,而不是某一個(gè)jar包,網(wǎng)上有很多
                       //是import sqlContext.implicits._,那是他們定義的是
                       //sqlContext = SQLContext(sc),這個(gè)是scala的一個(gè)特性
val people = sc.textFile("people.txt")//我們采用spark的類型讀入數(shù)據(jù),因?yàn)槿绻?                                      //SQLContext進(jìn)行讀入,他們自動(dòng)有了schema
case clase People(id:Int,name:String,age:Int)//定義一個(gè)類
val peopleInfo = people.map(lines => lines.split(","))
                        .map(p => People(p(0).toInt,p(1),p(2).toInt)).toDF
                        //這樣的一個(gè)toDF就創(chuàng)建了一個(gè)DataFrame,如果不導(dǎo)入
                        //sql.implicits._,這個(gè)toDF方法是不可以用的。

上面的例子是利用了scala的反射技術(shù),生成了一個(gè)DataFrame類型??梢钥闯鑫覀兪前裄DD給轉(zhuǎn)換為DataFrame的。


2、直接構(gòu)造schema,以peopelInfo為例子。直接構(gòu)造,我們需要把我們的數(shù)據(jù)類型進(jìn)行轉(zhuǎn)化成Row類型,不然會(huì)報(bào)錯(cuò)。

val sql = new SQLContext(sc) //創(chuàng)建一個(gè)SQLContext對(duì)象
val people = sc.textFile("people.txt").map(lines => lines.split(","))
val peopleRow = sc.map(p => Row(p(0),p(1),(2)))//把RDD轉(zhuǎn)化成RDD(Row)類型
val schema = StructType(StructFile("id",IntegerType,true)::
                        StructFile("name",StringType,true)::
                        StructFile("age",IntegerType,true)::Nil)
val peopleInfo = sql.createDataFrame(peopleRow,schema)//peopleRow的每一行的數(shù)據(jù)
                                                      //類型一定要與schema的一致
                                                      //否則會(huì)報(bào)錯(cuò),說類型無法匹配
                                                      //同時(shí)peopleRow每一行的長度
                                                      //也要和schema一致,否則
                                                      //也會(huì)報(bào)錯(cuò)

構(gòu)造schema用到了兩個(gè)類StructType和StructFile,其中StructFile類的三個(gè)參數(shù)分別是(字段名稱,類型,數(shù)據(jù)是否可以用null填充)

采用直接構(gòu)造有很大的制約性,字段少了還可以,如果有幾十個(gè)甚至一百多個(gè)字段,這種方法就比較耗時(shí),不僅要保證Row中數(shù)據(jù)的類型要和我們定義的schema類型一致,長度也要一樣,不然都會(huì)報(bào)錯(cuò),所以要想直接構(gòu)造schema,一定要細(xì)心細(xì)心再細(xì)心,本人就被自己的不細(xì)心虐慘了,處理的字段將近一百,由于定義的schema和我的數(shù)據(jù)類型不一致,我就需要每一個(gè)字段每一個(gè)字段的去確認(rèn),字段一多在對(duì)的時(shí)候就容易疲勞,就這樣的一個(gè)錯(cuò)誤,由于本人比較笨,就花費(fèi)了一個(gè)下午的時(shí)間,所以字段多了,在直接構(gòu)造schema的時(shí)候,一定要細(xì)心、細(xì)心、細(xì)心,重要的事情說三遍,不然會(huì)死的很慘。


好了,現(xiàn)在我們已經(jīng)把我們的數(shù)據(jù)轉(zhuǎn)化成DataFrame類型的,下面就要往數(shù)據(jù)庫中寫我們的數(shù)據(jù)了

寫數(shù)據(jù)操作:

val sql = new SQLContext(sc) 
val people = sc.textFile("people.txt").map(lines => lines.split(","))
val peopleRow = sc.map(p => Row(p(0),p(1),(2)))
val schema = StructType(StructFile("id",IntegerType,true)::
                        StructFile("name",StringType,true)::
                        StructFile("age",IntegerType,true)::Nil)
val peopleInfo = sql.createDataFrame(peopleRow,schema)
peopleInfo.registerTempTable("tempTable")//只有有了這個(gè)注冊(cè)的表tempTable,我們
                                         //才能通過sql.sql(“”“ ”“”)進(jìn)行查詢
                                         //這個(gè)是在內(nèi)存中注冊(cè)一個(gè)臨時(shí)表用戶查詢
sql.sql.sql("""
|insert overwrite table tagetTable
|select
| id,
| name,
| age
|from tempTable
""".stripMargin)//這樣就把數(shù)據(jù)寫入到了數(shù)據(jù)庫目標(biāo)表tagetTable中

有上面可以看到,sparkSQL的sql()其實(shí)就是用來執(zhí)行我們寫的sql語句的。


好了,上面介紹了讀和寫的操作,現(xiàn)在需要對(duì)最重要的地方來進(jìn)行操作了啊。



2.3 通過DataFrame中的方法對(duì)數(shù)據(jù)進(jìn)行操作

       

        在介紹DataFrame之前,我們還是要先明確一下,sparkSQL是用來干什么的,它主要為我們提供了怎樣的便捷,我們?yōu)槭裁匆盟?。它是為了讓我們能用寫代碼的形式來處理sql,這樣說可能有點(diǎn)不準(zhǔn)確,如果就這么簡單,只是對(duì)sql進(jìn)行簡單的替換,要是我,我也不學(xué)習(xí)它,因?yàn)槲乙呀?jīng)會(huì)sql了,會(huì)通過sql進(jìn)行處理數(shù)據(jù)倉庫的etl,我還學(xué)習(xí)sparkSQL干嘛,而且學(xué)習(xí)的成本又那么高。sparkSQL肯定有好處了,不然也不會(huì)有這篇博客啦。我們都知道通過寫sql來進(jìn)行數(shù)據(jù)邏輯的處理時(shí)有限的,寫程序來進(jìn)行數(shù)據(jù)邏輯的處理是非常靈活的,所以sparkSQL是用來處理那些不能夠用sql來進(jìn)行處理的數(shù)據(jù)邏輯或者用sql處理起來比較復(fù)雜的數(shù)據(jù)邏輯。一般的原則是能用sql來處理的,盡量用sql來處理,畢竟開發(fā)起來簡單,sql處理不了的,再選擇用sparkSQL通過寫代碼的方式來處理。好了廢話不多說了,開始DataFrame之旅。

       sparkSQL非常強(qiáng)大,它提供了我們sql中的正刪改查所有的功能,每一個(gè)功能都對(duì)應(yīng)了一個(gè)實(shí)現(xiàn)此功能的方法。


對(duì)schema的操作


val sql = new SQLContext(sc)
val people = sql.read.json("people.txt")//people是一個(gè)DataFrame類型的對(duì)象

//數(shù)據(jù)讀進(jìn)來了,那我們查看一下其schema吧

people.schema

//返回的類型
//org.apache.spark.sql.types.StructType = 
//StructType(StructField(age,LongType,true), 
//           StructField(id,LongType,true),
//           StructField(name,StringType,true))

//以數(shù)組的形式分會(huì)schema

people.dtypes

//返回的結(jié)果:
//Array[(String, String)] = 
//       Array((age,LongType), (id,LongType), (name,StringType))


//返回schema中的字段

people.columns

//返回的結(jié)果:
//Array[String] = Array(age, id, name)  

//以tree的形式打印輸出schema

people.printSchema

//返回的結(jié)果:
//root
// |-- age: long (nullable = true)
// |-- id: long (nullable = true)
// |-- name: string (nullable = true)

對(duì)表的操作,對(duì)表的操作語句一般情況下是不常用的,因?yàn)殡m然sparkSQL把sql查的每一個(gè)功能都封裝到了一個(gè)方法中,但是處理起來還是不怎么靈活一般情況下我們采用的是用sql()方法直接來寫sql,這樣比較實(shí)用,還更靈活,而且代碼的可讀性也是很高的。那下面就把能用到的方法做一個(gè)簡要的說明。


方法(sql使我們定義的sql = new SQLContext(sc)) df是一個(gè)DataFrame對(duì)象實(shí)例說明
sql.read.table(tableName)
讀取一張表的數(shù)據(jù)
df.where(),          df.filter()

過濾條件,相當(dāng)于sql的where部分;

用法:選擇出年齡字段中年齡大于20的字段。

返回值類型:DataFrame


df.where("age >= 20"),df.filter("age >= 20")

df.limit()

限制輸出的行數(shù),對(duì)應(yīng)于sql的limit

用法:限制輸出一百行

返回值類型:DataFrame


df.limit(100)

df.join()

鏈接操作,相當(dāng)于sql的join

對(duì)于join操作,下面會(huì)單獨(dú)進(jìn)行介紹

df.groupBy()

聚合操作,相當(dāng)于sql的groupBy

用法:對(duì)于某幾行進(jìn)行聚合

返回值類型:DataFrame


df.groupBy("id")

df.agg()求聚合用的相關(guān)函數(shù),下面會(huì)詳細(xì)介紹
df.intersect(other:DataFrame)


求兩個(gè)DataFrame的交集
df.except(other:DataFrame)求在df中而不在other中的行
df.withColumn(colName:String,col:Column)

增加一列

df.withColumnRenamed(exName,newName)對(duì)某一列的名字進(jìn)行重新命名

df.map(),

df.flatMap,

df.mapPartitions(),

df.foreach()

df.foreachPartition()

df.collect()

df.collectAsList()

df.repartition()

df.distinct()

df.count()

這些方法都是spark的RDD的基本操作,其中在DataFrame類中也封裝了這些方法,需要注意的是這些方法的返回值是RDD類型的,不是DataFrame類型的,在這些方法的使用上,一定要記清楚返回值類型,不然就容易出現(xiàn)錯(cuò)誤
df.select()

選取某幾列元素,這個(gè)方法相當(dāng)于sql的select的功能

用法:返回選擇的某幾列數(shù)據(jù)

返回值類型:DataFrame


df.select("id","name")

以上是兩個(gè)都是一寫基本的方法,下面就詳細(xì)介紹一下join和agg,na,udf操作


2.4 sparkSQL的join操作


    spark的join操作就沒有直接寫sql的join操作來的靈活,在進(jìn)行鏈接的時(shí)候,不能對(duì)兩個(gè)表中的字段進(jìn)行重新命名,這樣就會(huì)出現(xiàn)同一張表中出現(xiàn)兩個(gè)相同的字段。下面就一點(diǎn)一點(diǎn)的進(jìn)行展開用到的兩個(gè)表,一個(gè)是用戶信息表,一個(gè)是用戶的收入薪資表:

sparkSQL實(shí)戰(zhàn)詳解  sparkSQL實(shí)戰(zhàn)詳解


1、內(nèi)連接,等值鏈接,會(huì)把鏈接的列合并成一個(gè)列


val sql = new SQLContext(sc)
val pInfo = sql.read.json("people.txt")
val pSalar = sql.read.json("salary.txt")
val info_salary = pInfo.join(pSalar,"id")//單個(gè)字段進(jìn)行內(nèi)連接
val info_salary1 = pInfo.join(pSalar,Seq("id","name"))//多字段鏈接

返回的結(jié)果如下圖:

單個(gè)id進(jìn)行鏈接 (一張表出現(xiàn)兩個(gè)name字段)                                                兩個(gè)字段進(jìn)行鏈接

sparkSQL實(shí)戰(zhàn)詳解                                                    sparkSQL實(shí)戰(zhàn)詳解


2、join還支持左聯(lián)接和右鏈接,但是其左聯(lián)接和右鏈接和我們sql的鏈接的意思是一樣的,同樣也是在鏈接的時(shí)候不能對(duì)字段進(jìn)行重新命名,如果兩個(gè)表中有相同的字段,則就會(huì)出現(xiàn)在同一個(gè)join的表中,同事左右鏈接,不會(huì)合并用于鏈接的字段。鏈接用的關(guān)鍵詞:outer,inner,left_outer,right_outer

//單字段鏈接
val left = pInfo.join(pSalar,pInfo("id") === pSalar("id"),"left_outer")
//多字段鏈接
val left2 = pInfo.join(pSalar,pInfo("id") === pSalar("id") and 
                pInfo("name") === pSalar("name"),"left_outer")

返回的結(jié)果:

單字段鏈接                                                               多字段鏈接

sparkSQL實(shí)戰(zhàn)詳解                  sparkSQL實(shí)戰(zhàn)詳解


由上可以發(fā)現(xiàn),sparkSQL的join操作還是沒有sql的join靈活,容易出現(xiàn)重復(fù)的字段在同一張表中,一般我們進(jìn)行鏈接操作時(shí),我們都是先利用registerTempTable()函數(shù)把此DataFrame注冊(cè)成一個(gè)內(nèi)部表,然后通過sql.sql("")寫sql的方法進(jìn)行鏈接,這樣可以更好的解決了重復(fù)字段的問題。


2.5 sparkSQL的agg操作

    

     其中sparkSQL的agg是sparkSQL聚合操作的一種表達(dá)式,當(dāng)我們調(diào)用agg時(shí),其一般情況下都是和groupBy()的一起使用的,選擇操作的數(shù)據(jù)表為:

sparkSQL實(shí)戰(zhàn)詳解

val pSalar = new SQLContext(sc).read.json("salary.txt")
val group = pSalar.groupBy("name").agg("salary" -> "avg")
val group2 = pSalar.groupBy("id","name").agg("salary" -> "avg")
val group3 = pSalar.groupBy("name").agg(Map("id" -> "avg","salary"->"max"))

得到的結(jié)過如下:

   group的結(jié)果                                         group2                                           group3

sparkSQL實(shí)戰(zhàn)詳解   sparkSQL實(shí)戰(zhàn)詳解    sparkSQL實(shí)戰(zhàn)詳解 


使用agg時(shí)需要注意的是,同一個(gè)字段不能進(jìn)行兩次操作比如:agg(Map("salary" -> "avg","salary" -> "max"),他只會(huì)計(jì)算max的操作,原因很簡單,agg接入的參數(shù)是Map類型的key-value對(duì),當(dāng)key相同時(shí),會(huì)覆蓋掉之前的value。同時(shí)還可以直接使用agg,這樣是對(duì)所有的行而言的。聚合所用的計(jì)算參數(shù)有:avg,max,min,sum,count,而不是只有例子中用到的avg



2.6 sparkSQL的na操作

   

     sparkSQL的na方法,返回的是一個(gè)DataFrameFuctions對(duì)象,此類主要是對(duì)DataFrame中值為null的行的操作,只提供三個(gè)方法,drop()刪除行,fill()填充行,replace()代替行的操作。很簡單不做過多的介紹。


3、總結(jié)


        我們使用sparkSQL的目的就是為了解決用寫sql不能解決的或者解決起來比較困難的問題,在平時(shí)的開發(fā)過程中,我們不能為了高逼格什么樣的sql問題都是用sparkSQL,這樣不是最高效的。使用sparkSQL,主要是利用了寫代碼處理數(shù)據(jù)邏輯的靈活性,但是我們也不能完全的只使用sparkSQL提供的sql方法,這樣同樣是走向了另外一個(gè)極端,有上面的討論可知,在使用join操作時(shí),如果使用sparkSQL的join操作,有很多的弊端。為了能結(jié)合sql語句的優(yōu)越性,我們可以先把要進(jìn)行鏈接的DataFrame對(duì)象,注冊(cè)成內(nèi)部的一個(gè)中間表,然后在通過寫sql語句,用SQLContext提供的sql()方法來執(zhí)行我們寫的sql,這樣處理起來更加的合理而且高效。在工作的開發(fā)過程中,我們要結(jié)合寫代碼和寫sql的各自的所長來處理我們的問題,這樣會(huì)更加的高效。

       寫這篇博客,花費(fèi)了我兩周的時(shí)間,由于工作比較忙,只有在業(yè)余時(shí)間進(jìn)行思考和總結(jié)。也算對(duì)自己學(xué)習(xí)的一個(gè)交代。關(guān)于sparkSQL的兩個(gè)類HiveContext和SQLContext提供的udf方法,如果用好了udf方法,可以使我們代碼的開發(fā)更加的簡潔和高效,可讀性也是很強(qiáng)的。由于在代碼中注冊(cè)u(píng)df方法,還有很多很細(xì)的知識(shí)點(diǎn)需要注意,我準(zhǔn)備在另外寫一篇博客進(jìn)行詳細(xì)的介紹。


      累死我了,已經(jīng)兩天宅在家里了,該出去溜達(dá)溜達(dá)了!!

    

    




    




向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI