溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

spark中RDD算子的示例分析

發(fā)布時(shí)間:2021-12-10 11:49:29 來(lái)源:億速云 閱讀:251 作者:小新 欄目:云計(jì)算

這篇文章主要介紹了spark中RDD算子的示例分析,具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

Value型Transformation算子

處理數(shù)據(jù)類(lèi)型為Value型的Transformation算子可以根據(jù)RDD變換算子的輸入分區(qū)與輸出分區(qū)關(guān)系分為以下幾種類(lèi)型。

1)輸入分區(qū)與輸出分區(qū)一對(duì)一型。

2)輸入分區(qū)與輸出分區(qū)多對(duì)一型。

3)輸入分區(qū)與輸出分區(qū)多對(duì)多型。

4)輸出分區(qū)為輸入分區(qū)子集型。

5)還有一種特殊的輸入與輸出分區(qū)一對(duì)一的算子類(lèi)型:Cache型。Cache算子對(duì)RDD分區(qū)進(jìn)行緩存。

這里的對(duì)應(yīng)指的是分區(qū)依賴(lài)的對(duì)應(yīng)

spark中RDD算子的示例分析

1.輸入分區(qū)與輸出分區(qū)一對(duì)一型

(1)map(func)

map是對(duì)RDD中的每個(gè)元素都執(zhí)行一個(gè)指定的函數(shù)來(lái)產(chǎn)生一個(gè)新的RDD,新RDD叫作MappedRDD(this, sc.clean(f))。任何原RDD中的元素在新RDD中都有且只有一個(gè)元素與之對(duì)應(yīng)。

圖3-4中的每個(gè)方框表示一個(gè)RDD分區(qū),左側(cè)的分區(qū)經(jīng)過(guò)用戶(hù)自定義函數(shù)f:T->U映射為右側(cè)的新的RDD分區(qū)。但是實(shí)際只有等到Action算子觸發(fā)后,這個(gè)f函數(shù)才會(huì)和其他函數(shù)在一個(gè)Stage中對(duì)數(shù)據(jù)進(jìn)行運(yùn)算。V1輸入f轉(zhuǎn)換輸出V’1。

spark中RDD算子的示例分析

(2)flatMap(func)

類(lèi)似于map,但是每一個(gè)輸入元素,會(huì)被映射為0到多個(gè)輸出元素(因此,func函數(shù)的返回值是一個(gè)Seq,而不是單一元素)。內(nèi)部創(chuàng)建 FlatMappedRDD(this, sc.clean(f))。

圖3-5中小方框表示RDD的一個(gè)分區(qū),對(duì)分區(qū)進(jìn)行flatMap函數(shù)操作,flatMap中傳入的函數(shù)為f:T->U,T和U可以是任意的數(shù)據(jù)類(lèi)型。將分區(qū)中的數(shù)據(jù)通過(guò)用戶(hù)自定義函數(shù)f轉(zhuǎn)換為新的數(shù)據(jù)。外部大方框可以認(rèn)為是一個(gè)RDD分區(qū),小方框代表一個(gè)集合。V1、V2、V3在一個(gè)集合作為RDD的一個(gè)數(shù)據(jù)項(xiàng),轉(zhuǎn)換為V’1、V’2、V’3后,將結(jié)合拆散,形成為RDD中的數(shù)據(jù)項(xiàng)。

spark中RDD算子的示例分析

(3)mapPartitions(func)

mapPartitions是map的一個(gè)變種。map的輸入函數(shù)是應(yīng)用于RDD中每個(gè)元素,而mapPartitions的輸入函數(shù)是應(yīng)用于每個(gè)分區(qū),也就是把每個(gè)分區(qū)中的內(nèi)容作為整體來(lái)處理的。

mapPartitions函數(shù)獲取到每個(gè)分區(qū)的迭代器,在函數(shù)中通過(guò)這個(gè)分區(qū)整體的迭代器對(duì)整個(gè)分區(qū)的元素進(jìn)行操作。內(nèi)部實(shí)現(xiàn)是生成MapPartitionsRDD。圖3-6中的方框代表一個(gè)RDD分區(qū)。

圖3-6中,用戶(hù)通過(guò)函數(shù)f (iter )=>iter.filter(_>=3)對(duì)分區(qū)中的所有數(shù)據(jù)進(jìn)行過(guò)濾,>=3的數(shù)據(jù)保留。一個(gè)方塊代表一個(gè)RDD分區(qū),含有1、2、3的分區(qū)過(guò)濾只剩下元素3。

spark中RDD算子的示例分析

(4)glom()

glom函數(shù)將每個(gè)分區(qū)形成一個(gè)數(shù)組,內(nèi)部實(shí)現(xiàn)是返回的GlommedRDD。圖3-7中的每個(gè)方框代表一個(gè)RDD分區(qū)。

圖3-7中的方框代表一個(gè)分區(qū)。該圖表示含有V1、V2、V3的分區(qū)通過(guò)函數(shù)glom形成一個(gè)數(shù)組Array[(V1),(V2),(V3)]。

spark中RDD算子的示例分析

2.輸入分區(qū)與輸出分區(qū)多對(duì)一型

(1)union(otherDataset)

使用union函數(shù)時(shí)需要保證兩個(gè)RDD元素的數(shù)據(jù)類(lèi)型相同,返回的RDD數(shù)據(jù)類(lèi)型和被合并的RDD元素?cái)?shù)據(jù)類(lèi)型相同,并不進(jìn)行去重操作,保存所有元素。如果想去重,可以使用distinct()。++符號(hào)相當(dāng)于uion函數(shù)操作。

圖3-8中左側(cè)的大方框代表兩個(gè)RDD,大方框內(nèi)的小方框代表RDD的分區(qū)。右側(cè)大方框代表合并后的RDD,大方框內(nèi)的小方框代表分區(qū)。含有V1,V2…U4的RDD和含有V1,V8…U8的RDD合并所有元素形成一個(gè)RDD。V1、V1、V2、V8形成一個(gè)分區(qū),其他元素同理進(jìn)行合并。

spark中RDD算子的示例分析

(2)cartesian(otherDataset)

對(duì)兩個(gè)RDD內(nèi)的所有元素進(jìn)行笛卡爾積操作。操作后,內(nèi)部實(shí)現(xiàn)返回CartesianRDD。

左側(cè)的大方框代表兩個(gè)RDD,大方框內(nèi)的小方框代表RDD的分區(qū)。右側(cè)大方框代表合并后的RDD,大方框內(nèi)的小方框代表分區(qū)。大方框代表RDD,大方框中的小方框代表RDD分區(qū)。 例如,V1和另一個(gè)RDD中的W1、 W2、 Q5進(jìn)行笛卡爾積運(yùn)算形成(V1,W1)、(V1,W2)、(V1,Q5)。

spark中RDD算子的示例分析

3.輸入分區(qū)與輸出分區(qū)多對(duì)多型

groupBy (func)

將元素通過(guò)函數(shù)生成相應(yīng)的Key,數(shù)據(jù)就轉(zhuǎn)化為Key-Value格式,之后將Key相同的元素分為一組。
圖中,方框代表一個(gè)RDD分區(qū),相同key的元素合并到一個(gè)組。 例如,V1,V2合并為一個(gè)Key-Value對(duì),其中key為“ V” ,Value為“ V1,V2” ,形成V,Seq(V1,V2)。

spark中RDD算子的示例分析

4.輸出分區(qū)為輸入分區(qū)子集型

(1)filter(func)

filter的功能是對(duì)元素進(jìn)行過(guò)濾,對(duì)每個(gè)元素應(yīng)用f函數(shù),返回值為true的元素在RDD中保留,返回為false的將過(guò)濾掉。內(nèi)部實(shí)現(xiàn)相當(dāng)于生成FilteredRDD(this,sc.clean(f))。
圖3-11中的每個(gè)方框代表一個(gè)RDD分區(qū)。T可以是任意的類(lèi)型。通過(guò)用戶(hù)自定義的過(guò)濾函數(shù)f,對(duì)每個(gè)數(shù)據(jù)項(xiàng)進(jìn)行操作,將滿(mǎn)足條件,返回結(jié)果為true的數(shù)據(jù)項(xiàng)保留。例如,過(guò)濾掉V2、V3保留了V1,將區(qū)分命名為V1'。

spark中RDD算子的示例分析

(2)distinct([numTasks]))

distinct將RDD中的元素進(jìn)行去重操作。圖3-12中的方框代表RDD分區(qū)。

圖3-12中的每個(gè)方框代表一個(gè)分區(qū),通過(guò)distinct函數(shù),將數(shù)據(jù)去重。例如,重復(fù)數(shù)據(jù)V1、V1去重后只保留一份V1。

spark中RDD算子的示例分析

(3)subtract(other, numPartitions=None)

subtract相當(dāng)于進(jìn)行集合的差操作,RDD 1去除RDD 1和RDD 2交集中的所有元素。

圖3-13中左側(cè)的大方框代表兩個(gè)RDD,大方框內(nèi)的小方框代表RDD的分區(qū)。右側(cè)大方框代表合并后的RDD,大方框內(nèi)的小方框代表分區(qū)。V1在兩個(gè)RDD中均有,根據(jù)差集運(yùn)算規(guī)則,新RDD不保留,V2在第一個(gè)RDD有,第二個(gè)RDD沒(méi)有,則在新RDD元素中包含V2。

spark中RDD算子的示例分析

(4)sample(withReplacement, fraction, seed=None)

sample將RDD這個(gè)集合內(nèi)的元素進(jìn)行采樣,獲取所有元素的子集。用戶(hù)可以設(shè)定是否有放回的抽樣、百分比、隨機(jī)種子,進(jìn)而決定采樣方式。

內(nèi)部實(shí)現(xiàn)是生成SampledRDD(withReplacement, fraction, seed)。

函數(shù)參數(shù)設(shè)置如下。

withReplacement=true,表示有放回的抽樣;

withReplacement=false,表示無(wú)放回的抽樣。

圖3-14中的每個(gè)方框是一個(gè)RDD分區(qū)。通過(guò)sample函數(shù),采樣50%的數(shù)據(jù)。V1、V2、U1、U2、U3、U4采樣出數(shù)據(jù)V1和U1、U2,形成新的RDD。

spark中RDD算子的示例分析

(5)takeSample(withReplacement, num, seed=None)

takeSample()函數(shù)和上面的sample函數(shù)是一個(gè)原理,但是不使用相對(duì)比例采樣,而是按設(shè)定的采樣個(gè)數(shù)進(jìn)行采樣,同時(shí)返回結(jié)果不再是RDD,而是相當(dāng)于對(duì)采樣后的數(shù)據(jù)進(jìn)行Collect(),返回結(jié)果的集合為單機(jī)的數(shù)組。

圖3-15中左側(cè)的方框代表分布式的各個(gè)節(jié)點(diǎn)上的分區(qū),右側(cè)方框代表單機(jī)上返回的結(jié)果數(shù)組。通過(guò)takeSample對(duì)數(shù)據(jù)采樣,設(shè)置為采樣一份數(shù)據(jù),返回結(jié)果為V1。

spark中RDD算子的示例分析

5.Cache型

(1)cache

cache將RDD元素從磁盤(pán)緩存到內(nèi)存,相當(dāng)于persist(MEMORY_ONLY)函數(shù)的功能。圖3-14中的方框代表RDD分區(qū)。

圖3-16中的每個(gè)方框代表一個(gè)RDD分區(qū),左側(cè)相當(dāng)于數(shù)據(jù)分區(qū)都存儲(chǔ)在磁盤(pán),通過(guò)cache算子將數(shù)據(jù)緩存在內(nèi)存。

spark中RDD算子的示例分析

(2)persist(storageLevel=StorageLevel(False, True, False, False, 1))

persist函數(shù)對(duì)RDD進(jìn)行緩存操作。數(shù)據(jù)緩存在哪里由StorageLevel枚舉類(lèi)型確定。有以下幾種類(lèi)型的組合(見(jiàn)圖3-15),DISK代表磁盤(pán),MEMORY代表內(nèi)存,SER代表數(shù)據(jù)是否進(jìn)行序列化存儲(chǔ)。

下面為函數(shù)定義,StorageLevel是枚舉類(lèi)型,代表存儲(chǔ)模式,用戶(hù)可以通過(guò)圖3-17按需選擇。

圖3-17中列出persist函數(shù)可以緩存的模式。例如,MEMORY_AND_DISK_SER代表數(shù)據(jù)可以存儲(chǔ)在內(nèi)存和磁盤(pán),并且以序列化的方式存儲(chǔ)。其他同理。圖中,方框代表RDD分區(qū)。 disk代表存儲(chǔ)在磁盤(pán),mem代表存儲(chǔ)在內(nèi)存。 數(shù)據(jù)最初全部存儲(chǔ)在磁盤(pán),通過(guò)persist(MEMORY_AND_DISK)將數(shù)據(jù)緩存到內(nèi)存,但是有的分區(qū)無(wú)法容納在內(nèi)存,例如:圖3-18中將含有V1,V2,V3的RDD存儲(chǔ)到磁盤(pán),將含有U1,U2的RDD仍舊存儲(chǔ)在內(nèi)存。spark中RDD算子的示例分析

spark中RDD算子的示例分析

感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“spark中RDD算子的示例分析”這篇文章對(duì)大家有幫助,同時(shí)也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,更多相關(guān)知識(shí)等著你來(lái)學(xué)習(xí)!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI