溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

spark(一):spark概覽及邏輯執(zhí)行圖

發(fā)布時(shí)間:2020-04-03 14:56:16 來(lái)源:網(wǎng)絡(luò) 閱讀:856 作者:afeiye 欄目:大數(shù)據(jù)

spark(一):spark概覽及邏輯執(zhí)行圖
上圖是spark框架概要圖,spark一些重要概念先簡(jiǎn)要介紹一下:

  1. cluster manager:資源管理集群,比如standalone、yarn;
  2. application:用戶編寫(xiě)的應(yīng)用程序;
  3. Driver:application中的main函數(shù),創(chuàng)建的SparkContext負(fù)責(zé)與cluster manager通信,進(jìn)行資源的申請(qǐng)、任務(wù)的分配與監(jiān)控。一般認(rèn)為SparkContext就是Driver;
  4. worker:集群中可以運(yùn)行任務(wù)的節(jié)點(diǎn);
  5. executor:worker上運(yùn)行任務(wù)的進(jìn)程,負(fù)責(zé)執(zhí)行task;
  6. task:被executor執(zhí)行的最小單位,一個(gè)stage由多個(gè)task組成;
  7. stage:一個(gè)job中的多個(gè)階段,一般只要發(fā)生shuffle就會(huì)切分一個(gè)stage;
  8. job:一個(gè)application至少有一個(gè)job,spark只要有一個(gè)action就會(huì)產(chǎn)生一個(gè)job。

spark邏輯執(zhí)行圖的四個(gè)概要步驟

  1. 從數(shù)據(jù)源創(chuàng)造初始RDD;
  2. 對(duì)RDD進(jìn)行一系列transformation操作,生成新的RDD[T],其中類(lèi)型T可以是scala中的基本數(shù)據(jù)類(lèi)型,也可以是<k, v>,如果是<k, v>那么k不能是復(fù)雜數(shù)據(jù)結(jié)構(gòu);
  3. 對(duì)最后的final RDD進(jìn)行action操作,每個(gè)partition產(chǎn)生result;
  4. 將result回送到Driver端,進(jìn)行最后的計(jì)算。

邏輯執(zhí)行圖的生成

  1. 如何產(chǎn)生RDD,應(yīng)該產(chǎn)生哪些RDD
    一般每個(gè)transformation方法都會(huì)返回一個(gè)RDD,有些transformation還會(huì)有一些子transformation,因此可能產(chǎn)生多于一個(gè)的RDD;
  2. RDD的依賴關(guān)系
    RDD依賴哪些父RDD比較簡(jiǎn)單,從代碼中可以直觀看到;
    RDD中有多少個(gè)partition呢?這個(gè)一般是用戶指定,如果未指定的話,會(huì)去父RDD中partition數(shù)最多的那個(gè);
    RDD和父RDD的partitions之間是怎么依賴的呢?
    spark(一):spark概覽及邏輯執(zhí)行圖
    上圖前三種是窄依賴,最后一個(gè)是寬依賴。窄依賴一般也叫完全依賴,就是說(shuō)父RDD中partition的全部數(shù)據(jù)都被子RDD特定的partition依賴;寬依賴一般也叫部分依賴,就是說(shuō)父RDD中某個(gè)partition的一部分?jǐn)?shù)據(jù)被子RDD的partition1所依賴,而另一部分?jǐn)?shù)據(jù)被子RDD的partition2所依賴,這種情況就要發(fā)生shuflle。
    一般認(rèn)為父RDD的所有partition只要不被子RDD的多個(gè)partition依賴就屬于窄依賴,就不會(huì)發(fā)生shuffle,但是存在特殊情況就是第三種情況:父RDD的partition被子RDD的多個(gè)partition依賴,但依然不需要發(fā)生shuffle(一般笛卡爾積是這種情況)。

常用transformation簡(jiǎn)介

  1. union:將兩個(gè)RDD合并,不改變partition里的數(shù)據(jù)
  2. groupByKey:將相同key的records聚合在一起,聚合后的每條對(duì)應(yīng)的value為原來(lái)所有相同的key的value組成的數(shù)組。(默認(rèn)不會(huì)再map端開(kāi)啟conbine)
  3. reduceByKey:相當(dāng)于傳統(tǒng)的MR,對(duì)相同key的value做出一定函數(shù)處理,得出最后一個(gè)value,比如reduceByKey(+)就會(huì)相同的key的value不斷相加。
    spark(一):spark概覽及邏輯執(zhí)行圖
    reduceByKey() 默認(rèn)在 map 端開(kāi)啟 combine(),因此在 shuffle 之前先通過(guò) mapPartitions 操作進(jìn)行 combine,得到 MapPartitionsRDD,然后 shuffle 得到 ShuffledRDD,然后再進(jìn)行 reduce(通過(guò) aggregate + mapPartitions() 操作來(lái)實(shí)現(xiàn))得到 MapPartitionsRDD。
  4. distinct:去重,這個(gè)transformation內(nèi)部會(huì)先把value轉(zhuǎn)出<k, ->形式的rdd,然后進(jìn)行依次reduceByKey,最后再還原。
  5. cogroup(otherRdd, numPartitions):類(lèi)似groupByKey,不過(guò)這個(gè)聚合兩個(gè)或兩個(gè)以上的RDD,產(chǎn)生的結(jié)果也不太一樣,是每個(gè)RDD自己內(nèi)部相同的key對(duì)應(yīng)的value先聚合成一個(gè)數(shù)組,然后兩個(gè)rdd相同key對(duì)應(yīng)的數(shù)組再聚合成一個(gè)二維數(shù)組,類(lèi)似于[(a, c), (f)]這樣。
  6. intersection(otherRdd):抽取兩個(gè)rdd的公共數(shù)據(jù),內(nèi)部會(huì)想distinct那樣先把value轉(zhuǎn)為<k, ->形式,之后調(diào)用cogroup,最后把有相同key的留下并還原。
  7. join(otherRdd):將兩個(gè) RDD[(K, V)] 按照 SQL 中的 join 方式聚合在一起。與 intersection() 類(lèi)似,首先進(jìn)行 cogroup(),得到<K, (Iterable[V1], Iterable[V2])>類(lèi)型的 MappedValuesRDD,然后對(duì) Iterable[V1] 和 Iterable[V2] 做笛卡爾集,并將集合 flat() 化。spark(一):spark概覽及邏輯執(zhí)行圖
  8. sortByKey:將 RDD[(K, V)] 中的 records 按 key 排序,ascending = true 表示升序,false 表示降序。
  9. cartesion:spark(一):spark概覽及邏輯執(zhí)行圖
    笛卡爾積就是上面提到的父RDD的partition被子RDD的多個(gè)partition依賴,但依然不需要發(fā)生shuffle的情況。
  10. coalesce:當(dāng) shuffle = false 的時(shí)候,是不能增加 partition 個(gè)數(shù)的
  11. filterByRange(lower: K, upper: K):以RDD中元素key的范圍做過(guò)濾,包含lower和upper上下邊界

spark常見(jiàn)action操作

  1. reduce(func):使用傳入的函數(shù)參數(shù) func 對(duì)數(shù)據(jù)集中的元素進(jìn)行匯聚操作 (兩兩合并).
  2. collect():在 driver program 上將數(shù)據(jù)集中的元素作為一個(gè)數(shù)組返回. 這在執(zhí)行一個(gè) filter 或是其他返回一個(gè)足夠小的子數(shù)據(jù)集操作后十分有用.
  3. count():返回?cái)?shù)據(jù)集中的元素個(gè)數(shù)
  4. first():返回?cái)?shù)據(jù)集中的第一個(gè)元素 (與 take(1) 類(lèi)似)
  5. take(n):返回?cái)?shù)據(jù)集中的前 n 個(gè)元素
  6. takeOrdered(n, [ordering]):以其自然序或使用自定義的比較器返回 RDD 的前 n 元素
  7. saveAsTextFile(path):數(shù)據(jù)集中的元素寫(xiě)入到指定目錄下的一個(gè)或多個(gè)文本文件中, 該目錄可以存在于本地文件系統(tǒng), HDFS 或其他 Hadoop 支持的文件系統(tǒng).
  8. countByKey():僅適用于 (K, V) 類(lèi)型的 RDD. 返回每個(gè) key 的 value 數(shù)的一個(gè) hashmap (K, int) pair.
  9. foreach(func):對(duì)數(shù)據(jù)集中的每個(gè)元素執(zhí)行函數(shù) func.
向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI