您好,登錄后才能下訂單哦!
MapReduce 是一個(gè)分布式運(yùn)算程序的編程框架,核心功能是將用戶(hù)編寫(xiě)的業(yè)務(wù)邏輯代碼和自帶默認(rèn)組件整合成一個(gè)完整的分布式運(yùn)算程序,并發(fā)運(yùn)行在一個(gè) Hadoop 集群上。
MapReduce大體上分三個(gè)部分:
- MRAppMaster:MapReduce Application Master,分配任務(wù),協(xié)調(diào)任務(wù)的運(yùn)行
- MapTask:階段并發(fā)任,負(fù)責(zé) mapper 階段的任務(wù)處理 YARNChild
- ReduceTask:階段匯總?cè)蝿?wù),負(fù)責(zé) reducer 階段的任務(wù)處理 YARNChild
public class MyWordCount {
public static void main(String[] args) {
// 指定 hdfs 相關(guān)的參數(shù)
Configuration conf=new Configuration(true);
conf.set("fs.defaultFS","hdfs://hadoop01:9000");
System.setProperty("HADOOP_USER_NAME", "hadoop");
try {
// 新建一個(gè) job 任務(wù)
Job job=Job.getInstance(conf);
// 設(shè)置 jar 包所在路徑
job.setJarByClass(MyWordCount.class);
// 指定 mapper 類(lèi)和 reducer 類(lèi)
job.setMapperClass(Mapper.class);
job.setReducerClass(MyReduce.class);
// 指定 maptask 的輸出類(lèi)型,注意,如果maptask的輸出類(lèi)型與reducetask輸出類(lèi)型一樣,mapTask可以不用設(shè)置
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 指定 reducetask 的輸出類(lèi)型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 指定該 mapreduce 程序數(shù)據(jù)的輸入和輸出路徑
Path input=new Path("/data/input");
Path output =new Path("/data/output");
//一定要保證output不存在
if(output.getFileSystem(conf).exists(output)){
output.getFileSystem(conf).delete(output,true); //遞歸刪除
}
FileInputFormat.addInputPath(job,input);
FileOutputFormat.setOutputPath(job,output);
// 最后提交任務(wù)
boolean success = job.waitForCompletion(true);
System.exit(success?0:-1);
} catch (Exception e) {
e.printStackTrace();
}
}
private class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
Text mk =new Text();
IntWritable mv=new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 計(jì)算任務(wù)代碼:切割單詞,輸出每個(gè)單詞計(jì) 1 的 key-value 對(duì)
String[] words = value.toString().split("\\s+");
for(String word:words){
mk.set(word);
context.write(mk,mv);
}
}
}
private class MyReduce extends Reducer<Text,IntWritable,Text,IntWritable> {
IntWritable mv=new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum=0;
// 匯總計(jì)算代碼:對(duì)每個(gè) key 相同的一組 key-value 做匯總統(tǒng)計(jì)
for(IntWritable value:values){
sum+=value.get();
}
mv.set(sum);
context.write(key,mv);
}
}
}
一個(gè) job 的 map 階段并行度由客戶(hù)端在提交 job 時(shí)決定,客戶(hù)端對(duì) map 階段并行度的規(guī)劃的基本邏輯為:將待處理數(shù)據(jù)執(zhí)行邏輯切片(即按照一個(gè)特定切片大小,將待處理數(shù)據(jù)劃分成邏輯上的多個(gè) split),然后每一個(gè) split 分配一個(gè) mapTask 并行實(shí)例處理。這段邏輯及形成的切片規(guī)劃描述文件,是由FileInputFormat實(shí)現(xiàn)類(lèi)的getSplits()方法完成的,小編后續(xù)會(huì)對(duì)MPjob提交過(guò)程的源碼進(jìn)行詳細(xì)的分析。
決定map task的個(gè)數(shù)主要由這幾個(gè)方面:
-文件的大小
- 文件的個(gè)數(shù)
- block的大小
- 邏輯切片的大小
總的來(lái)說(shuō)就是,當(dāng)對(duì)文件進(jìn)行邏輯劃分的時(shí)候,默認(rèn)的劃分規(guī)則就是一個(gè)split和一個(gè)block的大小一樣,如果文件沒(méi)有到一個(gè)block大小,也會(huì)被切分出來(lái)一個(gè)split,這里有調(diào)優(yōu)點(diǎn),就是如果處理的文件都是小文件的話,那么機(jī)會(huì)并行很多的maptask,導(dǎo)致大量的時(shí)間都浪費(fèi)在了啟動(dòng)jvm上,此時(shí)可以通過(guò)合并小文件或者重用jvm的方式提高效率。
邏輯切片機(jī)制:
long splitSize = computeSplitSize(blockSize, minSize, maxSize)
blocksize:默認(rèn)是 128M,可通過(guò) dfs.blocksize 修改
minSize:默認(rèn)是 1,可通過(guò) mapreduce.input.fileinputformat.split.minsize 修改
maxsize:默認(rèn)是 Long.MaxValue,可通過(guò)mapreduce.input.fileinputformat.split.maxsize 修改
因此,如果是想調(diào)小split的大小,那么就將 maxsize調(diào)整到比block小。
如果是想調(diào)大split的大小,那么就將minSize調(diào)整到比block大。
reducetask 的并行度同樣影響整個(gè) job 的執(zhí)行并發(fā)度和執(zhí)行效率,但與 maptask 的并發(fā)數(shù)由切片數(shù)決定不同,Reducetask 數(shù)量的決定是可以直接手動(dòng)設(shè)置:job.setNumReduceTasks(4);,默認(rèn)是1個(gè),如果設(shè)置為0個(gè)表示沒(méi)有reduce階段,當(dāng)然也可以設(shè)置多個(gè),根據(jù)需求,如果有些需要全局計(jì)數(shù)的操作,那么只能設(shè)置1個(gè)reduce,有些可以設(shè)置多個(gè)reducetask的,千萬(wàn)不要設(shè)置太多,最好設(shè)置的和分區(qū)的個(gè)數(shù)能一一對(duì)應(yīng),不然的會(huì)就會(huì)有一些reduceTask空跑,導(dǎo)致了不能處理業(yè)務(wù)而且還占用系統(tǒng)資源。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。