溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

MapReduce的典型編程場(chǎng)景2

發(fā)布時(shí)間:2020-07-25 19:41:41 來源:網(wǎng)絡(luò) 閱讀:312 作者:原生zzy 欄目:大數(shù)據(jù)

1.MapReduce 多 Job 串聯(lián)

   介紹:一個(gè)稍復(fù)雜點(diǎn)的處理邏輯往往需要多個(gè) MapReduce 程序串聯(lián)處理,多 job 的串聯(lián)可以借助MapReduce 框架的 JobControl 實(shí)現(xiàn)。
需求
以下有兩個(gè) MapReduce 任務(wù),分別是 Flow 的 SumMR 和 SortMR,其中有依賴關(guān)系:SumMR的輸出是 SortMR 的輸入,所以 SortMR 的啟動(dòng)得在 SumMR 完成之后
這兩個(gè)程序在:https://blog.51cto.com/14048416/2342024
如何實(shí)現(xiàn)兩個(gè)代碼的依賴關(guān)系呢?
代碼實(shí)現(xiàn)(這里只給出多 Job 串聯(lián)的代碼)

public class JobDecy {
    public static void main(String[] args) {
        Configuration conf = new Configuration(true);
        conf.set("fs.defaultFS", "hdfs://zzy:9000");
        conf.addResource("core-site.xml");
        conf.addResource("hdfs-site.xml");
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        try {
            //job1  FlowSum
            Job job1 = Job.getInstance(conf);
            job1.setJobName("FlowSum");
            //設(shè)置任務(wù)類
            job1.setJarByClass(FlowSum.class);
            //設(shè)置Mapper  Reducer  Combine
            job1.setMapperClass(FlowSum.MyMapper.class);
            job1.setReducerClass(FlowSum.MyReducer.class);
            job1.setCombinerClass(FlowSum.FlowSumCombine.class);

            //設(shè)置map 和reduce 的輸入輸出類型
            job1.setMapOutputKeyClass(Text.class);
            job1.setMapOutputValueClass(Text.class);
            job1.setOutputKeyClass(Text.class);
            job1.setMapOutputValueClass(Text.class);

            // 指定該 mapreduce 程序數(shù)據(jù)的輸入和輸出路徑
            Path input1 = new Path("/data/input");
            Path output1 = new Path("/data/output");
            //一定要保證output不存在
            if (output1.getFileSystem(conf).exists(output1)) {
                output1.getFileSystem(conf).delete(output1, true);  //遞歸刪除
            }
            FileInputFormat.addInputPath(job1, input1);
            FileOutputFormat.setOutputPath(job1, output1);

            //Job2 FlowSumSort
            Job job2= Job.getInstance(conf);
            job2.setJarByClass(FlowSumSort.class);
            job2.setJobName("FlowSumSort");

            job2.setMapperClass(Mapper.class);
            job2.setReducerClass(Reducer.class);

            job2.setOutputKeyClass(FlowBean.class);
            job2.setOutputValueClass(NullWritable.class);

            // 指定該 mapreduce 程序數(shù)據(jù)的輸入和輸出路徑
            Path input2=new Path("http://data/output");
            Path output2 =new Path("/data/output1");
            //一定要保證output不存在
            if(output2.getFileSystem(conf).exists(output2)){
                output2.getFileSystem(conf).delete(output2,true);  //遞歸刪除
            }
            FileInputFormat.addInputPath(job2,input2);
            FileOutputFormat.setOutputPath(job2,output2);

            //為每個(gè)任務(wù)創(chuàng)建ControlledJob
            ControlledJob  job1_cj=new ControlledJob(job1.getConfiguration());
            ControlledJob  job2_cj=new ControlledJob(job2.getConfiguration());
            //綁定
            job1_cj.setJob(job1);
            job2_cj.setJob(job2);

            // 設(shè)置作業(yè)依賴關(guān)系
            job2_cj.addDependingJob(job2_cj);  //job2 依賴于job1

            //創(chuàng)建jobControl
            JobControl jc=new JobControl("sum and sort");
            jc.addJob(job1_cj);
            jc.addJob(job2_cj);

            //使用線程開啟Job
            Thread  jobThread=new Thread(jc);
            //開啟任務(wù)
            jobThread.start();

            //為了保證主程序不終止,沒0.5秒檢查一次是否完成作業(yè)
            while(!jc.allFinished()){
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //罪作業(yè)完成之后,終止線程,釋放資源
            jc.stop();

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

2.TopN 算法實(shí)現(xiàn)(二次排序)

需求:求每個(gè)班級(jí)的總分最高的前三名
字段:班級(jí) 姓名 數(shù)學(xué) 語文 英語 (字段之間是制表符分割)
分析
  - 利用“班級(jí)和總分”作為 key,可以將 map 階段讀取到的所有學(xué)生成績(jī)數(shù)據(jù)按照班級(jí)和成績(jī)排倒序,發(fā)送到 reduce
  - 在 reduce 端利用 GroupingComparator 將班級(jí)相同的 kv 聚合成組,然后取前三個(gè)即是前三名
代碼實(shí)現(xiàn)
自定義學(xué)生類

public class Student implements WritableComparable<Student> {
    private String t_class;
    private String t_name;
    private int t_sumSource;

    public Student(){

    }
    public void set(String t_class,String t_name,int chinese,int math,int english){
        this.t_class=t_class;
        this.t_name=t_name;
        this.t_sumSource=chinese+math+english;
    }
    public String getT_class() {
        return t_class;
    }

    public void setT_class(String t_class) {
        this.t_class = t_class;
    }

    public String getT_name() {
        return t_name;
    }

    public void setT_name(String t_name) {
        this.t_name = t_name;
    }

    public int getT_sumSource() {
        return t_sumSource;
    }

    public void setT_sumSource(int t_sumSource) {
        this.t_sumSource = t_sumSource;
    }

    //比較規(guī)則
    @Override
    public int compareTo(Student stu) {
        //首先根據(jù)班級(jí)比較
        int result1=this.t_class.compareTo(stu.t_class);
        //班級(jí)相同的在根據(jù)總分比較
        if(result1==0){
            return stu.t_sumSource-this.t_sumSource;
        }
        return result1;
    }
    //序列化
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(this.t_class);
        out.writeUTF(this.t_name);
        out.writeInt(this.t_sumSource);

    }
    //反序列化
    @Override
    public void readFields(DataInput in) throws IOException {
        this.t_class=in.readUTF();
        this.t_name=in.readUTF();
        this.t_sumSource=in.readInt();
    }
}

自定義分組

//自定義分組規(guī)則
    private static class MyGroupComparator extends  WritableComparator{
        //這句代碼必須要加,并且要調(diào)用父類的構(gòu)造
        public MyGroupComparator(){
            super(Student.class, true);
        }

        /
          決定輸入到 reduce 的數(shù)據(jù)的分組規(guī)則
          根據(jù)班級(jí)進(jìn)行分組
         /
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            Student stu1=(Student)a;
            Student stu2=(Student)a;
            return stu1.getTclass().compareTo(stu2.getTclass());
        }
    }*

MR程序
//Mapper

    private static class MyMapper extends Mapper<LongWritable, Text, Student, NullWritable> {
        Student bean = new Student();
        NullWritable mv = NullWritable.get();

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String[] fields = value.toString().split("\\s+");
            //班級(jí)    姓名    數(shù)學(xué)    語文    英語
            String t_clas=fields[0];
            String t_name=fields[1];
            int chinese=Integer.parseInt(fields[2]);
            int math=Integer.parseInt(fields[3]);
            int english=Integer.parseInt(fields[4]);
            bean.set(t_clas,t_name,chinese,math,english);
            context.write(bean,mv);
        }
    }

