溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

怎么分析spark計算框架

發(fā)布時間:2021-12-16 20:41:54 來源:億速云 閱讀:132 作者:柒染 欄目:大數(shù)據(jù)

本篇文章給大家分享的是有關(guān)怎么分析spark計算框架,小編覺得挺實用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

首先明確一點:學(xué)計算框架主要就是學(xué)2部分:1.資源調(diào)度 2.任務(wù)調(diào)度

寫一個spark程序包含加載配置文件,創(chuàng)建上下文,創(chuàng)建RDD , 調(diào)用RDD的算子,用戶在算子中自定義的函數(shù)

map端:狹窄的理解是MapReduce中的map端,本質(zhì)就是將數(shù)據(jù)變成你想要的形式,例如:按照空格切分,乘2等等操作。

shuffle : 分為shuffle write(臨時存到本地磁盤)和shuffle read(從磁盤拉數(shù)據(jù),同一個分區(qū)的拉到一個partition上)階段,本質(zhì)就是數(shù)據(jù)的規(guī)整,例如同一個分區(qū)的拉到一塊。

reduce端:狹窄的理解是MapReduce中的reduce端,本質(zhì)就是數(shù)據(jù)的聚合

寬泛的理解2個stage之間,前面的可以說是map端,后面的stage可以理解為reduce端,中間正好需要shuffle過程,且shuffle過程需要再shuffle write階段將數(shù)據(jù)暫時存到本地磁盤上。

spark專業(yè)術(shù)語:

任務(wù)相關(guān)的專業(yè)術(shù)語:

1.application:用戶寫的應(yīng)用程序(包含2部分:Driver Program(運(yùn)行應(yīng)用的main()方法,創(chuàng)建spark上下文 )和Executor Program(用戶在算子中自定義的函數(shù)))

2.job:一個action類算子觸發(fā)執(zhí)行的操作,有多少個action類算子就有多少個job,一個應(yīng)用程序可以有多個job.

3.stage(階段):一組任務(wù)(task)就是一個stage,例如MapReduce中一組的map task(一個切片對應(yīng)一個map task),一個job中可以有有多個stage(根據(jù)寬依賴為分界線來劃分的)

.4.task(任務(wù):底層就是一個thread(線程)):在集群運(yùn)行時最小的執(zhí)行單元

集群相關(guān)的專業(yè)術(shù)語:

Master:資源管理的主節(jié)點

Worker:資源管理的從節(jié)點

Executor:執(zhí)行任務(wù)的進(jìn)程,運(yùn)行在worker節(jié)點上,負(fù)責(zé)運(yùn)行task,負(fù)責(zé)將數(shù)據(jù)存儲到內(nèi)存或磁盤,每個application有多個獨立的Executors

ThreadPool:線程池,存在與Executor進(jìn)程中,task在線程池中運(yùn)行

RDD的依賴關(guān)系

RDD有5大特性:

1.一個RDD有多個partition組成。

2.每個算子實質(zhì)上作用于每個partition上。

3.每個RDD依賴其父RDD.

4.可選項 :分區(qū)器是作用于KV格式的RDD上

5.可選項:RDD會提供一系列的最佳的計算位置

父RDD不知道其子RDD,但是子RDD知道的的所有父RDD

1.窄依賴:父RDD與子RDD,partition的關(guān)系是一對一,這種情況并沒有shuffle過程

例如:map(x=>x.split(" "))

2.寬依賴 : 父RDD與子RDD,partition之間的關(guān)系是一對多,這種情況下一般都會導(dǎo)致shuffle數(shù)據(jù)規(guī)整的過程

例如:groupByKey()->相同key的二元組一定在同一個分區(qū)中,無參的情況下子RDD的分區(qū)數(shù)等于父RDD的分區(qū)數(shù)(也就是會先計算key的hash函數(shù)再與父RDD的分區(qū)數(shù)求余,所以最終的數(shù)據(jù)一定會散落在這幾個分區(qū)中),當(dāng)然你可以傳入?yún)?shù),這個參數(shù)用于鎖定該子RDD有多少個分區(qū),后面調(diào)優(yōu)的時候會用到。

groupBy:根據(jù)指定的作為分組依據(jù),同sortBy和sortByKey

寬窄依賴的作用是:將job切割成多個stage.從祖先RDD開始找,如果是窄依賴?yán)^續(xù)往下找,以寬依賴為切割點,分為2個stage

那么為什么要劃分出stage呢?因為每個stage中的RDD都是窄依賴,沒有shuffle過程,且每個partition都是一對一的關(guān)系,所以可以在后面以管道的形式使每個partition上的task并行處理 (簡單說就是為了是每個task以管道的形式進(jìn)行計算)

關(guān)于stage的一個結(jié)論:stage與stage之間是寬依賴,stage內(nèi)部都是窄依賴

形成一個DAG(有向無環(huán)圖)需要從最后一個RDD往前回溯:因為子RDD知道父RDD,但是父RDD不知道子RDD

怎么分析spark計算框架

RDD中不是存儲的真實數(shù)據(jù),而是存儲的對數(shù)據(jù)處理的邏輯過程

對于KV格式的RDD應(yīng)該說:存儲的邏輯過程的返回類型是二元組類型我們稱為是KV格式的RDD

怎么分析spark計算框架

每個task作用于partition所在的block或副本所在的節(jié)點上(計算向數(shù)據(jù)移動,本地化可以大大減少網(wǎng)絡(luò)傳輸),這里task的計算邏輯(也就是這個展開式),處理的結(jié)果并沒有落地(存到磁盤的意思),而是以管道的模式,一條一條數(shù)據(jù)的從partition(邏輯上的,數(shù)據(jù)存在block上)中讀到內(nèi)存,在內(nèi)存中一直連續(xù)的執(zhí)行,直到最后執(zhí)行完這個task才可能會落地,一條接著一條的流式處理,一個task中的數(shù)據(jù)像流水線一樣,多個task是并行計算的。

偽代碼中的輸出:一條filter的輸出,一條map的輸出,交替出現(xiàn),而不是先將filter中的所有數(shù)據(jù)都打印出來,再打印map的數(shù)據(jù)。

從這里就能明顯感覺到spark計算框架比MapReduce計算框架的優(yōu)勢:基于內(nèi)存迭代,不需要落地,不需要存儲到磁盤,減少了磁盤IO,大大提高了效率。

幾個問題:

1.stage中的task(管道模式)并行計算,什么時候會落地磁盤呢?

①如果是stage后面是action類算子

collect:將每個管道中的輸出結(jié)果收集到driver端的內(nèi)存中

saveAsTextFile:將每個管道中的輸出結(jié)果保存到指定目錄,可以是本地磁盤,也可以是hdfs中

count:將管道的計算結(jié)果統(tǒng)計記錄數(shù),返回給Driver

②如果是stage后面是stage

在shuffle write節(jié)點會寫到本地磁盤暫時存儲,因為內(nèi)存中的數(shù)據(jù)不夠穩(wěn)定,為了防止reduce task拉取數(shù)據(jù)失敗

2.spark在計算過程中,是不是非常消耗內(nèi)存?

不是,正常使用,因為管道是很細(xì)的不會導(dǎo)致內(nèi)存過大,多個task并行運(yùn)算,也是正常使用,但是如果使用控制類算子的 cache,就會消耗大量內(nèi)存,因為如果一個rdd調(diào)用cache(),會將這個管道,開一個口,將數(shù)據(jù)復(fù)制一份放到內(nèi)存中存儲,方便下次運(yùn)行,但是非常消耗內(nèi)存。

3.RDD彈性分布式數(shù)據(jù)集,為什么不存儲數(shù)據(jù),還依然叫數(shù)據(jù)集?

因為它有處理數(shù)據(jù)的能力,可以通過生活的例子來舉例說明:例如:滴滴雖然每年一直虧損,但是市值依然很高,因為他雖然沒錢,但有創(chuàng)造錢的能力

對比一下spark和MapReduce的計算模式的差異:

mapreduce是1+1=2 2+1=3

spark是1+1+1=3

spark的任務(wù)調(diào)度過程:

怎么分析spark計算框架

1.首先編寫一個Application(上面的這個程序缺少一個action算子),一個spark應(yīng)用程序是基于RDD來操作的,會先創(chuàng)建出相應(yīng)的RDD對象,然后建立一個系統(tǒng)DAG(有向無環(huán)圖)

2.DAGScheduler(有向無環(huán)圖調(diào)度器)分割這個DAG,將其分割成多個stage,每個stage中有一組的task,所以也叫TaskSet(任務(wù)集合),一個stage就是一個TaskSet

3.將TaskSet提交給TaskScheduler(任務(wù)調(diào)度器),經(jīng)由集群管理者發(fā)送任務(wù)到worker節(jié)點運(yùn)行,監(jiān)控task,會重試失敗的task和掉隊的task,不可能無限重試,所以限制重試次數(shù)為3次,默認(rèn)最大失敗次數(shù)為4次,如果重試了3次還是失敗,此時TaskScheduler會向DAGScheduler匯報當(dāng)前失敗的task所在的stage失敗,此時DAGScheduler收到匯報也會重試該stage,重試次數(shù)默認(rèn)為4次,注意此時已經(jīng)成功執(zhí)行的task不需要再重新執(zhí)行了,只需要提交失敗的task就行,如果stage重試4次失敗,說明這個job就徹底失敗了,job沒有重試。

那么問題是發(fā)送到哪個work節(jié)點呢?最好是存儲節(jié)點(HDFS)包含計算節(jié)點(這里是spark集群),因為這樣為了數(shù)據(jù)本地化。根據(jù)文件名就可以獲得該文件的所有信息,根據(jù)文件名可以獲得每一個block的位置,以及block所在節(jié)點的ip等,然后就將task發(fā)送到該節(jié)點運(yùn)行就行。

4.task放到work節(jié)點的executor進(jìn)程中的線程池中運(yùn)行

怎么分析spark計算框架

spark資源調(diào)度的方式

粗粒度的資源調(diào)度

在任務(wù)執(zhí)行前申請到所需的所有資源,當(dāng)所有 task 執(zhí)行完畢后再釋放資源

優(yōu)點:task 直接使用已經(jīng)申請好的資源,執(zhí)行效率高

缺點:所有的 task 執(zhí)行完畢才釋放資源,可能導(dǎo)致集群資源浪費,例如只剩一個 task 遲遲不能結(jié)束,那么大量資源將被閑置

細(xì)粒度的資源調(diào)度

任務(wù)執(zhí)行時,task 自己去申請資源,執(zhí)行完畢后釋放資源

優(yōu)點:使集群資源得以充分利用

缺點:task 需要自己申請資源,執(zhí)行效率低

spark on standalone 執(zhí)行流程

1> worker 節(jié)點啟動,向 master 匯報信息,該信息被存儲在 workers 對象中,workers 底層使用 HashSet 數(shù)據(jù)結(jié)構(gòu),為了防止同一臺 worker 節(jié)點在 master 中注冊兩次(worker 節(jié)點掛掉但是迅速恢復(fù)可能會導(dǎo)致此問題)

2> 在客戶端提交任務(wù),這里以客戶端提交方式為例,首先客戶端會啟動 driver 進(jìn)程,然后構(gòu)建Spark Application的運(yùn)行環(huán)境,創(chuàng)建 SparkContext 對象,這會創(chuàng)建并初始化 TaskScheduler 和 DAGScheduler 兩個對象

3> 當(dāng)兩個對象創(chuàng)建完成后,TaskScheduler 會向 master 為 Application 申請資源, Application 的信息會注冊在 master 上的 waitingApps 對象中,waitingApps 使用 ArrayBuffer 存儲數(shù)據(jù)

4> 當(dāng) waitingApps 集合中的元素發(fā)生變化時會回調(diào) schedule() 方法,這時 master 就知道有 Appliacation 在請求執(zhí)行。master 會去讀取 workers 來獲取自己掌握的 worker 節(jié)點,然后在資源充足的 worker 節(jié)點上為 Appliacation 分配資源 -> 通知 worker 節(jié)點啟動Executor 進(jìn)程,Executor 進(jìn)程啟動時會在內(nèi)部初始化一個線程池,用來執(zhí)行 task

–master 采用輪循方式分配資源,確保整個集群的資源得到充分利用,并有利于后面分發(fā) task 時實現(xiàn)數(shù)據(jù)本地化–每一個 worker 節(jié)點上默認(rèn)為 Applacation 啟動 1 個 Executor 進(jìn)程,該 Executor 進(jìn)程默認(rèn)使用 1G 內(nèi)存和該 worker 節(jié)點上空閑的所有的核可通過在提交任務(wù)時使用 - -executor-cores 和 - -executor-memory 來手動指定每個 Executor 使用的資源–spark 采用粗粒度的資源調(diào)度,當(dāng)所有 task 都執(zhí)行完畢后,才進(jìn)行資源回收

5> 當(dāng) Executor 成功啟動后,會去向 TaskScheduler 反向注冊,此時 TaskScheduler 就得到所有成功啟動的 Executor 的信息

6> SparkContext 對象解析代碼構(gòu)建DAG(有向無環(huán)圖)交給 DAGScheduler,每一個 job 會構(gòu)建一個DAG圖,DAGScheduler 根據(jù) DAG 中 RDD 的寬窄依賴將其切分成一個個 stage,每個 stage 中包含一組 task,每個 task 因為都是窄依賴,不會產(chǎn)生 shuffle,所以都是 pipeline(管道) 計算模式

7> DAGScheduler 將一個 stage 封裝到一個 taskSet 中,傳給 TaskScheduler,TaskScheduler拿到后遍歷 taskSet ,得到一個個 task,解讀其要計算的數(shù)據(jù),然后調(diào)用 HDFS 的 API 得到數(shù)據(jù)所在的位置

8> 本著計算向數(shù)據(jù)靠攏的原則,TaskScheduler 將 task 分發(fā)到其所要計算的數(shù)據(jù)所在的節(jié)點的 Executor 進(jìn)程中,task 最后會被封裝到線程池里的一個線程中執(zhí)行,task 執(zhí)行的過程中 TaskScheduler 會對其進(jìn)行監(jiān)控

9> 如果 task 執(zhí)行失敗,TaskScheduler 會進(jìn)行重試,再次分發(fā)該 task ,最多重試3次;

如果 task 陷入掙扎并且 spark 開啟了推測執(zhí)行,TaskScheduler 會換一個節(jié)點分發(fā)陷入掙扎的 task,兩個 task 誰先執(zhí)行完就以誰的結(jié)果為準(zhǔn)

陷入掙扎的判定標(biāo)準(zhǔn):當(dāng)75%的 task 已經(jīng)執(zhí)行完畢后,這時 TaskScheduler 每隔10ms會計算一次剩余 task 當(dāng)前執(zhí)行時間的中值 t,然后以 t 的1.5倍 為標(biāo)準(zhǔn),未執(zhí)行完的 task 當(dāng)前執(zhí)行時間如果大于 t*1.5 則該 task 被判定為陷入掙扎的 task

10> 如果3次重試后 task 依然執(zhí)行失敗,該 task 所在的 stage 就會被判定為失敗,TaskScheduler 會向 DAGScheduler 反饋,DAGScheduler 會重試失敗的 stage,最多重試4次,如果4次重試后該 stage 依然失敗,則該 job 被判定為失敗,程序中止

DAGScheduler 重試 stage 時只會重試 stage 中失敗的 task

11> 當(dāng)所有 task 成功執(zhí)行完畢后或 job 失敗,driver 會通知 master, master 會通知 worker kill 掉 Executor,完成資源回收

以上就是怎么分析spark計算框架,小編相信有部分知識點可能是我們?nèi)粘9ぷ鲿姷交蛴玫降?。希望你能通過這篇文章學(xué)到更多知識。更多詳情敬請關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI