溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

RDD行動操作方法是什么

發(fā)布時間:2021-12-20 14:19:36 來源:億速云 閱讀:179 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要介紹“RDD行動操作方法是什么”,在日常操作中,相信很多人在RDD行動操作方法是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”RDD行動操作方法是什么”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

行動操作是真正觸發(fā)計算的地方。Spark程序執(zhí)行到行動操作時,才會執(zhí)行真正的計算,從文件中加載數(shù)據(jù),完成一次又一次轉(zhuǎn)換操作,最終,完成行動操作得到結(jié)果。

操作說明
count()返回數(shù)據(jù)集中的元素個數(shù)
collect()以數(shù)組的形式返回數(shù)據(jù)集中的所有元素
first()返回數(shù)據(jù)集中的第一個元素
take(n)以數(shù)組的形式返回數(shù)據(jù)集中的前n個元素
reduce(func)通過函數(shù)func(輸入兩個參數(shù)并返回一個值)聚合數(shù)據(jù)集中的元素
foreach(func)將數(shù)據(jù)集中的每個元素傳遞到函數(shù)func中運行

####惰性機制
在當(dāng)前的spark目錄下面創(chuàng)建input目錄

cd $SPARK_HOMEmkdir inputvim word.txthello worldhello sparkhello hadoophello scala

由于textFile()方法只是一個轉(zhuǎn)換操作,因此,這行代碼執(zhí)行后,不會立即把data.txt文件加載到內(nèi)存中,這時的lines只是一個指向這個文件的指針。

scala> val lines = sc.textFile("word.txt")lines: org.apache.spark.rdd.RDD[String] = word.txt MapPartitionsRDD[13] at textFile at <console>:24

下面代碼用來計算每行的長度(即每行包含多少個單詞),同樣,由于map()方法只是一個轉(zhuǎn)換操作,這行代碼執(zhí)行后,不會立即計算每行的長度。

scala> val lineLengths = lines.map(s=>s.length)lineLengths: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[14] at map at <console>:25

reduce()方法是一個“動作”類型的操作,這時,就會觸發(fā)真正的計算。這時,Spark會把計算分解成多個任務(wù)在不同的機器上執(zhí)行,每臺機器運行位于屬于它自己的map和reduce,最后把結(jié)果返回給Driver Program。

scala> val totalLength = lineLengths.reduce((a,b)=> a+b)totalLength: Int = 45

####count
lines就是一個RDD。lines.filter()會遍歷lines中的每行文本,并對每行文本執(zhí)行括號中的匿名函數(shù),也就是執(zhí)行Lamda表達(dá)式:line => line.contains(“spark”),在執(zhí)行Lamda表達(dá)式時,會把當(dāng)前遍歷到的這行文本內(nèi)容賦值給參數(shù)line,然后,執(zhí)行處理邏輯line.contains(“spark”),也就是只有當(dāng)改行文本包含“spark”才滿足條件,才會被放入到結(jié)果集中。最后,等到lines集合遍歷結(jié)束后,就會得到一個結(jié)果集,這個結(jié)果集中包含了所有包含“Spark”的行。最后,對這個結(jié)果集調(diào)用count(),這是一個行動操作,會計算出結(jié)果集中的元素個數(shù)。

##持久化
在Spark中,RDD采用惰性求值的機制,每次遇到行動操作,都會從頭開始執(zhí)行計算。如果整個Spark程序中只有一次行動操作,這當(dāng)然不會有什么問題。但是,在一些情形下,我們需要多次調(diào)用不同的行動操作,這就意味著,每次調(diào)用行動操作,都會觸發(fā)一次從頭開始的計算。這對于迭代計算而言,代價是很大的,迭代計算經(jīng)常需要多次重復(fù)使用同一組數(shù)據(jù)。

前后共觸發(fā)了兩次從頭到尾的計算。
實際上,可以通過持久化(緩存)機制避免這種重復(fù)計算的開銷??梢允褂胮ersist()方法對一個RDD標(biāo)記為持久化,之所以說“標(biāo)記為持久化”,是因為出現(xiàn)persist()語句的地方,并不會馬上計算生成RDD并把它持久化,而是要等到遇到第一個行動操作觸發(fā)真正計算以后,才會把計算結(jié)果進(jìn)行持久化,持久化后的RDD將會被保留在計算節(jié)點的內(nèi)存中被后面的行動操作重復(fù)使用。
persist()的圓括號中包含的是持久化級別參數(shù),
persist(MEMORY_ONLY)表示將RDD作為反序列化的對象存儲于JVM中,如果內(nèi)存不足,就要按照LRU原則替換緩存中的內(nèi)容。
persist(MEMORY_AND_DISK)表示將RDD作為反序列化的對象存儲在JVM中,如果內(nèi)存不足,超出的分區(qū)將會被存放在硬盤上。
一般而言,使用cache()方法時,會調(diào)用persist(MEMORY_ONLY)。

可以使用unpersist()方法手動地把持久化的RDD從緩存中移除。

分區(qū)

RDD是彈性分布式數(shù)據(jù)集,通常RDD很大,會被分成很多個分區(qū),分別保存在不同的節(jié)點上。RDD分區(qū)的一個分區(qū)原則是使得分區(qū)的個數(shù)盡量等于集群中的CPU核心(core)數(shù)目。
對于不同的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),都可以通過設(shè)置spark.default.parallelism這個參數(shù)的值,來配置默認(rèn)的分區(qū)數(shù)目,一般而言:
*本地模式:默認(rèn)為本地機器的CPU數(shù)目,若設(shè)置了local[N],則默認(rèn)為N;
*Apache Mesos:默認(rèn)的分區(qū)數(shù)為8;
*Standalone或YARN:在“集群中所有CPU核心數(shù)目總和”和“2”二者中取較大值作為默認(rèn)值;
因此,對于parallelize而言,如果沒有在方法中指定分區(qū)數(shù),則默認(rèn)為spark.default.parallelism,比如:

對于textFile而言,如果沒有在方法中指定分區(qū)數(shù),則默認(rèn)為min(defaultParallelism,2),其中,defaultParallelism對應(yīng)的就是spark.default.parallelism。
如果是從HDFS中讀取文件,則分區(qū)數(shù)為文件分片數(shù)(比如,128MB/片)。

到此,關(guān)于“RDD行動操作方法是什么”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

rdd
AI