溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

選擇Parquet for Spark SQL 的 5 大原因分別是什么

發(fā)布時間:2021-12-17 10:59:18 來源:億速云 閱讀:145 作者:柒染 欄目:大數(shù)據(jù)

這篇文章給大家介紹選擇Parquet for Spark SQL 的 5 大原因分別是什么,內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。

列式存儲 (columnar storage) 在處理大數(shù)據(jù)的時候可以有效地節(jié)省時間和空間。例如,與使用文本相比,Parquet 讓 Spark SQL  的性能平均提高了 10 倍,這要感謝初級的讀取器過濾器、高效的執(zhí)行計劃,以及 Spark 1.6.0 中經(jīng)過改進的掃描吞吐量!小編將為您詳細(xì)介紹使用  Parquet for Spark SQL 優(yōu)勢的 5 大原因。

為了了解 Parquet 有多么強大,我們從 spark-perf-sql 中挑選了 24 個從 TPC-DS 中衍生的查詢來完成比較(總共有 99  個查詢,一些查詢在 1TB 的縮放比例下無法用于平面的 CSV 數(shù)據(jù)文件。更多內(nèi)容參見下文)。這些查詢代表了 TPC-DS  中的所有類別:報告、即席報告、迭代和數(shù)據(jù)挖掘。我們還要確保包含了短查詢(查詢12 和 91)和長時間運行的查詢(查詢 24a 和 25),以及會使用 100%  CPU 的眾所周知的查詢(查詢 97)。

我們使用了一個 6 節(jié)點的預(yù)置型 Cisco UCS 集群,每個 Cisco  驗證了的設(shè)計都有類似的配置。我們調(diào)優(yōu)了底層硬件,以防在所有測試中遇到網(wǎng)絡(luò)或磁盤 IO 瓶頸。本文的重點是了解在 Spark 1.5.1 和剛發(fā)布的 Spark  1.6.0 中只對文本和 Parquet 存儲格式運行這些查詢會有怎樣的性能差異??偟?Spark 工作存儲為 500GB。TPC-DS 縮放比例為  1TB。

1. Spark SQL 在用于 Parquet 時更快一些!

下圖比較了在 Spark 1.5.1 中運行 24 個查詢的所有執(zhí)行時間的總和。在使用平面的 CVS 文件時,查詢花費了大約 12 個小時才完成,而在使用  Parquet 時,查詢用了不到 1 個小時的時間就完成了,性能提高了 11 倍。

選擇Parquet for Spark SQL 的 5 大原因分別是什么

圖 1. 比較在文本和 Parquet 中花費的總查詢時間(以秒為單位),越小越好。

2. Spark SQL 在使用較大縮放比例時的表現(xiàn)要優(yōu)于 Parquet

存儲格式的選擇不當(dāng)往往會導(dǎo)致難以診斷和難以修復(fù)。例如,在采用 1TB 的縮放比例時,如果使用平面 CSV 文件,在所有可運行的查詢中,至少有 1/3  的查詢無法完成,但在使用 Parquet 時,這些查詢都完成了。

一些錯誤和異常非常神秘。這里有 3 個示例:

錯誤示例 1:

WARN scheduler.TaskSetManager: Lost task 145.0 in stage 4.0 (TID 4988, rhel8.cisco.com): FetchFailed(BlockManagerId(2, rhel4.cisco.com, 49209), shuffleId=13, mapId=47, reduceId=145, message=          org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException: /data6/hadoop/yarn/local/usercache/spark/appcache/application_1447965002296_0142/blockmgr-44627d4c-4a2b-4f53-a471-32085a252cb0/15/shuffle_13_119_0.index (No such file or directory)          at java.io.FileInputStream.open0(Native Method)          at java.io.FileInputStream.open(FileInputStream.java:195)

錯誤示例 2:

WARN scheduler.TaskSetManager: Lost task 1.0 in stage 13.1 (TID 13621, rhel7.cisco.com): FetchFailed(null, shuffleId=9, mapId=-1, reduceId=148, message=             org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 9             at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460)             at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456)             at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)             at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)             at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)

錯誤示例 3:

ERROR cluster.YarnScheduler: Lost executor 59 on rhel4.cisco.com: remote Rpc client disassociated

大多數(shù)查詢的失敗迫使 Spark  通過重新排隊任務(wù)(甚至是重新啟動某個階段)來進行再次嘗試。事情從那時起變得更糟;最終,該應(yīng)用程序失敗了,像是永遠(yuǎn)不會完成。

通過切換到 Parquet,無需更改其他任何 Spark  配置,這些問題就得到了解決。壓縮減小了文件的大小,列式格式允許只讀取選擇的記錄,減少的輸入數(shù)據(jù)直接影響了 Spark DAG  調(diào)度程序關(guān)于執(zhí)行圖的決策(更多的細(xì)節(jié)參見下文)。Parquet 的所有這些優(yōu)勢都對查詢的快速完成至關(guān)重要。

3. 更少的磁盤 IO

采用了壓縮功能的 Parquet 能夠讓數(shù)據(jù)存儲平均減少 75%,也就是說,1TB 壓縮比例的數(shù)據(jù)文件在磁盤上只會占用大約 250 GB  的磁盤空間。這顯著減少了 Spark SQL 應(yīng)用程序所需的輸入數(shù)據(jù)。而且在 Spark 1.6.0 中,Parquet 讀取器使用了下推過濾器來進一步減少磁盤  IO。下推式過濾器允許在將數(shù)據(jù)讀入 Spark 之前就制定數(shù)據(jù)選擇決策。例如,對查詢 97 中的 between 子句的處理如下所示:

select cs_bill_customer_sk customer_sk, cs_item_sk item_sk
from catalog_sales,date_dim
where cs_sold_date_sk = d_date_sk 
and d_month_seq between 1200 and 1200 + 11

Spark SQL 展示了查詢的物理計劃中的以下 scan 語句:

+- Scan ParquetRelation[d_date_sk#141,d_month_seq#144L] InputPaths: hdfs://rhel10.cisco.com/user/spark/hadoopds1tbparquet/date_dim/_SUCCESS, hdfs://rhel10.cisco.com/user/spark/hadoopds1tbparquet/date_dim/_common_metadata, hdfs://rhel10.cisco.com/user/spark/hadoopds1tbparquet/date_dim/_metadata, hdfs://rhel10.cisco.com/user/spark/hadoopds1tbparquet/date_dim/part-r-00000-4d205b7e-b21d-4e8b-81ac-d2a1f3dd3246.gz.parquet, hdfs://rhel10.cisco.com/user/spark/hadoopds1tbparquet/date_dim/part-r-00001-4d205b7e-b21d-4e8b-81ac-d2a1f3dd3246.gz.parquet, PushedFilters: [GreaterThanOrEqual(d_month_seq,1200), LessThanOrEqual(d_month_seq,1211)]]

其中,PushedFilters 只返回 d_mont_seq 列中范圍 1200 到 1211  的記錄,或者只返回幾個記錄。與平面文件相比較,在使用平面文件時,會讀取整個表(每一列和每一行),如物理計劃中所示:

  1. [                  Scan CsvRelation(hdfs://rhel10.cisco.com/user/spark/hadoopds1000g/date_dim/*,false,|,",null,PERMISSIVE,COMMONS,false,false,StructType(StructField(d_date_sk,IntegerType,false), StructField(d_date_id,StringType,false), StructField(d_date,StringType,true), StructField(d_month_seq,LongType,true), StructField(d_week_seq,LongType,true), StructField(d_quarter_seq,LongType,true), StructField(d_year,LongType,true), StructField(d_dow,LongType,true), StructField(d_moy,LongType,true), StructField(d_dom,LongType,true), StructField(d_qoy,LongType,true), StructField(d_fy_year,LongType,true), StructField(d_fy_quarter_seq,LongType,true), StructField(d_fy_week_seq,LongType,true), StructField(d_day_name,StringType,true), StructField(d_quarter_name,StringType,true), StructField(d_holiday,StringType,true), StructField(d_weekend,StringType,true), StructField(d_following_holiday,StringType,true), StructField(d_first_dom,LongType,true), StructField(d_last_dom,LongType,true), StructField(d_same_day_ly,LongType,true), StructField(d_same_day_lq,LongType,true), StructField(d_current_day,StringType,true), StructField(d_current_week,StringType,true), StructField(d_current_month,StringType,true), StructField(d_current_quarter,StringType,true), StructField(d_current_year,StringType,true)))[d_date_sk#141,d_date_id#142,d_date#143,d_month_seq#144L,d_week_seq#145L,d_quarter_seq#146L,d_year#147L,d_dow#148L,d_moy#149L,d_dom#150L,d_qoy#151L,d_fy_year#152L,d_fy_quarter_seq#153L,d_fy_week_seq#154L,d_day_name#155,d_quarter_name#156,d_holiday#157,d_weekend#158,d_following_holiday#159,d_first_dom#160L,d_last_dom#161L,d_same_day_ly#162L,d_same_day_lq#163L,d_current_day#164,d_current_week#165,d_current_month#166,d_current_quarter#167,d_current_year#168]]

4. Spark 1.6.0 提供了更高的掃描吞吐量

Databricks 的 Spark 1.6.0 發(fā)布博客中曾經(jīng)提到過顯著的平面掃描吞吐量,因為該博客使用到了 “更優(yōu)化的代碼路徑”  一詞。為了在現(xiàn)實世界中說明這一點,我們在 Spark 1.5.1 和 1.6.0 中運行了查詢 97,并捕獲了 nmon 數(shù)據(jù)。改進非常明顯。

首先,查詢響應(yīng)時間減少了一半:查詢 97 在 Spark 1.5.1 中用了 138 秒時間,而在 Spark 1.6.0 中只用了 60 秒。

選擇Parquet for Spark SQL 的 5 大原因分別是什么

圖 2. 使用 Parquet 時查詢 97 所用的時間(以秒為單位)

其次,在 Spark 1.6.0 中,工作節(jié)點上的 CPU 使用率更低一些,這主要歸功于 SPARK-11787:

選擇Parquet for Spark SQL 的 5 大原因分別是什么

圖 3. Spark 1.6.0 中的查詢 97 的 CPU 使用率,***時為 70%

選擇Parquet for Spark SQL 的 5 大原因分別是什么

圖 4. Spark 1.5.1 中的查詢 97 的 CPU 使用率,***時為 100%

與上述數(shù)據(jù)相關(guān),在 Spark 1.6.0 中,磁盤讀取吞吐量要高出 50%:

選擇Parquet for Spark SQL 的 5 大原因分別是什么

圖 5. Spark 1.5.1 和 1.6.0 中的磁盤讀取吞吐量

5. 高效的 Spark 執(zhí)行圖

除了更智能的讀取器(比如 Parquet)之外,數(shù)據(jù)格式也會直接影響 Spark 執(zhí)行圖,因為調(diào)度程序的一個主要輸入是 RDD  計數(shù)。在我們的示例中,我們使用文本和 Parquet 在 Spark 1.5.1 上運行了相同的查詢 97,我們獲得了各個階段的以下執(zhí)行模式。

使用文本 – 有許多長時間運行的階段(請注意,y 軸上使用的單位是毫秒)

選擇Parquet for Spark SQL 的 5 大原因分別是什么

圖 6. 使用文本的執(zhí)行階段

在使用 Parquet 時,雖然有更多的階段,但工作的執(zhí)行速度很快,而且只創(chuàng)建了兩個長時間運行的階段就接近了工作尾聲。這表明 “父-子”  階段的邊界變得更明確,因此需要保存到磁盤和/或通過網(wǎng)絡(luò)節(jié)點的中間數(shù)據(jù)變得更少,這加快了端到端執(zhí)行的速度。

選擇Parquet for Spark SQL 的 5 大原因分別是什么

圖 7. 使用 Parquet 的執(zhí)行階段

Parquet 用于 Spark SQL  時表現(xiàn)非常出色。它不僅提供了更高的壓縮率,還允許通過已選定的列和低級別的讀取器過濾器來只讀取感興趣的記錄。因此,如果需要多次傳遞數(shù)據(jù),那么花費一些時間編碼現(xiàn)有的平面文件可能是值得的。

關(guān)于選擇Parquet for Spark SQL 的 5 大原因分別是什么就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI