您好,登錄后才能下訂單哦!
大數(shù)據(jù)開(kāi)發(fā)中Spark-RDD的持久化和緩存該如何實(shí)現(xiàn),相信很多沒(méi)有經(jīng)驗(yàn)的人對(duì)此束手無(wú)策,為此本文總結(jié)了問(wèn)題出現(xiàn)的原因和解決方法,通過(guò)這篇文章希望你能解決這個(gè)問(wèn)題。
Spark 速度非常快的一個(gè)原因是 RDD 支持緩存。成功緩存后,如果之后的操作使用到了該數(shù)據(jù)集,則直接從緩存中獲取。雖然緩存也有丟失的風(fēng)險(xiǎn),但是由于 RDD 之間的依賴關(guān)系,如果某個(gè)分區(qū)的緩存數(shù)據(jù)丟失,只需要重新計(jì)算該分區(qū)即可。
涉及到的算子:persist、cache、unpersist;都是 Transformation
緩存是將計(jì)算結(jié)果寫(xiě)入不同的介質(zhì),用戶定義可定義存儲(chǔ)級(jí)別(存儲(chǔ)級(jí)別定義了緩存存儲(chǔ)的介質(zhì),目前支持內(nèi)存、堆 外內(nèi)存、磁盤);
通過(guò)緩存,Spark避免了RDD上的重復(fù)計(jì)算,能夠極大地提升計(jì)算速度; RDD持久化或緩存,是Spark最重要的特征之一??梢哉f(shuō),緩存是Spark構(gòu)建迭代式算法和快速交互式查詢的關(guān)鍵因 素;
Spark速度非??斓脑蛑唬褪窃趦?nèi)存中持久化(或緩存)一個(gè)數(shù)據(jù)集。當(dāng)持久化一個(gè)RDD后,每一個(gè)節(jié)點(diǎn)都將 把計(jì)算的分片結(jié)果保存在內(nèi)存中,并在對(duì)此數(shù)據(jù)集(或者衍生出的數(shù)據(jù)集)進(jìn)行的其他動(dòng)作(Action)中重用。這使 得后續(xù)的動(dòng)作變得更加迅速;使用persist()方法對(duì)一個(gè)RDD標(biāo)記為持久化。之所以說(shuō)“標(biāo)記為持久化”,是因?yàn)槌霈F(xiàn)persist()語(yǔ)句的地方,并不會(huì)馬 上計(jì)算生成RDD并把它持久化,而是要等到遇到第一個(gè)行動(dòng)操作觸發(fā)真正計(jì)算以后,才會(huì)把計(jì)算結(jié)果進(jìn)行持久化;通過(guò)persist()或cache()方法可以標(biāo)記一個(gè)要被持久化的RDD,持久化被觸發(fā),RDD將會(huì)被保留在計(jì)算節(jié)點(diǎn)的內(nèi)存中 并重用;
什么時(shí)候緩存數(shù)據(jù),需要對(duì)空間和速度進(jìn)行權(quán)衡。一般情況下,如果多個(gè)動(dòng)作需要用到某個(gè) RDD,而它的計(jì)算代價(jià) 又很高,那么就應(yīng)該把這個(gè) RDD 緩存起來(lái);
緩存有可能丟失,或者存儲(chǔ)于內(nèi)存的數(shù)據(jù)由于內(nèi)存不足而被刪除。RDD的緩存的容錯(cuò)機(jī)制保證了即使緩存丟失也能保 證計(jì)算的正確執(zhí)行。通過(guò)基于RDD的一系列的轉(zhuǎn)換,丟失的數(shù)據(jù)會(huì)被重算。RDD的各個(gè)Partition是相對(duì)獨(dú)立的,因此 只需要計(jì)算丟失的部分即可,并不需要重算全部Partition。
啟動(dòng)堆外內(nèi)存需要配置兩個(gè)參數(shù):
spark.memory.offHeap.enabled :是否開(kāi)啟堆外內(nèi)存,默認(rèn)值為 false,需要設(shè)置為 true;
spark.memory.offHeap.size : 堆外內(nèi)存空間的大小,默認(rèn)值為 0,需要設(shè)置為正值。
Spark 速度非常快的一個(gè)原因是 RDD 支持緩存。成功緩存后,如果之后的操作使用到了該數(shù)據(jù)集,則直接從緩存中獲取。雖然緩存也有丟失的風(fēng)險(xiǎn),但是由于 RDD 之間的依賴關(guān)系,如果某個(gè)分區(qū)的緩存數(shù)據(jù)丟失,只需要重新計(jì)算該分區(qū)即可。
Spark 支持多種緩存級(jí)別 :
Storage Level(存儲(chǔ)級(jí)別) | Meaning(含義) |
---|---|
MEMORY_ONLY | 默認(rèn)的緩存級(jí)別,將 RDD 以反序列化的 Java 對(duì)象的形式存儲(chǔ)在 JVM 中。如果內(nèi)存空間不夠,則部分分區(qū)數(shù)據(jù)將不再緩存。 |
MEMORY_AND_DISK | 將 RDD 以反序列化的 Java 對(duì)象的形式存儲(chǔ) JVM 中。如果內(nèi)存空間不夠,將未緩存的分區(qū)數(shù)據(jù)存儲(chǔ)到磁盤,在需要使用這些分區(qū)時(shí)從磁盤讀取。 |
MEMORY_ONLY_SER | 將 RDD 以序列化的 Java 對(duì)象的形式進(jìn)行存儲(chǔ)(每個(gè)分區(qū)為一個(gè) byte 數(shù)組)。這種方式比反序列化對(duì)象節(jié)省存儲(chǔ)空間,但在讀取時(shí)會(huì)增加 CPU 的計(jì)算負(fù)擔(dān)。僅支持 Java 和 Scala 。 |
MEMORY_AND_DISK_SER | 類似于 MEMORY_ONLY_SER ,但是溢出的分區(qū)數(shù)據(jù)會(huì)存儲(chǔ)到磁盤,而不是在用到它們時(shí)重新計(jì)算。僅支持 Java 和 Scala。 |
DISK_ONLY | 只在磁盤上緩存 RDD |
MEMORY_ONLY_2 , MEMORY_AND_DISK_2 | 與上面的對(duì)應(yīng)級(jí)別功能相同,但是會(huì)為每個(gè)分區(qū)在集群中的兩個(gè)節(jié)點(diǎn)上建立副本。 |
OFF_HEAP | 與 MEMORY_ONLY_SER 類似,但將數(shù)據(jù)存儲(chǔ)在堆外內(nèi)存中。這需要啟用堆外內(nèi)存。 |
啟動(dòng)堆外內(nèi)存需要配置兩個(gè)參數(shù):
spark.memory.offHeap.enabled :是否開(kāi)啟堆外內(nèi)存,默認(rèn)值為 false,需要設(shè)置為 true;
spark.memory.offHeap.size : 堆外內(nèi)存空間的大小,默認(rèn)值為 0,需要設(shè)置為正值。
緩存數(shù)據(jù)的方法有兩個(gè):persist
和 cache
。cache
內(nèi)部調(diào)用的也是 persist
,它是 persist
的特殊化形式,等價(jià)于 persist(StorageLevel.MEMORY_ONLY)
。示例如下:
// 所有存儲(chǔ)級(jí)別均定義在 StorageLevel 對(duì)象中 fileRDD.persist(StorageLevel.MEMORY_AND_DISK) fileRDD.cache()
被緩存的RDD在DAG圖中有一個(gè)綠色的圓點(diǎn)。
Spark 會(huì)自動(dòng)監(jiān)視每個(gè)節(jié)點(diǎn)上的緩存使用情況,并按照最近最少使用(LRU)的規(guī)則刪除舊數(shù)據(jù)分區(qū)。當(dāng)然,你也可以使用 RDD.unpersist()
方法進(jìn)行手動(dòng)刪除。
Spark中對(duì)于數(shù)據(jù)的保存除了持久化操作之外,還提供了檢查點(diǎn)的機(jī)制;檢查點(diǎn)本質(zhì)是通過(guò)將RDD寫(xiě)入高可靠的磁盤,主要目的是為了容錯(cuò)。檢查點(diǎn)通過(guò)將數(shù)據(jù)寫(xiě)入到HDFS文件系統(tǒng)實(shí)現(xiàn)了
RDD的檢查點(diǎn)功能。Lineage過(guò)長(zhǎng)會(huì)造成容錯(cuò)成本過(guò)高,這樣就不如在中間階段做檢查點(diǎn)容錯(cuò),如果之后有節(jié)點(diǎn)出現(xiàn)問(wèn)題而丟失分區(qū),從
做檢查點(diǎn)的RDD開(kāi)始重做Lineage,就會(huì)減少開(kāi)銷。
cache 和 checkpoint 是有顯著區(qū)別的,緩存把 RDD 計(jì)算出來(lái)然后放在內(nèi)存中,但是 RDD 的依賴鏈不能丟掉, 當(dāng)某個(gè)點(diǎn)某個(gè) executor 宕了,上面 cache 的RDD就會(huì)丟掉, 需要通過(guò)依賴鏈重放計(jì)算。不同的是,checkpoint 是把
RDD 保存在 HDFS中,是多副本可靠存儲(chǔ),此時(shí)依賴鏈可以丟掉,所以斬?cái)嗔艘蕾囨湣?/p>
以下場(chǎng)景適合使用檢查點(diǎn)機(jī)制:
DAG中的Lineage過(guò)長(zhǎng),如果重算,則開(kāi)銷太大
在寬依賴上做 Checkpoint 獲得的收益更大
與cache類似 checkpoint 也是 lazy 的。
val rdd1 = sc.parallelize(1 to 100000) // 設(shè)置檢查點(diǎn)目錄 sc.setCheckpointDir("/tmp/checkpoint") val rdd2 = rdd1.map(_*2) rdd2.checkpoint // checkpoint是lazy操作 rdd2.isCheckpointed // checkpoint之前的rdd依賴關(guān)系 rdd2.dependencies(0).rdd rdd2.dependencies(0).rdd.collect // 執(zhí)行一次action,觸發(fā)checkpoint的執(zhí)行 rdd2.count rdd2.isCheckpointed // 再次查看RDD的依賴關(guān)系。可以看到checkpoint后,RDD的lineage被截?cái)?,變成從checkpointRDD開(kāi)始 rdd2.dependencies(0).rdd rdd2.dependencies(0).rdd.collect //查看RDD所依賴的checkpoint文件 rdd2.getCheckpointFile
看完上述內(nèi)容,你們掌握大數(shù)據(jù)開(kāi)發(fā)中Spark-RDD的持久化和緩存該如何實(shí)現(xiàn)的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。