溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Spark中foreachRDD、foreachPartition和foreach的區(qū)別是什么

發(fā)布時(shí)間:2021-12-09 16:13:42 來源:億速云 閱讀:196 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要介紹“Spark中foreachRDD、foreachPartition和foreach的區(qū)別是什么”,在日常操作中,相信很多人在Spark中foreachRDD、foreachPartition和foreach的區(qū)別是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Spark中foreachRDD、foreachPartition和foreach的區(qū)別是什么”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

區(qū)別

最近有不少同學(xué)問我,Spark 中 foreachRDD、foreachPartition和foreach 的區(qū)別,工作中經(jīng)常會(huì)用錯(cuò)或不知道怎么用,今天簡單聊聊它們之間的區(qū)別:

其實(shí)區(qū)別它們很簡單,首先是作用范圍不同,foreachRDD 作用于 DStream中每一個(gè)時(shí)間間隔的 RDD,foreachPartition 作用于每一個(gè)時(shí)間間隔的RDD中的每一個(gè) partition,foreach 作用于每一個(gè)時(shí)間間隔的 RDD 中的每一個(gè)元素。

http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

SparkStreaming 中對 foreachRDD的說明。

foreach 與 foreachPartition都是在每個(gè)partition中對iterator進(jìn)行操作,不同的是,foreach是直接在每個(gè)partition中直接對iterator執(zhí)行foreach操作,而傳入的function只是在foreach內(nèi)部使用,而foreachPartition是在每個(gè)partition中把iterator給傳入的function,讓function自己對iterator進(jìn)行處理(可以避免內(nèi)存溢出)

一個(gè)簡單的例子

在Spark 官網(wǎng)中,foreachRDD被劃分到Output Operations on DStreams中,所有我們首先要明確的是,它是一個(gè)輸出操作的算子,然后再來看官網(wǎng)對它的含義解釋:The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.

最常用的輸出操作,需要一個(gè)函數(shù)作為參數(shù),函數(shù)作用于DStream中的每一個(gè)RDD,函數(shù)將RDD中的數(shù)據(jù)輸出到外部系統(tǒng),如文件、數(shù)據(jù)庫,在driver上執(zhí)行

函數(shù)中通常要有action算子,因?yàn)閒oreachRDD本身是transform算子

官網(wǎng)還給出了開發(fā)者常見的錯(cuò)誤:

Often writing data to external system requires creating a connection object (e.g. TCP connection to a remote server) and using it to send data to a remote system. For this purpose, a developer may inadvertently try creating a connection object at the Spark driver, and then try to use it in a Spark worker to save records in the RDDs. For example :(中文解析見代碼下方)

// ① 這種寫法是錯(cuò)誤的 ?dstream.foreachRDD { rdd =>  val connection = createNewConnection()  // executed at the driver  rdd.foreach { record =>    connection.send(record) // executed at the worker  }}

上面說的是我們使用foreachRDD向外部系統(tǒng)輸出數(shù)據(jù)時(shí),通常要?jiǎng)?chuàng)建一個(gè)連接對象,如果像上面的代碼中創(chuàng)建在 driver 上就是錯(cuò)誤的,因?yàn)閒oreach在每個(gè)節(jié)點(diǎn)上執(zhí)行時(shí)節(jié)點(diǎn)上并沒有連接對象。driver節(jié)點(diǎn)就一個(gè),而worker節(jié)點(diǎn)有多個(gè)。

所以,我們改成下面這樣:

// ② 把創(chuàng)建連接寫在 forech 里面,RDD 中的每個(gè)元素都會(huì)創(chuàng)建一個(gè)連接dstream.foreachRDD { rdd =>  rdd.foreach { record =>    val connection = createNewConnection() // executed at the worker    connection.send(record) // executed at the worker    connection.close()  }}

這時(shí)不會(huì)出現(xiàn)計(jì)算節(jié)點(diǎn)沒有連接對象的情況。但是,這樣寫會(huì)在每次循環(huán)RDD的時(shí)候都會(huì)創(chuàng)建一個(gè)連接,創(chuàng)建連接和關(guān)閉連接都很頻繁,造成系統(tǒng)不必要的開銷。

可以通過使用 foreachPartirion 來解決這類問題:

// ③ 使用foreachPartitoin來減少連接的創(chuàng)建,RDD的每個(gè)partition創(chuàng)建一個(gè)鏈接dstream.foreachRDD { rdd =>  rdd.foreachPartition { partitionOfRecords =>    val connection = createNewConnection()    partitionOfRecords.foreach(record => connection.send(record))    connection.close()  }}

上面這種方式還可以優(yōu)化,雖然連接申請變少了,但是對一每一個(gè)partition來說,連接還是沒有辦法復(fù)用,所以我們可以引入靜態(tài)連接池。官方說明:該連接池必須是靜態(tài)的、懶加載的。

// ④ 使用靜態(tài)連接池,可以增加連接的復(fù)用、減少連接的創(chuàng)建和關(guān)閉。dstream.foreachRDD { rdd =>  rdd.foreachPartition { partitionOfRecords =>    // ConnectionPool is a static, lazily initialized pool of connections    val connection = ConnectionPool.getConnection()    partitionOfRecords.foreach(record => connection.send(record))    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse  }}

這里需要注意的是:使用連接池中的連接應(yīng)按需創(chuàng)建,如果有一段時(shí)間不使用,則應(yīng)超時(shí),這樣實(shí)現(xiàn)了向外部系統(tǒng)最有效地發(fā)送地?cái)?shù)據(jù)。

到此,關(guān)于“Spark中foreachRDD、foreachPartition和foreach的區(qū)別是什么”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI