您好,登錄后才能下訂單哦!
[TOC]
編寫MapReduce的程序有至少三個(gè)必不可少的部分:mapper,reducer,driver??蛇x的有 partitioner,combiner
而且mapper的輸入輸出、reducer的輸入輸出都是key value型的,所以要求我們在編寫mapper和reducer時(shí),必須實(shí)現(xiàn)明確這4個(gè)鍵值對中的8種數(shù)據(jù)類型,而且必須還是hadoop的可序列化類型。同時(shí)還需要注意的是,map的輸出其實(shí)就是reduce的輸入,所以包括的數(shù)據(jù)類型是一樣的。
編寫基本流程
1)自定義map類,需要繼承 Mapper這個(gè)類
2)繼承Mapper 的時(shí)候,需要指定輸入和輸出的鍵值對中的類型
3)必須重寫繼承自父類的map() 方法
4)上面重寫的map() 方法是每個(gè)map task對每一個(gè)輸入到mapper中的鍵值對都會調(diào)用處理一次。
基本編寫實(shí)例如下:
/*
指定Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 這4個(gè)類型分別為:
LongWritable, Text, Text, IntWritable,相當(dāng)于普通類型:
long,string,string,int
*/
public class TestMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
這里是map方法 處理邏輯
}
}
基本編寫流程
1)自定義reduce類,需要繼承 Reducer這個(gè)類
2)繼承Reducer的時(shí)候,需要指定輸入和輸出的鍵值對中的類型
3)必須重寫繼承自父類的reduce() 方法
4)上面重寫的reduce() 方法是每個(gè)reduer task對每一個(gè)輸入到reducer中的鍵值對都會調(diào)用處理一次。
基本編寫實(shí)例如下:
/*
指定Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 這4個(gè)類型分別為:
Text, IntWritable, Text, IntWritable,相當(dāng)于普通類型:
string,int,string,int
*/
public class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
protected void reduce(Text key,
Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
這里是reduce方法 處理邏輯
}
}
這個(gè)部分是用于配置job對象的各種必須配置信息,配置完成后,將job提交給yarn執(zhí)行
具體配置啥下面直接上例子看好了。主要起到調(diào)度map和reduce任務(wù)執(zhí)行的作用
這個(gè)階段主要是對map階段的輸出進(jìn)行分區(qū),而map的分區(qū)數(shù)直接決定reduce task的數(shù)量(一般來說是一對一),編寫流程如下:
1)自定義分區(qū)類,繼承 Partitioner<key, value>
2)繼承Partitioner的時(shí)候,處理的輸入的鍵值對類型
3)必須重寫繼承自父類的getPartition() 方法
4)上面重寫的getPartition() () 方法是每個(gè)maptask對每一個(gè)輸入的鍵值對都會調(diào)用處理一次。
5)根據(jù)分區(qū)規(guī)則,返回0~n,表示分區(qū)格式為0~n
編寫案例如下:
public class WordCountPartitioner extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text text, IntWritable intWritable, int i) {
判斷條件1:
return 0;
判斷條件2:
return 1;
.......
return n;
}
}
combiner不是一個(gè)獨(dú)立的階段,它其實(shí)是包含在map階段中的。map本身輸出的鍵值對中,每個(gè)鍵值對的value都是1,就算是一樣的key,也是獨(dú)立一個(gè)鍵值對。如果重復(fù)的鍵值對越多,那么將map輸出傳遞到reduce的過程中,就會占用很多帶寬資源。優(yōu)化的方法就是每個(gè)map輸出時(shí),先在當(dāng)前map task下進(jìn)行局部合并匯總,減少重復(fù)可以的出現(xiàn)。即
<king,1> <>king,1> 這種一樣的key的,就會合并成 <king,2>
這樣就會減少傳輸?shù)臄?shù)據(jù)量
所以其實(shí)由此可以知道,其實(shí)combiner的操作和reduce的操作是一樣的,只不過一個(gè)是局部,一個(gè)是全局。簡單的做法就是,直接將reducer作為combiner類傳入job,如:
job.setCombinerClass(WordCountReducer.class);
我們可以看看這個(gè)方法的源碼:
public void setCombinerClass(Class<? extends Reducer> cls) throws IllegalStateException {
this.ensureState(Job.JobState.DEFINE);
//看到?jīng)],那個(gè) Reducer.class
this.conf.setClass("mapreduce.job.combine.class", cls, Reducer.class);
}
可以清楚看到設(shè)置combine class時(shí),可以看到多態(tài)的類型設(shè)置就是 Reducer 類型的,從這里也可以更加確定 combiner 的操作和 reducer的就是一樣的。
下面開始用wordcount作為例子編寫一個(gè)完整的MapReduce程序
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
//setup 和 clean 方法不是必須的
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//最先執(zhí)行
//System.out.println("this is setup");
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
//執(zhí)行完map之后執(zhí)行
//System.out.println("this is cleanup");
}
//這里創(chuàng)建一個(gè)臨時(shí)對象,用于保存中間值
Text k = new Text();
IntWritable v = new IntWritable();
/**
*
*
* @param key
* @param value
* @param context 用于連接map和reduce上下文,通過這個(gè)對象傳遞map的結(jié)果給reduce
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//System.out.println("開始map=====================");
//1.value是讀取到的一行字符串,要將其轉(zhuǎn)換為java中的string進(jìn)行處理,即反序列化
String line = value.toString();
//2.切分?jǐn)?shù)據(jù)
String[] words = line.split(" ");
//3.輸出map結(jié)構(gòu), <單詞,個(gè)數(shù)>的形式,寫入的時(shí)候需將普通類型轉(zhuǎn)為序列化類型
/**
* 兩種寫法:
* 1) context.write(new Text(word), new IntWritable(1));
* 缺點(diǎn):每次都會創(chuàng)建兩個(gè)對象,最后會造成創(chuàng)建了很多臨時(shí)對象
*
* 2)Text k = new Text();
* IntWritable v = new IntWritable();
*
* for {
* k.set(word);
* v.set(1);
* context.write(k, v);
* }
*
* 這種方法好處就是,對象只創(chuàng)建了一次,后續(xù)只是通過修改對象內(nèi)部的值的方式傳遞,無需重復(fù)創(chuàng)建多個(gè)對象
*/
for (String word:words) {
//轉(zhuǎn)換普通類型為可序列化類型
k.set(word);
v.set(1);
//寫入到上下文對象中
context.write(k, v);
}
}
}
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
* 這里的 Iterable<IntWritable> values 之所以是一個(gè)可迭代的對象,
* 是因?yàn)閺膍ap傳遞過來的數(shù)據(jù)經(jīng)過合并了,如:
* (HDFS,1),(HDFS,1)合并成 (HDFS,[1,1]) 這樣的形式,所以value可以通過迭代方式獲取其中的值
*
*/
IntWritable counts = new IntWritable();
@Override
protected void reduce(Text key,
Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
//1.初始化次數(shù)
int count = 0;
//2.匯總同一個(gè)key中的個(gè)數(shù)
for (IntWritable value: values) {
count += value.get();
}
//3.輸出reduce
counts.set(count);
context.write(key, counts);
}
}
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//這里只是方便在ide下直接運(yùn)行,如果是在命令行下直接輸入輸入和輸出文件路徑即可
args = new String[]{"G:\\test2\\", "G:\\testmap6\\"};
//1.獲取配置對象
Configuration conf = new Configuration();
//2.獲取job對象
Job job = Job.getInstance(conf);
//3.分別給job指定driver,map,reducer的類
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//4.分別指定map和reduce階段輸出的類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//這里可以設(shè)置分區(qū)類,需要額外編寫分區(qū)實(shí)現(xiàn)類
// job.setPartitionerClass(WordCountPartitioner.class);
// job.setNumReduceTasks(2);
//設(shè)置預(yù)合并類
//job.setCombinerClass(WordCountReducer.class);
//設(shè)置inputFormat類,大量小文件優(yōu)化,不設(shè)置默認(rèn)使用 TextInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job,3* 1024 * 1024);
CombineTextInputFormat.setMinInputSplitSize(job, 2 * 1024 * 1024);
//5.數(shù)據(jù)輸入來源以及結(jié)果的輸出位置
// 輸入的時(shí)候會根據(jù)數(shù)據(jù)源的情況自動map切片,形成切片信息(或者叫切片方案)
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//以上就是將一個(gè)job的配置信息配置完成后,下面就提交job,hadoop將跟就job的配置執(zhí)行job
//6.提交job任務(wù),這個(gè)方法相當(dāng)于 job.submit()之后,然后等待執(zhí)行完成
//任務(wù)配置信息是提交至yarn的 MRappmanager
job.waitForCompletion(true);
}
}
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。