您好,登錄后才能下訂單哦!
大數(shù)據(jù)開發(fā)中Spark常見RDD是怎樣的,針對這個問題,這篇文章詳細(xì)介紹了相對應(yīng)的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。
A list of partitions
A function for computing each split
A list of dependencies on other RDDs
Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
這是RDD的源碼中注釋中寫到的,下面介紹這五種特征屬性
一組分片(Partition),即數(shù)據(jù)集的基本組成單位。對于RDD來說,每個分片都會被一個計算任務(wù)處理,并決 定并行計算的粒度。用戶可以在創(chuàng)建RDD時指定RDD的分片個數(shù),如果沒有指定,那么就會采用默認(rèn)值
一個對分區(qū)數(shù)據(jù)進行計算的函數(shù)。Spark中RDD的計算是以分片為單位的,每個RDD都會實現(xiàn) compute 函數(shù)以 達到該目的。compute函數(shù)會對迭代器進行組合,不需要保存每次計算的結(jié)果
RDD之間的存在依賴關(guān)系。RDD的每次轉(zhuǎn)換都會生成一個新的RDD,RDD之間形成類似于流水線一樣的前后依 賴關(guān)系(lineage)。在部分分區(qū)數(shù)據(jù)丟失時,Spark可以通過這個依賴關(guān)系重新計算丟失的分區(qū)數(shù)據(jù),而不是 對RDD的所有分區(qū)進行重新計算
對于 key-value
的RDD而言,可能存在分區(qū)器(Partitioner
)。Spark 實現(xiàn)了兩種類型的分片函數(shù),一個是基于 哈希的HashPartitioner
,另外一個是基于范圍的RangePartitioner。只有 key-value 的RDD,才可能有 Partitioner
,非key-value的RDD的Parititioner
的值是None。Partitioner
函數(shù)決定了RDD
本身的分片數(shù)量,也 決定了parent RDD Shuffle
輸出時的分片數(shù)量
一個列表,存儲存儲每個Partition
的優(yōu)先位置(preferred location)。對于一個HDFS
文件來說,這個列表保 存的就是每個Partition
所在的塊的位置。按照“移動數(shù)據(jù)不移動計算”的理念,Spark
在任務(wù)調(diào)度的時候,會盡可 能地將計算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲位置
從前面的RDD
的基本特征入手,在工作中常編寫的程序是,創(chuàng)建RDD
,RDD
的轉(zhuǎn)換,RDD
的算子的執(zhí)行,創(chuàng)建對應(yīng)著外部系統(tǒng)的數(shù)據(jù)流入Spark集群的必選步驟,至于之間從集合創(chuàng)建的數(shù)據(jù),一般在測試時候使用,所以不細(xì)述,RDD
的轉(zhuǎn)換對應(yīng)一個專門的算子叫Transformation
其是惰性加載使用的, 而行動對應(yīng)著觸發(fā)Transformation
執(zhí)行的操作,一般是輸出到集合,或者打印出來,或者返回一個值,另外就是從集群輸出到別的系統(tǒng),這有一個專業(yè)詞叫Action
.
轉(zhuǎn)換算子,即從一個RDD到另外一個RDD的轉(zhuǎn)換操作,對應(yīng)一些內(nèi)置的Compute函數(shù),但是這些函數(shù)被有沒有shuffle來分為寬依賴算子和窄依賴算子
一般網(wǎng)上文章有兩種,一種是搬運定義的,即是否一個父RDD
分區(qū)會被多個子分區(qū)依賴,另外一種是看有沒有Shuffle
,有Shuffle
就是寬依賴,沒有則是窄依賴,第一種還靠譜點,第二種就是拿本身來說本身,所以沒有參考價值,2.1.3 如何區(qū)別寬依賴和窄依賴,可以之間看這個
map(func)
:對數(shù)據(jù)集中的每個元素都使用func,然后返回一個新的RDD filter(func)
:對數(shù)據(jù)集中的每個元素都使用func,然后返回一個包含使func為true的元素構(gòu)成的RDD flatMap(func)
:與 map 類似,每個輸入元素被映射為0或多個輸出元素 mapPartitions(func)
:和map很像,但是map是將func作用在每個元素上,而mapPartitions是func作用在整個分 區(qū)上。假設(shè)一個RDD有N個元素,M個分區(qū)(N >> M),那么map的函數(shù)將被調(diào)用N次,而mapPartitions中的函數(shù) 僅被調(diào)用M次,一次處理一個分區(qū)中的所有元素 mapPartitionsWithIndex(func)
:與 mapPartitions 類似,多了分區(qū)的索引值的信息
glom()
:將每一個分區(qū)形成一個數(shù)組,形成新的RDD類型 RDD[Array[T]] sample(withReplacement, fraction, seed)
:采樣算子。以指定的隨機種子(seed)隨機抽樣出數(shù)量為fraction的數(shù) 據(jù),withReplacement表示是抽出的數(shù)據(jù)是否放回,true為有放回的抽樣,false為無放回的抽樣
coalesce(numPartitions,false)
:無shuffle,一般用來減少分區(qū)
union(otherRDD)
: 求兩個RDD的并集
cartesian(otherRDD)
:笛卡爾積
zip(otherRDD)
:將兩個RDD組合成 key-value 形式的RDD,默認(rèn)兩個RDD的partition數(shù)量以及元素數(shù)量都相同,否 則會拋出異常。
map 與 mapPartitions 的區(qū)別 map
:每次處理一條數(shù)據(jù) mapPartitions
:每次處理一個分區(qū)的數(shù)據(jù),分區(qū)的數(shù)據(jù)處理完成后,數(shù)據(jù)才能釋放,資源不足時容易導(dǎo)致 OOM 最佳實踐:當(dāng)內(nèi)存資源充足時,建議使用mapPartitions
,以提高處理效率
groupBy(func)
:按照傳入函數(shù)的返回值進行分組。將key相同的值放入一個迭代器
distinct([numTasks]))
:對RDD元素去重后,返回一個新的RDD??蓚魅雗umTasks參數(shù)改變RDD分區(qū)數(shù)
coalesce(numPartitions, true)
:有shuffle,無論增加分區(qū)還是減少分區(qū),一般用repartition來代替
repartition(numPartitions)
:增加或減少分區(qū)數(shù),有shuffle
sortBy(func, [ascending], [numTasks])
:使用 func 對數(shù)據(jù)進行處理,對處理后的結(jié)果進行排序
intersection(otherRDD)
: 求兩個RDD的交集
subtract (otherRDD)
: 求兩個RDD的差集
這里我建議理解不了的算子,直接從Spark
的history
的依賴圖來看,有沒有劃分Stage
,如果劃分了就是寬依賴,沒有劃分就是窄依賴,當(dāng)然這是實戰(zhàn)派的做法,可以在同事或者同學(xué)說明問題的時候,show your code
給他,然后把依賴圖拿給他 ,當(dāng)然作為理論加實踐的并行者,我這里再拿一種來判別,是從理解定義開始的,定義說是父RDD分區(qū)有沒有被多個子分區(qū)依賴,那可以從這個角度想一下,父分區(qū)單個分區(qū)數(shù)據(jù),有沒有可能流向不同的子RDD的分區(qū),比如想一想distinct算子,或者sortBy算子,全局去重和全局排序,假設(shè)剛開始1,2,3在一個分區(qū),經(jīng)過map(x => (x, null)).reduceByKey((x, y) => x).map(_._1)
去重后,雖然分區(qū)數(shù)量沒有變,但是每個分區(qū)數(shù)據(jù)必然要看別的分區(qū)的數(shù)據(jù),才能知道最后自己要不要保留,從輸入分區(qū),到輸出分區(qū),必然經(jīng)過匯合重組,所以必然有shuffle
的。sortBy
同理。
Action觸發(fā)Job。一個Spark程序(Driver程序)包含了多少 Action 算子,那么就有多少Job; 典型的Action算子: collect / count collect() => sc.runJob() => ... => dagScheduler.runJob() => 觸發(fā)了Job
collect() / collectAsMap() stats / count / mean / stdev / max / min reduce(func) / fold(func) / aggregate(func)
first()
:Return the first element in this RDD take(n)
:Take the first num elements of the RDD top(n)
:按照默認(rèn)(降序)或者指定的排序規(guī)則,返回前num個元素。 takeSample(withReplacement, num, [seed])
:返回采樣的數(shù)據(jù) foreach(func) / foreachPartition(func)
:與map、mapPartitions類似,區(qū)別是 foreach 是 Action saveAsTextFile(path) / saveAsSequenceFile(path) / saveAsObjectFile(path)
RDD整體上分為 Value 類型和 Key-Value 類型。 前面介紹的是 Value 類型的RDD的操作,實際使用更多的是 key-value 類型的RDD,也稱為 PairRDD。 Value 類型RDD的操作基本集中在 RDD.scala 中; key-value 類型的RDD操作集中在 PairRDDFunctions.scala 中;
前面介紹的大多數(shù)算子對 Pair RDD 都是有效的,RDD的值為key-value的時候即可隱式轉(zhuǎn)換為PairRDD, Pair RDD還有屬于自己的 Transformation、Action 算子;
mapValues / flatMapValues / keys / values,這些操作都可以使用 map 操作實現(xiàn),是簡化操作。
PariRDD(k, v)使用范圍廣,聚合 groupByKey / reduceByKey / foldByKey / aggregateByKey combineByKey(OLD) / combineByKeyWithClassTag (NEW) => 底層實現(xiàn) subtractByKey:類似于subtract,刪掉 RDD 中鍵與 other RDD 中的鍵相同的元素
結(jié)論:效率相等用最熟悉的方法;groupByKey在一般情況下效率低,盡量少用
sortByKey:sortByKey函數(shù)作用于PairRDD,對Key進行排序
cogroup / join / leftOuterJoin / rightOuterJoin / fullOuterJoin
val rdd1 = sc.makeRDD(Array((1,"Spark"), (2,"Hadoop"), (3,"Kylin"), (4,"Flink"))) val rdd2 = sc.makeRDD(Array((3,"李四"), (4,"王五"), (5,"趙六"), (6,"馮七"))) val rdd3 = rdd1.cogroup(rdd2) rdd3.collect.foreach(println) rdd3.filter{case (_, (v1, v2)) => v1.nonEmpty & v2.nonEmpty}.collect // 仿照源碼實現(xiàn)join操作 rdd3.flatMapValues( pair => for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w) ) val rdd1 = sc.makeRDD(Array(("1","Spark"),("2","Hadoop"),("3","Scala"),("4","Java"))) val rdd2 = sc.makeRDD(Array(("3","20K"),("4","18K"),("5","25K"),("6","10K"))) rdd1.join(rdd2).collect rdd1.leftOuterJoin(rdd2).collect rdd1.rightOuterJoin(rdd2).collect rdd1.fullOuterJoin(rdd2).collect
collectAsMap / countByKey / lookup(key)
lookup(key)
:高效的查找方法,只查找對應(yīng)分區(qū)的數(shù)據(jù)
關(guān)于大數(shù)據(jù)開發(fā)中Spark常見RDD是怎樣的問題的解答就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注億速云行業(yè)資訊頻道了解更多相關(guān)知識。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。