您好,登錄后才能下訂單哦!
spark的RDD以及代碼實操是怎樣進行的,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。
??在開始學習Spark工作原理之前, 先來介紹一下Spark中兩個最為重要的概念-- 彈性分布式數(shù)據(jù)集(Resilient Distributed Datasets, RDD) 和算子(Operation).
RDD背景
??Spark的核心是建立在RDD之上, 使Spark中的各個組件可以無縫進行集成, 從而在一個應用程序中完成大數(shù)據(jù)計算. 這也是為什么說在SparkCore中一切得計算都是基于RDD來完成的. RDD的設計理念源自AMP實驗室發(fā)表的論文–Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing.
??MapReduce計算框架在實際應用中, 許多迭代式算法和交互式數(shù)據(jù)挖掘過程中的計算結(jié)果會寫到磁盤, 然后再重復使用, 這就帶來了大量的磁盤IO和序列化開銷. 為解決中間過程數(shù)據(jù)落地花費大量時間的需求, 出現(xiàn)了一種抽象的數(shù)據(jù)結(jié)構, 讓我們不必再考慮數(shù)據(jù)的分布式特性, 只需保存具體的邏輯轉(zhuǎn)換表達式即可, 這種數(shù)據(jù)結(jié)構就是RDD.
??RDD之間的轉(zhuǎn)換操作使父子RDD之間具有依賴關系, 滿足條件的RDD之間形成管道(Pipeline), 從而避免中間結(jié)果落地, 極大的降低了磁盤IO和序列化消耗的時間.
RDD介紹
??RDD(彈性分布式數(shù)據(jù)集), 雖然叫做數(shù)據(jù)集, 但RDD并不像集合一樣存儲真實的數(shù)據(jù), 而是存儲這些數(shù)據(jù)轉(zhuǎn)換的邏輯, 可以將RDD理解為一個大的數(shù)據(jù)集合以分布式的形式保存在集群服務器的內(nèi)存中. 每個RDD可以分成多個分區(qū), 每個分區(qū)就是一個數(shù)據(jù)集片段, 并且一個RDD的不同分區(qū)可以被保存到集群中不同的節(jié)點上(但是同一個分區(qū)不能被拆分保存), 從而可以在集群中的不同節(jié)點上進行并行計算.
??RDD提供了一種高度受限的共享內(nèi)存模型, 即RDD是只讀的記錄分區(qū)的集合, 不能直接修改, 只能基于穩(wěn)定的物理存儲中的數(shù)據(jù)集來創(chuàng)建RDD, 或者通過在其他RDD上執(zhí)行轉(zhuǎn)換操作(如map、join和groupBy) 創(chuàng)建得到新的RDD.
Operation介紹
??算子(Operation)是Spark中定義的函數(shù), 用于對RDD中的數(shù)據(jù)結(jié)構進行操作和轉(zhuǎn)換等. Spark中的算子可以分為4類:
創(chuàng)建類(creation)算子, 用于將內(nèi)存中的集合或外部文件創(chuàng)建為RDD.
轉(zhuǎn)換(transformation)算子, 用于將一種格式的RDD轉(zhuǎn)換為其他自定義格式.
緩存(cache)算子, 用于將RDD緩存在內(nèi)存(memory)或磁盤(disk)中, 一般后續(xù)計算會用到重復數(shù)據(jù)時才會使用.
行動(action)算子, 用于觸發(fā)執(zhí)行Spark作業(yè), 并將計算結(jié)果保存為集合, 標量或保存到外部文件, 數(shù)據(jù)庫中.
??典型的RDD執(zhí)行過程如下:
讀入外部數(shù)據(jù)源(或者內(nèi)存中的集合) ,然后Create RDD;
RDD經(jīng)過一系列Transformation, 每一次都會產(chǎn)生不同的RDD, 供給下一個Transformation 使用;
最后一個RDD經(jīng)Action進行處理, 得到最后想要的值, 并進行后續(xù)輸出操作.
??需注意: RDD采用惰性調(diào)用, 即在RDD的執(zhí)行過程中, 如圖所示, 真正的計算發(fā)生在RDD的Action操作, 對于Action之前的所有Transformation操作, Spark只是記錄下Transformation操作應用的一些基礎數(shù)據(jù)集以及RDD生成的軌跡, 即相互之間的依賴關系, 而不會觸發(fā)真正的計算.
??RDD提供的轉(zhuǎn)換接口都非常簡單, 都是類似map, filter, groupBy, join等粗粒度的數(shù)據(jù)轉(zhuǎn)換操作, 而不是針對某個數(shù)據(jù)項的細粒度修改. 因此, RDD比較適合對于數(shù)據(jù)集中元素執(zhí)行相同操作的批處理式應用, 而不適合用于需要異步/細粒度狀態(tài)的應用, 比如Web應用系統(tǒng), 增量式的網(wǎng)頁爬蟲等.
??轉(zhuǎn)換和行動兩種類型的算子, 前者指定RDD之間的相互依賴關系, 后者用于執(zhí)行計算并指定輸出的形式. 兩類操作的主要區(qū)別是, 轉(zhuǎn)換操作接受RDD并返回RDD, 而行動操作(如count、collect等) 接受RDD但是返回非RDD(即輸出一個值或結(jié)果).
RDD五大特性
RDD是由一系列的Partition(分區(qū))組成;
每一個函數(shù)作用在每一個分區(qū)上;
RDD之間存在依賴關系;
[可選項]分區(qū)器作用在KV格式的RDD上;
[可選項]RDD會提供最佳計算位置.
??接下來, 結(jié)合Spark實現(xiàn)的WC案例, 來理解這五個特性以及其他注意點(圖中綠色為block塊, 藍色為Partition分區(qū)):
HDFS存儲文件是以block塊的形式, Spark應用在讀取HDFS上的數(shù)據(jù)后, 會將同一個block塊中的數(shù)據(jù)轉(zhuǎn)換邏輯保存在同一個Partition中, 一個文件對應的所有Partition構成一個RDD. 即一個RDD中的Partition個數(shù)等于這個文件存儲在HDFS中的block個數(shù). 但有一個例外, 如果一個block塊的最后存儲了某個數(shù)據(jù)的大部分字節(jié)后達到block規(guī)定的大小, 僅有少量字節(jié)存儲在另外一個block塊中, 這時這多余的小部分數(shù)據(jù)會放在與大部分數(shù)據(jù)相同的Partition中, 即Partition數(shù)小于block塊數(shù).
Spark中沒有讀文件的方法, 但Spark依然能夠讀取文件內(nèi)容依賴的是MapReduce中讀文件的方法. MR讀文件前, 會先將文件劃分為一個個的split(切片), 一個split的大小 = 一個block的大小; 但這個文件的split個數(shù) ≈ 存儲這個文件的block個數(shù)(同上一個例外情況); 一個RDD中Partition的個數(shù) = 這個文件切分的split個數(shù).
每一個函數(shù)作用在每一個分區(qū)上, 即每個函數(shù)會在每一個Partition中各執(zhí)行一次.
RDD之間存在依賴關系, 通過一個算子關聯(lián)的兩個RDD稱為父子RDD, 父子RDD之間存在寬窄依賴(后續(xù)講解), 子RDD知道它的父RDD是誰, 但父RDD不知道它的子RDD有誰. 這種依賴關系的優(yōu)勢在于當數(shù)據(jù)因某種情形丟失時, 可以通過算子和父RDD重寫計算出子RDD, 從而提高了計算的容錯性. (RDD的依賴關系也被稱為RDD的血統(tǒng)–Lineage)
KV格式的RDD指RDD中的數(shù)據(jù)是二元組類型, 對于這類RDD可以使用分區(qū)器按照Key或者Value進行分組, 進而完成聚合計算. 在WC中, pairRDD和restRDD均為KV格式的RDD. 分區(qū)器用于決定數(shù)據(jù)被放到哪一個reduce task中處理.
每一個算子作用在每一個Partition上, Partition會分布式的存儲在集群各個節(jié)點的內(nèi)存中, 對一個Partition的連續(xù)處理可以看作是一個task任務, 每一個task計算任務都在數(shù)據(jù)所在節(jié)點上執(zhí)行, 從而實現(xiàn)數(shù)據(jù)本地化, 減少網(wǎng)絡IO. 簡單來說, RDD會提供一個方法接口, 調(diào)用這個接口就能直接拿到這個RDD所有Partition的位置, 拿到位置之后就可以分發(fā)task了. 至于這個接口是什么不需要我們關心, Spark應用在執(zhí)行時會自動尋找.
實際操作:
案例說明
??大數(shù)據(jù)分析處理萬變不離其宗, 核心思想就是一個WorldCount–單詞統(tǒng)計. 單詞統(tǒng)計, 顧名思義就是將一個文件中出現(xiàn)的所有單詞讀一遍, 并對相同單詞的個數(shù)進行統(tǒng)計. 如何處理這個文件? 如何得到每一個單詞? 如何對相同的單詞進行統(tǒng)計? 這三個問題是需要解決的核心問題, 接下來就一起來看看是如何對一個文件進行WordCount的.
??首先, 來看一下我們測試的數(shù)據(jù), 在這匹數(shù)據(jù)中, 同一行中每個單詞之間使用制表符’\t’ 來分隔, 接下來我們先對這批數(shù)據(jù)的計算思想進行解析, 然后再分別使用MapReduce和Spark技術的API編碼實現(xiàn).
??通過對這兩種技術編碼的比較, 可以幫助大家更好的理解之前所說的Spark在表達能力上相較于Hadoop(MR)的優(yōu)勢 Spark優(yōu)勢鏈接. 除此之外, 更重要的一點是引入SparkCore中彈性分布式數(shù)據(jù)集(RDD) 的概念, 對RDD有一定認識之后, 將有利于學習RDD的具體原理以及如何使用等知識.
??在Spark中, 一切計算都是基于RDD實現(xiàn)的, RDD可以看作是一個集合, 類似于Scala中的List, Map, 它有著與這些普通集合相同的方法(map, flatmap, foreach…), 但是RDD是重新寫的這些方法, 初次之外還有許多其他的方法, 這些方法在Spark中稱為算子, 之后的博客中會對它們進行詳細介紹.
計算分析
無論是MapReduce還是Spark, 在讀取數(shù)據(jù)時都是一行一行讀取的而且讀取的數(shù)據(jù)都是字符串類型, 因此在處理時要把一行數(shù)據(jù)看成一條記錄;
既然一行是一條記錄, 那么我們在處理時只需要關注這一條記錄即可, 其余記錄格式與之相同, 不相同格式的數(shù)據(jù)一般為臟數(shù)據(jù), 需要過濾掉. 相同格式的按照規(guī)律進行切分(split).
數(shù)據(jù)切分完成后, 就可以得到每一個單詞, 然后將每一個單詞當作key, 把它的value置為1, 得到一些列KV格式的數(shù)據(jù), 這些數(shù)據(jù)中有的key相同, 有的key不同, 但value都是1.
對這一系列KV格式的數(shù)據(jù)進行統(tǒng)計, 先按照Key進行分組, 相同Key, 即同一個單詞為一組, 這個Key對應多個Value, 構成一個有一個或多個元素1組成的集合. 然后再將同一個Key中所有的Value進行累加, 累加完成之后將累加值最為新的Value, Key還是原來的Key.
最新的KV格式的數(shù)據(jù)中, Key代表的是出現(xiàn)的每一個單詞, Value則對應該單詞出現(xiàn)的次數(shù).
??圖解:
————————————————
代碼實現(xiàn)
在SparkCore中一切得計算都是基于RDD(彈性分布式數(shù)據(jù)集), R(Resilient) D(Distributed ) D(Dataset). RDD 調(diào)用的方法稱為算子,一般情況下RDD的算子返回的還是RDD. 先對RDD有個大概的了解, 之后再對其進行詳細地介紹.
??準備環(huán)境:
Scala運行環(huán)境
導入jar包, 開發(fā)Spark應用程序時, 只需要導入一個整合包即可.
??用Spark寫WC:
package com.hpe.spark.core
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object WCSpark {
def main(args: Array[String]): Unit = {
//創(chuàng)建配置對象
val conf = new SparkConf()
//設置App的名稱-->方便在監(jiān)控頁面找到
conf.setAppName("WCSpark")
//設置Spark的運行模式-->local本地運行-->用于測試環(huán)境
conf.setMaster("local")
//創(chuàng)建Spark上下文 他是通往集群的唯一通道
val sc = new SparkContext(conf)
// textFile()讀取上述數(shù)據(jù),讀取時是一行行讀取,可以是本地也可是HDFS的數(shù)據(jù),返回RDD類型的數(shù)據(jù)
val lineRDD = sc.textFile("d:/wc.txt")
// 基于lineRDD中的數(shù)據(jù)按照\t進行分詞
val wordRDD = lineRDD.flatMap { _.split("\t") }
// 將wordRDD中的每一條數(shù)據(jù)封裝成一個二元組,每一個單詞計數(shù)為1 pairRDD[(K:word V:1)]
val pairRDD = wordRDD.map { (_,1) }
// 將pairRDD中相同的單詞分為一組,對組內(nèi)的數(shù)據(jù)進行累加
val restRDD = pairRDD.reduceByKey((v1,v2)=>v1+v2)
//可簡寫為:val restRDD = pairRDD.reduceByKey(_+_)
// 根據(jù)單詞出現(xiàn)的次數(shù)來排序,sortBy():根據(jù)指定字段來排序,false:指定為降序;
// foreach對RDD中排好序的數(shù)據(jù)進行遍歷
restRDD
.sortBy(x=>x._2, false)
.foreach(println)
//一直啟動,為查看而寫
while(true){}
//釋放資源
sc.stop()
}
}
??但從代碼的編寫上來看, 不難發(fā)現(xiàn), Spark的表達能力著實比MR強, 上述代碼中間處理部分其實還可以更加簡潔:
val lineRDD = sc.textFile("d:/wc.txt") .flatMap { _.split("\t") } .map { (_,1) } .reduceByKey(_+_) .sortBy(_._2, false) .foreach(println)
??MR中復雜的程序, 在Spark中了了幾行就可以輕松解決, 既可以看出Scala語言的靈活性, 又表現(xiàn)了Spark超強的表達能力, 因此Spark在計算上逐漸取代MR.
??這里最后一句while(true){} ,讓程序一直執(zhí)行, 可以在WebUI的監(jiān)控頁面http://localhost:4040進行查看。
看完上述內(nèi)容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業(yè)資訊頻道,感謝您對億速云的支持。
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。