    //Reducer
    private static class MyReducer extends Reducer<Student, NullWritable, Student, NullWritable> {
        @Override
        protected void reduce(Student key, Iterable<NullWritable> values, Context context)
                throws IOException, InterruptedException {
            int count =0;
            for(NullWritable value:values){
                if(count>2){
                    break;
                }
                context.write(key,value);
                count++;
            }
        }
    }

job

public class ClazzScoreGroupComparator {
    public static void main(String[] args) {
        Configuration conf=new Configuration(true);
        conf.set("fs.defaultFS","hdfs://zzy:9000");
        conf.set("fs.defaultFS", "hdfs://zzy:9000");
        conf.addResource("core-site.xml");
        conf.addResource("hdfs-site.xml");
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        try {
            Job job= Job.getInstance(conf);
            job.setJarByClass(ClazzScoreGroupComparator.class);
            job.setJobName("ClazzScoreGroupComparator");

            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);
            //指定自定義分組
            job.setGroupingComparatorClass(MyGroupComparator.class);

            job.setOutputKeyClass(Student.class);
            job.setOutputValueClass(NullWritable.class);

            // 指定該 mapreduce 程序數(shù)據(jù)的輸入和輸出路徑
            Path input=new Path("http://data/student.txt");
            Path output =new Path("/data/output2");
            //一定要保證output不存在
            if(output.getFileSystem(conf).exists(output)){
                output.getFileSystem(conf).delete(output,true);  //遞歸刪除
            }
            FileInputFormat.addInputPath(job,input);
            FileOutputFormat.setOutputPath(job,output);

            boolean success=job.waitForCompletion(true);
            System.exit(success?0:1);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

3. MapReduce 全局計(jì)數(shù)器

    介紹:計(jì)數(shù)器是用來記錄 job 的執(zhí)行進(jìn)度和狀態(tài)的。它的作用可以理解為日志。我們可以在程序的某個(gè)位置插入計(jì)數(shù)器,記錄數(shù)據(jù)或者進(jìn)度的變化情況,MapReduce 自帶了許多默認(rèn) Counter,現(xiàn)在我們來分析這些默認(rèn) Counter 的含義,方便大家觀察 Job 結(jié)果,如輸入的字節(jié)數(shù)、輸出的字節(jié)數(shù)、Map 端輸入/輸出的字節(jié)數(shù)和條數(shù)、Reduce 端的輸入/輸出的字節(jié)數(shù)和條數(shù)等。

需求:利用全局計(jì)數(shù)器來統(tǒng)計(jì)一個(gè)目錄下所有文件出現(xiàn)的單詞總數(shù)和總行數(shù)
代碼實(shí)現(xiàn)

public class CounterWordCount {
    public static void main(String[] args) {
        Configuration conf=new Configuration(true);
        conf.set("fs.defaultFS","hdfs://zzy:9000");
        conf.set("fs.defaultFS", "hdfs://zzy:9000");
        conf.addResource("core-site.xml");
        conf.addResource("hdfs-site.xml");
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        try {
            Job job= Job.getInstance(conf);
            job.setJarByClass(CounterWordCount.class);
            job.setJobName("CounterWordCount");

            job.setMapperClass(MyMapper.class);
            //設(shè)置reduceTask為0
            job.setNumReduceTasks(0);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);

            // 指定該 mapreduce 程序數(shù)據(jù)的輸入和輸出路徑
            Path input=new Path("http://data/");
            Path output =new Path("/data/output3");
            //一定要保證output不存在
            if(output.getFileSystem(conf).exists(output)){
                output.getFileSystem(conf).delete(output,true);  //遞歸刪除
            }
            FileInputFormat.addInputPath(job,input);
            FileOutputFormat.setOutputPath(job,output);

            boolean success=job.waitForCompletion(true);
            System.exit(success?0:1);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //定義枚舉 用于存放計(jì)數(shù)器
    enum CouterWordsCounts{COUNT_WORDS, COUNT_LINES}

    //Mapper
    private static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
        Text mk=new Text();
        LongWritable mv=new LongWritable();
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
           // 統(tǒng)計(jì)行數(shù),因?yàn)槟J(rèn)讀取文本是逐行讀取,所以 map 執(zhí)行一次,行數(shù)+1
            context.getCounter(CouterWordsCounts.COUNT_LINES).increment(1);
            String words[]=value.toString().split("\\s+");
            for(String word:words){
                context.getCounter(CouterWordsCounts.COUNT_WORDS).increment(1);
            }
        }
        //這個(gè)方法,在這個(gè)類的最后執(zhí)行
        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            mk.set("行數(shù):");
            mv.set(context.getCounter(CouterWordsCounts.COUNT_LINES).getValue());
            context.write(mk,mv);
            mk.set("單詞數(shù):");
            mv.set(context.getCounter(CouterWordsCounts.COUNT_WORDS).getValue());
            context.write(mk,mv);
        }
    }
}

4.MapReduce Join

    介紹:在各種實(shí)際業(yè)務(wù)場(chǎng)景中,按照某個(gè)關(guān)鍵字對(duì)兩份數(shù)據(jù)進(jìn)行連接是非常常見的。如果兩份數(shù)據(jù)都比較小,那么可以直接在內(nèi)存中完成連接。如果是大數(shù)據(jù)量的呢?顯然,在內(nèi)存中進(jìn)行連接會(huì)發(fā)生 OOM。MapReduce 可以用來解決大數(shù)據(jù)量的連接。在MapReduce join分兩種,map joinreduce join

map join

    介紹:MapJoin 適用于有一份數(shù)據(jù)較小的連接情況。做法是直接把該小份數(shù)據(jù)直接全部加載到內(nèi)存當(dāng)中,按鏈接關(guān)鍵字建立索引。然后大份數(shù)據(jù)就作為 MapTask 的輸入,對(duì) map()方法的每次輸入都去內(nèi)存當(dāng)中直接去匹配連接。然后把連接結(jié)果按 key 輸出.。


數(shù)據(jù)介紹
movies.dat:1::Toy Story (1995)::Animation|Children's|Comedy
字段含義:movieid, moviename, movietype
Ratings.dat:1::1193::5::978300760
字段含義:userid, movieid, rate, timestamp


代碼實(shí)現(xiàn)

public class MovieRatingMapJoinMR {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://hadoop02:9000");
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        try {
            Job job = Job.getInstance(conf);
            job.setJarByClass(MovieRatingMapJoinMR.class);
            job.setMapperClass(MyMapper.class);

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            job.setNumReduceTasks(0);

            String minInput = args[0];
            String maxInput = args[1];
            String output = args[2];
            FileInputFormat.setInputPaths(job, new Path(maxInput));
            Path outputPath = new Path(output);
            FileSystem fs = FileSystem.get(conf);
            if (fs.exists(outputPath)) {
                fs.delete(outputPath, true);
            }
            FileOutputFormat.setOutputPath(job, outputPath);

            //將小表加載到內(nèi)存
            URI uri=new Path(minInput).toUri();
            job.addCacheFile(uri);
            boolean status = job.waitForCompletion(true);
            System.exit(status?0:1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //Mapper
    private static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
        Text mk = new Text();
        Text mv = new Text();
        // 用來存儲(chǔ)小份數(shù)據(jù)的所有解析出來的 key-value
        private static Map<String, String> movieMap = new HashMap<String, String>();

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            //讀取加載到內(nèi)存的表數(shù)據(jù),并將數(shù)據(jù)的封裝到movieMap容器中
            URI[] cacheFiles = context.getCacheFiles();
            //獲取文件名
            String myfilePath = cacheFiles[0].toString();
            BufferedReader br = new BufferedReader(new FileReader(myfilePath));
            // 此處的 line 就是從文件當(dāng)中逐行讀到的 movie
            String line = "";
            while ((line = br.readLine()) != null) {
                //movieid::moviename::movietype
                String fields[] = line.split("::");
                movieMap.put(fields[0], fields[1] + "\\t" + fields[2]);
            }
            IOUtils.closeStream(br);
        }

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String[] fields = value.toString().split("::");
            //userid::movieid::rate::timestamp
            String userid = fields[0];
            String movieid = fields[1];
            String rate = fields[2];
            String timestamp = fields[3];
            if (movieMap.containsKey(userid)) {
                String movieFileds = movieMap.get(userid);
                mk.set(userid);
                mv.set(movieFileds + "\\t" + movieid + "\\t" + rate + "\\t" + timestamp);
                context.write(mk, mv);
            }
        }
    }
}

reduce join

    介紹
    - map 階段,兩份數(shù)據(jù) data1 和 data2 會(huì)被 map 分別讀入,解析成以鏈接字段為 key 以查詢字段為 value 的 key-value 對(duì),并標(biāo)明數(shù)據(jù)來源是 data1 還是 data2。
    - reduce 階段,reducetask 會(huì)接收來自 data1 和 data2 的相同 key 的數(shù)據(jù),在 reduce 端進(jìn)行乘積鏈接,最直接的影響是很消耗內(nèi)存,導(dǎo)致 OOM


數(shù)據(jù)介紹
movies.dat:1::Toy Story (1995)::Animation|Children's|Comedy
字段含義:movieid, moviename, movietype
Ratings.dat:1::1193::5::978300760
字段含義:userid, movieid, rate, timestamp


代碼實(shí)現(xiàn)

public class MovieRatingReduceJoinMR {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://zzy:9000");
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        try {
            Job job = Job.getInstance(conf);
            job.setJarByClass(MovieRatingReduceJoinMR.class);
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            String Input = args[0];
            String output = args[1];

            FileInputFormat.setInputPaths(job, new Path(Input));
            Path outputPath = new Path(output);
            FileSystem fs = FileSystem.get(conf);
            if (fs.exists(outputPath)) {
                fs.delete(outputPath, true);
            }
            FileOutputFormat.setOutputPath(job, outputPath);
            boolean status = job.waitForCompletion(true);
            System.exit(status?0:1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //Mapper
    private static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
        private String name;
        Text mk = new Text();
        Text mv = new Text();

        //獲取文件名
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {

            //InputSplit是一個(gè)抽象類,使用它的實(shí)現(xiàn)類FileSplit
            FileSplit is=(FileSplit)context.getInputSplit();
            name=is.getPath().getName();
        }
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            //movies.dat movieid::moviename::movietype
            //ratings.dat userid::movieid::rate::timestamp
            String OutputKey=null;
            String OutputValue=null;
            String fields[]=value.toString().split("::");
            if(name.endsWith("movies.dat")){
                OutputKey=fields[0];
                OutputValue=fields[1]+"\t"+fields[2]+"_"+"movies";
            }else if(name.endsWith("ratings.dat")){
                OutputKey=fields[1];
                OutputValue=fields[0]+"\t"+fields[2]+"\t"+fields[3]+"_"+"ratings";
            }
            mk.set(OutputKey);
            mv.set(OutputValue);
            context.write(mk,mv);
        }
    }
    //Reducer
    private static class MyReducer extends Reducer< Text, Text, Text, Text>{
        Text rv=new Text();
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            List<String> movies=new ArrayList<>();
            List<String> ratings=new ArrayList<>();
            //將數(shù)據(jù)分別添加到存放兩張表字段的容器中
            for(Text value:values){
              String fields[]= value.toString().split("_");
              if(fields[1].equals("movies")){
                  movies.add(fields[0]);
              }else if(fields[1].equals("ratings")){
                  ratings.add(fields[0]);
              }
            }
            //連接兩個(gè)表的數(shù)據(jù)
            if(ratings.size()>0&&movies.size()>0){
                for(String movie:movies){
                    for(String rate:ratings){
                        rv.set(movie+"\t"+rate);
                        context.write(key,rv);
                    }
                }
            }
        }
    }
}
向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI