您好,登錄后才能下訂單哦!
我們先做一個(gè)簡(jiǎn)單的測(cè)試讀取一個(gè)本地文件做一次collect操作:
val rdd=sc.textFile("file:///home/hadoop/data/input.txt")
val rdd=sc.textFile("file:///home/hadoop/data/input.txt")
上面我們進(jìn)行了兩次相同的操作,觀察日志我們發(fā)現(xiàn)這樣一句話Submitting ResultStage 0 (file:///home/hadoop/data/input.txt MapPartitionsRDD[1] at textFile at <console>:25), which has no missing parents
,每次都要去本地讀取input.txt文件,這里大家能想到存在什么問題嗎? 如果我的文件很大,每次都對(duì)相同的RDD進(jìn)行同一個(gè)action操作,那么每次都要到本地讀取文件,得到相同的結(jié)果。不斷進(jìn)行這樣的重復(fù)操作,耗費(fèi)資源浪費(fèi)時(shí)間啊。這時(shí)候我們可能想到能不能把RDD保存在內(nèi)存中呢?答案是可以的,這就是我們所要學(xué)習(xí)的cache。
通過上面的講解我們知道, 有時(shí)候很多地方都會(huì)用到同一個(gè)RDD, 那么每個(gè)地方遇到Action操作的時(shí)候都會(huì)對(duì)同一個(gè)算子計(jì)算多次, 這樣會(huì)造成效率低下的問題。通過cache操作可以把RDD持久化到內(nèi)存或者磁盤。
現(xiàn)在我們利用上面說的例子,把rdd進(jìn)行cache操作
rdd.cache這時(shí)候我們打開192.168.137.130:4040界面查看storage界面中是否有我們的剛才cache的文件,發(fā)現(xiàn)并沒有。這時(shí)候我們進(jìn)行一個(gè)action操作rdd.count。繼續(xù)查看storage是不是有東西了哈
并且給我們列出了很多信息,存儲(chǔ)級(jí)別(后面詳解),大?。〞?huì)發(fā)現(xiàn)要比源文件大,這也是一個(gè)調(diào)優(yōu)點(diǎn))等等。
說到這里小伙伴能能想到什么呢? cacha是一個(gè)Tranformation還是一個(gè)Action呢?相信大伙應(yīng)該知道了。
cache這個(gè)方法也是個(gè)Tranformation,當(dāng)?shù)谝淮斡龅紸ction算子的時(shí)才會(huì)進(jìn)行持久化,所以說我們第一次進(jìn)行了cache操作在ui中并沒有看到結(jié)果,進(jìn)行了count操作才有。
Spark版本:2.2.0
源碼分析
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def cache(): this.type = persist()
從源碼中可以明顯看出cache()調(diào)用了persist(), 想要知道二者的不同還需要看一下persist函數(shù):(這里注釋cache的storage level)
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
可以看到persist()內(nèi)部調(diào)用了persist(StorageLevel.MEMORY_ONLY),是不是和上面對(duì)上了哈,這里我們能夠得出cache和persist的區(qū)別了:cache只有一個(gè)默認(rèn)的緩存級(jí)別MEMORY_ONLY ,而persist可以根據(jù)情況設(shè)置其它的緩存級(jí)別。
我相信小伙伴們肯定很好奇這個(gè)緩存級(jí)別到底有多少種呢?我們繼續(xù)懟源碼看看:
/**
* :: DeveloperApi ::
* Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
* or ExternalBlockStore, whether to drop the RDD to disk if it falls out of memory or
* ExternalBlockStore, whether to keep the data in memory in a serialized format, and whether
* to replicate the RDD partitions on multiple nodes.
*
* The [[org.apache.spark.storage.StorageLevel]] singleton object contains some static constants
* for commonly useful storage levels. To create your own storage level object, use the
* factory method of the singleton object (`StorageLevel(...)`).
*/
@DeveloperApi
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
extends Externalizable
我們先來看看存儲(chǔ)類型,源碼中我們可以看出有五個(gè)參數(shù),分別代表:
useDisk
:使用硬盤(外存);
useMemory
:使用內(nèi)存;
useOffHeap
:使用堆外內(nèi)存,這是Java虛擬機(jī)里面的概念,堆外內(nèi)存意味著把內(nèi)存對(duì)象分配在Java虛擬機(jī)的堆以外的內(nèi)存,這些內(nèi)存直接受操作系統(tǒng)管理(而不是虛擬機(jī))。這樣做的結(jié)果就是能保持一個(gè)較小的堆,以減少垃圾收集對(duì)應(yīng)用的影響。這部分內(nèi)存也會(huì)被頻繁的使用而且也可能導(dǎo)致OOM,它是通過存儲(chǔ)在堆中的DirectByteBuffer對(duì)象進(jìn)行引用,可以避免堆和堆外數(shù)據(jù)進(jìn)行來回復(fù)制;
deserialized
:反序列化,其逆過程序列化(Serialization)是java提供的一種機(jī)制,將對(duì)象表示成一連串的字節(jié);而反序列化就表示將字節(jié)恢復(fù)為對(duì)象的過程。序列化是對(duì)象永久化的一種機(jī)制,可以將對(duì)象及其屬性保存起來,并能在反序列化后直接恢復(fù)這個(gè)對(duì)象;
replication
:備份數(shù)(在多個(gè)節(jié)點(diǎn)上備份,默認(rèn)為1)。
我們接著看看緩存級(jí)別:
/**
* Various [[org.apache.spark.storage.StorageLevel]] defined and utility functions for creating
* new storage levels.
*/
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
可以看到這里列出了12種緩存級(jí)別,但這些有什么區(qū)別呢?可以看到每個(gè)緩存級(jí)別后面都跟了一個(gè)StorageLevel的構(gòu)造函數(shù),里面包含了4個(gè)或5個(gè)參數(shù),和上面說的存儲(chǔ)類型是相對(duì)應(yīng)的,四個(gè)參數(shù)是因?yàn)橛幸粋€(gè)是有默認(rèn)值的。
好吧這里我又想問小伙伴們一個(gè)問題了,這幾種存儲(chǔ)方式什么意思呢?該如何選擇呢?
官網(wǎng)上進(jìn)行了詳細(xì)的解釋。我這里介紹一個(gè)有興趣的同學(xué)可以去官網(wǎng)看看哈。
MEMORY_ONLY
使用反序列化的Java對(duì)象格式,將數(shù)據(jù)保存在內(nèi)存中。如果內(nèi)存不夠存放所有的數(shù)據(jù),某些分區(qū)將不會(huì)被緩存,并且將在需要時(shí)重新計(jì)算。這是默認(rèn)級(jí)別。
MEMORY_AND_DISK
使用反序列化的Java對(duì)象格式,優(yōu)先嘗試將數(shù)據(jù)保存在內(nèi)存中。如果內(nèi)存不夠存放所有的數(shù)據(jù),會(huì)將數(shù)據(jù)寫入磁盤文件中,下次對(duì)這個(gè)RDD執(zhí)行算子時(shí),持久化在磁盤文件中的數(shù)據(jù)會(huì)被讀取出來使用。
MEMORY_ONLY_SER((Java and Scala))
基本含義同MEMORY_ONLY。唯一的區(qū)別是,會(huì)將RDD中的數(shù)據(jù)進(jìn)行序列化,RDD的每個(gè)partition會(huì)被序列化成一個(gè)字節(jié)數(shù)組。這種方式更加節(jié)省內(nèi)存,但是會(huì)加大cpu負(fù)擔(dān)。
一個(gè)簡(jiǎn)單的案例感官行的認(rèn)識(shí)存儲(chǔ)級(jí)別的差別:
19M page_views.dat
val rdd1=sc.textFile("file:///home/hadoop/data/page_views.dat")
rdd1.persist().count
ui查看緩存大?。?/p>
是不是明顯變大了,我們先刪除緩存rdd1.unpersist()
使用MEMORY_ONLY_SER級(jí)別
import org.apache.spark.storage.StorageLevel
rdd1.persist(StorageLevel.MEMORY_ONLY_SER)
rdd1.count
這里我就用這兩種方式進(jìn)行對(duì)比,大家可以試試其他方式。
那如何選擇呢?哈哈官網(wǎng)也說了。
你可以在內(nèi)存使用和CPU效率之間來做出不同的選擇不同的權(quán)衡。
默認(rèn)情況下,性能最高的當(dāng)然是MEMORY_ONLY,但前提是你的內(nèi)存必須足夠足夠大,可以綽綽有余地存放下整個(gè)RDD的所有數(shù)據(jù)。因?yàn)椴贿M(jìn)行序列化與反序列化操作,就避免了這部分的性能開銷;對(duì)這個(gè)RDD的后續(xù)算子操作,都是基于純內(nèi)存中的數(shù)據(jù)的操作,不需要從磁盤文件中讀取數(shù)據(jù),性能也很高;而且不需要復(fù)制一份數(shù)據(jù)副本,并遠(yuǎn)程傳送到其他節(jié)點(diǎn)上。但是這里必須要注意的是,在實(shí)際的生產(chǎn)環(huán)境中,恐怕能夠直接用這種策略的場(chǎng)景還是有限的,如果RDD中數(shù)據(jù)比較多時(shí)(比如幾十億),直接用這種持久化級(jí)別,會(huì)導(dǎo)致JVM的OOM內(nèi)存溢出異常。
如果使用MEMORY_ONLY級(jí)別時(shí)發(fā)生了內(nèi)存溢出,那么建議嘗試使用MEMORY_ONLY_SER級(jí)別。該級(jí)別會(huì)將RDD數(shù)據(jù)序列化后再保存在內(nèi)存中,此時(shí)每個(gè)partition僅僅是一個(gè)字節(jié)數(shù)組而已,大大減少了對(duì)象數(shù)量,并降低了內(nèi)存占用。這種級(jí)別比MEMORY_ONLY多出來的性能開銷,主要就是序列化與反序列化的開銷。但是后續(xù)算子可以基于純內(nèi)存進(jìn)行操作,因此性能總體還是比較高的。此外,可能發(fā)生的問題同上,如果RDD中的數(shù)據(jù)量過多的話,還是可能會(huì)導(dǎo)致OOM內(nèi)存溢出的異常。
不要泄漏到磁盤,除非你在內(nèi)存中計(jì)算需要很大的花費(fèi),或者可以過濾大量數(shù)據(jù),保存部分相對(duì)重要的在內(nèi)存中。否則存儲(chǔ)在磁盤中計(jì)算速度會(huì)很慢,性能急劇降低。
后綴為_2的級(jí)別,必須將所有數(shù)據(jù)都復(fù)制一份副本,并發(fā)送到其他節(jié)點(diǎn)上,數(shù)據(jù)復(fù)制以及網(wǎng)絡(luò)傳輸會(huì)導(dǎo)致較大的性能開銷,除非是要求作業(yè)的高可用性,否則不建議使用。
spark自動(dòng)監(jiān)視每個(gè)節(jié)點(diǎn)上的緩存使用,并以最近最少使用的(LRU)方式丟棄舊數(shù)據(jù)分區(qū)。如果您想手動(dòng)刪除RDD,而不是等待它從緩存中掉出來,請(qǐng)使用 RDD.unpersist()方法。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。