溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

MapReduce的典型編程場景1

發(fā)布時(shí)間:2020-08-03 21:45:42 來源:網(wǎng)絡(luò) 閱讀:291 作者:原生zzy 欄目:大數(shù)據(jù)

接下來通過一個(gè)實(shí)際的案例,介紹在MR編程中的,partition、sort、combiner。

??

流量統(tǒng)計(jì)項(xiàng)目案例

數(shù)據(jù)樣本
1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4
2052.flash3-http.qq.com 綜合門戶 15 12 1938 2910 200
字段介紹
MapReduce的典型編程場景1
需求
1、 統(tǒng)計(jì)每一個(gè)用戶(手機(jī)號(hào))所耗費(fèi)的總上行流量、總下行流量,總流量
2、 得出上題結(jié)果的基礎(chǔ)之上再加一個(gè)需求:將統(tǒng)計(jì)結(jié)果按照總流量倒序排序
3、 將流量匯總統(tǒng)計(jì)結(jié)果按照手機(jī)歸屬地不同省份輸出到不同文件中


??
需求一:統(tǒng)計(jì)每一個(gè)用戶(手機(jī)號(hào))所耗費(fèi)的總上行流量、總下行流量,總流量
??通過需求分析 ,我們可以知道這里可以使用combiner這個(gè)優(yōu)化組件,它的作用是在 maptask之后給 maptask 的結(jié)果進(jìn)行局部匯總,以減輕 reducetask 的計(jì)算負(fù)載,減少網(wǎng)絡(luò)傳輸。
??使用方式:Combiner 和 Reducer 一樣,編寫一個(gè)類,然后繼承 Reducer,reduce 方法中寫具體的 Combiner邏輯,然后在 job 中設(shè)置 Combiner 組件:job.setCombinerClass(FlowSumCombine.class)
代碼實(shí)現(xiàn)

public class FlowSum {
    //job
    public static void main(String[] args) {
        Configuration conf = new Configuration(true);
        conf.set("fs.defaultFS", "hdfs://zzy:9000");
        conf.addResource("core-site.xml");
        conf.addResource("hdfs-site.xml");
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        try {
            Job job = Job.getInstance(conf);
            job.setJobName("FlowSum");
            //設(shè)置任務(wù)類
            job.setJarByClass(FlowSum.class);
            //設(shè)置Mapper  Reducer  Combine
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);
            job.setCombinerClass(FlowSumCombine.class);

            //設(shè)置map 和reduce 的輸入輸出類型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            // 指定該 mapreduce 程序數(shù)據(jù)的輸入和輸出路徑
            Path input=new Path("/data/input");
            Path output =new Path("/data/output");
            //一定要保證output不存在
            if(output.getFileSystem(conf).exists(output)){
                output.getFileSystem(conf).delete(output,true);  //遞歸刪除
            }
            FileInputFormat.addInputPath(job,input);
            FileOutputFormat.setOutputPath(job,output);

            // 最后提交任務(wù)
            boolean success = job.waitForCompletion(true);
            System.exit(success?0:-1);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //Mapper
    private class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
        //統(tǒng)計(jì)每一個(gè)用戶(手機(jī)號(hào))所耗費(fèi)的總上行流量、總下行流量,總流量
        //1363157984040
        // 13602846565
        // 5C-0E-8B-8B-B6-00:CMCC
        // 120.197.40.4
        //2052.flash3-http.qq.com
        // 綜合門戶
        // 15
        // 12
        // 1938  上行流量
        // 2910  下行流量
        // 200
        Text mk=new Text();
        Text mv=new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String[] fields = value.toString().split("\\s+");
            String phone = fields[0];
            String upFlow = fields[8];
            String downFlow = fields[9];
            mk.set(phone);
            mv.set(upFlow+"_"+downFlow);
            context.write(mk,mv);
        }
    }

    //Combiner
    private class FlowSumCombine extends Reducer<Text, Text,Text, Text> {
        Text rv=new Text();
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            int upFlowSum=0;
            int downFlowSum=0;
            int  upFlow = 0;
            int downFlow =0;
            for(Text value:values){
                String fields[]=value.toString().split("_");
                upFlow=Integer.parseInt(fields[0]);
                downFlow=Integer.parseInt(fields[1]);
                upFlowSum+=upFlow;
                downFlowSum+=downFlow;
            }
            rv.set(upFlowSum+"_"+downFlowSum);
            context.write(key,rv);
        }
    }

    //Reducer
    private class MyReducer extends Reducer<Text, Text,Text, Text> {
        Text rv=new Text();
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            int upFlowSum=0;
            int downFlowSum=0;
            int  upFlow = 0;
            int downFlow =0;
            for(Text value:values){
                String fields[]=value.toString().split("_");
                upFlow=Integer.parseInt(fields[0]);
                downFlow=Integer.parseInt(fields[1]);
                upFlowSum+=upFlow;
                downFlowSum+=downFlow;
            }
            rv.set(upFlowSum+"\\t"+downFlowSum);
            context.write(key,rv);
        }
    }
}

??使用 Combiner 注意事項(xiàng)
???- Combiner 的輸出 kv 類型應(yīng)該跟 Reducer 的輸入 kv 類型對(duì)應(yīng)起來
???- Combiner 的輸入 kv 類型應(yīng)該跟 Mapper 的輸出 kv 類型對(duì)應(yīng)起來
???- Combiner 的使用要非常謹(jǐn)慎,因?yàn)?Combiner 在MapReduce 過程中可能調(diào)用也可能不調(diào)用,可能調(diào)一次也可能調(diào)多次
???- Combiner 使用的原則是:有或沒有都不能影響業(yè)務(wù)邏輯,都不能影響最終結(jié)果

需求二:得出上題結(jié)果的基礎(chǔ)之上再加一個(gè)需求:將統(tǒng)計(jì)結(jié)果按照總流量倒序排序
   分析:如果是在得出上行流量和下行流量之后,實(shí)現(xiàn)倒敘排序呢,之前在java中,如果想讓對(duì)象按照自定義的規(guī)則排序,那么就需要自定義對(duì)象并且實(shí)現(xiàn)它的比較器。MR也可以,在MR運(yùn)行過程中,如果有Reducer階段的話,那么是一定會(huì)排序的,根據(jù)對(duì)象的比較器,進(jìn)行排序,將排序結(jié)果相同的key分到一個(gè)reduceTask中。
  實(shí)現(xiàn):這里我們可以使用hadoop自定義WritableComparable來自定義對(duì)象,并且實(shí)現(xiàn)它的比較器。
代碼實(shí)現(xiàn)(注意:這里的是對(duì)需求一實(shí)現(xiàn)之后的結(jié)果進(jìn)行處理的):
自定義對(duì)象,實(shí)現(xiàn)比較器

public class FlowBean implements WritableComparable<FlowBean> {
    private String phone;
    private long upFlow;
    private long downFlow;
    private long sumFlow;

    // 序列化框架在反序列化操作創(chuàng)建對(duì)象實(shí)例時(shí)會(huì)調(diào)用無參構(gòu)造
    public FlowBean(){

    }

    public void set(String phone, long upFlow, long downFlow){
        this.phone=phone;
        this.upFlow=upFlow;
        this.downFlow=downFlow;
        this.sumFlow=upFlow+downFlow;
    }
    public String getPhone() {
        return phone;
    }

    public void setPhone(String phone) {
        this.phone = phone;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    // 序列化方法
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(this.phone);
        out.writeLong(this.upFlow);
        out.writeLong(this.downFlow);
        out.writeLong(this.sumFlow);
    }

    /*
        反序列化方法,這里注意,上面的write中是什么順寫,這里就要什么順序讀取
        要保證字段數(shù)據(jù)類型一一對(duì)應(yīng)
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        this.phone=in.readUTF();
        this.upFlow=in.readLong();
        this.downFlow=in.readLong();
        this.sumFlow=in.readLong();
    }
    //這里就是要實(shí)現(xiàn)的比較方法
    // 0 表示相等, 1表示大于  負(fù)數(shù)表示小于
    @Override
    public int compareTo(FlowBean o) {
        //倒敘輸出的話就是,參數(shù)的屬性-類中的屬性
        return (int)(o.sumFlow-this.sumFlow);
    }
}

MR程序

public class FlowSumSort {
    //job
    public static void main(String[] args) {
        Configuration conf=new Configuration(true);
        conf.set("fs.defaultFS","hdfs://zzy:9000");
        conf.set("fs.defaultFS", "hdfs://zzy:9000");
        conf.addResource("core-site.xml");
        conf.addResource("hdfs-site.xml");
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        try {
            Job job= Job.getInstance(conf);
            job.setJarByClass(FlowSumSort.class);
            job.setJobName("FlowSumSort");

            job.setMapperClass(Mapper.class);
            job.setReducerClass(Reducer.class);

            job.setOutputKeyClass(FlowBean.class);
            job.setOutputValueClass(NullWritable.class);

            // 指定該 mapreduce 程序數(shù)據(jù)的輸入和輸出路徑
            Path input=new Path("http://data/output");
            Path output =new Path("/data/output1");
            //一定要保證output不存在
            if(output.getFileSystem(conf).exists(output)){
                output.getFileSystem(conf).delete(output,true);  //遞歸刪除
            }
            FileInputFormat.addInputPath(job,input);
            FileOutputFormat.setOutputPath(job,output);

            boolean success=job.waitForCompletion(true);
            System.exit(success?0:1);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //Mapper
    private class MyMapper extends Mapper<LongWritable,Text,FlowBean,NullWritable> {
        FlowBean bean=new FlowBean();
        NullWritable mv=NullWritable.get();
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
             String[] fields = value.toString().split("\\s+");
             String phone=fields[0];
             long upflow= Long.parseLong(fields[1]);
             long downflow= Long.parseLong(fields[2]);
             bean.set(phone,upflow,downflow);
            context.write(bean,mv);
        }
    }
    //Reducer
    private class MyReducer extends Reducer<FlowBean,NullWritable,FlowBean,NullWritable>{
        @Override
        protected void reduce(FlowBean key, Iterable<NullWritable> values, Context context)
                throws IOException, InterruptedException {
            for(NullWritable value:values){
                context.write(key,value);
            }
        }
    }
}

注意:雖然這里reduce階段只是做了一個(gè)輸出,沒有任何業(yè)務(wù)操作,但是不能不使用reduce,因?yàn)镸R的排序就是在MapTask運(yùn)行完成之后,向reduceTask端輸出數(shù)據(jù)的時(shí)候,才會(huì)進(jìn)行排序,如果沒有Reduce階段就不會(huì)有排序。

  
需求三:根據(jù)歸屬地輸出流量統(tǒng)計(jì)數(shù)據(jù)結(jié)果到不同文件,以便于在查詢統(tǒng)計(jì)結(jié)果時(shí)可以定位到省級(jí)范圍進(jìn)行
分析:默認(rèn)的在運(yùn)行MR程序的時(shí)候,只運(yùn)行一個(gè)ReducerTask,默認(rèn)額的一個(gè)ReducerTask會(huì)有一個(gè)輸出文件,那么只要自定義分區(qū)規(guī)則,并且設(shè)置好ReducerTask的個(gè)數(shù),就可以完成以上需求,默認(rèn)的MR的分區(qū)規(guī)則是,key的hashcode%分區(qū)數(shù)。
代碼實(shí)現(xiàn)
自定義分區(qū)

    //這里的兩個(gè)泛型,是Mpapper端輸出的key和value的類型
    private static class MyPartitioner extends Partitioner<Text,FlowBean> {
        //自定義的分區(qū)規(guī)則
        private static HashMap<String, Integer> provincMap = new HashMap<String, Integer>();
        static {
            provincMap.put("138", 0);
            provincMap.put("139", 1);
            provincMap.put("136", 2);
            provincMap.put("137", 3);
            provincMap.put("135", 4);
        }
        @Override
        public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
            //千萬注意,這里的返回值,一定不能大于numPartitions,否則會(huì)報(bào)錯(cuò)
            Integer code = provincMap.get(text.toString().substring(0, 3));
            if(code!=null){
                return code;
            }
            return 5;
        }
    }

job

            //指定分區(qū)規(guī)則,和分區(qū)個(gè)數(shù)
            job.setPartitionerClass(MyPartitioner.class);
            job.setNumReduceTasks(5);

注意

  • 在使用分區(qū)時(shí),一定注意,分區(qū)的返回值一定不能大于設(shè)置的reduceTask個(gè)數(shù)
  • 雖然設(shè)置多個(gè)ReduceTask可以增加并行度,但是也不需要設(shè)置太多,如果某個(gè)ReduceTask中沒有數(shù)據(jù),那么這個(gè)ReduceTask就是空跑,浪費(fèi)資源
  • 盡量的在設(shè)置分區(qū)時(shí)是從0開始的連續(xù)的整數(shù)
向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI