您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“Hadoop中JOB怎么實(shí)現(xiàn)提交任務(wù)”的有關(guān)知識,在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
百度百科:MapReduce是一種編程模型,用于大規(guī)模數(shù)據(jù)集(大于1TB)的并行運(yùn)算。概念"Map(映射)"和"Reduce(歸約)",和他們的主要思想,都是從函數(shù)式編程語言里借來的,還有從矢量編程語言里借來的特性。它極大地方便了編程人員在不會(huì)分布式并行編程的情況下,將自己的程序運(yùn)行在分布式系統(tǒng)上。 當(dāng)前的軟件實(shí)現(xiàn)是指定一個(gè)Map(映射)函數(shù),用來把一組鍵值對映射成一組新的鍵值對,指定并發(fā)的Reduce(歸約)函數(shù),用來保證所有映射的鍵值對中的每一個(gè)共享相同的鍵組。至于什么是函數(shù)式編程語言和矢量編程語言,自己也搞得不太清楚,見解釋鏈接:
http://www.cnblogs.com/kym/archive/2011/03/07/1976519.html.
自己的理解:MapReduce是一種分布式計(jì)算模型,由Google提出,主要用于搜索領(lǐng)域,解決海量數(shù)據(jù)的計(jì)算問題.當(dāng)你向MapReduce 框架提交一個(gè)計(jì)算作業(yè)時(shí),它會(huì)首先把計(jì)算作業(yè)拆分成若干個(gè)Map 任務(wù),然后分配到不同的節(jié)點(diǎn)上去執(zhí)行,每一個(gè)Map 任務(wù)處理輸入數(shù)據(jù)中的一部分,當(dāng)Map 任務(wù)完成后,它會(huì)生成一些中間文件,這些中間文件將會(huì)作為Reduce 任務(wù)的輸入數(shù)據(jù)。Reduce 任務(wù)的主要目標(biāo)就是把前面若干個(gè)Map 的輸出匯總到一起并輸出.就是說HDFS已經(jīng)為我們提供了高性能、高并發(fā)的服務(wù),但是并行編程可不是所有程序員都玩得轉(zhuǎn)的活兒,如果我們的應(yīng)用本身不能并發(fā),那Hadoop的HDFS也都是沒有意義的。MapReduce的偉大之處就在于讓不熟悉并行編程的程序員(比如像我這的)也能充分發(fā)揮分布式系統(tǒng)的威力。這里說明以下:Hadoop本身這個(gè)框架就是洋人基于洋人公司谷歌的三大論文GFS,BigTable,MapReduce(編程模型),用Java語言實(shí)現(xiàn)的框架.谷歌它就用的C++實(shí)現(xiàn),而MapReduce編程模型(是高度抽象的)大體離不開下面這張圖.Spark并行運(yùn)算框架(和Hadoop的MapReduce)的不同點(diǎn):在于它將中間結(jié)果即map函數(shù)結(jié)果直接放入內(nèi)存中,而不是放入本地磁盤的HDFS中.這些都不是重點(diǎn),重點(diǎn)是下面圖的流程:
上圖是論文里給出的流程圖。一切都是從最上方的user program開始的,user program鏈接了MapReduce庫,實(shí)現(xiàn)了最基本的Map函數(shù)和Reduce函數(shù)。圖中執(zhí)行的順序都用數(shù)字標(biāo)記了。 1.MapReduce庫先把user program的輸入文件劃分為M份(M為用戶定義),每一份通常有16MB到64MB,如圖左方所示分成了split0~4;然后使用fork將用戶進(jìn)程拷貝到集群內(nèi)其它機(jī)器上。 2.user program的副本中有一個(gè)稱為master,其余稱為worker,master是負(fù)責(zé)調(diào)度的,為空閑worker分配作業(yè)(Map作業(yè)3或者Reduce作業(yè)),worker的數(shù)量也是可以由用戶指定的。 3.被分配了Map作業(yè)的worker,開始讀取對應(yīng)分片的輸入數(shù)據(jù),Map作業(yè)數(shù)量是由M決定的,和split一一對應(yīng);Map作業(yè)從輸入數(shù)據(jù)中抽取出鍵值對,每一個(gè)鍵值對都作為參數(shù)傳遞給map函數(shù),map函數(shù)產(chǎn)生的中間鍵值對被緩存在內(nèi)存中。 4.緩存的中間鍵值對會(huì)被定期寫入本地磁盤,而且被分為R個(gè)區(qū),R的大小是由用戶定義的,將來每個(gè)區(qū)會(huì)對應(yīng)一個(gè)Reduce作業(yè);這些中間鍵值對的位置會(huì)被通報(bào)給master,master負(fù)責(zé)將信息轉(zhuǎn)發(fā)給Reduce worker。 5.master通知分配了Reduce作業(yè)的worker它負(fù)責(zé)的分區(qū)在什么位置(肯定不止一個(gè)地方,每個(gè)Map作業(yè)產(chǎn)生的中間鍵值對都可能映射到所有R個(gè)不同分區(qū)),當(dāng)Reduce worker把所有它負(fù)責(zé)的中間鍵值對都讀過來后,先對它們進(jìn)行排序,使得相同鍵的鍵值對聚集在一起。因?yàn)椴煌逆I可能會(huì)映射到同一個(gè)分區(qū)也就是同一個(gè)Reduce作業(yè)(誰讓分區(qū)少呢),所以排序是必須的。 6.reduce worker遍歷排序后的中間鍵值對,對于每個(gè)唯一的鍵,都將鍵與關(guān)聯(lián)的值傳遞給reduce函數(shù),reduce函數(shù)產(chǎn)生的輸出會(huì)添加到這個(gè)分區(qū)的輸出文件中。 7.當(dāng)所有的Map和Reduce作業(yè)都完成了,master喚醒正版的user program,MapReduce函數(shù)調(diào)用返回user program的代碼 所有執(zhí)行完畢后,MapReduce輸出放在了R個(gè)分區(qū)的輸出文件中(分別對應(yīng)一個(gè)Reduce作業(yè))。用戶通常并不需要合并這R個(gè)文件,而是將其作為輸入交給另一個(gè)MapReduce程序處理。整個(gè)過程中,輸入數(shù)據(jù)是來自底層分布式文件系統(tǒng)(GFS)的,中間數(shù)據(jù)是放在本地文件系統(tǒng)的,最終輸出數(shù)據(jù)是寫入底層分布式文件系統(tǒng)(GFS)的。而且我們要注意Map/Reduce作業(yè)和map/reduce函數(shù)的區(qū)別:Map作業(yè)處理一個(gè)輸入數(shù)據(jù)的分片,可能需要調(diào)用多次map函數(shù)來處理每個(gè)輸入鍵值對;Reduce作業(yè)處理一個(gè)分區(qū)的中間鍵值對,期間要對每個(gè)不同的鍵調(diào)用一次reduce函數(shù),Reduce作業(yè)最終也對應(yīng)一個(gè)輸出文件。
至于下面一張圖Hadoop MapReduce(彩色的)的模型實(shí)現(xiàn)則如下圖(當(dāng)然這也不是我畫的,只是大自然的搬運(yùn)工):
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)
參考鏈接:怎樣向妻子解釋MapReduce.
在Hadoop里面的MapReduce的是有存在兩個(gè)不同的時(shí)期.剛開始的Hadoop中的MapReduce實(shí)現(xiàn)是做到很多的事情,而該框架的核心Job Tracker(作業(yè)跟蹤者)則是既當(dāng)?shù)之?dāng)媽的意思.看下圖:
原 MapReduce 程序的流程及設(shè)計(jì)思路: 1.首先用戶程序 (JobClient) 提交了一個(gè) job,job 的信息會(huì)發(fā)送到 Job Tracker 中,Job Tracker 是 Map-reduce 框架的中心,他需要與集群中的機(jī)器定時(shí)通信 (heartbeat), 需要管理哪些程序應(yīng)該跑在哪些機(jī)器上,需要管理所有 job 失敗、重啟等操作。 2.TaskTracker 是 Map-reduce 集群中每臺機(jī)器都有的一個(gè)部分,他做的事情主要是監(jiān)視自己所在機(jī)器的資源情況。 3.TaskTracker 同時(shí)監(jiān)視當(dāng)前機(jī)器的 tasks 運(yùn)行狀況。TaskTracker 需要把這些信息通過 heartbeat發(fā)送給JobTracker,JobTracker 會(huì)搜集這些信息以給新提交的 job 分配運(yùn)行在哪些機(jī)器上。
既然出現(xiàn)Hadoop2改進(jìn)它,那它就有一些問題咯。主要的問題如下:
1.JobTracker 是 Map-reduce 的集中處理點(diǎn),存在單點(diǎn)故障。 2.JobTracker 完成了太多的任務(wù),造成了過多的資源消耗,當(dāng) map-reduce job 非常多的時(shí)候,會(huì)造成很大的內(nèi)存開銷,潛在來說,也增加了 JobTracker fail 的風(fēng)險(xiǎn),這也是業(yè)界普遍總結(jié)出老 Hadoop 的 Map-Reduce 只能支持 4000 節(jié)點(diǎn)主機(jī)的上限。 3.在 TaskTracker 端,以 map/reduce task 的數(shù)目作為資源的表示過于簡單,沒有考慮到 cpu/內(nèi)存的占用情況,如果兩個(gè)大內(nèi)存消耗的 task 被調(diào)度到了一塊,很容易出現(xiàn) OOM。 4.在 TaskTracker 端,把資源強(qiáng)制劃分為 map task slot 和 reduce task slot, 如果當(dāng)系統(tǒng)中只有 map task 或者只有 reduce task 的時(shí)候,會(huì)造成資源的浪費(fèi),也就是前面提過的集群資源利用的問題。 源代碼層面分析的時(shí)候,會(huì)發(fā)現(xiàn)代碼非常的難讀,常常因?yàn)橐粋€(gè) class 做了太多的事情,代碼量達(dá) 3000 多行造成 class 的任務(wù)不清晰,增加 bug 修復(fù)和版本維護(hù)的難度。 5.從操作的角度來看,現(xiàn)在的 Hadoop MapReduce 框架在有任何重要的或者不重要的變化 ( 例如 bug 修復(fù),性能提升和特性化 ) 時(shí),都會(huì)強(qiáng)制進(jìn)行系統(tǒng)級別的升級更新。更糟的是,它不管用戶的喜好,強(qiáng)制讓分布式集群系統(tǒng)的每一個(gè)用戶端同時(shí)更新。這些更新會(huì)讓用戶為了驗(yàn)證他們之前的應(yīng)用程序是不是適用新的 Hadoop 版本而浪費(fèi)大量時(shí)間。
首先的不要被YARN給迷惑住了,它只是負(fù)責(zé)資源調(diào)度管理,而MapReduce才是負(fù)責(zé)運(yùn)算的家伙,所以YARN != MapReduce2.這是大師說的:
YARN并不是下一代MapReduce(MRv2),下一代MapReduce與第一代MapReduce(MRv1)在編程接口、數(shù)據(jù)處理引擎(MapTask和ReduceTask)是完全一樣的, 可認(rèn)為MRv2重用了MRv1的這些模塊,不同的是資源管理和作業(yè)管理系統(tǒng),MRv1中資源管理和作業(yè)管理均是由JobTracker實(shí)現(xiàn)的,集兩個(gè)功能于一身,而在MRv2中,將這兩部分分開了, 其中,作業(yè)管理由ApplicationMaster實(shí)現(xiàn),而資源管理由新增系統(tǒng)YARN完成,由于YARN具有通用性,因此YARN也可以作為其他計(jì)算框架的資源管理系統(tǒng),不僅限于MapReduce,也是其他計(jì)算框架(Spark).
看上圖我們可以知道Hadoop1中mapreduce可以說是啥事都干,而Hadoop2中的MapReduce的話則是專門處理數(shù)據(jù)分析.而YARN的話則做為資源管理器存在.
有了YARN之后,官網(wǎng)上這么說Apache Hadoop NextGen MapReduce (YARN).它的架構(gòu)圖如下:
在Hadoop2中將JobTracker兩個(gè)主要的功能分離成單獨(dú)的組件,這兩個(gè)功能是資源管理和任務(wù)調(diào)度/監(jiān)控。新的資源管理器全局管理所有應(yīng)用程序計(jì)算資源的分配,每一個(gè)應(yīng)用的 ApplicationMaster 負(fù)責(zé)相應(yīng)的調(diào)度和協(xié)調(diào)。一個(gè)應(yīng)用程序無非是一個(gè)單獨(dú)的傳統(tǒng)的 MapReduce 任務(wù)或者是一個(gè) DAG( 有向無環(huán)圖 ) 任務(wù)。ResourceManager 和每一臺機(jī)器的節(jié)點(diǎn)管理服務(wù)器能夠管理用戶在那臺機(jī)器上的進(jìn)程并能對計(jì)算進(jìn)行組織。 1.事實(shí)上,每一個(gè)應(yīng)用的ApplicationMaster是一個(gè)詳細(xì)的框架庫,它結(jié)合從ResourceManager獲得的資源和 NodeManagr 協(xié)同工作來運(yùn)行和監(jiān)控任務(wù)。 2.在上圖中ResourceManager支持分層級的應(yīng)用隊(duì)列,這些隊(duì)列享有集群一定比例的資源。從某種意義上講它就是一個(gè)純粹的調(diào)度器,它在執(zhí)行過程中不對應(yīng)用進(jìn)行監(jiān)控和狀態(tài)跟蹤。同樣,它也不能重啟因應(yīng)用失敗或者硬件錯(cuò)誤而運(yùn)行失敗的任務(wù)。 ResourceManager 是基于應(yīng)用程序?qū)Y源的需求進(jìn)行調(diào)度的 ; 每一個(gè)應(yīng)用程序需要不同類型的資源因此就需要不同的容器。資源包括:內(nèi)存,CPU,磁盤,網(wǎng)絡(luò)等等??梢钥闯?,這同現(xiàn) Mapreduce 固定類型的資源使用模型有顯著區(qū)別,它給集群的使用帶來負(fù)面的影響。資源管理器提供一個(gè)調(diào)度策略的插件,它負(fù)責(zé)將集群資源分配給多個(gè)隊(duì)列和應(yīng)用程序。調(diào)度插件可以基于現(xiàn)有的能力調(diào)度和公平調(diào)度模型。 3.在上圖中 NodeManager 是每一臺機(jī)器框架的代理,是執(zhí)行應(yīng)用程序的容器,監(jiān)控應(yīng)用程序的資源使用情況 (CPU,內(nèi)存,硬盤,網(wǎng)絡(luò) ) 并且向調(diào)度器匯報(bào)。 4.在上圖中,每一個(gè)應(yīng)用的 ApplicationMaster的職責(zé)有:向調(diào)度器索要適當(dāng)?shù)馁Y源容器,運(yùn)行任務(wù),跟蹤應(yīng)用程序的狀態(tài)和監(jiān)控它們的進(jìn)程,處理任務(wù)的失敗原因。
再次總結(jié),在Hadoop2集群里,一個(gè)客戶端提交任務(wù)的一整套的流程圖:
1.客戶端的mapreduce程序通過hadoop shell提交到hadoop的集群中. 2.程序會(huì)通過RPC通信將打成jar包的程序的有關(guān)信息傳遞給Hadoop集群中RM(ResourceManager),可稱為領(lǐng)取JOBID的過程 3.RM更加提交上來的信息給任務(wù)分配一個(gè)唯一的ID,同時(shí)會(huì)將run.jar的在HDFS上的存儲(chǔ)路徑發(fā)送給客戶端. 4.客戶端得到那個(gè)存儲(chǔ)路徑之后,會(huì)相應(yīng)的拼接出最終的存放路徑目錄,然后將run.jar分多份存儲(chǔ)在HDFS目錄中,默認(rèn)情況下備份數(shù)量為10份.可配置. 5.客戶端提交一些配置信息,例如:最終存儲(chǔ)路徑,JOB ID等. 6.RM會(huì)將這些配置信息放入一個(gè)隊(duì)列當(dāng)中,所謂的調(diào)度器.至于調(diào)度的算法,則不必深究. 7.NM(NodeManager)和RM是通過心跳機(jī)制保持著通信的,NM會(huì)定期的向RM去領(lǐng)取任務(wù). 8.RM會(huì)在任意的一臺或多臺的NM中,啟動(dòng)任務(wù)監(jiān)控的進(jìn)程Application Master.用來監(jiān)控其他NM中YARN CHild的執(zhí)行的情況 9.NM在領(lǐng)取到任務(wù)之后,得到信息,會(huì)去HDFS的下載run.jar.然后在本地的機(jī)器上啟動(dòng)YARN Child進(jìn)程來執(zhí)行map或者reduce函數(shù).map函數(shù)的處理之后的中間結(jié)果數(shù)據(jù)會(huì)放在本地文件系統(tǒng)中的. 10.在結(jié)束程序之后,將結(jié)果數(shù)據(jù)寫會(huì)HDFS中.整個(gè)流程大概就是這樣子的.
隨著 YARN 的出現(xiàn),您不再受到更簡單的 MapReduce 開發(fā)模式約束,而是可以創(chuàng)建更復(fù)雜的分布式應(yīng)用程序。實(shí)際上,您可以將 MapReduce 模型視為 YARN 架構(gòu)可運(yùn)行的一些應(yīng)用程序中的其中一個(gè),只是為自定義開發(fā)公開了基礎(chǔ)框架的更多功能。這種能力非常強(qiáng)大,因?yàn)?YARN 的使用模型幾乎沒有限制,不再需要與一個(gè)集群上可能存在的其他更復(fù)雜的分布式應(yīng)用程序框架相隔離,就像 MRv1 一樣。甚至可以說,隨著 YARN 變得更加健全,它有能力取代其他一些分布式處理框架,從而完全消除了專用于其他框架的資源開銷,同時(shí)還簡化了整個(gè)系統(tǒng)。
為了演示 YARN 相對于 MRv1 的效率提升,可考慮蠻力測試舊版本的 LAN Manager Hash 的并行問題,這是舊版 Windows? 用于密碼散列運(yùn)算的典型方法。在此場景中,MapReduce 方法沒有多大意義,因?yàn)?Mapping/Reducing 階段涉及到太多開銷。相反,更合理的方法是抽象化作業(yè)分配,以便每個(gè)容器擁有密碼搜索空間的一部分,在其之上進(jìn)行枚舉,并通知您是否找到了正確的密碼。這里的重點(diǎn)是,密碼將通過一個(gè)函數(shù) 來動(dòng)態(tài)確定(這確實(shí)有點(diǎn)棘手),而不需要將所有可能性映射到一個(gè)數(shù)據(jù)結(jié)構(gòu)中,這就使得 MapReduce 風(fēng)格顯得不必要且不實(shí)用。
歸結(jié)而言,MRv1 框架下的問題僅是需要一個(gè)關(guān)聯(lián)數(shù)組,而且這些問題有專門朝大數(shù)據(jù)操作方向演變的傾向。但是,問題一定不會(huì)永遠(yuǎn)僅局限于此范式中,因?yàn)槟F(xiàn)在可以更為簡單地將它們抽象化,編寫自定義客戶端、應(yīng)用程序主程序,以及符合任何您想要的設(shè)計(jì)的應(yīng)用程序。
我們直接拿Apache Hadoop官網(wǎng)中的wordcount的例子來說明MapReduce程序的編寫.
Source Code
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { //編寫自己的Mapper,需要繼承org.apache.hadoop.mapreduce.Mapper public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ //輸入的<Key,Value>的類型,輸出的<Key,Value> //作為類中成員變量 private final static IntWritable one = new IntWritable(1); private Text word = new Text(); //key : offset 偏移量,幾乎可以忽略 //value : one line string 一行的數(shù)據(jù) //context : the context of computer 計(jì)算的上下文 public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } //編寫自己的Reducer,需要繼承org.apache.hadoop.mapreduce.Reducer public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } //主函數(shù)開始運(yùn)行JOB public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); //提交JOB成功,退出JVM虛擬機(jī) } }
----------------------------至此,與服務(wù)器RM的通信已建立.---------
-----------------------接下來的話,就是提交job任務(wù)了.
“Hadoop中JOB怎么實(shí)現(xiàn)提交任務(wù)”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。