溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

基于 Spark 的數(shù)據(jù)分析實(shí)踐是怎樣進(jìn)行的

發(fā)布時(shí)間:2021-12-17 14:00:49 來(lái)源:億速云 閱讀:210 作者:柒染 欄目:大數(shù)據(jù)

今天就跟大家聊聊有關(guān)基于 Spark 的數(shù)據(jù)分析實(shí)踐是怎樣進(jìn)行的,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

引言:

Spark是在借鑒了MapReduce之上發(fā)展而來(lái)的,繼承了其分布式并行計(jì)算的優(yōu)點(diǎn)并改進(jìn)了MapReduce明顯的缺陷。Spark主要包含了Spark Core、Spark SQL、Spark Streaming、MLLib和GraphX等組件。

下面主要分析了 Spark RDD 以及 RDD 作為開(kāi)發(fā)的不足之處,介紹了 SparkSQL 對(duì)已有的常見(jiàn)數(shù)據(jù)系統(tǒng)的操作方法,以及重點(diǎn)介紹了普元在眾多數(shù)據(jù)開(kāi)發(fā)項(xiàng)目中總結(jié)的基于 SparkSQL Flow 開(kāi)發(fā)框架。

一、Spark RDD

RDD(Resilient Distributed Dataset)叫做彈性分布式數(shù)據(jù)集,是Spark中最基本的數(shù)據(jù)抽象,它代表一個(gè)不可變、可分區(qū)、元素可并行計(jì)算的集合。

RDD具有數(shù)據(jù)流模型的特點(diǎn):自動(dòng)容錯(cuò)、位置感知性調(diào)度和可伸縮性。

//Scala 在內(nèi)存中使用列表創(chuàng)建

val lines = List(“A”, “B”, “C”, “D” …)
val rdd:RDD = sc.parallelize(lines);

//以文本文件創(chuàng)建

val rdd:RDD[String] = sc.textFile(“hdfs://path/filename”)

Spark RDD Partition 分區(qū)劃分


基于 Spark 的數(shù)據(jù)分析實(shí)踐是怎樣進(jìn)行的

新版本的 Hadoop 已經(jīng)把 BlockSize 改為 128M,也就是說(shuō)每個(gè)分區(qū)處理的數(shù)據(jù)量更大。

Spark 讀取文件分區(qū)的核心原理

本質(zhì)上,Spark 是利用了 Hadoop 的底層對(duì)數(shù)據(jù)進(jìn)行分區(qū)的 API(InputFormat):

public abstract class InputFormat<K,V>{
  public abstract List<InputSplit> getSplits(JobContextcontext
                               ) throwsIOException,InterruptedException;
  
  public abstract RecordReader<K,V> createRecordReader(InputSplitsplit,
                                         TaskAttemptContextcontext
                                        )throwsIOException,InterruptedException;
}

Spark 任務(wù)提交后通過(guò)對(duì)輸入進(jìn)行 Split,在 RDD 構(gòu)造階段,只是判斷是否可 Split(如果參數(shù)異常一定在此階段報(bào)出異常),并且 Split 后每個(gè) InputSplit 都是一個(gè)分區(qū)。只有在Action 算子提交后,才真正用 getSplits 返回的 InputSplit 通過(guò) createRecordReader 獲得每個(gè) Partition 的連接。

然后通過(guò) RecordReader 的 next() 遍歷分區(qū)內(nèi)的數(shù)據(jù)。

Spark RDD 轉(zhuǎn)換函數(shù)和提交函數(shù)

基于 Spark 的數(shù)據(jù)分析實(shí)踐是怎樣進(jìn)行的


Spark RDD 的眾多函數(shù)可分為兩大類(lèi)Transformation 與 Action。Transformation 與 Action 的區(qū)別在于,對(duì) RDD 進(jìn)行 Transformation 并不會(huì)觸發(fā)計(jì)算:Transformation 方法所產(chǎn)生的 RDD 對(duì)象只會(huì)記錄住該 RDD 所依賴(lài)的 RDD 以及計(jì)算產(chǎn)生該 RDD 的數(shù)據(jù)的方式;只有在用戶(hù)進(jìn)行 Action 操作時(shí),Spark 才會(huì)調(diào)度 RDD 計(jì)算任務(wù),依次為各個(gè) RDD 計(jì)算數(shù)據(jù)。這就是 Spark RDD 內(nèi)函數(shù)的“懶加載”特性。

二、基于Spark RDD數(shù)據(jù)開(kāi)發(fā)的不足

由于MapReduce的shuffle過(guò)程需寫(xiě)磁盤(pán),比較影響性能;而Spark利用RDD技術(shù),計(jì)算在內(nèi)存中流式進(jìn)行。另外 MapReduce計(jì)算框架(API)比較局限, 使用需要關(guān)注的參數(shù)眾多,而Spark則是中間結(jié)果自動(dòng)推斷,通過(guò)對(duì)數(shù)據(jù)集上鏈?zhǔn)綀?zhí)行函數(shù)具備一定的靈活性。

即使 SparkRDD 相對(duì)于 MapReduce 提高很大的便利性,但在使用上仍然有許多問(wèn)題。體現(xiàn)在一下幾個(gè)方面:

  1. RDD 函數(shù)眾多,開(kāi)發(fā)者不容易掌握,部分函數(shù)使用不當(dāng) shuffle時(shí)造成數(shù)據(jù)傾斜影響性能;

  2. RDD 關(guān)注點(diǎn)仍然是Spark太底層的 API,基于 Spark RDD的開(kāi)發(fā)是基于特定語(yǔ)言(Scala,Python,Java)的函數(shù)開(kāi)發(fā),無(wú)法以數(shù)據(jù)的視界來(lái)開(kāi)發(fā)數(shù)據(jù);

  3. 對(duì) RDD 轉(zhuǎn)換算子函數(shù)內(nèi)部分常量、變量、廣播變量使用不當(dāng),會(huì)造成不可控的異常;

  4. 對(duì)多種數(shù)據(jù)開(kāi)發(fā),需各自開(kāi)發(fā)RDD的轉(zhuǎn)換,樣板代碼較多,無(wú)法有效重利用;

  5. 其它在運(yùn)行期可能發(fā)生的異常。如:對(duì)象無(wú)法序列化等運(yùn)行期才能發(fā)現(xiàn)的異常。

三、SparkSQL

Spark 從 1.3 版本開(kāi)始原有 SchemaRDD 的基礎(chǔ)上提供了類(lèi)似Pandas DataFrame API。新的DataFrame API不僅可以大幅度降低普通開(kāi)發(fā)者的學(xué)習(xí)門(mén)檻,同時(shí)還支持Scala、Java與Python三種語(yǔ)言。更重要的是,由于脫胎自SchemaRDD,DataFrame天然適用于分布式大數(shù)據(jù)場(chǎng)景。

基于 Spark 的數(shù)據(jù)分析實(shí)踐是怎樣進(jìn)行的

一般的數(shù)據(jù)處理步驟:讀入數(shù)據(jù) -> 對(duì)數(shù)據(jù)進(jìn)行處理 -> 分析結(jié)果  -> 寫(xiě)入結(jié)果

SparkSQL 結(jié)構(gòu)化數(shù)據(jù)

  • 處理結(jié)構(gòu)化數(shù)據(jù)(如 CSV,JSON,Parquet 等);

  • 把已經(jīng)結(jié)構(gòu)化數(shù)據(jù)抽象成 DataFrame (HiveTable);

  • 非結(jié)構(gòu)化數(shù)據(jù)通過(guò) RDD.map.filter 轉(zhuǎn)換成結(jié)構(gòu)化進(jìn)行處理;

  • 按照列式數(shù)據(jù)庫(kù),只加載非結(jié)構(gòu)化中可結(jié)構(gòu)化的部分列(Hbase,MongoDB);

處理非結(jié)構(gòu)化數(shù)據(jù),不能簡(jiǎn)單的用 DataFrame 裝載。而是要用 SparkRDD 把數(shù)據(jù)讀入,在通過(guò)一系列的 Transformer Method 把非結(jié)構(gòu)化的數(shù)據(jù)加工為結(jié)構(gòu)化,或者過(guò)濾到不合法的數(shù)據(jù)。

SparkSQL DataFrame

基于 Spark 的數(shù)據(jù)分析實(shí)踐是怎樣進(jìn)行的

SparkSQL 中一切都是 DataFrame,all in DataFrame. DataFrame是一種以RDD為基礎(chǔ)的分布式數(shù)據(jù)集,類(lèi)似于傳統(tǒng)數(shù)據(jù)庫(kù)中的二維表格。DataFrame與RDD的主要區(qū)別在于,前者帶有schema元信息,即DataFrame所表示的二維表數(shù)據(jù)集的每一列都帶有名稱(chēng)和類(lèi)型。如果熟悉 Python Pandas 庫(kù)中的 DataFrame 結(jié)構(gòu),則會(huì)對(duì) SparkSQL DataFrame 概念非常熟悉。

TextFile DataFrame

import.org.apache.spark.sql._
//定義數(shù)據(jù)的列名稱(chēng)和類(lèi)型
valdt=StructType(List(id:String,name:String,gender:String,age:Int))

//導(dǎo)入user_info.csv文件并指定分隔符
vallines = sc.textFile("/path/user_info.csv").map(_.split(","))

//將表結(jié)構(gòu)和數(shù)據(jù)關(guān)聯(lián)起來(lái),把讀入的數(shù)據(jù)user.csv映射成行,構(gòu)成數(shù)據(jù)集
valrowRDD = lines.map(x=>Row(x(0),x(1),x(2),x(3).toInt))

//通過(guò)SparkSession.createDataFrame()創(chuàng)建表,并且數(shù)據(jù)表表頭
val df= spark.createDataFrame(rowRDD, dt)

讀取規(guī)則數(shù)據(jù)文件作為DataFrame

SparkSession.Builder builder = SparkSession.builder()
Builder.setMaster("local").setAppName("TestSparkSQLApp")
SparkSession spark = builder.getOrCreate();
SQLContext sqlContext = spark.sqlContext();

# 讀取 JSON 數(shù)據(jù),path 可為文件或者目錄
valdf=sqlContext.read().json(path);

# 讀取 HadoopParquet 文件
vardf=sqlContext.read().parquet(path);

# 讀取 HadoopORC 文件
vardf=sqlContext.read().orc(path);

JSON 文件為每行一個(gè) JSON 對(duì)象的文件類(lèi)型,行尾無(wú)須逗號(hào)。文件頭也無(wú)須[]指定為數(shù)組;SparkSQL 讀取是只是按照每行一條 JSON Record序列化;

Parquet文件

Configurationconfig = new Configuration();
ParquetFileReaderreader = ParquetFileReader.open(
        HadoopInputFile.fromPath(new Path("hdfs:///path/file.parquet"),conf));
Map<String, String>schema = reader.getFileMetaData().getKeyValueMetaData();
String allFields= schema.get("org.apache.spark.sql.parquet.row.metadata");

allFiedls 的值就是各字段的名稱(chēng)和具體的類(lèi)型,整體是一個(gè)json格式進(jìn)行展示。

讀取 Hive 表作為 DataFrame

Spark2 API 推薦通過(guò) SparkSession.Builder 的 Builder 模式創(chuàng)建 SparkContext。 Builder.getOrCreate() 用于創(chuàng)建 SparkSession,SparkSession 是 SparkContext 的封裝。

在Spark1.6中有兩個(gè)核心組件SQLcontext和HiveContext。SQLContext 用于處理在 SparkSQL 中動(dòng)態(tài)注冊(cè)的表,HiveContext 用于處理 Hive 中的表。

從Spark2.0以上的版本開(kāi)始,spark是使用全新的SparkSession接口代替Spark1.6中的SQLcontext和HiveContext。SQLContext.sql 即可執(zhí)行 Hive 中的表,也可執(zhí)行內(nèi)部注冊(cè)的表;

在需要執(zhí)行 Hive 表時(shí),只需要在 SparkSession.Builder 中開(kāi)啟 Hive 支持即可(enableHiveSupport())。

SparkSession.Builder builder = SparkSession.builder().enableHiveSupport();
SparkSession spark = builder.getOrCreate();
SQLContext sqlContext = spark.sqlContext();

// db 指 Hive 庫(kù)中的數(shù)據(jù)庫(kù)名,如果不寫(xiě)默認(rèn)為 default

// tableName 指 hive 庫(kù)的數(shù)據(jù)表名

sqlContext.sql(“select * from db.tableName”)

SparkSQL ThriftServer

//首先打開(kāi) Hive 的 Metastore服務(wù)

hive$bin/hive –-service metastore –p 8093

//把 Spark 的相關(guān) jar 上傳到hadoophdfs指定目錄,用于指定sparkonyarn的依賴(lài) jar

spark$hadoop fs –put jars/*.jar /lib/spark2

// 啟動(dòng) spark thriftserver 服務(wù)

spark$ sbin/start-thriftserver.sh --master yarn-client --driver-memory 1G --conf 
spark.yarn.jars=hdfs:///lib/spark2/*.jar

當(dāng)hdfs 上傳了spark 依賴(lài) jar 時(shí),通過(guò)spark.yarn.jars 可看到日志 spark 無(wú)須每個(gè)job 都上傳jar,可節(jié)省啟動(dòng)時(shí)間

19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.0.5.jar
19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.1.7.3.jar

//通過(guò) spark bin 下的 beeline 工具,可以連接到 spark ThriftServer(SparkOnHive)

bin/beeline -u jdbc:hive2://ip:10000/default -n hadoop

-u 是指定 beeline 的執(zhí)行驅(qū)動(dòng)地址;

-n 是指定登陸到 spark Session 上的用戶(hù)名稱(chēng);

Beeline 還支持傳入-e 可傳入一行 SQL, 

-e <query>                      query that should be executed

也可通過(guò) –f 指定一個(gè) SQL File,內(nèi)部可用逗號(hào)分隔的多個(gè) SQL(存儲(chǔ)過(guò)程)

-f <exec file>                  script file that should be executed

SparkSQL Beeline 的執(zhí)行效果展示

基于 Spark 的數(shù)據(jù)分析實(shí)踐是怎樣進(jìn)行的

SparkSQL ThriftServer

基于 Spark 的數(shù)據(jù)分析實(shí)踐是怎樣進(jìn)行的

對(duì)于 SparkSQL ThriftServer 服務(wù),每個(gè)登陸的用戶(hù)都有創(chuàng)建的 SparkSession,并且執(zhí)行的對(duì)個(gè) SQL 會(huì)通過(guò)時(shí)間順序列表展示。

SparkSQL ThriftServer 服務(wù)可用于其他支持的數(shù)據(jù)庫(kù)工具創(chuàng)建查詢(xún),也用于第三方的 BI 工具,如 tableau。

四、SparkSQL Flow

SparkSQL Flow 是以 SparkSQL 為基礎(chǔ),開(kāi)發(fā)的統(tǒng)一的基于 XML 配置化的可執(zhí)行一連串的 SQL 操作,這一連串的 SQL 操作定義為一個(gè) Flow。下文開(kāi)始 SparkSQL Flow 的介紹:

SparkSQL Flow 是基于 SparkSQL 開(kāi)發(fā)的一種基于 XML 配置化的 SQL 數(shù)據(jù)流轉(zhuǎn)處理模型。該模型簡(jiǎn)化了 SparkSQL 、Spark RDD的開(kāi)發(fā),并且降低開(kāi)發(fā)了難度,適合了解數(shù)據(jù)業(yè)務(wù)但無(wú)法駕馭大數(shù)據(jù)以及 Spark 技術(shù)的開(kāi)發(fā)者。

  • 一個(gè)由普元技術(shù)部提供的基于 SparkSQL 的開(kāi)發(fā)模型;

  • 一個(gè)可二次定制開(kāi)發(fā)的大數(shù)據(jù)開(kāi)發(fā)框架,提供了靈活的可擴(kuò)展 API;

  • 一個(gè)提供了 對(duì)文件,數(shù)據(jù)庫(kù),NoSQL 等統(tǒng)一的數(shù)據(jù)開(kāi)發(fā)視界語(yǔ)義;

  • 基于 SQL 的開(kāi)發(fā)語(yǔ)言和 XML 的模板配置,支持 Spark UDF 的擴(kuò)展管理;

  • 支持基于 Spark Standlone,Yarn,Mesos 資源管理平臺(tái);

  • 支持開(kāi)源、華為、星環(huán)等平臺(tái)統(tǒng)一認(rèn)證。

SparkSQL Flow 適合的場(chǎng)景:

  1. 批量 ETL;

  2. 非實(shí)時(shí)分析服務(wù);

SparkSQL Flow XML 概覽

基于 Spark 的數(shù)據(jù)分析實(shí)踐是怎樣進(jìn)行的

  1. Properties 內(nèi)定義一組變量,可用于宏替換;

  2. Methods 內(nèi)可注冊(cè) udf 和 udaf 兩種函數(shù);

  3. Prepare 內(nèi)可定義前置 SQL,用于執(zhí)行 source 前的 sql 操作;

  4. Sources 內(nèi)定義一個(gè)到多個(gè)數(shù)據(jù)表視圖;

  5. Transformer 內(nèi)可定義 0 到多個(gè)基于 SQL 的數(shù)據(jù)轉(zhuǎn)換操作(支持 join);

  6. Targets 用于定義 1 到多個(gè)數(shù)據(jù)輸出;

  7. After 可定義 0到多個(gè)任務(wù)日志;

如你所見(jiàn),source 的 type 參數(shù)用于區(qū)分 source 的類(lèi)型,source 支持的種類(lèi)直接決定SparkSQL Flow 的數(shù)據(jù)源加載廣度;并且,根據(jù) type 不同,source 也需要配置不同的參數(shù),如數(shù)據(jù)庫(kù)還需要 driver,url,user和 password 參數(shù)。

Transformer 是基于 source 定的數(shù)據(jù)視圖可執(zhí)行的一組轉(zhuǎn)換 SQL,該 SQL 符合 SparkSQL 的語(yǔ)法(SQL99)。Transform 的 SQL 的執(zhí)行結(jié)果被作為中間表命名為 table_name 指定的值。

Targets 為定義輸出,table_name 的值需在 source 或者 Transformer 中定義。

SparkSQL Flow 支持的Sourse

基于 Spark 的數(shù)據(jù)分析實(shí)踐是怎樣進(jìn)行的

  • 支持從 Hive 獲得數(shù)據(jù);

  • 支持文件:JSON,TextFile(CSV),ParquetFile,AvroFile

  • 支持RDBMS數(shù)據(jù)庫(kù):PostgreSQL, MySQL,Oracle

  • 支持 NOSQL 數(shù)據(jù)庫(kù):Hbase,MongoDB

SparkSQL Flow TextFile Source

textfile 為讀取文本文件,把文本文件每行按照 delimiter 指定的字符進(jìn)行切分,切分不夠的列使用 null 填充。

<source type="textfile" table_name="et_rel_pty_cong"
              fields="cust_id,name1,gender1,age1:int" 
              delimiter=","
              path="file:///Users/zhenqin/software/hive/user.txt"/>
  1. Tablename 為該文件映射的數(shù)據(jù)表名,可理解為數(shù)據(jù)的視圖;

  2. Fields 為切分后的字段,使用逗號(hào)分隔,字段后可緊跟該字段的類(lèi)型,使用冒號(hào)分隔;

  3. Delimiter 為每行的分隔符;

  4. Path 用于指定文件地址,可以是文件,也可是文件夾;

  5. Path 指定地址需要使用協(xié)議,如:file:// 、 hdfs://,否則跟 core-site.xml 配置密切相關(guān);

SparkSQL Flow DB Source

<source type="mysql" table_name="et_rel_pty_cong"
                table="user"
                url="jdbc:mysql://localhost:3306/tdb?characterEncoding=UTF-8"
                driver="com.mysql.jdbc.Driver"
                user="root" password="123456"/>

RDBMS 是從數(shù)據(jù)庫(kù)使用 JDBC讀取 數(shù)據(jù)集。支持 type 為:db、mysql、oracle、postgres、mssql;

  1. tablename 為該數(shù)據(jù)表的抽象 table 名稱(chēng)(視圖);

  2. url、driver、user,password 為數(shù)據(jù)庫(kù) JDBC 驅(qū)動(dòng)信息,為必須字段;

  3. SparkSQL 會(huì)加載該表的全表數(shù)據(jù),無(wú)法使用 where 條件。

SparkSQL Flow Transformer

<transform type="sql" table_name="cust_id_agmt_id_t" cached="true">
            SELECT c_phone,c_type,c_num, CONCAT_VAL(cust_id) as cust_ids
            FROM user_concat_testx
            group by c_phone,c_type,c_num
</transform>

Transform 支持 cached 屬性,默認(rèn)為 false;如果設(shè)置為 true,相當(dāng)于把該結(jié)果緩存到內(nèi)存中,緩存到內(nèi)存中的數(shù)據(jù)在后續(xù)其它 Transform 中使用能提高計(jì)算效率。但是需使用大量?jī)?nèi)存,開(kāi)發(fā)者需要評(píng)估該數(shù)據(jù)集能否放到內(nèi)存中,防止出現(xiàn) OutofMemory 的異常。

SparkSQL Flow Targets

SparkSQL Flow Targets 支持輸出數(shù)據(jù)到一個(gè)或者多個(gè)目標(biāo)。這些目標(biāo),基本覆蓋了 Source 包含的外部系統(tǒng)。下面以 Hive 舉例說(shuō)明:

<target type="hive"
 table_name="cust_id_agmt_id_t" 
 savemode=”append”
target_table_name="cust_id_agmt_id_h"/>
  1. table_name 為 source 或者 Transform 定義的表名稱(chēng);

  2. target_table_name 為 hive 中的表結(jié)果,Hive 表可不存在也可存在,sparksql 會(huì)根據(jù) DataFrame 的數(shù)據(jù)類(lèi)型自動(dòng)創(chuàng)建表;

  3. savemode 默認(rèn)為 overwrite 覆蓋寫(xiě)入,當(dāng)寫(xiě)入目標(biāo)已存在時(shí)刪除源表再寫(xiě)入;支持 append 模式, 可增量寫(xiě)入。

Target 有一個(gè)特殊的 show 類(lèi)型的 target。用于直接在控制臺(tái)輸出一個(gè) DataFrame 的結(jié)果到控制臺(tái)(print),該 target 用于開(kāi)發(fā)和測(cè)試。

<target type="show" table_name="cust_id_agmt_id_t" rows=”10000”/>

Rows 用于控制輸出多少行數(shù)據(jù)。

SparkSQL Around

After 用于 Flow 在運(yùn)行結(jié)束后執(zhí)行的一個(gè)環(huán)繞,用于記錄日志和寫(xiě)入狀態(tài)。類(lèi)似 Java 的 try {} finally{ round.execute() }

多個(gè) round 一定會(huì)執(zhí)行,round 異常不會(huì)導(dǎo)致任務(wù)失敗。 

<prepare>
        <round type="mysql"
               sql="insert into cpic_task_history(id, task_type, catalog_model, start_time, retry_count, final_status, created_at)
               values(${uuid}, ${task.type}, ${catalog.model}, ${starttime}, 0, ${status}, now())"
               url="${jdbc.url}" .../>
</prepare>
<after>
        <round type="mysql"
               sql="update cpic_task_history set
               end_time = ${endtime}, final_status = ${status}, error_text = ${error} where id = ${uuid}"
               url="${jdbc.url}”…/>
</after>

Prepare round 和 after round 配合使用可用于記錄 SparkSQL Flow 任務(wù)的運(yùn)行日志。

SparkSQL Around可使用的變量

基于 Spark 的數(shù)據(jù)分析實(shí)踐是怎樣進(jìn)行的

SparkSQL Around的執(zhí)行效果

基于 Spark 的數(shù)據(jù)分析實(shí)踐是怎樣進(jìn)行的

Prepare round 可做插入(insert)動(dòng)作,after round 可做更新 (update)動(dòng)作,相當(dāng)于在數(shù)據(jù)庫(kù)表中從執(zhí)行開(kāi)始到結(jié)束有了完整的日志記錄。SparkSQL Flow 會(huì)保證round 一定能被執(zhí)行,而且 round 的執(zhí)行不影響任務(wù)的狀態(tài)。

SparkSQL Flow 提交

bin/spark-submit --master yarn-client --driver-memory 1G \
--num-executors 10 --executor-memory 2G \
--jars /lib/jsoup-1.11.3.jarlib/jsqlparser-0.9.6.jar,/lib/mysql-connector-java-5.1.46.jar \
--conf spark.yarn.jars=hdfs:///lib/spark2/*.jar \
--queue default --name FlowTest \
etl-flow-0.2.0.jar -f hive-flow-test.xml

基于 Spark 的數(shù)據(jù)分析實(shí)踐是怎樣進(jìn)行的

接收必須的參數(shù) –f,可選的參數(shù)為支持 Kerberos 認(rèn)證的租戶(hù)名稱(chēng)principal,和其認(rèn)證需要的密鑰文件。

usage: spark-submit --jars etl-flow.jar --class
                    com.yiidata.etl.flow.source.FlowRunner
 -f,--xml-file <arg>     Flow XML File Path
    --keytabFile <arg>   keytab File Path(Huawei)
    --krb5File <arg>     krb5 File Path(Huawei)
    --principal <arg>    principal for hadoop(Huawei)

SparkSQL Execution Plan

基于 Spark 的數(shù)據(jù)分析實(shí)踐是怎樣進(jìn)行的

每個(gè)Spark Flow 任務(wù)本質(zhì)上是一連串的 SparkSQL 操作,在 SparkUI SQL tab 里可以看到 flow 中重要的數(shù)據(jù)表操作。

regiserDataFrameAsTable 是每個(gè) source 和 Transform 的數(shù)據(jù)在 SparkSQL 中的數(shù)據(jù)視圖,每個(gè)視圖都會(huì)在 SparkContex 中注冊(cè)一次。

對(duì)RegisterDataFrameAsTable的分析

基于 Spark 的數(shù)據(jù)分析實(shí)踐是怎樣進(jìn)行的

通過(guò)單個(gè) regiserDataFrameAsTable 項(xiàng)進(jìn)行分析,SparkSQL 并不是把source 的數(shù)據(jù)立即計(jì)算把數(shù)據(jù)放到內(nèi)存,而是每次執(zhí)行 source 時(shí)只是生成了一個(gè) Logical Plan,只有遇到需要提交的算子(Action),SparkSQL 才會(huì)觸發(fā)前面所依賴(lài)的的 plan 執(zhí)行。

總結(jié)

這是一個(gè)開(kāi)發(fā)框架,不是一個(gè)成熟的產(chǎn)品,也不是一種架構(gòu)。他只是基于 SparkSQL 整合了大多數(shù)的外部系統(tǒng),能通過(guò) XML 的模板配置完成數(shù)據(jù)開(kāi)發(fā)。面向的是理解數(shù)據(jù)業(yè)務(wù)但不了解 Spark 的數(shù)據(jù)開(kāi)發(fā)人員。整個(gè)框架完成了大多數(shù)的外部系統(tǒng)對(duì)接,開(kāi)發(fā)者只需要使用 type 獲得數(shù)據(jù),完成數(shù)據(jù)開(kāi)發(fā)后通過(guò) target 回寫(xiě)到目標(biāo)系統(tǒng)中。整個(gè)過(guò)程基本無(wú)須程序開(kāi)發(fā),除非當(dāng)前的 SQL 函數(shù)無(wú)法滿(mǎn)足使用的情況下,需要自行開(kāi)發(fā)一下特定的 UDF。因此本框架在對(duì) SparkSQL 做了二次開(kāi)發(fā)基礎(chǔ)上,大大簡(jiǎn)化了 Spark 的開(kāi)發(fā),可降低了開(kāi)發(fā)者使用難度。

精選提問(wèn):

問(wèn)1:和Fink平臺(tái)有什么優(yōu)勢(shì)么?

答:Flink 應(yīng)該對(duì)標(biāo) Spark Streaming 的解決方案,是另一種可選流數(shù)據(jù)引擎。Flink 也采用了 Scala 語(yǔ)言,內(nèi)部原理和操作數(shù)據(jù)方式頗有相似之處,是 SparkStreaming 之外流數(shù)據(jù)處理一種選型?;?SparkSQL Flow 的架構(gòu)主要側(cè)重批量數(shù)據(jù)分析,非實(shí)時(shí) ETL 方面。

問(wèn)2:這些應(yīng)該是源數(shù)據(jù)庫(kù)吧,請(qǐng)問(wèn)目標(biāo)數(shù)據(jù)庫(kù)支持哪些?

答:目前的實(shí)現(xiàn)目標(biāo)數(shù)據(jù)基本支持所有的源。

問(wèn)3:你們產(chǎn)品是軟件開(kāi)發(fā)平臺(tái),spark和你們開(kāi)發(fā)平臺(tái)啥關(guān)系?

答:普元針對(duì)部分成熟場(chǎng)景提供了一些開(kāi)發(fā)平臺(tái)和工具,也在參與了一些大數(shù)據(jù)項(xiàng)目建設(shè)。對(duì)于大規(guī)模數(shù)據(jù)的數(shù)據(jù)報(bào)表,數(shù)據(jù)質(zhì)量分析也需要適應(yīng)大數(shù)據(jù)的技術(shù)場(chǎng)景,Spark 作為Hadoop 內(nèi)比較成熟的解決方案,因此作為主要的選型工具。在參與部分項(xiàng)目實(shí)施過(guò)程中,通過(guò)對(duì)一些開(kāi)發(fā)中的痛點(diǎn)針對(duì)性的提取了應(yīng)用框架。

問(wèn)4:對(duì)于ETL中存在的merge、update的數(shù)據(jù)匹配、整合處理,Spark SQL Flow有沒(méi)有好的解決方法?

答:merge 和 update 在數(shù)據(jù)開(kāi)發(fā)過(guò)程不可避免,往往對(duì)數(shù)據(jù)庫(kù)造成較大壓力。大數(shù)據(jù)場(chǎng)景下不建議逐條對(duì)數(shù)據(jù)做 update 操作,更好的辦法是在數(shù)據(jù)處理階段通過(guò) join 把結(jié)果集在寫(xiě)入目標(biāo)前準(zhǔn)備好,統(tǒng)一一次性寫(xiě)入到目標(biāo)數(shù)據(jù)庫(kù)。查詢(xún)操作通過(guò)換庫(kù)使用新庫(kù),這中操作一般適合數(shù)據(jù)量比較大,數(shù)據(jù)更新頻率較低的情況。如果目標(biāo)庫(kù)是 HBase 或者其他 MPP 類(lèi)基于列式的數(shù)據(jù)庫(kù),適當(dāng)?shù)目梢愿隆5钱?dāng)每天有 60% 以上的數(shù)據(jù)都需要更新時(shí),建議還是一次性生成新表。

問(wèn)5: blink和flink 應(yīng)該如何選???

答:blink 是阿里巴巴在 flink 基礎(chǔ)上做了部分場(chǎng)景優(yōu)化(只是部分社區(qū)有介紹,并不明確)并且開(kāi)源,但是考慮到國(guó)內(nèi)這些機(jī)構(gòu)開(kāi)源往往是沒(méi)有持久動(dòng)力的。要看采用 Blink 是否用了比較關(guān)鍵的特性。也有消息說(shuō) Blink 和 Flink 會(huì)合并,畢竟阿里 Dubbo 前期自己發(fā)展,后期還是捐給了 Apache,因此兩者合并也是有可能。建議選型 Flink。

問(wèn)6:etl 同步數(shù)據(jù)中主要用哪些工具?

答:這個(gè)要區(qū)分場(chǎng)景。傳統(tǒng)數(shù)據(jù)庫(kù)之間,可采用日志同步,也有部分成熟的工具;

傳統(tǒng)數(shù)據(jù)庫(kù)和Hadoop 生態(tài)內(nèi)(HBase,HIVE) 同步可使用 apache sqoop。 SparkSQL Flow 可以作為數(shù)據(jù)同步的另一種方案,可用在實(shí)時(shí)性不高的場(chǎng)景。SparkSQL Flow 更側(cè)重大數(shù)據(jù)工具,偏向數(shù)據(jù)分析和非實(shí)時(shí) ETL。

看完上述內(nèi)容,你們對(duì)基于 Spark 的數(shù)據(jù)分析實(shí)踐是怎樣進(jìn)行的有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI