您好,登錄后才能下訂單哦!
Join方法
需求:處理input1和input2文件,兩個文件中的id都一樣,也就是key值一樣,value值不同,把兩者合并。input1存的是id和名字,input2存的是id和各種信息。
處理方法一:
package org.robby.join; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class MyReduceJoin { public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { //map過程需要用到的中間變量 private Text key = new Text(); private Text value = new Text(); private String[] keyValue = null; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //用逗號分開后傳出 keyValue = value.toString().split(",", 2); this.key.set(keyValue[0]); this.value.set(keyValue[1]); context.write(this.key, this.value); } } public static class Reduce extends Reducer<Text, Text, Text, Text> { private Text value = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuilder valueStr = new StringBuilder(); //reduce過程之所以可以用迭代出相同的id,因為shuffle過程進行了分區(qū),排序,在進入reduce之前,有進行排序和分組, //相同的key的值默認分在一組 for(Text val : values) { valueStr.append(val); valueStr.append(","); } this.value.set(valueStr.deleteCharAt(valueStr.length()-1).toString()); context.write(key, this.value); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MyReduceJoin.class); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); //reduce輸出的格式 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path outputPath = new Path(args[1]); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, outputPath); outputPath.getFileSystem(conf).delete(outputPath, true); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
方法一缺點:value值無需,可能第一個文件的value在前,也可能第二個文件的value在前;
處理方法二:
引入了一個自定義類型:
package org.robby.join; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; public class CombineValues implements WritableComparable<CombineValues>{ //這里的自定義類型,實現(xiàn)WritableComparable接口 //里面的數(shù)據(jù)使用的是hadoop自帶的類型Text private Text joinKey; private Text flag; private Text secondPart; public void setJoinKey(Text joinKey) { this.joinKey = joinKey; } public void setFlag(Text flag) { this.flag = flag; } public void setSecondPart(Text secondPart) { this.secondPart = secondPart; } public Text getFlag() { return flag; } public Text getSecondPart() { return secondPart; } public Text getJoinKey() { return joinKey; } public CombineValues() { //構(gòu)造時初始化數(shù)據(jù),用set添加 this.joinKey = new Text(); this.flag = new Text(); this.secondPart = new Text(); } //序列與反序列化,其中體現(xiàn)為傳入文件流,使用hadoop提供的文件流去傳送數(shù)據(jù) @Override public void write(DataOutput out) throws IOException { //因使用的是hadoop自帶的Text,因此序列化時,可以用本身的Text,傳入流out即可 this.joinKey.write(out); this.flag.write(out); this.secondPart.write(out); } @Override public void readFields(DataInput in) throws IOException { this.joinKey.readFields(in); this.flag.readFields(in); this.secondPart.readFields(in); } @Override public int compareTo(CombineValues o) { return this.joinKey.compareTo(o.getJoinKey()); } @Override public String toString() { // TODO Auto-generated method stub return "[flag="+this.flag.toString()+",joinKey="+this.joinKey.toString()+",secondPart="+this.secondPart.toString()+"]"; } }
處理過程:可以在mapper階段通過context得到處理的文件是哪一個,因此可以分別處理。
package org.robby.join; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class MyReduceJoin1 { public static class Map extends Mapper<LongWritable, Text, Text, CombineValues> { private CombineValues combineValues = new CombineValues(); private Text flag = new Text(); private Text key = new Text(); private Text value = new Text(); private String[] keyValue = null; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //FileSplit是文件塊,通過context,文件處理可以的到處理的文件屬于哪一個文件 String pathName = ((FileSplit) context.getInputSplit()).getPath().toString(); //通過pathName獲得處理文件的名字,然后用flag進行標示 if(pathName.endsWith("input1.txt")) flag.set("0"); else flag.set("1"); combineValues.setFlag(flag); keyValue = value.toString().split(",", 2); combineValues.setJoinKey(new Text(keyValue[0])); combineValues.setSecondPart(new Text(keyValue[1])); this.key.set(keyValue[0]); //將封裝的數(shù)據(jù)傳出,key是id,用于分區(qū)排序分組,value是自定義的類,在main函數(shù)里需要說明 context.write(this.key, combineValues); } } public static class Reduce extends Reducer<Text, CombineValues, Text, Text> { private Text value = new Text(); private Text left = new Text(); private Text right = new Text(); @Override protected void reduce(Text key, Iterable<CombineValues> values, Context context) throws IOException, InterruptedException { //因key一樣,因此默認分在一組 for(CombineValues val : values) { System.out.println("val:" + val.toString()); Text secondPar = new Text(val.getSecondPart().toString()); //根據(jù)flag,來判斷是左邊還是右邊 if(val.getFlag().toString().equals("0")){ System.out.println("left :" + secondPar); left.set(secondPar); } else{ System.out.println("right :" + secondPar); right.set(secondPar); } } //整合value,輸出 Text output = new Text(left.toString() + "," + right.toString()); context.write(key, output); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MyReduceJoin1.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); //這里要指明map的輸出,因為默認是Text.class job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(CombineValues.class); //指明reduce的輸出 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //job任務(wù)的文件輸入和輸出形式 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); //job任務(wù)的輸出與輸入文件路徑 Path outputPath = new Path(args[1]); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, outputPath); //通個outputPath,查看hdfs是否已有這個文件,有則刪除 outputPath.getFileSystem(conf).delete(outputPath, true); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
缺點:如果兩個文件的條數(shù)不同,并且還需要把id相同的合并
處理方法三:
package org.robby.join; import java.io.IOException; import java.util.ArrayList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class MyReduceJoin2 { public static class Map extends Mapper<LongWritable, Text, Text, CombineValues> { private CombineValues combineValues = new CombineValues(); private Text flag = new Text(); private Text key = new Text(); private Text value = new Text(); private String[] keyValue = null; @Override //map的處理和以前一樣,分文件加flag標識,用自定義的類型封裝輸出 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String pathName = ((FileSplit) context.getInputSplit()).getPath().toString(); if(pathName.endsWith("input1.txt")) flag.set("0"); else flag.set("1"); combineValues.setFlag(flag); keyValue = value.toString().split(",", 2); combineValues.setJoinKey(new Text(keyValue[0])); combineValues.setSecondPart(new Text(keyValue[1])); this.key.set(keyValue[0]); context.write(this.key, combineValues); } } public static class Reduce extends Reducer<Text, CombineValues, Text, Text> { private Text value = new Text(); private Text left = new Text(); private ArrayList<Text> right = new ArrayList<Text>(); @Override protected void reduce(Text key, Iterable<CombineValues> values, Context context) throws IOException, InterruptedException { right.clear(); for(CombineValues val : values) { //這里id相同的合并,有多個了 System.out.println("val:" + val.toString()); Text secondPar = new Text(val.getSecondPart().toString()); if(val.getFlag().toString().equals("0")){ left.set(secondPar); } else{ //文件一是名字,文件二是各種信息,因此存在一個list集合中 right.add(secondPar); } } for(Text t : right){ Text output = new Text(left.toString() + "," + t.toString()); context.write(key, output); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MyReduceJoin2.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(CombineValues.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path outputPath = new Path(args[1]); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, outputPath); outputPath.getFileSystem(conf).delete(outputPath, true); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
其他處理方法:
使用distributedCache在mapper環(huán)節(jié)進行映射;
主要是重寫mapper里面的setup方法,通個context去讀取job傳入的文件,然后存在mapper對象中,從而使得mapper在每次實現(xiàn)map方法時都可以調(diào)用這些預(yù)先存入的數(shù)據(jù);
使用setup預(yù)先處理input1,則mapper的map方法處理input2即可。
package org.robby.join; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class MapJoinWithCache { public static class Map extends Mapper<LongWritable, Text, Text, Text> { private CombineValues combineValues = new CombineValues(); private Text flag = new Text(); private Text key = new Text(); private Text value = new Text(); private String[] keyValue = null; //這個keyMap就是存文件數(shù)據(jù)供map共享的 private HashMap<String, String> keyMap = null; @Override //這個map每行都會調(diào)用一次,傳入數(shù)據(jù) //每次都會訪問keyMap集合 //因為setup方法處理了input1文件,因此這里只需要處理input2就行 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { keyValue = value.toString().split(",", 2); String name = keyMap.get(keyValue[0]); this.key.set(keyValue[0]); String output = name + "," + keyValue[1]; this.value.set(output); context.write(this.key, this.value); } @Override //這個setup方法是在mapper類初始化運行的方法 protected void setup(Context context) throws IOException, InterruptedException { //context傳入文件路徑 URI[] localPaths = context.getCacheFiles(); keyMap = new HashMap<String, String>(); for(URI url : localPaths){ //通過uri打開hdfs文件系統(tǒng) FileSystem fs = FileSystem.get(URI.create("hdfs://hadoop1:9000"), context.getConfiguration()); FSDataInputStream in = null; //打開hdfs的對應(yīng)文件,需要path類創(chuàng)建并傳入,獲取流對象 in = fs.open(new Path(url.getPath())); BufferedReader br=new BufferedReader(new InputStreamReader(in)); String s1 = null; while ((s1 = br.readLine()) != null) { keyValue = s1.split(",", 2); keyMap.put(keyValue[0], keyValue[1]); System.out.println(s1); } br.close(); } } } public static class Reduce extends Reducer<Text, Text, Text, Text> { //處理都在mpper中進行,reduce迭代分組后的數(shù)據(jù)就行 @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for(Text val : values) context.write(key, val); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MapJoinWithCache.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path outputPath = new Path(args[1]); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, outputPath); outputPath.getFileSystem(conf).delete(outputPath, true); //其他都一樣,這里在job中加入了要傳入的文件路徑,用作cache //可以傳入多個文件,文件全路徑 job.addCacheFile(new Path(args[2]).toUri()); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
其他linux指令:
[root@hadoop1 dataFile]# wc test* 6 14 35 test2.txt 7 16 41 test.txt 13 30 76 total
可以通過wc查看文件的條數(shù)
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。