溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

MapReduce中怎樣實(shí)現(xiàn)二次排序

發(fā)布時(shí)間:2021-08-12 14:49:06 來(lái)源:億速云 閱讀:170 作者:Leah 欄目:大數(shù)據(jù)

本篇文章給大家分享的是有關(guān)MapReduce中怎樣實(shí)現(xiàn)二次排序,小編覺(jué)得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說(shuō),跟著小編一起來(lái)看看吧。

一、需求分析

MR的二次排序的需求說(shuō)明: 在mapreduce操作時(shí),shuffle階段會(huì)多次根據(jù)key值排序。但是在shuffle分組后,相同key值的values序列的順序是不確定的(如下圖)。如果想要此時(shí)value值也是排序好的,這種需求就是二次排序。 MapReduce中怎樣實(shí)現(xiàn)二次排序

	原始數(shù)據(jù)		無(wú)二次排序	有二次排序
	a 12    	a 12    	a 12
	b 34		b 34		b 13
	c 90		b 23    	b 23
	b 23		b 13    	b 34
	b 13		c 90    	c 90

根據(jù)案例分析,我們要將下面數(shù)據(jù)key按照abc,value按照大小排序,這也就是一個(gè)典型的MR的二次排序的案例,準(zhǔn)備原始數(shù)據(jù):

a 20
b 20
a 5
c 10
c 8
b 15
a 10
b 18
c 29
b 52

我們想要得到的結(jié)果:

a       5
a       10
a       20
b       15
b       18
b       20
b       52
c       8
c       10
c       29

二、方案一實(shí)現(xiàn)

先看方案一的實(shí)現(xiàn)思路:

input -> map -><a,20> -> shuffle -> <a,list(10, 5, 20)> -> reduce -> <a,5>
			   <b,20>				<b,list(52, 18, 15, 20)>         <a,10>
   			   <a,5>				<c,list(29, 8, 10)>    		     <a,20>
   			   <c,10>											     <b,15>
   			   ...												     <b,18>
   			   													     <b,20>
   			   													      ...

直接在reduce端對(duì)分組后的values進(jìn)行排序 示例代碼:

package com.kfk.hadoop.mr.sort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
 * @author : 蔡政潔
 * @email :caizhengjie888@icloud.com
 * @date : 2020/10/9
 * @time : 7:07 下午
 */

public class SortMR extends Configured implements Tool {

    /**
     * map
     * TODO
     */
    public static class TemplateMapper extends Mapper<LongWritable, Text,Text, IntWritable>{

        // 創(chuàng)建map輸出的對(duì)象
        private static final Text mapOutKey = new Text();
        private static final IntWritable mapOutValue = new IntWritable();

        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            // TODO
        }

        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            // 將每一行數(shù)據(jù)按空格拆開(kāi)
            String[] values = value.toString().split(" ");

            // 數(shù)據(jù)預(yù)處理,將數(shù)組超過(guò)2的過(guò)濾掉
            if (values.length != 2){
                return;
            }
            mapOutKey.set(values[0]);
            mapOutValue.set(Integer.valueOf(values[1]));
            context.write(mapOutKey,mapOutValue);

        }

        @Override
        public void cleanup(Context context) throws IOException, InterruptedException {
            // TODO
        }
    }

    /**
     * reduce
     * TODO
     */
    public static class TemplateReducer extends Reducer<Text,IntWritable,Text,IntWritable>{

        // 創(chuàng)建reduceout端的對(duì)象
        private static final IntWritable outputValue = new IntWritable();

        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            // TODO
        }

        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            List<Integer> valueList = new ArrayList<Integer>();

            // 取出value
            for (IntWritable value:values){
                valueList.add(value.get());
            }

			// 打印出reduce輸入的key和valueList
            System.out.println("Reduce in == KeyIn: "+key+"   ValueIn: "+valueList);
            // 進(jìn)行排序
            Collections.sort(valueList);

			/*
                valueList:表示上面已經(jīng)排序好的列表,即需要遍歷列表中的值作為reduce的輸出
                key不需要改變,即可作為reduce的輸出
             */
            for (Integer value : valueList){
                outputValue.set(value);
                context.write(key,outputValue);
            }
        }

        @Override
        public void cleanup(Context context) throws IOException, InterruptedException {
            // TODO
        }
    }

    /**
     * run
     * @param args
     * @return
     * @throws IOException
     * @throws ClassNotFoundException
     * @throws InterruptedException
     */
    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1) get conf
        Configuration configuration = this.getConf();

        // 2) create job
        Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());

        // 3.1) input,指定job的輸入
        Path path = new Path(args[0]);
        FileInputFormat.addInputPath(job,path);

        // 3.2) map,指定job的mapper和輸出的類型
        job.setMapperClass(TemplateMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 1.分區(qū)
        // job.setPartitionerClass();

        // 2.排序
        // job.setSortComparatorClass();

        // 3.combiner -可選項(xiàng)
        // job.setCombinerClass(WordCountCombiner.class);

        // 4.compress -可配置
        // configuration.set("mapreduce.map.output.compress","true");
        // 使用的SnappyCodec壓縮算法
        // configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");

        // 5.分組
        // job.setGroupingComparatorClass();

        // 6.設(shè)置reduce的數(shù)量
        // job.setNumReduceTasks(2);

        // 3.3) reduce,指定job的reducer和輸出類型
        job.setReducerClass(TemplateReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 3.4) output,指定job的輸出
        Path outpath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job,outpath);

        // 4) commit,執(zhí)行job
        boolean isSuccess = job.waitForCompletion(true);
        // 如果正常執(zhí)行返回0,否則返回1
        return (isSuccess) ? 0 : 1;
    }

    public static void main(String[] args) {

        // 添加輸入,輸入?yún)?shù)
        args = new String[]{
            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/secondSort",
            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/mr/output"
        };
//        WordCountUpMR wordCountUpMR = new WordCountUpMR();
        Configuration configuration = new Configuration();

        try {
            // 判斷輸出的文件存不存在,如果存在就將它刪除
            Path fileOutPath = new Path(args[1]);
            FileSystem fileSystem = FileSystem.get(configuration);
            if (fileSystem.exists(fileOutPath)){
                fileSystem.delete(fileOutPath,true);
            }
            // 調(diào)用run方法
            int status = ToolRunner.run(configuration,new SortMR(),args);
            // 退出程序
            System.exit(status);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

運(yùn)行結(jié)果:

a       5
a       10
a       20
b       15
b       18
b       20
b       52
c       8
c       10
c       29

很容易發(fā)現(xiàn),這樣把排序工作都放到reduce端完成,當(dāng)values序列長(zhǎng)度非常大的時(shí)候,會(huì)對(duì)CPU和內(nèi)存造成極大的負(fù)載。

注意的地方(容易被“坑”) 在reduce端對(duì)values進(jìn)行迭代的時(shí)候,不要直接存儲(chǔ)value值或者key值,因?yàn)閞educe方法會(huì)反復(fù)執(zhí)行多次,但key和value相關(guān)的對(duì)象只有兩個(gè),reduce會(huì)反復(fù)重用這兩個(gè)對(duì)象。需要用相應(yīng)的數(shù)據(jù)類型.get()取出后再存儲(chǔ)。

三、方案二實(shí)現(xiàn)

方案二的解決思路:

  原始數(shù)據(jù) 			自定義newkey 在shuffle中排序  reduce輸入				     reduce輸出
	a 12    		a#12,12    a#12,12
	b 34			b#34,34	   b#13,13
	c 90 -> map ->	c#90,90    b#23,23       b#,List(13,23,34)-> reduce ->  b,13 b,23 b,34
	b 23			b#23,23    b#34,34	
	b 13			b#13,13    c#90,90

我們可以把key和value聯(lián)合起來(lái)作為新的key,記作newkey。這時(shí),newkey含有兩個(gè)字段,假設(shè)分別是k,v。這里的k和v是原來(lái)的key和value。原來(lái)的value還是不變。這樣,value就同時(shí)在newkey和value的位置。我們?cè)賹?shí)現(xiàn)newkey的比較規(guī)則,先按照key排序,在key相同的基礎(chǔ)上再按照value排序。在分組時(shí),再按照原來(lái)的key進(jìn)行分組,就不會(huì)影響原有的分組邏輯了。最后在輸出的時(shí)候,只把原有的key、value輸出,就可以變通的實(shí)現(xiàn)了二次排序的需求。

需要自定義的地方   1.自定義數(shù)據(jù)類型實(shí)現(xiàn)組合key     實(shí)現(xiàn)方式:繼承WritableComparable   2.自定義partioner,形成newKey后保持分區(qū)規(guī)則任然按照key進(jìn)行。保證不打亂原來(lái)的分區(qū)。     實(shí)現(xiàn)方式:繼承Partitioner   3.自定義分組,保持分組規(guī)則任然按照key進(jìn)行。不打亂原來(lái)的分組     實(shí)現(xiàn)方式:繼承RawComparator

     自定義數(shù)據(jù)類型代碼:

package com.kfk.hadoop.mr.secondsort;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @author : 蔡政潔
 * @email :caizhengjie888@icloud.com
 * @date : 2020/10/15
 * @time : 6:16 下午
 */
public class PairWritable implements WritableComparable<PairWritable> {

    // 組合key:a#12,12
    private String first;
    private int second;

    public PairWritable() {

    }

    public PairWritable(String first, int second) {
        this.set(first,second);
    }

    /**
     * 方便設(shè)置字段
     */
    public void set(String first, int second){
        this.first = first;
        this.second = second;
    }

    public String getFirst() {
        return first;
    }

    public void setFirst(String first) {
        this.first = first;
    }

    public int getSecond() {
        return second;
    }

    public void setSecond(int second) {
        this.second = second;
    }

    /**
     * 重寫(xiě)比較器
     */
    public int compareTo(PairWritable o) {

        int comp = this.getFirst().compareTo(o.getFirst());
        if (0 == comp){
            // 若第一個(gè)字段相等,則比較第二個(gè)字段
            return Integer.valueOf(this.getSecond()).compareTo(o.getSecond());
        }
        return comp;
    }

    /**
     * 序列化
     */
    public void write(DataOutput out) throws IOException {
        out.writeUTF(first);
        out.writeInt(second);
    }

    /**
     * 反序列化
     */
    public void readFields(DataInput in) throws IOException {
        this.first = in.readUTF();
        this.second = in.readInt();
    }

    @Override
    public String toString() {
        return "PairWritable{" +
                "first='" + first + '\'' +
                ", second=" + second +
                '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        PairWritable that = (PairWritable) o;

        if (second != that.second) return false;
        return first != null ? first.equals(that.first) : that.first == null;
    }

    @Override
    public int hashCode() {
        int result = first != null ? first.hashCode() : 0;
        result = 31 * result + second;
        return result;
    }
}

自定義分區(qū)代碼:

package com.kfk.hadoop.mr.secondsort;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * @author : 蔡政潔
 * @email :caizhengjie888@icloud.com
 * @date : 2020/10/15
 * @time : 7:09 下午
 */
public class FristPartitioner extends Partitioner<PairWritable, IntWritable> {

    public int getPartition(PairWritable key, IntWritable intWritable, int numPartitions) {
       /*
        * 默認(rèn)的實(shí)現(xiàn) (key.hashCode() & Integer.MAX_VALUE) % numPartitions
        * 讓key中first字段作為分區(qū)依據(jù)
        */
        return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
    }
}

自定義分組比較器代碼:

package com.kfk.hadoop.mr.secondsort;

import org.apache.hadoop.io.RawComparator;

import org.apache.hadoop.io.WritableComparator;


/**
 * @author : 蔡政潔
 * @email :caizhengjie888@icloud.com
 * @date : 2020/10/15
 * @time : 6:59 下午
 */
public class FristGrouping implements RawComparator<PairWritable> {

    /*
     * 字節(jié)比較
     * bytes1,bytes2為要比較的兩個(gè)字節(jié)數(shù)組
     * i,i1表示第一個(gè)字節(jié)數(shù)組要進(jìn)行比較的收尾位置,i2,i3表示第二個(gè)
     * 從第一個(gè)字節(jié)比到組合key中second的前一個(gè)字節(jié),因?yàn)閟econd為int型,所以長(zhǎng)度為4
     */

    public int compare(byte[] bytes1, int i, int i1, byte[] bytes2, int i2, int i3) {
        return WritableComparator.compareBytes(bytes1,0,i1-4,bytes2,0,i3-4);
    }

    /*
     * 對(duì)象比較
     */
    public int compare(PairWritable o1, PairWritable o2) {
        return o1.getFirst().compareTo(o2.getFirst());
    }
}

二次排序?qū)崿F(xiàn)代碼:

package com.kfk.hadoop.mr.secondsort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;


/**
 * @author : 蔡政潔
 * @email :caizhengjie888@icloud.com
 * @date : 2020/10/9
 * @time : 7:07 下午
 */

public class SecondSortMR extends Configured implements Tool {

    /**
     * map
     * TODO
     */
    public static class TemplateMapper extends Mapper<LongWritable, Text,PairWritable, IntWritable>{

        // 創(chuàng)建map輸出的對(duì)象
        private static final PairWritable mapOutKey = new PairWritable();
        private static final IntWritable mapOutValue = new IntWritable();

        @Override
        public void setup(Context context) {
            // TODO
        }

        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            // 將每一行數(shù)據(jù)按空格拆開(kāi)
            String[] values = value.toString().split(" ");

            // 數(shù)據(jù)預(yù)處理,將數(shù)組超過(guò)2的過(guò)濾掉
            if (values.length != 2){
                return;
            }
            mapOutKey.set(values[0],Integer.parseInt(values[1]));
            mapOutValue.set(Integer.parseInt(values[1]));
            context.write(mapOutKey,mapOutValue);

            System.out.println("Map out == KeyOut: "+mapOutKey+"   ValueOut: "+mapOutValue);

        }

        @Override
        public void cleanup(Context context) {
            // TODO
        }
    }

    /**
     * reduce
     * TODO
     */
    public static class TemplateReducer extends Reducer<PairWritable,IntWritable,Text,IntWritable>{

        // 創(chuàng)建reduce output端的對(duì)象
        private static final IntWritable outputValue = new IntWritable();
        private static final Text outputKey = new Text();

        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            // TODO
        }

        @Override
        public void reduce(PairWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

			/*
                values表示reduce端輸入已經(jīng)排序好的列表,即需要遍歷values每一個(gè)值作為reduce輸出即可
                key表示為自定義的key(newkey),即需要取出newkey的第一部分,也就是原始的key,作為reduce的輸出
             */
            for (IntWritable value:values){
                outputKey.set(key.getFirst());
                context.write(outputKey,value);
            }
        }

        @Override
        public void cleanup(Context context) throws IOException, InterruptedException {
            // TODO
        }
    }

    /**
     * run
     * @param args
     * @return
     * @throws IOException
     * @throws ClassNotFoundException
     * @throws InterruptedException
     */
    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1) get conf
        Configuration configuration = this.getConf();

        // 2) create job
        Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());

        // 3.1) input,指定job的輸入
        Path path = new Path(args[0]);
        FileInputFormat.addInputPath(job,path);

        // 3.2) map,指定job的mapper和輸出的類型
        job.setMapperClass(TemplateMapper.class);
        job.setMapOutputKeyClass(PairWritable.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 1.分區(qū)
        job.setPartitionerClass(FristPartitioner.class);

        // 2.排序
        // job.setSortComparatorClass();

        // 3.combiner -可選項(xiàng)
        // job.setCombinerClass(WordCountCombiner.class);

        // 4.compress -可配置
        // configuration.set("mapreduce.map.output.compress","true");
        // 使用的SnappyCodec壓縮算法
        // configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");

        // 5.分組
        job.setGroupingComparatorClass(FristGrouping.class);

        // 6.設(shè)置reduce的數(shù)量
        // job.setNumReduceTasks(2);

        // 3.3) reduce,指定job的reducer和輸出類型
        job.setReducerClass(TemplateReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 3.4) output,指定job的輸出
        Path outpath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job,outpath);

        // 4) commit,執(zhí)行job
        boolean isSuccess = job.waitForCompletion(true);
        // 如果正常執(zhí)行返回0,否則返回1
        return (isSuccess) ? 0 : 1;
    }

    public static void main(String[] args) {

        // 添加輸入,輸入?yún)?shù)
        args = new String[]{
            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/secondSort",
            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/mr/output"
        };
//        WordCountUpMR wordCountUpMR = new WordCountUpMR();
        Configuration configuration = new Configuration();

        try {
            // 判斷輸出的文件存不存在,如果存在就將它刪除
            Path fileOutPath = new Path(args[1]);
            FileSystem fileSystem = FileSystem.get(configuration);
            if (fileSystem.exists(fileOutPath)){
                fileSystem.delete(fileOutPath,true);
            }
            // 調(diào)用run方法
            int status = ToolRunner.run(configuration,new SecondSortMR(),args);
            // 退出程序
            System.exit(status);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

運(yùn)行結(jié)果: MapReduce中怎樣實(shí)現(xiàn)二次排序

a       5
a       10
a       20
b       15
b       18
b       20
b       52
c       8
c       10
c       29

以上就是MapReduce中怎樣實(shí)現(xiàn)二次排序,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見(jiàn)到或用到的。希望你能通過(guò)這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注億速云行業(yè)資訊頻道。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI