溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

DataFrame怎么用

發(fā)布時(shí)間:2022-01-13 16:01:32 來(lái)源:億速云 閱讀:150 作者:小新 欄目:大數(shù)據(jù)

這篇文章主要介紹DataFrame怎么用,文中介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們一定要看完!

一、概述:

        DataFrame是一個(gè)分布式數(shù)據(jù)集,可以理解為關(guān)系型數(shù)據(jù)庫(kù)一張表,由字段和字段類型、字段值按列組織,且支持四種語(yǔ)言,在Scala API中可以理解為: FataFrame=Dataset[ROW]

注:DataFrame產(chǎn)生于V1.3之后,在V1.3前為SchemaRDD,在V1.6以后又添加了Dataset

二、DataFrame vs RDD 差異:

	概念	:
 兩個(gè)都是分布式容器,DF理解是一個(gè)表格除了RDD數(shù)據(jù)以外還有Schema,也支持復(fù)雜數(shù)據(jù)類型(map..)

	API	:
 DataFrame提供的API比RDD豐富 支持map  filter  flatMap .....

	數(shù)據(jù)結(jié)構(gòu):RDD知道類型沒有結(jié)構(gòu), DF提供Schema信息 有利于優(yōu)化,性能上好

	底層	:基于運(yùn)行環(huán)境不一樣,RDD開發(fā)的Java/Scala API運(yùn)行底層環(huán)境JVM,

  DF在SparkSQL中轉(zhuǎn)換成邏輯執(zhí)行計(jì)劃(locaical Plan)和物理執(zhí)行計(jì)劃(Physical Plan)中間自身優(yōu)化功能,性能差異大

三、json文件操作

[hadoop@hadoop001 bin]$./spark-shell --master local[2] --jars ~/software/mysql-connector-java-5.1.34-bin.jar 

-- 讀取json文件

scala>val df = spark.read.json("file:///home/hadoop/data/people.json")

18/09/02 11:47:20 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException

df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

-- 打印schema信息

scala> df.printSchema

root

 |-- age: long (nullable = true)    -- 字段 類型 允許為空

 |-- name: string (nullable = true)

--  打印字段內(nèi)容

scala> df.show

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

-- 打印查詢字段

scala> df.select("name").show

+-------+
|   name|
+-------+
|Michael| 
|   Andy|
| Justin|
+-------+

-- 單引號(hào),存在隱式轉(zhuǎn)換

scala> df.select('name).show

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

-- 雙引號(hào)隱式轉(zhuǎn)換不識(shí)別

scala> df.select("name).show

<console>:1: error: unclosed string literal

df.select("name).show

          ^  

-- 年齡計(jì)算,NULL無(wú)法計(jì)算

scala> df.select($"name",$"age" + 1).show

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+

-- 年齡過濾

scala> df.filter($"age" > 21).show

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

-- 年齡分組 匯總

scala> df.groupBy("age").count.show

+----+-----+                                                                    
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+

-- 創(chuàng)建一個(gè)臨時(shí)視圖

scala>  df.createOrReplaceTempView("people")

scala>spark.sql("select * from people").show

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

四、DataFrame對(duì)象上Action操作

-- 定義case class 用來(lái)創(chuàng)建Schema

case class Student(id:String,name:String,phone:String,Email:String)

-- RDD與DF反射方式實(shí)現(xiàn)

val students = sc.textFile("file:///home/hadoop/data/student.data").map(_.split("\\|")).map(x=>Student(x(0),x(1),x(2),x(3))).toDF()

-- 打印DF信息

students.printSchema

-- show(numRows: Int, truncate: Boolean) 

-- numRows截取前20行和truncate讀取前20字符串

-- students.show(5,false) 讀取前五行和所有字符串

scala> students.show

+---+--------+--------------+--------------------+
| id|    name|         phone|               Email|
+---+--------+--------------+--------------------+
|  1|   Burke|1-300-746-8446|ullamcorper.velit...|
|  2|   Kamal|1-668-571-5046|pede.Suspendisse@...|
|  3|    Olga|1-956-311-1686|Aenean.eget.metus...|
|  4|   Belle|1-246-894-6340|vitae.aliquet.nec...|
|  5|  Trevor|1-300-527-4967|dapibus.id@acturp...|
|  6|  Laurel|1-691-379-9921|adipiscing@consec...|
|  7|    Sara|1-608-140-1995|Donec.nibh@enimEt...|
|  8|  Kaseem|1-881-586-2689|cursus.et.magna@e...|
|  9|     Lev|1-916-367-5608|Vivamus.nisi@ipsu...|
| 10|    Maya|1-271-683-2698|accumsan.convalli...|
| 11|     Emi|1-467-270-1337|est@nunc.com|.......|
| 12|   Caleb|1-683-212-0896|Suspendisse@Quisq...|
| 13|Florence|1-603-575-2444|sit.amet.dapibus@...|
| 14|   Anika|1-856-828-7883|euismod@ligulaeli...|
| 15|   Tarik|1-398-171-2268|turpis@felisorci.com|
| 16|   Amena|1-878-250-3129|lorem.luctus.ut@s...|
| 17| Blossom|1-154-406-9596|Nunc.commodo.auct...|
| 18|     Guy|1-869-521-3230|senectus.et.netus...|
| 19| Malachi|1-608-637-2772|Proin.mi.Aliquam@...|
| 20|  Edward|1-711-710-6552|lectus@aliquetlib...|
+---+--------+--------------+--------------------+
only showing top 20 rows

-- students.head(5) 返回前幾行數(shù)據(jù)

scala> students.head(5).foreach(println)
[1,Burke,1-300-746-8446,ullamcorper.velit.in@ametnullaDonec.co.uk]
[2,Kamal,1-668-571-5046,pede.Suspendisse@interdumenim.edu]
[3,Olga,1-956-311-1686,Aenean.eget.metus@dictumcursusNunc.edu]
[4,Belle,1-246-894-6340,vitae.aliquet.nec@neque.co.uk]
[5,Trevor,1-300-527-4967,dapibus.id@acturpisegestas.net]

-- 查詢具體字段

scala> students.select("id","name").show(5)
+---+------+
| id|  name|
+---+------+
|  1| Burke|
|  2| Kamal|
|  3|  Olga|
|  4| Belle|
|  5|Trevor|
+---+------+

-- 修改字段取別名

scala> students.select($"name".as("new_name")).show(5)

+--------+
|new_name|
+--------+
|   Burke|
|   Kamal|
|    Olga|
|   Belle|
|  Trevor|
+--------+

--查詢id大于五

scala> students.filter("id>5").show(5)

+---+------+--------------+--------------------+
| id|  name|         phone|               Email|
+---+------+--------------+--------------------+
|  6|Laurel|1-691-379-9921|adipiscing@consec...|
|  7|  Sara|1-608-140-1995|Donec.nibh@enimEt...|
|  8|Kaseem|1-881-586-2689|cursus.et.magna@e...|
|  9|   Lev|1-916-367-5608|Vivamus.nisi@ipsu...|
| 10|  Maya|1-271-683-2698|accumsan.convalli...|
+---+------+--------------+--------------------+

-- 查詢名稱為空或者名稱為NULL(filter=where)

scala> students.filter("name=''or name='NULL'").show(false)

+---+----+--------------+--------------------------+
|id |name|phone         |Email                     |
+---+----+--------------+--------------------------+
|21 |    |1-711-710-6552|lectus@aliquetlibero.co.uk|
|22 |    |1-711-710-6552|lectus@aliquetlibero.co.uk|
|23 |NULL|1-711-710-6552|lectus@aliquetlibero.co.uk|
+---+----+--------------+--------------------------+

-- 查詢ID大于5且名稱模糊查詢 

scala> students.filter("id>5 and name like 'M%'").show(5)

+---+-------+--------------+--------------------+
| id|   name|         phone|               Email|
+---+-------+--------------+--------------------+
| 10|   Maya|1-271-683-2698|accumsan.convalli...|
| 19|Malachi|1-608-637-2772|Proin.mi.Aliquam@...|
+---+-------+--------------+--------------------+

-- 按照名稱升序排序且不等于空

scala> students.sort($"name").select("id","name").filter("name <> ''").show(3)

+---+-----+
| id| name|
+---+-----+
| 16|Amena|
| 14|Anika|
|  4|Belle|
+---+-----+

-- 按照名稱倒敘排序(sort = orderBy)

scala> students.sort($"name".desc).select("name").show(5)

+------+
|  name|
+------+
|Trevor|
| Tarik|
|  Sara|
|  Olga|
|  NULL|
+------+

-- 年齡分組 匯總

scala> students.groupBy("age").count().show

+----+-----+                                                                    
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+

-- 聚合函數(shù)使用

scala> students.agg("id" -> "max", "id" -> "sum").show(false)

+-------+-------+
|max(id)|sum(id)|
+-------+-------+
|9      |276.0  |
+-------+-------+

-- join操作,using模式seq指定多個(gè)字段 

students.join(students2, Seq("id", "name"), "inner")

-- DataFrame的join操作有inner, outer, left_outer, right_outer, leftsemi類型

-- 指定類型,指定join的類型

students.join(students2 , students("id" ) === students2( "t1_id"), "inner")

五、DataFrame API實(shí)現(xiàn)文件操作

1.maven依賴下載

<spark.version>2.3.1</spark.version>

<!-- 添加Spark Core的dependency -->
<dependency>

  <groupId>org.apache.spark</groupId>

  <artifactId>spark-core_2.11</artifactId>

  <version>${spark.version}</version>
</dependency>

<!-- 添加Spark SQL的dependency -->
<dependency>

  <groupId>org.apache.spark</groupId>

  <artifactId>spark-sql_2.11</artifactId>

  <version>${spark.version}</version>
</dependency>

2、IDEA實(shí)現(xiàn)方式:

package com.zrc.ruozedata.sparkSQL
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

object SparkSQL001 extends App {

      /*

       * RDD與DataFrame反射方式實(shí)現(xiàn)(一)

       * 創(chuàng)建RDD --> DataFrema

       * 利用case class創(chuàng)建Schema,來(lái)解析輸出文本每一行信息

       */

      val spark = SparkSession.builder()

      .master("local[2]")

      .appName("SparkSQL001")

      .getOrCreate() // 操作hive添加

      val  infos = spark.sparkContext.textFile("file:///F:/infos.txt")


      /*

      import spark.implicits._

      val infoDF = infos.map(_.split(",")).map(x=>Info(x(0).toInt,x(1),x(2).toInt)).toDF()

      infoDF.show()

      */


      /*

       * RDD與DataFrame使用StructType方式實(shí)現(xiàn)(二)

        * StructType構(gòu)造了StructField方法傳入name和dataType

        * 每一個(gè)字段就是為一個(gè)StructField

        * Schema和RDD通過createDataFrame方法作用起來(lái)

      */

       // 注意通過ROW獲取的需要轉(zhuǎn)換對(duì)應(yīng)類型

      val infoss = infos.map(_.split(",")).map(x=>Row(x(0).trim.toInt,x(1),x(2).trim.toInt))

      val fields = StructType(

            Array(

                  StructField("id",IntegerType,true),

                  StructField("name",StringType,true),

                  StructField("age",IntegerType,true)

            )

      )

      val schema = StructType(fields)

      val infoDF = spark.createDataFrame(infoss,schema)

      infoDF.show()

      spark.stop()
}
// case class Info (id:Int,name:String,age:Int)

以上是“DataFrame怎么用”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對(duì)大家有幫助,更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI