您好,登錄后才能下訂單哦!
大數(shù)據(jù)學(xué)習(xí)路線之mapreduce概述,mapreduce:分布式并行離線計(jì)算框架,是一個(gè)分布式運(yùn)算程序的編程框架,是用戶開發(fā)“基于hadoop的數(shù)據(jù)分析應(yīng)用”的核心框架;Mapreduce核心功能是將用戶編寫的業(yè)務(wù)邏輯代碼和自帶默認(rèn)組件整合成一個(gè)完整的分布式運(yùn)算程序,并發(fā)運(yùn)行在一個(gè)hadoop集群上;
與HDFS解決問題的原理類似,HDFS是將大的文件切分成若干小文件,然后將它們分別存儲(chǔ)到集群中各個(gè)主機(jī)中。
同樣原理,mapreduce是將一個(gè)復(fù)雜的運(yùn)算切分成若個(gè)子運(yùn)算,然后將它們分別交給集群中各個(gè)主機(jī),由各個(gè)主機(jī)并行運(yùn)算。
1.1 mapreduce產(chǎn)生的背景
海量數(shù)據(jù)在單機(jī)上處理因?yàn)橛布Y源限制,無法勝任。
而一旦將單機(jī)版程序擴(kuò)展到集群來分布式運(yùn)行,將極大增加程序的復(fù)雜度和開發(fā)難度。
引入mapreduce框架后,開發(fā)人員可以將絕大部分工作集中在業(yè)務(wù)邏輯的開發(fā)上,而將分布式計(jì)算中的復(fù)雜×××由框架來處理。
1.2 mapreduce編程模型
一種分布式計(jì)算模型。
MapReduce將這個(gè)并行計(jì)算過程抽象到兩個(gè)函數(shù)。
Map(映射):對(duì)一些獨(dú)立元素組成的列表的每一個(gè)元素進(jìn)行指定的操作,可以高度并行。
Reduce(化簡(jiǎn) 歸約):對(duì)一個(gè)列表的元素進(jìn)行合并。
一個(gè)簡(jiǎn)單的MapReduce程序只需要指定map()、reduce()、input和output,剩下的事由框架完成。
Job :用戶的每一個(gè)計(jì)算請(qǐng)求稱為一個(gè)作業(yè)。
Task:每一個(gè)作業(yè),都需要拆分開了,交由多個(gè)主機(jī)來完成,拆分出來的執(zhí)行單位就是任務(wù)。
Task又分為如下三種類型的任務(wù):
Map:負(fù)責(zé)map階段的整個(gè)數(shù)據(jù)處理流程
Reduce:負(fù)責(zé)reduce階段的整個(gè)數(shù)據(jù)處理流程
MRAppMaster:負(fù)責(zé)整個(gè)程序的過程調(diào)度及狀態(tài)協(xié)調(diào)
具體流程說明:
一個(gè)mr程序啟動(dòng)的時(shí)候,最先啟動(dòng)的是MRAppMaster,MRAppMaster啟動(dòng)后根據(jù)本次job的描述信息,計(jì)算出需要的maptask實(shí)例數(shù)量,然后向集群申請(qǐng)機(jī)器啟動(dòng)相應(yīng)數(shù)量的maptask進(jìn)程
maptask進(jìn)程啟動(dòng)之后,根據(jù)給定的數(shù)據(jù)切片范圍進(jìn)行數(shù)據(jù)處理,主體流程為:
–?利用客戶指定的inputformat來獲取RecordReader讀取數(shù)據(jù),形成輸入KV對(duì)。
–?將輸入KV(k是文件的行號(hào),v是文件一行的數(shù)據(jù))對(duì)傳遞給客戶定義的map()方法,做邏輯運(yùn)算,并將map()方法輸出的KV對(duì)收集到緩存。
–?將緩存中的KV對(duì)按照K分區(qū)排序后不斷溢寫到磁盤文件
MRAppMaster監(jiān)控到所有maptask進(jìn)程任務(wù)完成之后,會(huì)根據(jù)客戶指定的參數(shù)啟動(dòng)相應(yīng)數(shù)量的reducetask進(jìn)程,并告知reducetask進(jìn)程要處理的數(shù)據(jù)范圍(數(shù)據(jù)分區(qū))
Reducetask進(jìn)程啟動(dòng)之后,根據(jù)MRAppMaster告知的待處理數(shù)據(jù)所在位置,從若干臺(tái)maptask運(yùn)行所在機(jī)器上獲取到若干個(gè)maptask輸出結(jié)果文件,并在本地進(jìn)行重新歸并排序,然后按照相同key的KV為一個(gè)組,調(diào)用客戶定義的reduce()方法進(jìn)行邏輯運(yùn)算,并收集運(yùn)算輸出的結(jié)果KV,然后調(diào)用客戶指定的outputformat將結(jié)果數(shù)據(jù)輸出到外部存儲(chǔ)
基于MapReduce 計(jì)算模型編寫分布式并行程序非常簡(jiǎn)單,程序員的主要編碼工作就是實(shí)現(xiàn)Map 和Reduce函數(shù)。
其它的并行編程中的種種復(fù)雜問題,如分布式存儲(chǔ),工作調(diào)度,負(fù)載平衡,容錯(cuò)處理,網(wǎng)絡(luò)通信等,均由YARN框架負(fù)責(zé)處理。
MapReduce中,map和reduce函數(shù)遵循如下常規(guī)格式:
?map: (K1, V1) → list(K2, V2)
reduce: (K2, list(V2)) → list(K3, V3)
Mapper的接口:
?protected?void?map(KEY key, VALUE value, Context?context)?
????throws?IOException, InterruptedException?{ ?
}
Reduce的接口:
?protected?void?reduce(KEY key, Iterable<VALUE> values,
?Context?context)?throws?IOException, InterruptedException?{?
}
Mapreduce程序代碼基本結(jié)構(gòu)
用戶編寫的程序分成三個(gè)部分:Mapper,Reducer,Driver(提交運(yùn)行mr程序的客戶端)
Mapper的輸入數(shù)據(jù)是KV對(duì)的形式(KV的類型可自定義)
Mapper的輸出數(shù)據(jù)是KV對(duì)的形式(KV的類型可自定義)
Mapper中的業(yè)務(wù)邏輯寫在map()方法中
map()方法(maptask進(jìn)程)對(duì)每一個(gè)<K,V>調(diào)用一次
Reducer的輸入數(shù)據(jù)類型對(duì)應(yīng)Mapper的輸出數(shù)據(jù)類型,也是KV
Reducer的業(yè)務(wù)邏輯寫在reduce()方法中
Reducetask進(jìn)程對(duì)每一組相同k的<k,v>組調(diào)用一次reduce()方法
用戶自定義的Mapper和Reducer都要繼承各自的父類
整個(gè)程序需要一個(gè)Drvier來進(jìn)行提交,提交的是一個(gè)描述了各種必要信息的job對(duì)象
需求:有一批文件(規(guī)模為TB級(jí)或者PB級(jí)),如何統(tǒng)計(jì)這些文件中所有單詞出現(xiàn)次數(shù)
?如有三個(gè)文件,文件名是qfcourse.txt、qfstu.txt 和 qf_teacher
?qf_course.txt內(nèi)容:
?php java linux
bigdata VR
C C++ java web
linux shell
?qf_stu.txt內(nèi)容:
?tom jim lucy
lily sally
andy
tom jim sally
?qf_teacher內(nèi)容:
?jerry Lucy tom
jim
方案
–?分別統(tǒng)計(jì)每個(gè)文件中單詞出現(xiàn)次數(shù)?- map()
–?累加不同文件中同一個(gè)單詞出現(xiàn)次數(shù)?- reduce()
實(shí)現(xiàn)代碼
–?創(chuàng)建一個(gè)簡(jiǎn)單的maven項(xiàng)目
–?添加hadoop client依賴的jar,pom.xml主要內(nèi)容如下:
?<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
?
–?編寫代碼
–?自定義一個(gè)mapper類
?import?java.io.IOException;
??import?org.apache.hadoop.io.IntWritable;
??import?org.apache.hadoop.io.LongWritable;
??import?org.apache.hadoop.io.Text;
??import?org.apache.hadoop.mapreduce.Mapper;
??/**
???*?Maper里面的泛型的四個(gè)類型從左到右依次是:
???*?
???*?LongWritable KEYIN:?默認(rèn)情況下,是mr框架所讀到的一行文本的起始偏移量,Long,??類似于行號(hào)但是在hadoop中有自己的更精簡(jiǎn)的序列化接口,所以不直接用Long,而用LongWritable?
???*?Text VALUEIN:默認(rèn)情況下,是mr框架所讀到的一行文本的內(nèi)容,String,同上,用Text
???*
???*?Text KEYOUT:是用戶自定義邏輯處理完成之后輸出數(shù)據(jù)中的key,在此處是單詞,String,同上,用Text
???*?IntWritable VALUEOUT:是用戶自定義邏輯處理完成之后輸出數(shù)據(jù)中的value,在此處是單詞次數(shù),Integer,同上,用IntWritable
???*/
??public?class?WordcountMapper?extends?Mapper<LongWritable, Text, Text, IntWritable>{
?? /**
?? ?*?map階段的業(yè)務(wù)邏輯就寫在自定義的map()方法中
?? ?*?maptask會(huì)對(duì)每一行輸入數(shù)據(jù)調(diào)用一次我們自定義的map()方法
?? ?*/
?? @Override
?? protected?void?map(LongWritable key, Text?value, Context?context)?throws?IOException, InterruptedException?{
??
?? //將maptask傳給我們的一行的文本內(nèi)容先轉(zhuǎn)換成String
?? String?line = value.toString();
?? //根據(jù)空格將這一行切分成單詞
?? String[] words = line.split(" ");
??
?? /**
?? ?*將單詞輸出為<單詞,1>?
?? ?*如<lily,1>?<lucy,1>??<c,1>?<c++,1>?<tom,1>?
?? ?*/
?? for(String?word:words){
?? //將單詞作為key,將次數(shù)1作為value,以便于后續(xù)的數(shù)據(jù)分發(fā),可以根據(jù)單詞分發(fā),以便于相同單詞會(huì)到相同的reduce task
?? context.write(new?Text(word),?new?IntWritable(1));
?? }
?? }
??}
?
–?自定義一個(gè)reduce類
??import?java.io.IOException;
??import?org.apache.hadoop.io.IntWritable;
??import?org.apache.hadoop.io.Text;
??import?org.apache.hadoop.mapreduce.Reducer;
??/**
???*?Reducer里面的泛型的四個(gè)類型從左到右依次是:
???*? Text KEYIN:?對(duì)應(yīng)mapper輸出的KEYOUT
???*? IntWritable VALUEIN:?對(duì)應(yīng)mapper輸出的VALUEOUT
???*?
???*? KEYOUT,?是單詞
???*? VALUEOUT 是自定義reduce邏輯處理結(jié)果的輸出數(shù)據(jù)類型,是總次數(shù)
???*/
??public?class?WordcountReducer?extends?Reducer<Text, IntWritable, Text, IntWritable>{
?? /**
?? ?*?<tom,1>
?? ?*?<tom,1>
?? ?*?<linux,1>
?? ?*?<banana,1>
?? ?*?<banana,1>
?? ?*?<banana,1>
?? ?*?入?yún)?/span>key,是一組相同單詞kv對(duì)的key
?? ?*?values是若干相同key的value集合
?? ?*?如?<tom,[1,1]>???<linux,[1]>???<banana,[1,1,1]>
?? ?*/
?? @Override
?? protected?void?reduce(Text?key, Iterable<IntWritable> values, Context?context)?throws?IOException, InterruptedException?{
?? int?count=0; ?//累加單詞的出現(xiàn)的次數(shù)
??
?? for(IntWritable value:values){
?? count += value.get();
?? }
?? context.write(key,?new?IntWritable(count));
?? }
??}
?
–?編寫一個(gè)Driver類
???import?org.apache.hadoop.conf.Configuration;
??import?org.apache.hadoop.fs.Path;
??import?org.apache.hadoop.io.IntWritable;
??import?org.apache.hadoop.io.Text;
??import?org.apache.hadoop.mapreduce.Job;
??import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
??import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
??/**
???*?相當(dāng)于一個(gè)yarn集群的客戶端
???*?需要在此封裝我們的mr程序的相關(guān)運(yùn)行參數(shù),指定jar包
???*?最后提交給yarn
???*/
??public?class?WordcountDriver {
?? /**
?? ?*?該類是運(yùn)行在hadoop客戶端的,main一運(yùn)行,yarn客戶端就啟動(dòng)起來了,與yarn服務(wù)器端通信
?? ?*?yarn服務(wù)器端負(fù)責(zé)啟動(dòng)mapreduce程序并使用WordcountMapper和WordcountReducer類
?? ?*/
?? public?static?void?main(String[] args)?throws?Exception?{
?? //此代碼需要兩個(gè)輸入?yún)?shù) ?第一個(gè)參數(shù)支持要處理的源文件;第二個(gè)參數(shù)是處理結(jié)果的輸出路徑
?? if?(args ==?null?|| args.length?== 0) {
?? args =?new?String[2];
?????????????//路徑都是 hdfs系統(tǒng)的文件路徑
?? args[0] = "hdfs://192.168.18.64:9000/wordcount/input/";
?? args[1] = "hdfs://192.168.18.64:9000/wordcount/output";
?? }
?? /**
?? ?*?什么也不設(shè)置時(shí),如果在安裝了hadoop的機(jī)器上運(yùn)行時(shí),自動(dòng)讀取
?? ?*?/home/hadoop/app/hadoop-2.7.1/etc/hadoop/core-site.xml
?? ?*?文件放入Configuration中
?? ?*/
?? Configuration?conf =?new?Configuration();
?? Job job = Job.getInstance(conf);
??
?? //指定本程序的jar包所在的本地路徑
?? job.setJarByClass(WordcountDriver.class);
??
?? //指定本業(yè)務(wù)job要使用的mapper業(yè)務(wù)類
?? job.setMapperClass(WordcountMapper.class);
?? //指定mapper輸出數(shù)據(jù)的kv類型
?? job.setMapOutputKeyClass(Text.class);
?? job.setMapOutputValueClass(IntWritable.class);
????????
?????????//指定本業(yè)務(wù)job要使用的Reducer業(yè)務(wù)類
?????????job.setReducerClass(WordcountReducer.class);
?? //指定最終輸出的數(shù)據(jù)的kv類型
?? job.setOutputKeyClass(Text.class);
?? job.setOutputValueClass(IntWritable.class);
??
?? //指定job的輸入原始文件所在目錄
?? FileInputFormat.setInputPaths(job,?new?Path(args[0]));
?? //指定job的輸出結(jié)果所在目錄
?? FileOutputFormat.setOutputPath(job,?new?Path(args[1]));
??
?? //將job中配置的相關(guān)參數(shù),以及job所用的java類所在的jar包,提交給yarn去運(yùn)行
?? /*job.submit();*/
?? boolean?res = job.waitForCompletion(true);
?? System.exit(res?0:1);
?? }
??}
wordcount處理過程
將文件拆分成splits,由于測(cè)試用的文件較小,所以每個(gè)文件為一個(gè)split,并將文件按行分割形成<key,value>對(duì),下圖所示。這一步由MapReduce框架自動(dòng)完成,其中偏移量(即key值)包括了回車所占的字符數(shù)(Windows/Linux環(huán)境不同)。
將分割好的<key,value>對(duì)交給用戶定義的map方法進(jìn)行處理,生成新的<key,value>對(duì),下圖所示。
得到map方法輸出的<key,value>對(duì)后,Mapper會(huì)將它們按照key值進(jìn)行排序,并執(zhí)行Combine過程,將key至相同value值累加,得到Mapper的最終輸出結(jié)果。下圖所示。
Reducer先對(duì)從Mapper接收的數(shù)據(jù)進(jìn)行排序,再交由用戶自定義的reduce方法進(jìn)行處理,得到新的<key,value>對(duì),并作為WordCount的輸出結(jié)果,下圖所示。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。