您好,登錄后才能下訂單哦!
要學會和使用一門技術的時候,首先要弄清楚該技術出現(xiàn)的背景和要解決的問題。要說spark首先要了解海量數(shù)據(jù)的處理和Hadoop技術。
一個系統(tǒng)在運行的過程中都會產(chǎn)生許多的日志數(shù)據(jù),這些日志數(shù)據(jù)包含但不局限我們平常開發(fā)中使用log4j或者logback生成的記錄系統(tǒng)運行情況的日志。例如對于網(wǎng)絡服務提供商,他們的設備可能會記錄著用戶上下線時間,訪問的網(wǎng)頁地址,響應時長等數(shù)據(jù),這些數(shù)據(jù)文件里面記錄的某些信息經(jīng)過抽取分析后可以得出許多的指標信息,從而為改善網(wǎng)絡結構和提高服務等提供數(shù)據(jù)依據(jù)。但這些數(shù)據(jù)會很大,使用一般的技術和方案將難以達到分析的目的,于是一種全新的處理海量數(shù)據(jù)的計算模型和框架的出現(xiàn)變得迫在眉睫。
處理海量數(shù)據(jù)要解決的第一個問題便是存儲。我們需要將收集來的日志文件存放在某個地方便于后面的數(shù)據(jù)分析??墒且慌_機器的容量始終十分的有限,隨著數(shù)據(jù)的增漲我們也不可能無限的擴展一臺服務器的存儲能力,所以我們需要將收集的數(shù)據(jù)存放在許多的機器上,并通過某種方案進行統(tǒng)一管理。
處理海量數(shù)據(jù)要解決的第二個問題便是計算。一臺服務器的計算能力是有限的,它會直接受限于CPU和內(nèi)存。隨著數(shù)據(jù)量的增大,我們也不能無限的擴展他們,所以同數(shù)據(jù)存儲一樣,我們也需要利用多臺機器的計算能力來一起完成運算的工作。每個計算機都是一個獨立的個體,他們之間運行的代碼本身是無關聯(lián)關系的,我們也需要某種方案來協(xié)調各個計算機的執(zhí)行,讓其成為邏輯上一臺超級超級的計算機。
基于GSF(Google的文件系統(tǒng))的思想也開發(fā)了一個Hadoop使用的分布式文件系統(tǒng)HDFS。HDFS是基于計算機本地文件系統(tǒng)的分布式文件系統(tǒng),也就是說HDFS將文件直接存放于計算機的本地文件系統(tǒng)之上(當然我們是無法直接查看文件里面的內(nèi)容的)。
HDFS解決了上面提到的數(shù)據(jù)的存儲問題。一般情況每個計算機上只會有一個管理本地數(shù)據(jù)的DataNode進程(該計算機稱為DataNode節(jié)點),DataNode進程與主控節(jié)點上的NameNode進程通信(該節(jié)點稱為NameNode節(jié)點),以完成數(shù)據(jù)塊狀態(tài)的報告和發(fā)送心跳信號等。NameNode是一個中心服務器,負責管理維護文件系統(tǒng)的名字空間(namespace)以及客戶端對文件的訪問。
注:名字空間(Namespace)即文件系統(tǒng)文件目錄的組織方式,是文件系統(tǒng)的重要組成部分,為用戶提供可視化的、可理解的文件系統(tǒng)視圖,從而解決或降低人類與計算機之間在數(shù)據(jù)存儲上的語義間隔。目前樹狀結構的文件系統(tǒng)組織方式與現(xiàn)實世界的組織結構最為相似,被人們所廣泛接受。因此絕大多數(shù)的文件系統(tǒng)皆以Tree方式來組織文件目錄,包括各種磁盤文件系統(tǒng)(EXTx, XFS, JFS, Reiserfs, ZFS, Btrfs, NTFS, FAT32等)、網(wǎng)絡文件系統(tǒng)(NFS,AFS, CIFS/SMB等)、集群文件系統(tǒng)(Lustre, PNFS, PVFS, GPFS, PanFS等)、分布式文件系統(tǒng)(GoogleFS,HDFS, MFS, KFS, TaobaoFS, FastDFS等)。
接著我們說說用于大數(shù)據(jù)批處理分析的并行計算框Map/Reduce。該框架把數(shù)據(jù)的處理分為兩個獨立的Map和Reduce階段,并分別對應兩個方法map和reduce:
/* @key 由于框架需要序列化key和根據(jù)key來排序,所有該key類型必須實現(xiàn)WritableComparable接口 @value 這就是具體的某行數(shù)據(jù),獲取前面?zhèn)€map傳遞過來的value,由于需要序列化所以需要實現(xiàn)Writable接口 @out 將映射后的鍵值對數(shù)據(jù)的接口,調用該接口的collect(WritableComparable,Writable)方法分別傳入key和value即可 @reporter應用程序可以使用Reporter報告進度,設定應用級別的狀態(tài)消息,更新Counters(計數(shù)器),或者僅是表明自己運行正常 */ map(WritableComparable key, Writable value, OutputCollector out , Reporter reporter)
/* @key 上個階段(map)輸出的key @values 上個階段已經(jīng)排序好的輸出(因為不同 mapper 的輸出中可能會有相同的 key) */ reduce(WritableComparable key, Iterator values, OutputCollector out, Reporter report) |
在Map階段,Hadoop框架首先從HDFS上指定路徑下獲取要處理的文件,然后對該文件進行分片處理(InputSplit),然后每個分片使用一個Map task來處理。Hadoop框架在調用map方法前,會使用InputFormat類型的對象來處理數(shù)據(jù)的分片操作,針對每個分片(InputSplit)會創(chuàng)建一個RecordReader類型的對象來讀取每個分片的內(nèi)容從而調用map方法來處理數(shù)據(jù)。
InputFormat類型將文件從邏輯上切分為片,每個片記錄了數(shù)據(jù)的偏移量和大小等信息,但分片操作會把本是一行的數(shù)據(jù)切分到兩個甚至多個片中,這么一來后面處理的數(shù)據(jù)就是錯誤的。這是RecordReader需要解決的問題,以LineRecordReader為例,如果某個分片是文件的第一個分片,那么從第一個字節(jié)開始讀取,否則從分片的第二行開始讀??;如果某個分片是文件的最后一個分片,那么讀完本分片的數(shù)據(jù)即可,否則獲取下一個分片的第一行數(shù)據(jù)結束。這么一來,對于以行分割的數(shù)據(jù)就可以保證每次讀取的行都是完整的。
以LineRecordReader為例,LineRecordReader讀取分片中的每行數(shù)據(jù),然后以鍵值序列對(key-value)的形式來調用map函數(shù),此時的key為該行數(shù)據(jù)的偏移量,value為該行數(shù)據(jù)。
Hadoop框架將每次執(zhí)行map函數(shù)的返回值先放入一個緩沖區(qū),當緩存區(qū)的使用量達到指定的閾值后,開使用一個線程來將這部分數(shù)據(jù)溢出到一個臨時文件當中。在溢出前會對這些要溢出的數(shù)據(jù)先做幾個操作:
1, 通過partitioner操作根據(jù)key來進行分區(qū),確定某個數(shù)據(jù)歸屬于哪個reducer來處理
2, 對數(shù)據(jù)根據(jù)key來排序
3, 根據(jù)key對數(shù)據(jù)進行合并(用戶根據(jù)需要指定)
以上步驟完成后將數(shù)據(jù)溢出到一個臨時文件。當處理完某個分片后,可能會生成許多個這樣的溢出文件,接著需要對溢出文件進行合并生成一個完整的文件(該完整指的是該分片要處理的那部分數(shù)據(jù))。在合并的時候也需要對數(shù)據(jù)進行排序和合并操作,由于文件可能很大,不能一次載入到內(nèi)存進行排序操作,所以這里用到了外排序。但最終生成的文件里面的數(shù)據(jù)是經(jīng)過分區(qū)分組,排序后的。
到此Map階段結束,接著要進入的是Reduce階段。在真正調用reduce方法之前,有一個shuffle階段需要預處理。在每個map task結束后,Reduce task都會得到通知,并將自己要處理的數(shù)據(jù)的位置信息保存到mapLocations中,然后對數(shù)據(jù)經(jīng)過過濾去重后保存在scheduledCopies中,接著由幾個線程并行的拷貝數(shù)據(jù),并進行排序合并操作。
最后就是通過調用reduce方法來處理合并的數(shù)據(jù),并將結果輸出到HDFS即可。
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。