溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Spark RDD轉(zhuǎn)換成DataFrame的兩種方式

發(fā)布時間:2020-08-11 00:00:33 來源:網(wǎng)絡(luò) 閱讀:6226 作者:Stitch_x 欄目:大數(shù)據(jù)

Spark SQL支持兩種方式將現(xiàn)有RDD轉(zhuǎn)換為DataFrame。
第一種方法使用反射來推斷RDD的schema并創(chuàng)建DataSet然后將其轉(zhuǎn)化為DataFrame。這種基于反射方法十分簡便,但是前提是在您編寫Spark應(yīng)用程序時就已經(jīng)知道RDD的schema類型。
第二種方法是通過編程接口,使用您構(gòu)建的StructType,然后將其應(yīng)用于現(xiàn)有RDD。雖然此方法很麻煩,但它允許您在運行之前并不知道列及其類型的情況下構(gòu)建DataSet

    方法如下
         1.將RDD轉(zhuǎn)換成Rows   
         2.按照第一步Rows的結(jié)構(gòu)定義StructType  
         3.基于rows和StructType使用createDataFrame創(chuàng)建相應(yīng)的DF

測試數(shù)據(jù)為order.data

1   小王  電視  12  2015-08-01 09:08:31
1   小王  冰箱  24  2015-08-01 09:08:14
2   小李  空調(diào)  12  2015-09-02 09:01:31

代碼如下:

object RDD2DF {

  /**
    * 主要有兩種方式
    *   第一種是在已經(jīng)知道schema已經(jīng)知道的情況下,我們使用反射把RDD轉(zhuǎn)換成DS,進(jìn)而轉(zhuǎn)換成DF
    *   第二種是你不能提前定義好case class,例如數(shù)據(jù)的結(jié)構(gòu)是以String類型存在的。我們使用接口自定義一個schema
    * @param args
    */
  def main(args: Array[String]): Unit = {

    val spark=SparkSession.builder()
      .appName("DFDemo")
      .master("local[2]")
      .getOrCreate()

//    rdd2DFFunc1(spark)

    rdd2DFFunc2(spark)
    spark.stop()
  }

  /**
    * 提前定義好case class
    * @param spark
    */
  def rdd2DFFunc1(spark:SparkSession): Unit ={
    import spark.implicits._
    val orderRDD=spark.sparkContext.textFile("F:\\JAVA\\WorkSpace\\spark\\src\\main\\resources\\order.data")
    val orderDF=orderRDD.map(_.split("\t"))
      .map(attributes=>Order(attributes(0),attributes(1),attributes(2),attributes(3),attributes(4)))
      .toDF()
    orderDF.show()
    Thread.sleep(1000000)
  }

  /**
    *總結(jié):第二種方式就是通過最基礎(chǔ)的DF接口方法,將
    * @param spark
    */
  def rdd2DFFunc2(spark:SparkSession): Unit ={
    //TODO:   1.將RDD轉(zhuǎn)換成Rows   2.按照第一步Rows的結(jié)構(gòu)定義StructType  3.基于rows和StructType使用createDataFrame創(chuàng)建相應(yīng)的DF
    val orderRDD=spark.sparkContext.textFile("F:\\JAVA\\WorkSpace\\spark\\src\\main\\resources\\order.data")

    //TODO:   1.將RDD轉(zhuǎn)換成Rows
    val rowsRDD=orderRDD
//      .filter((str:String)=>{val arr=str.split("\t");val res=arr(1)!="小李";res})
      .map(_.split("\t"))
      .map(attributes=>Row(attributes(0).trim,attributes(1),attributes(2),attributes(3).trim,attributes(4)))

    //TODO:   2.按照第一步Rows的結(jié)構(gòu)定義StructType
    val schemaString="id|name|commodity|age|date"
    val fields=schemaString.split("\\|")
      .map(filedName=>StructField(filedName,StringType,nullable = true))
    val schema=StructType(fields)

    //TODO:   3.基于rows和StructType使用createDataFrame創(chuàng)建相應(yīng)的DF
   val orderDF= spark.createDataFrame(rowsRDD,schema)
    orderDF.show()
    orderDF.groupBy("name").count().show()
    orderDF.select("name","commodity").show()
    Thread.sleep(10000000)
  }
}
case class Order(id:String,name:String,commodity:String,age:String,date:String)

生產(chǎn)中創(chuàng)建DataFrame代碼舉例

在實際生產(chǎn)環(huán)境中,我們其實選擇的是方式二這種進(jìn)行創(chuàng)建DataFrame的,因為我們生產(chǎn)中很難提前定義case class ,因為業(yè)務(wù)處理之后字段常常會發(fā)生意想不到的變化,所以一定要掌握這種方法。

測試數(shù)據(jù)

baidu   CN  A   E   [01/May/2018:02:15:52 +0800]    2   61.237.59.0 -   112.29.213.35:80    0   movieshow2000.edu.chinaren.com  GET http://movieshow2000.edu.chinaren.com/user_upload/15316339776271455.mp4 HTTP/1.1    -   bytes 13869056-13885439/25136186    TCP_HIT/206 112.29.213.35   video/mp4   16374   16384   -:0 0   0   -   -   -   11451601    -   "JSP3/2.0.14"   "-" "-" "-" http    -   2   v1.go2yd.com    0.002   25136186    16384   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   1531818470104-11451601-112.29.213.66#2705261172 644514568
baidu   CN  A   E   [01/May/2018:02:25:33 +0800]    2   61.232.37.228   -   112.29.213.35:80    0   github.com  GET http://github.com/user_upload/15316339776271/44y.mp4    HTTP/1.1    -   bytes 13869056-13885439/25136186    TCP_HIT/206 112.29.213.35   video/mp4   83552   16384   -:0 0   0   -   -   -   11451601    -   "JSP3/2.0.14"   "-" "-" "-" http    -   2   v1.go2yd.com    0.002   25136186    16384   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   1531818470104-11451601-112.29.213.66#2705261172 644514568

Schema方法類


import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}

object LogConverUtil {

  private val struct=StructType(
    Array(
      StructField("domain",StringType)
      ,StructField("url",StringType)
      ,StructField("pv",LongType)
      ,StructField("traffic",LongType)
      ,StructField("date",StringType)
    )
  )
  def getStruct():StructType={
    struct
  }

  def parseLog(logLine:String): Row ={
    val sourceFormat=new SimpleDateFormat("[dd/MMM/yyyy:hh:mm:ss +0800]",Locale.ENGLISH)
    val targetFormat=new SimpleDateFormat("yyyyMMddhh")

    try{

      val fields=logLine.split("\t")
      val domain=fields(10)
      val url=fields(12)
      val pv=1L
      val traffic=fields(19).trim.toLong
      val date=getFormatedDate(fields(4),sourceFormat,targetFormat)

      Row(domain,url,pv,traffic,date)
    }catch {
      case e:Exception=>Row(0)
    }
  }
  /**
    *
    * @param sourceDate  Log中的未格式化日期   [01/May/2018:01:09:45 +0800]
    * @return  按照需求格式化字段      2018050101
    */
  def getFormatedDate(sourceDate: String, sourceFormat: SimpleDateFormat, targetFormat: SimpleDateFormat) = {
    val targetTime=targetFormat.format(sourceFormat.parse(sourceDate))
    targetTime
  }

}

RDD2DataFrame主類

import org.apache.spark.sql.SparkSession

object SparkCleanJob {
  def main(args: Array[String]): Unit = {
    val spark=SparkSession.builder()
      .master("local[2]")
      .appName("SparkCleanJob")
      .getOrCreate()

    val logRDD=spark.sparkContext.textFile("file:///D:/baidu.log")
//    logRDD.take(2).foreach(println(_))

    //調(diào)用LogConverUtil里的parseLog方法和getStruct方法獲得Rows對象和StructType對象
    val logDF=spark.createDataFrame(logRDD.map(LogConverUtil.parseLog(_)),LogConverUtil.getStruct())
    logDF.show(false)
    logDF.printSchema()
  }
}

結(jié)果

+------------------------------+-------------------------------------------------------------------------+---+-------+----------+
|domain                        |url                                                                      |pv |traffic|date      |
+------------------------------+-------------------------------------------------------------------------+---+-------+----------+
|movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271455.mp4  |1  |16374  |2018050102|
|github.com                    |http://github.com/user_upload/15316339776271/44y.mp4                     |1  |83552  |2018050102|
|yooku.com                     |http://yooku.com/user_upload/15316339776271x0.html                       |1  |74986  |2018050101|
|rw.uestc.edu.cn               |http://rw.uestc.edu.cn/user_upload/15316339776271515.mp4                 |1  |55297  |2018050101|
|github.com                    |http://github.com/user_upload/15316339776271x05.mp4                      |1  |26812  |2018050102|
|movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271y4.html  |1  |50392  |2018050103|
|github.com                    |http://github.com/user_upload/15316339776271x15.html                     |1  |40092  |2018050101|
|movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/153163397762714z.mp4   |1  |8368   |2018050102|
|movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271/5z.html |1  |29677  |2018050103|
|rw.uestc.edu.cn               |http://rw.uestc.edu.cn/user_upload/153163397762710w.mp4                  |1  |26124  |2018050102|
|yooku.com                     |http://yooku.com/user_upload/15316339776271yz.mp4                        |1  |32219  |2018050101|
|yooku.com                     |http://yooku.com/user_upload/153163397762713w.html                       |1  |90389  |2018050101|
|movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271z/.html  |1  |15623  |2018050101|
|yooku.com                     |http://yooku.com/user_upload/1531633977627142.html                       |1  |53453  |2018050103|
|yooku.com                     |http://yooku.com/user_upload/15316339776271230.mp4                       |1  |20309  |2018050102|
|movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271/4w1.html|1  |87804  |2018050103|
|movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271y5y.html |1  |69469  |2018050103|
|yooku.com                     |http://yooku.com/user_upload/15316339776271011/.mp4                      |1  |3782   |2018050103|
|github.com                    |http://github.com/user_upload/15316339776271wzw.mp4                      |1  |89642  |2018050102|
|github.com                    |http://github.com/user_upload/15316339776271/1/.mp4                      |1  |63551  |2018050103|
+------------------------------+-------------------------------------------------------------------------+---+-------+----------+
only showing top 20 rows

root
 |-- domain: string (nullable = true)
 |-- url: string (nullable = true)
 |-- pv: long (nullable = true)
 |-- traffic: long (nullable = true)
 |-- date: string (nullable = true)

Process finished with exit code 0

注:除了這種使用RDD讀取文本進(jìn)而轉(zhuǎn)化成DataFrame之外,我們也會使用自定義DefaultSource來直接將text轉(zhuǎn)化成DataFrame

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI