您好,登錄后才能下訂單哦!
//mapreduce程序 import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { /** * TokenizerMapper 繼續(xù)自 Mapper<LongWritable, Text, Text, IntWritable> * * [一個(gè)文件就一個(gè)map,兩個(gè)文件就會(huì)有兩個(gè)map] * map[這里讀入輸入文件內(nèi)容 以" \t\n\r\f" 進(jìn)行分割,然后設(shè)置 word ==> one 的key/value對(duì)] * * @param Object Input key Type: * @param Text Input value Type: * @param Text Output key Type: * @param IntWritable Output value Type: * * Writable的主要特點(diǎn)是它使得Hadoop框架知道對(duì)一個(gè)Writable類(lèi)型的對(duì)象怎樣進(jìn)行serialize以及deserialize. * WritableComparable在Writable的基礎(chǔ)上增加了compareT接口,使得Hadoop框架知道怎樣對(duì)WritableComparable類(lèi)型的對(duì)象進(jìn)行排序。 * * @ author liuqingjie * */ public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } /** * IntSumReducer 繼承自 Reducer<Text,IntWritable,Text,IntWritable> * * [不管幾個(gè)Map,都只有一個(gè)Reduce,這是一個(gè)匯總] * reduce[循環(huán)所有的map值,把word ==> one 的key/value對(duì)進(jìn)行匯總] * * 這里的key為Mapper設(shè)置的word[每一個(gè)key/value都會(huì)有一次reduce] * * 當(dāng)循環(huán)結(jié)束后,最后的確context就是最后的結(jié)果. * * @author liuqingjie * */ public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); if (args.length != 2) { System.err.println("請(qǐng)配置路徑 "); System.exit(2); } Job job = new Job(conf, "wordcount"); job.setJarByClass(WordCount.class);//主類(lèi) job.setMapperClass(TokenizerMapper.class);//mapper job.setReducerClass(IntSumReducer.class);//reducer job.setMapOutputKeyClass(Text.class);//設(shè)置map輸出數(shù)據(jù)的關(guān)鍵類(lèi) job.setMapOutputValueClass(IntWritable.class);//設(shè)置map輸出值類(lèi) job.setOutputKeyClass(Text.class);//設(shè)置作業(yè)輸出數(shù)據(jù)的關(guān)鍵類(lèi) job.setOutputValueClass(IntWritable.class);//設(shè)置作業(yè)輸出值類(lèi) FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//文件輸入 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//文件輸出 System.exit(job.waitForCompletion(true) ? 0 : 1);//等待完成退出. } }
編寫(xiě)過(guò)程分析:
(1)數(shù)據(jù)類(lèi)型
整型:IntWritable, 這是Hadoop對(duì)int的封裝
字符串型:Text,這是Hadoop對(duì)String的封裝
上下文對(duì)象:Context,它用來(lái)與MapReduce系統(tǒng)進(jìn)行通信,如把map的結(jié)果傳給reduce
處理
(2)執(zhí)行過(guò)程
分為兩個(gè)階段:map階段和reduce階段, 以key/value為輸入輸出,其中key、value的類(lèi)型可以由程序員自定義。
map編寫(xiě):
自定義一個(gè)類(lèi),繼承于基類(lèi)Mapper,該基類(lèi)是一個(gè)泛型,有4個(gè)形參類(lèi)型:用來(lái)指定map函數(shù)的輸入鍵、輸入值,輸出鍵、輸 出值,格式如下:public class Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOU>。
根據(jù)實(shí)際需要,重寫(xiě)map函數(shù),函數(shù)類(lèi)型由Mapper指定。每一對(duì)<key,value>調(diào)用一次map函數(shù)。
wordcount程序中,map方法中的value值存儲(chǔ)的是文本文件中的一行,key值為該行的首字符相對(duì)于文本文件首字符的偏移量,在本程序中,key值未使用。StringTokenizer類(lèi)是將每一行拆分為一個(gè)個(gè)的單詞。
reduce編寫(xiě):
自定義一個(gè)類(lèi),繼承于基類(lèi)Reducer,該基類(lèi)是一個(gè)泛型,有4個(gè)形參類(lèi)型:用來(lái)指定reduce函數(shù)的輸入鍵、輸入值,輸出鍵、輸出值,格式public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>,其中reduce的輸入類(lèi)型必須與map的輸出類(lèi)型一致。
根據(jù)實(shí)際需要,重寫(xiě)reduce方法,方法的類(lèi)型由Reducer指定。每一個(gè)key調(diào)用一次reduce方法。
主函數(shù)編寫(xiě):
在主函數(shù)中進(jìn)行作業(yè)的配置,主要配置有:
Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class);//主類(lèi) job.setMapperClass(TokenizerMapper.class);//mapper job.setReducerClass(IntSumReducer.class);//reducer job.setMapOutputKeyClass(Text.class);//設(shè)置map輸出數(shù)據(jù)的關(guān)鍵類(lèi) job.setMapOutputValueClass(IntWritable.class);//設(shè)置map輸出值類(lèi) job.setOutputKeyClass(Text.class);//設(shè)置作業(yè)輸出數(shù)據(jù)的關(guān)鍵類(lèi) job.setOutputValueClass(IntWritable.class);//設(shè)置作業(yè)輸出值類(lèi) FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//文件輸入 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//文件輸出 System.exit(job.waitForCompletion(true) ? 0 : 1);//等待完成退出.
(3)數(shù)據(jù)處理過(guò)程
1)將文件拆分為splits,并由MapReduce框架自動(dòng)完成分割,將每一個(gè)split分割為<key,value>對(duì)
2)每一對(duì)<key,value>調(diào)用一次map函數(shù),處理后生產(chǎn)新的<key,value>對(duì),由Context傳遞給reduce處理
3)Mapper對(duì)<key,value>對(duì)進(jìn)行按key值進(jìn)行排序,將key值相同的value進(jìn)行合并。最后得到Mapper的最終輸出結(jié)果
4)reduce處理,處理后將新的<key,value>對(duì)輸出。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。