溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

學(xué)習(xí)日志---hadoop的join處理

發(fā)布時間:2020-06-26 13:33:42 來源:網(wǎng)絡(luò) 閱讀:395 作者:wukong0716 欄目:大數(shù)據(jù)

Join方法

需求:處理input1和input2文件,兩個文件中的id都一樣,也就是key值一樣,value值不同,把兩者合并。input1存的是id和名字,input2存的是id和各種信息。

處理方法一:

package org.robby.join;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class MyReduceJoin
{
    public static class MapClass extends 
        Mapper<LongWritable, Text, Text, Text>
    {
        //map過程需要用到的中間變量
        private Text key = new Text();
        private Text value = new Text();
        private String[] keyValue = null;
        
        @Override
        protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException
        {
            //用逗號分開后傳出
            keyValue = value.toString().split(",", 2);
            this.key.set(keyValue[0]);
            this.value.set(keyValue[1]);
            context.write(this.key, this.value);
        }
        
    }
    
    public static class Reduce extends Reducer<Text, Text, Text, Text>
    {
        private Text value = new Text();
        
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException
        {
            StringBuilder valueStr = new StringBuilder();
            
            //reduce過程之所以可以用迭代出相同的id,因為shuffle過程進行了分區(qū),排序,在進入reduce之前,有進行排序和分組,
            //相同的key的值默認分在一組
            for(Text val : values)
            {
                valueStr.append(val);
                valueStr.append(",");
            }
            
            this.value.set(valueStr.deleteCharAt(valueStr.length()-1).toString());
            context.write(key, this.value);
        }
        
    }
    
    public static void main(String[] args) throws Exception
    {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        
        job.setJarByClass(MyReduceJoin.class);
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);
        
        //reduce輸出的格式
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        
        Path outputPath = new Path(args[1]);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, outputPath);
        outputPath.getFileSystem(conf).delete(outputPath, true);  

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

方法一缺點:value值無需,可能第一個文件的value在前,也可能第二個文件的value在前;

處理方法二:

引入了一個自定義類型:

package org.robby.join;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class CombineValues implements WritableComparable<CombineValues>{
    //這里的自定義類型,實現(xiàn)WritableComparable接口
    //里面的數(shù)據(jù)使用的是hadoop自帶的類型Text
    private Text joinKey;
    private Text flag;
    private Text secondPart;
    
    public void setJoinKey(Text joinKey) {
        this.joinKey = joinKey;
    }
    public void setFlag(Text flag) {
        this.flag = flag;
    }
    public void setSecondPart(Text secondPart) {
        this.secondPart = secondPart;
    }
    public Text getFlag() {
        return flag;
    }
    public Text getSecondPart() {
        return secondPart;
    }
    public Text getJoinKey() {
        return joinKey;
    }
    public CombineValues() {
        //構(gòu)造時初始化數(shù)據(jù),用set添加
        this.joinKey =  new Text();
        this.flag = new Text();
        this.secondPart = new Text();
    }
    
    //序列與反序列化,其中體現(xiàn)為傳入文件流,使用hadoop提供的文件流去傳送數(shù)據(jù)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     
    @Override
    public void write(DataOutput out) throws IOException {
        //因使用的是hadoop自帶的Text,因此序列化時,可以用本身的Text,傳入流out即可
        this.joinKey.write(out);
        this.flag.write(out);
        this.secondPart.write(out);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        this.joinKey.readFields(in);
        this.flag.readFields(in);
        this.secondPart.readFields(in);
    }
    @Override
    public int compareTo(CombineValues o) {
        return this.joinKey.compareTo(o.getJoinKey());
    }
    
    @Override
    public String toString() {
        // TODO Auto-generated method stub
        return "[flag="+this.flag.toString()+",joinKey="+this.joinKey.toString()+",secondPart="+this.secondPart.toString()+"]";
    }

}

處理過程:可以在mapper階段通過context得到處理的文件是哪一個,因此可以分別處理。

package org.robby.join;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 
public class MyReduceJoin1
{
    public static class Map extends 
        Mapper<LongWritable, Text, Text, CombineValues>
    {
        private CombineValues combineValues = new CombineValues();
        private Text flag = new Text();
        private Text key = new Text();
        private Text value = new Text();
        private String[] keyValue = null;
        
        @Override
        protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException
        {
            //FileSplit是文件塊,通過context,文件處理可以的到處理的文件屬于哪一個文件
            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
            //通過pathName獲得處理文件的名字,然后用flag進行標示
            if(pathName.endsWith("input1.txt"))
                flag.set("0");
            else
                flag.set("1");
            
            combineValues.setFlag(flag);
            keyValue = value.toString().split(",", 2);
            combineValues.setJoinKey(new Text(keyValue[0]));
            combineValues.setSecondPart(new Text(keyValue[1]));

            this.key.set(keyValue[0]);
            //將封裝的數(shù)據(jù)傳出,key是id,用于分區(qū)排序分組,value是自定義的類,在main函數(shù)里需要說明
            context.write(this.key, combineValues);
        }
        
    }
    
    public static class Reduce extends Reducer<Text, CombineValues, Text, Text>
    {
        private Text value = new Text();
        private Text left = new Text();
        private Text right = new Text();
        
        @Override
        protected void reduce(Text key, Iterable<CombineValues> values, Context context)
                throws IOException, InterruptedException
        {
            //因key一樣,因此默認分在一組
            for(CombineValues val : values)
            {
                System.out.println("val:" + val.toString());
                Text secondPar = new Text(val.getSecondPart().toString());
                //根據(jù)flag,來判斷是左邊還是右邊
                if(val.getFlag().toString().equals("0")){
                    System.out.println("left :" + secondPar);
                    left.set(secondPar);
                }
                else{
                    System.out.println("right :" + secondPar);
                    right.set(secondPar);
                }
            }
            
            //整合value,輸出
            Text output = new Text(left.toString() + "," + right.toString());
            
            context.write(key, output);
        }
        
    }
    
    public static void main(String[] args) throws Exception
    {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        
        job.setJarByClass(MyReduceJoin1.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        
        //這里要指明map的輸出,因為默認是Text.class
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(CombineValues.class);
        
        //指明reduce的輸出
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        
        //job任務(wù)的文件輸入和輸出形式
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        
        //job任務(wù)的輸出與輸入文件路徑
        Path outputPath = new Path(args[1]);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, outputPath);
        //通個outputPath,查看hdfs是否已有這個文件,有則刪除
        outputPath.getFileSystem(conf).delete(outputPath, true);  

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

缺點:如果兩個文件的條數(shù)不同,并且還需要把id相同的合并

處理方法三:

package org.robby.join;

import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 
public class MyReduceJoin2
{
    public static class Map extends 
        Mapper<LongWritable, Text, Text, CombineValues>
    {
        private CombineValues combineValues = new CombineValues();
        private Text flag = new Text();
        private Text key = new Text();
        private Text value = new Text();
        private String[] keyValue = null;
        
        @Override
        //map的處理和以前一樣,分文件加flag標識,用自定義的類型封裝輸出
        protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException
        {
            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
            if(pathName.endsWith("input1.txt"))
                flag.set("0");
            else
                flag.set("1");
            
            combineValues.setFlag(flag);
            keyValue = value.toString().split(",", 2);
            combineValues.setJoinKey(new Text(keyValue[0]));
            combineValues.setSecondPart(new Text(keyValue[1]));

            this.key.set(keyValue[0]);
            context.write(this.key, combineValues);
        }
        
    }
    
    public static class Reduce extends Reducer<Text, CombineValues, Text, Text>
    {
        private Text value = new Text();
        private Text left = new Text();
        private ArrayList<Text> right = new ArrayList<Text>();
        
        @Override
        protected void reduce(Text key, Iterable<CombineValues> values, Context context)
                throws IOException, InterruptedException
        {
            right.clear();
            for(CombineValues val : values)
            {
                //這里id相同的合并,有多個了
                System.out.println("val:" + val.toString());
                Text secondPar = new Text(val.getSecondPart().toString());
                if(val.getFlag().toString().equals("0")){
                    left.set(secondPar);
                }
                else{
                    //文件一是名字,文件二是各種信息,因此存在一個list集合中
                    right.add(secondPar);
                }
            }
            
            for(Text t : right){
                Text output = new Text(left.toString() + "," + t.toString());
                context.write(key, output);
            }
            
        }
        
    }
    
    public static void main(String[] args) throws Exception
    {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        
        job.setJarByClass(MyReduceJoin2.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(CombineValues.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        
        Path outputPath = new Path(args[1]);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, outputPath);
        outputPath.getFileSystem(conf).delete(outputPath, true);  

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

其他處理方法:

使用distributedCache在mapper環(huán)節(jié)進行映射;

主要是重寫mapper里面的setup方法,通個context去讀取job傳入的文件,然后存在mapper對象中,從而使得mapper在每次實現(xiàn)map方法時都可以調(diào)用這些預(yù)先存入的數(shù)據(jù);

使用setup預(yù)先處理input1,則mapper的map方法處理input2即可。

package org.robby.join;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class MapJoinWithCache {
    public static class Map extends
            Mapper<LongWritable, Text, Text, Text> {
        private CombineValues combineValues = new CombineValues();
        private Text flag = new Text();
        private Text key = new Text();
        private Text value = new Text();
        private String[] keyValue = null;
        //這個keyMap就是存文件數(shù)據(jù)供map共享的
        private HashMap<String, String> keyMap = null;

        @Override
        //這個map每行都會調(diào)用一次,傳入數(shù)據(jù)
        //每次都會訪問keyMap集合
        //因為setup方法處理了input1文件,因此這里只需要處理input2就行
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            keyValue = value.toString().split(",", 2);

            String name = keyMap.get(keyValue[0]);
            
            this.key.set(keyValue[0]);
            
            String output = name + "," + keyValue[1];
            this.value.set(output);
            context.write(this.key, this.value);
        }

        @Override
        //這個setup方法是在mapper類初始化運行的方法
        protected void setup(Context context) throws IOException,
                InterruptedException {
            //context傳入文件路徑
            URI[] localPaths = context.getCacheFiles();
            
            keyMap = new HashMap<String, String>();
            for(URI url : localPaths){
                 //通過uri打開hdfs文件系統(tǒng)
                 FileSystem fs = FileSystem.get(URI.create("hdfs://hadoop1:9000"), context.getConfiguration());
                 FSDataInputStream in = null;
                 //打開hdfs的對應(yīng)文件,需要path類創(chuàng)建并傳入,獲取流對象
                 in = fs.open(new Path(url.getPath()));
                 BufferedReader br=new BufferedReader(new InputStreamReader(in));
                 String s1 = null;
                 while ((s1 = br.readLine()) != null)
                 {
                     keyValue = s1.split(",", 2);
                     
                     keyMap.put(keyValue[0], keyValue[1]);
                     System.out.println(s1);
                 }
                 br.close();
            }
        }
    }

    public static class Reduce extends Reducer<Text, Text, Text, Text> {

        //處理都在mpper中進行,reduce迭代分組后的數(shù)據(jù)就行
        @Override
        protected void reduce(Text key, Iterable<Text> values,
                Context context) throws IOException, InterruptedException {
            
            for(Text val : values)
                context.write(key, val);
            
        }

    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(MapJoinWithCache.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        Path outputPath = new Path(args[1]);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, outputPath);
        outputPath.getFileSystem(conf).delete(outputPath, true);

        //其他都一樣,這里在job中加入了要傳入的文件路徑,用作cache
        //可以傳入多個文件,文件全路徑
        job.addCacheFile(new Path(args[2]).toUri());

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}


其他linux指令:

[root@hadoop1 dataFile]# wc test*
 6 14 35 test2.txt
 7 16 41 test.txt
13 30 76 total

可以通過wc查看文件的條數(shù)

附件:http://down.51cto.com/data/2366171
向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI