您好,登錄后才能下訂單哦!
這期內(nèi)容當(dāng)中小編將會給大家?guī)碛嘘P(guān)如何解析SparkSQL外部數(shù)據(jù)源,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
大數(shù)據(jù)MapReduce,Hive,Spark作業(yè),首先需要加載數(shù)據(jù),數(shù)據(jù)的存放源可能是HDFS、HBase、S3、OSS mongoDB;數(shù)據(jù)格式也可能為json、text、csv、parquet、jdbc..或者數(shù)據(jù)格式經(jīng)過壓縮,不同格式文件需要不同的解析方式,
如果需要HDFS關(guān)聯(lián)MySQL數(shù)據(jù),可以通過sqoop進(jìn)行一些列轉(zhuǎn)換到,如果使用External Data Source API直接加載為DF拿到數(shù)據(jù),簡單的說可以通過SparkSQL拿到外部數(shù)據(jù)源數(shù)據(jù)加載成DF。
加載方式:
build-in :內(nèi)置加載外部數(shù)據(jù)如 json、text、parquet、jdbc、HDFS;
third-party:第三方加載外部數(shù)據(jù)如HBase、S3、OSS mongoDB
第三方JAR地址:https://spark-packages.org/
Maven工程需要導(dǎo)入gav
spark-shell:需要外部導(dǎo)入--package g:a:v
SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0
優(yōu)勢:下載依賴包到本地
缺點:內(nèi)網(wǎng)環(huán)境沒有網(wǎng)絡(luò)無法下載
Spark context Web UI available at http://hadoop001:4040
Spark context available as 'sc' (master = local[2], app id = local-1536244013147).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.3.1
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45)
Type in expressions to have them evaluated.
Type :help for more information.
scala> spark.read.load("file:///home/hadoop/app/spark--bin-2.6.0-cdh6.7.0/examples/src/main/resources/people.txt").show
提示錯誤:/people.txt is not a Parquet file
注意:spark.read.load()底層默認(rèn)讀取Parquet file
scala> spark.read.load("file:///home/hadoop/app/spark--bin-2.6.0-cdh6.7.0/examples/src/main/resources/users.parquet").show
18/09/06 10:32:29 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
+------+--------------+----------------+ | name|favorite_color|favorite_numbers| +------+--------------+----------------+ |Alyssa| null| [3, 9, 15, 20]| | Ben| red| []| +------+--------------+----------------+
scala> val users = spark.read.load("file:///home/hadoop/app/spark--bin-2.6.0-cdh6.7.0/examples/src/main/resources/users.parquet")
users: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]
scala> users.printSchema
root
|-- name: string (nullable = true)
|-- favorite_color: string (nullable = true)
|-- favorite_numbers: array (nullable = true)
| |-- element: integer (containsNull = true)
scala> users.show
+------+--------------+----------------+ | name|favorite_color|favorite_numbers| +------+--------------+----------------+ |Alyssa| null| [3, 9, 15, 20]| | Ben| red| []| +------+--------------+----------------+
-- 查看列,常規(guī)操作
scala> users.select("name").show
+------+ | name| +------+ |Alyssa| | Ben| +------+
二、轉(zhuǎn)換操作
-- 轉(zhuǎn)成json格式輸出
scala> users.select("name","favorite_color").write.format("json").save("file:////home/hadoop/data/parquet/")
[hadoop@hadoop001 parquet]$ cat * {"name":"Alyssa"} {"name":"Ben","favorite_color":"red"}
-- 不采取壓縮
.option("compression","none")
-- 轉(zhuǎn)成text格式輸出
scala> users.select("name").write.format("text").save("file:////home/hadoop/data/parquet2/")
[hadoop@hadoop001 parquet2]$ cat *
Alyssa
-- Save Modes
用法:.mode("")
1、default -- 目標(biāo)目錄存在,拋出異常
2、append -- 目標(biāo)目錄存在,重跑數(shù)據(jù)+1,無法保證數(shù)據(jù)冪等
3、overwrite-- 目標(biāo)目錄存在,覆蓋原文件
4、ignore-- 忽略你的模式,目標(biāo)純在將不保存
-- 讀取外部MySQL數(shù)據(jù)為DF
val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop001:3306/ruozedata").option("driver","com.mysql.jdbc.Driver").option("dbtable", "city_info").option("user","root").option("password", "root").load()
-- 查看表信息
jdbcDF.show()
-- 獲取本地數(shù)據(jù)
val deptDF = spark.table("dept")
-- join關(guān)聯(lián)使用
deptDF.join(jdbcDF,deptDF.col("deptid") === jdbcDF.col("deptid"))
-- DF寫入MySQL本地,數(shù)據(jù)類型有變化,重復(fù)寫入需要加上.mode("overwrite")
jdbcDF.write.format("jdbc").option("url", "jdbc:mysql://hadoop001:3306/hive_data").option("driver","com.mysql.jdbc.Driver").option("dbtable", "city_info_bak").option("user","root").option("password", "root").save()
mysql> show tables
+---------------------------+ | Tables_in_hive_data | +---------------------------+ | bucketing_cols | | cds | | city_info_bak | +---------------------------+
-- 如果想類型不發(fā)生變化指定option指定字段類型
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
-- SQL創(chuàng)建臨時表視圖,單session
CREATE TEMPORARY VIEW emp_sql USING org.apache.spark.sql.jdbc OPTIONS ( url "jdbc:mysql://hadoop001:3306/ruozedata", dbtable "city_info", user 'root', password 'root' )
show tbales;
INSERT INTO TABLE emp_sql
SELECT * FROM emp_sql
disk network CPU
外部數(shù)據(jù)外(1T)------->獲取本地磁盤(1T)---------->提交到集群(1T)--------->結(jié)果(1G)
disk network CPU
外部數(shù)據(jù)外(1T)------->經(jīng)過列裁剪(10G)----------->提交到集群(10G)----------->傳結(jié)果(1g)
disk CPU network
外部數(shù)據(jù)外(1T)------->經(jīng)過列裁剪(10G)---------->進(jìn)過計算(1G)----------->傳輸結(jié)果
-- 0.有效的讀取外部數(shù)據(jù)源的數(shù)據(jù)的
-- 1.buildScan掃描整張表,變成一個RDD[ROW]
trait TableScan {
def buildScan(): RDD[Row]
}
-- 2.PrunedScan獲取表的裁剪列
trait PrunedScan {
def buildScan(requiredColumns: Array[String]): RDD[Row]
}
-- 3.PrunedFilteredScan列裁剪,行過濾
trait PrunedFilteredScan {
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
}
-- 4.加載外部數(shù)據(jù)源的數(shù)據(jù),定義數(shù)據(jù)的schema信息
abstract class BaseRelation{
}
-- 5.Relation產(chǎn)生BaseRelation使用
trait RelationProvider {
}
總歸:
-- 查詢類操作
trait PrunedScan {
def buildScan(requiredColumns: Array[String]): RDD[Row]
}
-- 列裁剪,行過濾
trait PrunedFilteredScan {
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
}
-- 寫入類操作
trait InsertableRelation {
def insert(data: DataFrame, overwrite: Boolean): Unit
}
上述就是小編為大家分享的如何解析SparkSQL外部數(shù)據(jù)源了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。