溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

MapReduce中怎么實(shí)現(xiàn)自定義排序功能

發(fā)布時(shí)間:2021-07-29 16:36:21 來源:億速云 閱讀:297 作者:Leah 欄目:大數(shù)據(jù)

本篇文章給大家分享的是有關(guān)MapReduce中怎么實(shí)現(xiàn)自定義排序功能,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

本文測(cè)試文本:

tom 20 8000
nancy 22 8000
ketty 22 9000
stone 19 10000
green 19 11000
white 39 29000
socrates 30 40000

???MapReduce中,根據(jù)key進(jìn)行分區(qū)、排序、分組
MapReduce會(huì)按照基本類型對(duì)應(yīng)的key進(jìn)行排序,如int類型的IntWritable,long類型的LongWritable,Text類型,默認(rèn)升序排序
???為什么要自定義排序規(guī)則?現(xiàn)有需求,需要自定義key類型,并自定義key的排序規(guī)則,如按照人的salary降序排序,若相同,則再按age升序排序
以Text類型為例:
MapReduce中怎么實(shí)現(xiàn)自定義排序功能
MapReduce中怎么實(shí)現(xiàn)自定義排序功能
MapReduce中怎么實(shí)現(xiàn)自定義排序功能
MapReduce中怎么實(shí)現(xiàn)自定義排序功能
Text類實(shí)現(xiàn)了WritableComparable接口,并且有write()、readFields()compare()方法
readFields()方法:用來反序列化操作
write()方法:用來序列化操作
所以要想自定義類型用來排序需要有以上的方法
自定義類代碼

import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class Person implements WritableComparable<Person> {
    private String name;
    private int age;
    private int salary;
    public Person() {
    }
    public Person(String name, int age, int salary) {
        //super();
        this.name = name;
        this.age = age;
        this.salary = salary;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getAge() {
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }
    public int getSalary() {
        return salary;
    }
    public void setSalary(int salary) {
        this.salary = salary;
    }
    @Override
    public String toString() {
        return this.salary + "  " + this.age + "    " + this.name;
    }
    //先比較salary,高的排序在前;若相同,age小的在前
    public int compareTo(Person o) {
        int compareResult1= this.salary - o.salary;
        if(compareResult1 != 0) {
            return -compareResult1;
        } else {
            return this.age - o.age;
        }
    }
    //序列化,將NewKey轉(zhuǎn)化成使用流傳送的二進(jìn)制
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(name);
        dataOutput.writeInt(age);
        dataOutput.writeInt(salary);
    }
    //使用in讀字段的順序,要與write方法中寫的順序保持一致
    public void readFields(DataInput dataInput) throws IOException {
        //read string
        this.name = dataInput.readUTF();
        this.age = dataInput.readInt();
        this.salary = dataInput.readInt();
    }

}

MapReuduce程序:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
public class  SecondarySort {
    public static void main(String[] args) throws Exception {
        System.setProperty("HADOOP_USER_NAME","hadoop2.7");
        Configuration configuration = new Configuration();
        //設(shè)置本地運(yùn)行的mapreduce程序 jar包
        configuration.set("mapreduce.job.jar","C:\\Users\\tanglei1\\IdeaProjects\\Hadooptang\\target\\com.kaikeba.hadoop-1.0-SNAPSHOT.jar");
        Job job = Job.getInstance(configuration, SecondarySort.class.getSimpleName());
        FileSystem fileSystem = FileSystem.get(URI.create(args[1]), configuration);
        if (fileSystem.exists(new Path(args[1]))) {
            fileSystem.delete(new Path(args[1]), true);
        }
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        job.setMapperClass(MyMap.class);
        job.setMapOutputKeyClass(Person.class);
        job.setMapOutputValueClass(NullWritable.class);
        //設(shè)置reduce的個(gè)數(shù)
        job.setNumReduceTasks(1);
        job.setReducerClass(MyReduce.class);
        job.setOutputKeyClass(Person.class);
        job.setOutputValueClass(NullWritable.class);
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.waitForCompletion(true);
    }
    public static class MyMap extends
            Mapper<LongWritable, Text, Person, NullWritable> {
        //LongWritable:輸入?yún)?shù)鍵類型,Text:輸入?yún)?shù)值類型
        //Persion:輸出參數(shù)鍵類型,NullWritable:輸出參數(shù)值類型
        @Override
        //map的輸出值是鍵值對(duì)<K,V>,NullWritable說關(guān)心V的值
        protected void map(LongWritable key, Text value,
                Context context)
                throws IOException, InterruptedException {
            //LongWritable key:輸入?yún)?shù)鍵值對(duì)的鍵,Text value:輸入?yún)?shù)鍵值對(duì)的值
            //獲得一行數(shù)據(jù),輸入?yún)?shù)的鍵(距首行的位置),Hadoop讀取數(shù)據(jù)的時(shí)候逐行讀取文本
            //fields:代表著文本一行的的數(shù)據(jù)
            String[] fields = value.toString().split(" ");
            // 本列中文本一行數(shù)據(jù):nancy 22 8000
            String name = fields[0];
            //字符串轉(zhuǎn)換成int
            int age = Integer.parseInt(fields[1]);
            int salary = Integer.parseInt(fields[2]);
            //在自定義類中進(jìn)行比較
            Person person = new Person(name, age, salary);
            context.write(person, NullWritable.get());
        }
    }
    public static class MyReduce extends
            Reducer<Person, NullWritable, Person, NullWritable> {
        @Override
        protected void reduce(Person key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }
    }
}

運(yùn)行結(jié)果:

40000  30    socrates
29000  39    white
11000  19    green
10000  19    stone
9000  22    ketty
8000  20    tom
8000  22    nancy

以上就是MapReduce中怎么實(shí)現(xiàn)自定義排序功能,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI