溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

大數(shù)據(jù)開發(fā)中Spark常見RDD是怎樣的

發(fā)布時間:2021-12-17 09:42:43 來源:億速云 閱讀:148 作者:柒染 欄目:大數(shù)據(jù)

大數(shù)據(jù)開發(fā)中Spark常見RDD是怎樣的,針對這個問題,這篇文章詳細(xì)介紹了相對應(yīng)的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。

1.五個基本Properties

  • A list of partitions

  • A function for computing each split

  • A list of dependencies on other RDDs

  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)

  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

這是RDD的源碼中注釋中寫到的,下面介紹這五種特征屬性

1.1 分區(qū)

一組分片(Partition),即數(shù)據(jù)集的基本組成單位。對于RDD來說,每個分片都會被一個計算任務(wù)處理,并決 定并行計算的粒度。用戶可以在創(chuàng)建RDD時指定RDD的分片個數(shù),如果沒有指定,那么就會采用默認(rèn)值

1.2 計算的函數(shù)

一個對分區(qū)數(shù)據(jù)進行計算的函數(shù)。Spark中RDD的計算是以分片為單位的,每個RDD都會實現(xiàn) compute 函數(shù)以 達到該目的。compute函數(shù)會對迭代器進行組合,不需要保存每次計算的結(jié)果

1.3 依賴關(guān)系

RDD之間的存在依賴關(guān)系。RDD的每次轉(zhuǎn)換都會生成一個新的RDD,RDD之間形成類似于流水線一樣的前后依 賴關(guān)系(lineage)。在部分分區(qū)數(shù)據(jù)丟失時,Spark可以通過這個依賴關(guān)系重新計算丟失的分區(qū)數(shù)據(jù),而不是 對RDD的所有分區(qū)進行重新計算

1.4 分區(qū)器

對于 key-value 的RDD而言,可能存在分區(qū)器(Partitioner)。Spark 實現(xiàn)了兩種類型的分片函數(shù),一個是基于 哈希的HashPartitioner,另外一個是基于范圍的RangePartitioner。只有 key-value 的RDD,才可能有 Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數(shù)決定了RDD本身的分片數(shù)量,也 決定了parent RDD Shuffle輸出時的分片數(shù)量

1.5 優(yōu)先存儲位置

一個列表,存儲存儲每個Partition的優(yōu)先位置(preferred location)。對于一個HDFS文件來說,這個列表保 存的就是每個Partition所在的塊的位置。按照“移動數(shù)據(jù)不移動計算”的理念,Spark在任務(wù)調(diào)度的時候,會盡可 能地將計算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲位置

2. RDD轉(zhuǎn)換之間的常見算子

從前面的RDD的基本特征入手,在工作中常編寫的程序是,創(chuàng)建RDD,RDD的轉(zhuǎn)換,RDD的算子的執(zhí)行,創(chuàng)建對應(yīng)著外部系統(tǒng)的數(shù)據(jù)流入Spark集群的必選步驟,至于之間從集合創(chuàng)建的數(shù)據(jù),一般在測試時候使用,所以不細(xì)述,RDD的轉(zhuǎn)換對應(yīng)一個專門的算子叫Transformation其是惰性加載使用的, 而行動對應(yīng)著觸發(fā)Transformation執(zhí)行的操作,一般是輸出到集合,或者打印出來,或者返回一個值,另外就是從集群輸出到別的系統(tǒng),這有一個專業(yè)詞叫Action.

2.1 常見轉(zhuǎn)換算子

轉(zhuǎn)換算子,即從一個RDD到另外一個RDD的轉(zhuǎn)換操作,對應(yīng)一些內(nèi)置的Compute函數(shù),但是這些函數(shù)被有沒有shuffle來分為寬依賴算子和窄依賴算子

2.1.1 寬依賴和窄依賴的區(qū)別

一般網(wǎng)上文章有兩種,一種是搬運定義的,即是否一個父RDD分區(qū)會被多個子分區(qū)依賴,另外一種是看有沒有Shuffle,有Shuffle就是寬依賴,沒有則是窄依賴,第一種還靠譜點,第二種就是拿本身來說本身,所以沒有參考價值,2.1.3 如何區(qū)別寬依賴和窄依賴,可以之間看這個

2.1.2 寬依賴和窄依賴的常見算子

窄依賴常見算子

map(func):對數(shù)據(jù)集中的每個元素都使用func,然后返回一個新的RDD filter(func):對數(shù)據(jù)集中的每個元素都使用func,然后返回一個包含使func為true的元素構(gòu)成的RDD flatMap(func):與 map 類似,每個輸入元素被映射為0或多個輸出元素 mapPartitions(func):和map很像,但是map是將func作用在每個元素上,而mapPartitions是func作用在整個分 區(qū)上。假設(shè)一個RDD有N個元素,M個分區(qū)(N >> M),那么map的函數(shù)將被調(diào)用N次,而mapPartitions中的函數(shù) 僅被調(diào)用M次,一次處理一個分區(qū)中的所有元素 mapPartitionsWithIndex(func):與 mapPartitions 類似,多了分區(qū)的索引值的信息

glom():將每一個分區(qū)形成一個數(shù)組,形成新的RDD類型 RDD[Array[T]] sample(withReplacement, fraction, seed):采樣算子。以指定的隨機種子(seed)隨機抽樣出數(shù)量為fraction的數(shù) 據(jù),withReplacement表示是抽出的數(shù)據(jù)是否放回,true為有放回的抽樣,false為無放回的抽樣

coalesce(numPartitions,false):無shuffle,一般用來減少分區(qū)

union(otherRDD) : 求兩個RDD的并集

cartesian(otherRDD):笛卡爾積

zip(otherRDD):將兩個RDD組合成 key-value 形式的RDD,默認(rèn)兩個RDD的partition數(shù)量以及元素數(shù)量都相同,否 則會拋出異常。

map 與 mapPartitions 的區(qū)別 map:每次處理一條數(shù)據(jù) mapPartitions:每次處理一個分區(qū)的數(shù)據(jù),分區(qū)的數(shù)據(jù)處理完成后,數(shù)據(jù)才能釋放,資源不足時容易導(dǎo)致 OOM 最佳實踐:當(dāng)內(nèi)存資源充足時,建議使用mapPartitions,以提高處理效率

寬依賴常見算子

groupBy(func):按照傳入函數(shù)的返回值進行分組。將key相同的值放入一個迭代器

distinct([numTasks])):對RDD元素去重后,返回一個新的RDD??蓚魅雗umTasks參數(shù)改變RDD分區(qū)數(shù)

coalesce(numPartitions, true):有shuffle,無論增加分區(qū)還是減少分區(qū),一般用repartition來代替

repartition(numPartitions):增加或減少分區(qū)數(shù),有shuffle

sortBy(func, [ascending], [numTasks]):使用 func 對數(shù)據(jù)進行處理,對處理后的結(jié)果進行排序

intersection(otherRDD) : 求兩個RDD的交集

subtract (otherRDD) : 求兩個RDD的差集

2.1.3 如何區(qū)別寬依賴和窄依賴

這里我建議理解不了的算子,直接從Sparkhistory的依賴圖來看,有沒有劃分Stage,如果劃分了就是寬依賴,沒有劃分就是窄依賴,當(dāng)然這是實戰(zhàn)派的做法,可以在同事或者同學(xué)說明問題的時候,show your code 給他,然后把依賴圖拿給他 ,當(dāng)然作為理論加實踐的并行者,我這里再拿一種來判別,是從理解定義開始的,定義說是父RDD分區(qū)有沒有被多個子分區(qū)依賴,那可以從這個角度想一下,父分區(qū)單個分區(qū)數(shù)據(jù),有沒有可能流向不同的子RDD的分區(qū),比如想一想distinct算子,或者sortBy算子,全局去重和全局排序,假設(shè)剛開始1,2,3在一個分區(qū),經(jīng)過map(x => (x, null)).reduceByKey((x, y) => x).map(_._1) 去重后,雖然分區(qū)數(shù)量沒有變,但是每個分區(qū)數(shù)據(jù)必然要看別的分區(qū)的數(shù)據(jù),才能知道最后自己要不要保留,從輸入分區(qū),到輸出分區(qū),必然經(jīng)過匯合重組,所以必然有shuffle的。sortBy同理。

2.2 常見行動算子

Action觸發(fā)Job。一個Spark程序(Driver程序)包含了多少 Action 算子,那么就有多少Job; 典型的Action算子: collect / count collect() => sc.runJob() => ... => dagScheduler.runJob() => 觸發(fā)了Job

collect() / collectAsMap() stats / count / mean / stdev / max / min reduce(func) / fold(func) / aggregate(func)

first():Return the first element in this RDD take(n):Take the first num elements of the RDD top(n):按照默認(rèn)(降序)或者指定的排序規(guī)則,返回前num個元素。 takeSample(withReplacement, num, [seed]):返回采樣的數(shù)據(jù) foreach(func) / foreachPartition(func):與map、mapPartitions類似,區(qū)別是 foreach 是 Action saveAsTextFile(path) / saveAsSequenceFile(path) / saveAsObjectFile(path)

3. PairRDD常見操作

RDD整體上分為 Value 類型和 Key-Value 類型。 前面介紹的是 Value 類型的RDD的操作,實際使用更多的是 key-value 類型的RDD,也稱為 PairRDD。 Value 類型RDD的操作基本集中在 RDD.scala 中; key-value 類型的RDD操作集中在 PairRDDFunctions.scala 中;

前面介紹的大多數(shù)算子對 Pair RDD 都是有效的,RDD的值為key-value的時候即可隱式轉(zhuǎn)換為PairRDD, Pair RDD還有屬于自己的 Transformation、Action 算子;

大數(shù)據(jù)開發(fā)中Spark常見RDD是怎樣的

3.1 常見PairRDD的Transformation操作

3.1.1 類似 map 操作

mapValues / flatMapValues / keys / values,這些操作都可以使用 map 操作實現(xiàn),是簡化操作。

3.1.2 聚合操作【重要、難點】

PariRDD(k, v)使用范圍廣,聚合 groupByKey / reduceByKey / foldByKey / aggregateByKey combineByKey(OLD) / combineByKeyWithClassTag (NEW) => 底層實現(xiàn) subtractByKey:類似于subtract,刪掉 RDD 中鍵與 other RDD 中的鍵相同的元素

結(jié)論:效率相等用最熟悉的方法;groupByKey在一般情況下效率低,盡量少用

3.1.3 排序操作

sortByKey:sortByKey函數(shù)作用于PairRDD,對Key進行排序

3.1.4 join操作

cogroup / join / leftOuterJoin / rightOuterJoin / fullOuterJoin

大數(shù)據(jù)開發(fā)中Spark常見RDD是怎樣的

val rdd1 = sc.makeRDD(Array((1,"Spark"), (2,"Hadoop"), (3,"Kylin"), (4,"Flink")))
val rdd2 = sc.makeRDD(Array((3,"李四"), (4,"王五"), (5,"趙六"), (6,"馮七")))
val rdd3 = rdd1.cogroup(rdd2)
rdd3.collect.foreach(println)
rdd3.filter{case (_, (v1, v2)) => v1.nonEmpty & v2.nonEmpty}.collect
// 仿照源碼實現(xiàn)join操作
rdd3.flatMapValues( pair =>
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
val rdd1 = sc.makeRDD(Array(("1","Spark"),("2","Hadoop"),("3","Scala"),("4","Java")))
val rdd2 = sc.makeRDD(Array(("3","20K"),("4","18K"),("5","25K"),("6","10K")))
rdd1.join(rdd2).collect
rdd1.leftOuterJoin(rdd2).collect
rdd1.rightOuterJoin(rdd2).collect
rdd1.fullOuterJoin(rdd2).collect

3.1.5 Action操作

collectAsMap / countByKey / lookup(key)

大數(shù)據(jù)開發(fā)中Spark常見RDD是怎樣的

lookup(key):高效的查找方法,只查找對應(yīng)分區(qū)的數(shù)據(jù)

關(guān)于大數(shù)據(jù)開發(fā)中Spark常見RDD是怎樣的問題的解答就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注億速云行業(yè)資訊頻道了解更多相關(guān)知識。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI