您好,登錄后才能下訂單哦!
jai包
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>1.2.1</version> </dependency>
2.x以后就拆成一些零散的包了,沒有core包了
代碼:
package org.conan.myhadoop.mr; import java.io.IOException; import org.apache.hadoop.conf.Configuration; //org.apache.hadoop.mapred 老系統(tǒng)的包 //org.apache.hadoop.mapreduce 新系統(tǒng)的包 import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /* * ModuleMapReduce Class * 單純的注釋 */ public class ModuleMapReduce extends Configured implements Tool { /** * * ModuleMapper Class 不僅有注釋的功效而且你鼠標(biāo)放在你注釋的方法上面他會(huì)把你注釋的內(nèi)容顯示出來, * */ public static class ModuleMapper extends Mapper<LongWritable, Text, LongWritable, Text> { @Override public void setup(Context context) throws IOException, InterruptedException { super.setup(context); } @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // TODO } @Override public void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); } } /** * * ModuleReducer Class * */ public static class ModuleReducer extends Reducer<LongWritable, Text, LongWritable, Text> { @Override public void setup(Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub super.setup(context); } @Override protected void reduce(LongWritable key, Iterable<Text> value, Context context) throws IOException, InterruptedException { // TODO } @Override protected void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); } } // Driver 驅(qū)動(dòng) // @Override //實(shí)現(xiàn)接口時(shí)關(guān)鍵字1.5和1.7的JDK都會(huì)報(bào)錯(cuò),只有1.6不報(bào)錯(cuò) public int run(String[] args) throws Exception { Job job = parseInputAndOutput(this, this.getConf(), args); // 2.set job // step 1:set input job.setInputFormatClass(TextInputFormat.class); // step 3:set mappper class job.setMapperClass(ModuleMapper.class); // step 4:set mapout key/value class job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); // step 5:set shuffle(sort,combiner,group) // set sort job.setSortComparatorClass(LongWritable.Comparator.class); // set combiner(optional,default is unset)必須是Reducer的子類 job.setCombinerClass(ModuleReducer.class); // set grouping job.setGroupingComparatorClass(LongWritable.Comparator.class); // step 6 set reducer class job.setReducerClass(ModuleReducer.class); // step 7:set job output key/value class job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); // step 8:set output format job.setOutputFormatClass(FileOutputFormat.class); // step 10: submit job Boolean isCompletion = job.waitForCompletion(true);// 提交job return isCompletion ? 0 : 1; } public Job parseInputAndOutput(Tool tool, Configuration conf, String[] args) throws IOException { // 輸入?yún)?shù)的合法性 if (args.length != 2) { System.err.printf( "Usage: %s [generic options] <input> <output> \n ", tool .getClass().getSimpleName()); //%s表示輸出字符串,也就是將后面的字符串替換模式中的%s ToolRunner.printGenericCommandUsage(System.err); return null; } // 1.create job Job job = Job.getInstance(conf, this.getClass().getSimpleName()); job.setJarByClass(ModuleMapReduce.class); // step 2:set input path Path inputPath = new Path(args[0]); FileInputFormat.addInputPath(job, inputPath); // step 9:set output path Path outputPath = new Path(args[0]); FileOutputFormat.setOutputPath(job, outputPath); return job; } public static void main(String[] args) { try { int status = ToolRunner.run(new ModuleMapReduce(), args);// 返回值即為isCompletion ? 0 : 1 System.exit(status);// System.exit(0)中斷虛擬機(jī)的運(yùn)行,退出應(yīng)用程序,0表示沒有異常正常退出。 } catch (Exception e) { e.printStackTrace(); } } }
倒排索引代碼
輸入文件如下:
13588888888 112
13678987879 13509098987
18987655436 110
2543789 112
15699807656 110
011-678987 112
說明:每一行為一條電話通話記錄,左邊的號(hào)碼(記為a)打給右邊的號(hào)碼(記為b號(hào)碼),中間用空格隔開
要求:
將以上文件以如下格式輸出:
110 18987655436|15699807656
112 13588888888|011-678987
13509098987 13678987879
說明:左邊為被呼叫的號(hào)碼b,右邊為呼叫b的號(hào)碼a以"|"分割
package org.conan.myhadoop.mr; import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ReverseIndex extends Configured implements Tool { enum Counter { LINESKIP, // 出錯(cuò)的行 } public static class Map extends Mapper { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); // 讀取源數(shù)據(jù) try { // 數(shù)據(jù)處理 String[] lineSplit = line.split(" "); String anum = lineSplit[0]; String bnum = lineSplit[1]; context.write(new Text(bnum), new Text(anum)); // 輸出 } catch (java.lang.ArrayIndexOutOfBoundsException e) { context.getCounter(Counter.LINESKIP).increment(1); // 出錯(cuò)hang計(jì)數(shù)器+1 return; } } } public static class Reduce extends Reducer { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String valueString; String out = ""; for (Text value : values) { valueString = value.toString(); out += valueString + "|"; System.out.println("Ruduce:key=" + key + " value=" + value); } context.write(key, new Text(out)); } } @Override public int run(String[] args) throws Exception { Configuration conf = this.getConf(); Job job = new Job(conf, "ReverseIndex"); // 任務(wù)名 job.setJarByClass(ReverseIndex.class); // 指定Class FileInputFormat.addInputPath(job, new Path(args[0])); // 輸入路徑 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 輸出路徑 job.setMapperClass(Map.class); // 調(diào)用上面Map類作為Map任務(wù)代碼 job.setReducerClass(ReverseIndex.Reduce.class); // 調(diào)用上面Reduce類作為Reduce任務(wù)代碼 job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); // 指定輸出的KEY的格式 job.setOutputValueClass(Text.class); // 指定輸出的VALUE的格式 job.waitForCompletion(true); // 輸出任務(wù)完成情況 System.out.println("任務(wù)名稱:" + job.getJobName()); System.out.println("任務(wù)成功:" + (job.isSuccessful() ? "是" : "否")); System.out.println("輸入行數(shù):" + job.getCounters() .findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue()); System.out.println("輸出行數(shù):" + job.getCounters() .findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue()); System.out.println("跳過的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue()); return job.isSuccessful() ? 0 : 1; } public static void main(String[] args) throws Exception { // 判斷參數(shù)個(gè)數(shù)是否正確 // 如果無參數(shù)運(yùn)行則顯示以作程序說明 if (args.length != 2) { System.err.println(""); System.err .println("Usage: ReverseIndex < input path > < output path > "); System.err .println("Example: hadoop jar ~/ReverseIndex.jar hdfs://localhost:9000/in/telephone.txt hdfs://localhost:9000/out"); System.exit(-1); } // 記錄開始時(shí)間 DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date start = new Date(); // 運(yùn)行任務(wù) int res = ToolRunner.run(new Configuration(), new ReverseIndex(), args); // 輸出任務(wù)耗時(shí) Date end = new Date(); float time = (float) ((end.getTime() - start.getTime()) / 60000.0); System.out.println("任務(wù)開始:" + formatter.format(start)); System.out.println("任務(wù)結(jié)束:" + formatter.format(end)); System.out.println("任務(wù)耗時(shí):" + String.valueOf(time) + " 分鐘"); System.exit(res); } }
去重代碼
//Mapper任務(wù) static class DDMap extends Mapper<LongWritable,Text,Text,Text>{ private static Text line = new Text(); protected void map(LongWritable k1,Text v1,Context context){ line = v1; Text text = new Text(""); try { context.write(line,text); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }; } //Reducer任務(wù) static class DDReduce extends Reducer<Text,Text,Text,Text>{ protected void reduce(Text k2,Iterable<Text> v2s,Context context){ Text text = new Text(""); try { context.write(k2, text); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }; }
參考文章;
一個(gè)經(jīng)典的MapReduce模板代碼,倒排索引(ReverseIndex)
http://blog.itpub.net/26400547/viewspace-1214945/
http://www.tuicool.com/articles/emi6Fb
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。