溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

spark(四):shuffle

發(fā)布時間:2020-06-01 10:49:05 來源:網(wǎng)絡(luò) 閱讀:556 作者:afeiye 欄目:大數(shù)據(jù)

shuflle write

spark(四):shuffle

  1. 上圖有 4 個 ShuffleMapTask 要在同一個 worker node 上運行,CPU core 數(shù)為 2,可以同時運行兩個 task。
  2. 在一個 core 上連續(xù)執(zhí)行的 ShuffleMapTasks 可以共用一個輸出文件 ShuffleFile。先執(zhí)行完的 ShuffleMapTask 形成 ShuffleBlock i,后執(zhí)行的 ShuffleMapTask 可以將輸出數(shù)據(jù)直接追加到 ShuffleBlock i 后面,形成 ShuffleBlock i',每個 ShuffleBlock 被稱為 FileSegment。

shuflle read

  1. 在什么時候 fetch 數(shù)據(jù)?當(dāng) parent stage 的所有 ShuffleMapTasks 結(jié)束后再 fetch。
  2. 邊 fetch 邊處理還是一次性 fetch 完再處理?邊 fetch 邊處理。使用可以 aggregate 的數(shù)據(jù)結(jié)構(gòu),比如 HashMap,每 shuffle 得到(從緩沖的 FileSegment 中 deserialize 出來)一個 <Key, Value> record,直接將其放進(jìn) HashMap 里面。如果該 HashMap 已經(jīng)存在相應(yīng)的 Key,那么直接進(jìn)行 aggregate 也就是 func(hashMap.get(Key), Value)
  3. fetch 來的數(shù)據(jù)存放到哪里?剛 fetch 來的 FileSegment 存放在 softBuffer 緩沖區(qū),經(jīng)過處理后的數(shù)據(jù)放在內(nèi)存 + 磁盤上。
  4. 怎么獲得要 fetch 的數(shù)據(jù)的存放位置?reducer 在 shuffle 的時候是要去 driver 里面的 MapOutputTrackerMaster 詢問 ShuffleMapTask 輸出的數(shù)據(jù)位置的。每個 ShuffleMapTask 完成時會將 FileSegment 的存儲位置信息匯報給MapOutputTrackerMaster。

Shuffle read 中的 HashMap

ashMap 是 Spark shuffle read 過程中頻繁使用的、用于 aggregate 的數(shù)據(jù)結(jié)構(gòu)。Spark 設(shè)計了兩種:一種是全內(nèi)存的 AppendOnlyMap,另一種是內(nèi)存+磁盤的 ExternalAppendOnlyMap。
spark(四):shuffle

  1. 類似 HashMap,但沒有remove(key)方法。其實現(xiàn)原理很簡單,開一個大 Object 數(shù)組,藍(lán)色部分存儲 Key,白色部分存儲 Value。
  2. 如果 Array 的利用率達(dá)到 70%,那么就擴(kuò)張一倍,并對所有 key 進(jìn)行 rehash 后,重新排列每個 key 的位置。
    spark(四):shuffle
  3. ExternalAppendOnlyMap 持有一個 AppendOnlyMap,shuffle 來的一個個 (K, V) record 先 insert 到 AppendOnlyMap 中,insert 過程與原始的 AppendOnlyMap 一模一樣。
  4. 如果 AppendOnlyMap 快被裝滿時檢查一下內(nèi)存剩余空間是否可以夠擴(kuò)展,夠就直接在內(nèi)存中擴(kuò)展,不夠就 sort 一下 AppendOnlyMap,將其內(nèi)部所有 records 都 spill 到磁盤上。
  5. 每次 spill 完在磁盤上生成一個 spilledMap 文件,然后重新 new 出來一個 AppendOnlyMap。
  6. 最后一個 (K, V) record insert 到 AppendOnlyMap 后,表示所有 shuffle 來的 records 都被放到了 ExternalAppendOnlyMap 中,但不表示 records 已經(jīng)被處理完,因為每次 insert 的時候,新來的 record 只與 AppendOnlyMap 中的 records 進(jìn)行 aggregate,并不是與所有的 records 進(jìn)行 aggregate(一些 records 已經(jīng)被 spill 到磁盤上了)。因此當(dāng)需要 aggregate 的最終結(jié)果時,需要對 AppendOnlyMap 和所有的 spilledMaps 進(jìn)行全局 merge-aggregate。
  7. 全局 merge-aggregate 的流程:先將 AppendOnlyMap 中的 records 進(jìn)行 sort,形成 sortedMap。
  8. 然后分別從 sortedMap 和各個 spilledMap 讀出一部分?jǐn)?shù)據(jù)(StreamBuffer)放到 mergeHeap 里面。StreamBuffer 里面包含的 records 需要具有相同的 hash(key)
  9. mergeHeap 顧名思義就是使用堆排序不斷提取出 hash(firstRecord.Key) 相同的 StreamBuffer,并將其一個個放入 mergeBuffers 中,放入的時候與已經(jīng)存在于 mergeBuffers 中的 StreamBuffer 進(jìn)行 merge-combine

在Sort Based Shuffle的Shuffle Write階段,map端的任務(wù)會按照Partition id以及key對記錄進(jìn)行排序。同時將全部結(jié)果寫到一個數(shù)據(jù)文件中,同時生成一個索引文件,reduce端的Task可以通過該索引文件獲取相關(guān)的數(shù)據(jù)。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI