溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

發(fā)布時間:2020-08-14 13:35:01 來源:網(wǎng)絡(luò) 閱讀:910 作者:張立達 欄目:大數(shù)據(jù)

一:前置知識詳解: 
Spark SQL重要是操作DataFrame,DataFrame本身提供了save和load的操作, 
Load:可以創(chuàng)建DataFrame, 
Save:把DataFrame中的數(shù)據(jù)保存到文件或者說與具體的格式來指明我們要讀取的文件的類型以及與具體的格式來指出我們要輸出的文件是什么類型。 
二:Spark SQL讀寫數(shù)據(jù)代碼實戰(zhàn):

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.sql.*;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;import java.util.ArrayList;import java.util.List;public class SparkSQLLoadSaveOps {    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkSQLLoadSaveOps");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext = new SQLContext(sc);        /**
         * read()是DataFrameReader類型,load可以將數(shù)據(jù)讀取出來         */
        DataFrame peopleDF = sqlContext.read().format("json").load("E:\\Spark\\Sparkinstanll_package\\Big_Data_Software\\spark-1.6.0-bin-hadoop2.6\\examples\\src\\main\\resources\\people.json");        /**
         * 直接對DataFrame進行操作
         * Json: 是一種自解釋的格式,讀取Json的時候怎么判斷其是什么格式?
         * 通過掃描整個Json。掃描之后才會知道元數(shù)據(jù)         */
        //通過mode來指定輸出文件的是append。創(chuàng)建新文件來追加文件
   peopleDF.select("name").write().mode(SaveMode.Append).save("E:\\personNames");
    }
}

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

讀取過程源碼分析如下: 
1. read方法返回DataFrameReader,用于讀取數(shù)據(jù)。

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

`DataFrameReader` that can be used to read data in as a `DataFrame`. * {{{ *   sqlContext.read.parquet("/path/to/file.parquet") *   sqlContext.read.schema(schema).json("/path/to/file.json") * }}} *
 * @group genericdata * @since 1.4.0
 */@Experimental//創(chuàng)建DataFrameReader實例,獲得了DataFrameReader引用def read: DataFrameReader = new DataFrameReader(this)

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

2.  然后再調(diào)用DataFrameReader類中的format,指出讀取文件的格式。

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

/**
 * Specifies the input data source format.
 *
 * @since 1.4.0 */def format(source: String): DataFrameReader = {  this.source = source  this}

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

3.  通過DtaFrameReader中l(wèi)oad方法通過路徑把傳入過來的輸入變成DataFrame。

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

/**
 * Loads input in as a `DataFrame`, for data sources that require a path (e.g. data backed by
 * a local or distributed file system).
 *
 * @since 1.4.0 */// TODO: Remove this one in Spark 2.0.def load(path: String): DataFrame = {
  option("path", path).load()
}

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

至此,數(shù)據(jù)的讀取工作就完成了,下面就對DataFrame進行操作。 
下面就是寫操作?。?! 
1. 調(diào)用DataFrame中select函數(shù)進行對列篩選

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

/**
 * Selects a set of columns. This is a variant of `select` that can only select
 * existing columns using column names (i.e. cannot construct expressions).
 *
 * {{{
 *   // The following two are equivalent:
 *   df.select("colA", "colB")
 *   df.select($"colA", $"colB")
 * }}}
 * @group dfops
 * @since 1.3.0 */@scala.annotation.varargs
def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

2.  然后通過write將結(jié)果寫入到外部存儲系統(tǒng)中。

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

/**
 * :: Experimental ::
 * Interface for saving the content of the `DataFrame` out into external storage.
 *
 * @group output
 * @since 1.4.0 */@Experimental
def write: DataFrameWriter = new DataFrameWriter(this)

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

3. 在保持文件的時候mode指定追加文件的方式

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

/**
 * Specifies the behavior when data or table already exists. Options include:
// Overwrite是覆蓋
 *   - `SaveMode.Overwrite`: overwrite the existing data.
//創(chuàng)建新的文件,然后追加
 *   - `SaveMode.Append`: append the data.
 *   - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
 *   - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
 *
 * @since 1.4.0 */def mode(saveMode: SaveMode): DataFrameWriter = {  this.mode = saveMode  this}

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

4.   最后,save()方法觸發(fā)action,將文件輸出到指定文件中。

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

/**
 * Saves the content of the `DataFrame` at the specified path.
 *
 * @since 1.4.0 */def save(path: String): Unit = {  this.extraOptions += ("path" -> path)
  save()
}

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

三:Spark SQL讀寫整個流程圖如下: 

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

四:對于流程中部分函數(shù)源碼詳解: 
DataFrameReader.Load() 
1. Load()返回DataFrame類型的數(shù)據(jù)集合,使用的數(shù)據(jù)是從默認的路徑讀取。

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

/**
 * Returns the dataset stored at path as a DataFrame,
 * using the default data source configured by spark.sql.sources.default.
 *
 * @group genericdata
 * @deprecated As of 1.4.0, replaced by `read().load(path)`. This will be removed in Spark 2.0. */@deprecated("Use read.load(path). This will be removed in Spark 2.0.", "1.4.0")
def load(path: String): DataFrame = {//此時的read就是DataFrameReader  read.load(path)
}

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

2.  追蹤load源碼進去,源碼如下:

在DataFrameReader中的方法。Load()通過路徑把輸入傳進來變成一個DataFrame。

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

/** 
 * Loads input in as a `DataFrame`, for data sources that require a path (e.g. data backed by
 * a local or distributed file system).
 *
 * @since 1.4.0 */// TODO: Remove this one in Spark 2.0.def load(path: String): DataFrame = {
  option("path", path).load()
}

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

3.  追蹤load源碼如下:

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

/**
 * Loads input in as a `DataFrame`, for data sources that don't require a path (e.g. external
 * key-value stores).
 *
 * @since 1.4.0 */def load(): DataFrame = {//對傳入的Source進行解析
  val resolved = ResolvedDataSource(
    sqlContext,
    userSpecifiedSchema = userSpecifiedSchema,
    partitionColumns = Array.empty[String],
    provider = source,
    options = extraOptions.toMap)
  DataFrame(sqlContext, LogicalRelation(resolved.relation))
}

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

DataFrameReader.format() 
1. Format:具體指定文件格式,這就獲得一個巨大的啟示是:如果是Json文件格式可以保持為Parquet等此類操作。 
Spark SQL在讀取文件的時候可以指定讀取文件的類型。例如,Json,Parquet.

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

/**
 * Specifies the input data source format.Built-in options include “parquet”,”json”,etc.
 *
 * @since 1.4.0 */def format(source: String): DataFrameReader = {  this.source = source //FileType
  this}

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

DataFrame.write() 
1. 創(chuàng)建DataFrameWriter實例

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

/**
 * :: Experimental ::
 * Interface for saving the content of the `DataFrame` out into external storage.
 *
 * @group output
 * @since 1.4.0 */@Experimental
def write: DataFrameWriter = new DataFrameWriter(this)

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

2.  追蹤DataFrameWriter源碼如下:

以DataFrame的方式向外部存儲系統(tǒng)中寫入數(shù)據(jù)。

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

/**
 * :: Experimental ::
 * Interface used to write a `DataFrame` to external storage systems (e.g. file systems,
 * key-value stores, etc). Use `DataFrame`.`write` to access this.
 *
 * @since 1.4.0 */@Experimentalfinal class DataFrameWriter private[sql](df: DataFrame) {

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

DataFrameWriter.mode() 
1. Overwrite是覆蓋,之前寫的數(shù)據(jù)全都被覆蓋了。 
Append:是追加,對于普通文件是在一個文件中進行追加,但是對于parquet格式的文件則創(chuàng)建新的文件進行追加。

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

**
 * Specifies the behavior when data or table already exists. Options include: *   - `SaveMode.Overwrite`: overwrite the existing data. *   - `SaveMode.Append`: append the data. *   - `SaveMode.Ignore`: ignore the operation (i.e. no-op).//默認操作
 *   - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime. *
 * @since 1.4.0
 */def mode(saveMode: SaveMode): DataFrameWriter = {  this.mode = saveMode  this}

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

2.  通過模式匹配接收外部參數(shù)

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

/**
 * Specifies the behavior when data or table already exists. Options include:
 *   - `overwrite`: overwrite the existing data.
 *   - `append`: append the data.
 *   - `ignore`: ignore the operation (i.e. no-op).
 *   - `error`: default option, throw an exception at runtime.
 *
 * @since 1.4.0 */def mode(saveMode: String): DataFrameWriter = {  this.mode = saveMode.toLowerCase match {    case "overwrite" => SaveMode.Overwrite    case "append" => SaveMode.Append    case "ignore" => SaveMode.Ignore    case "error" | "default" => SaveMode.ErrorIfExists    case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +
      "Accepted modes are 'overwrite', 'append', 'ignore', 'error'.")
  }  this}

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

 
1. save將結(jié)果保存?zhèn)魅氲穆窂健?/pre>

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

/**
 * Saves the content of the `DataFrame` at the specified path.
 *
 * @since 1.4.0 */def save(path: String): Unit = {  this.extraOptions += ("path" -> path)
  save()
}

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

2.  追蹤save方法。

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

/**
 * Saves the content of the `DataFrame` as the specified table.
 *
 * @since 1.4.0 */def save(): Unit = {
  ResolvedDataSource(
    df.sqlContext,
    source,
    partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
    mode,
    extraOptions.toMap,
    df)
}

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

3.  其中source是SQLConf的defaultDataSourceName

private var source: String = df.sqlContext.conf.defaultDataSourceName

其中DEFAULT_DATA_SOURCE_NAME默認參數(shù)是parquet。

// This is used to set the default data sourceval DEFAULT_DATA_SOURCE_NAME = stringConf("spark.sql.sources.default",
  defaultValue = Some("org.apache.spark.sql.parquet"),
  doc = "The default data source to use in input/output.")

DataFrame.Scala中部分函數(shù)詳解: 
1. toDF函數(shù)是將RDD轉(zhuǎn)換成DataFrame

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

**
 * Returns the object itself. * @group basic * @since 1.3.0
 */// This is declared with parentheses to prevent the Scala compiler from treating// `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.def toDF(): DataFrame = this

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

2.  show()方法:將結(jié)果顯示出來

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

/**
 * Displays the `DataFrame` in a tabular form. For example:
 * {{{
 *   year  month AVG('Adj Close) MAX('Adj Close)
 *   1980  12    0.503218        0.595103
 *   1981  01    0.523289        0.570307
 *   1982  02    0.436504        0.475256
 *   1983  03    0.410516        0.442194
 *   1984  04    0.450090        0.483521
 * }}}
 * @param numRows Number of rows to show
 * @param truncate Whether truncate long strings. If true, strings more than 20 characters will
 *              be truncated and all cells will be aligned right
 *
 * @group action
 * @since 1.5.0 */// scalastyle:off printlndef show(numRows: Int, truncate: Boolean): Unit = println(showString(numRows, truncate))// scalastyle:on println

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

追蹤showString源碼如下:showString中觸發(fā)action收集數(shù)據(jù)。

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)

/**
 * Compose the string representing rows for output
 * @param _numRows Number of rows to show
 * @param truncate Whether truncate long strings and align cells right */private[sql] def showString(_numRows: Int, truncate: Boolean = true): String = {
  val numRows = _numRows.max(0)
  val sb = new StringBuilder
  val takeResult = take(numRows + 1)
  val hasMoreData = takeResult.length > numRows
  val data = takeResult.take(numRows)
  val numCols = schema.fieldNames.length

Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)


向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI