溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

mapreduce 模板代碼

發(fā)布時(shí)間:2020-06-27 01:52:39 來源:網(wǎng)絡(luò) 閱讀:716 作者:jethai 欄目:大數(shù)據(jù)



jai包

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-core</artifactId>
    <version>1.2.1</version>
</dependency>

2.x以后就拆成一些零散的包了,沒有core包了



代碼:

package org.conan.myhadoop.mr;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
//org.apache.hadoop.mapred 老系統(tǒng)的包
//org.apache.hadoop.mapreduce 新系統(tǒng)的包 
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/*
 * ModuleMapReduce Class
 * 單純的注釋 
 */
public class ModuleMapReduce extends Configured implements Tool {

    /**
     * 
     * ModuleMapper Class 不僅有注釋的功效而且你鼠標(biāo)放在你注釋的方法上面他會(huì)把你注釋的內(nèi)容顯示出來,
     * 
     */
    public static class ModuleMapper extends
            Mapper<LongWritable, Text, LongWritable, Text>

    {

        @Override
        public void setup(Context context) throws IOException,
                InterruptedException {

            super.setup(context);
        }

        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // TODO

        }

        @Override
        public void cleanup(Context context) throws IOException,
                InterruptedException {

            super.cleanup(context);
        }

    }

    /**
     * 
     * ModuleReducer Class
     * 
     */
    public static class ModuleReducer extends
            Reducer<LongWritable, Text, LongWritable, Text> {
        @Override
        public void setup(Context context) throws IOException,
                InterruptedException {
            // TODO Auto-generated method stub
            super.setup(context);
        }

        @Override
        protected void reduce(LongWritable key, Iterable<Text> value,
                Context context) throws IOException, InterruptedException {
            // TODO

        }

        @Override
        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            super.cleanup(context);
        }

    }

    // Driver 驅(qū)動(dòng)
    // @Override //實(shí)現(xiàn)接口時(shí)關(guān)鍵字1.5和1.7的JDK都會(huì)報(bào)錯(cuò),只有1.6不報(bào)錯(cuò)
    public int run(String[] args) throws Exception {
        Job job = parseInputAndOutput(this, this.getConf(), args);
        // 2.set job

        // step 1:set input
        job.setInputFormatClass(TextInputFormat.class);

        // step 3:set mappper class
        job.setMapperClass(ModuleMapper.class);
        // step 4:set mapout key/value class
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);

        // step 5:set shuffle(sort,combiner,group)
        // set sort
        job.setSortComparatorClass(LongWritable.Comparator.class);
        // set combiner(optional,default is unset)必須是Reducer的子類
        job.setCombinerClass(ModuleReducer.class);
        // set grouping
        job.setGroupingComparatorClass(LongWritable.Comparator.class);
        // step 6 set reducer class
        job.setReducerClass(ModuleReducer.class);
        // step 7:set job output key/value class
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        // step 8:set output format
        job.setOutputFormatClass(FileOutputFormat.class);

        // step 10: submit job
        Boolean isCompletion = job.waitForCompletion(true);// 提交job
        return isCompletion ? 0 : 1;
    }

    public Job parseInputAndOutput(Tool tool, Configuration conf, String[] args)
            throws IOException {
        // 輸入?yún)?shù)的合法性
        if (args.length != 2) {
            System.err.printf(
                    "Usage: %s [generic options] <input> <output> \n ", tool
                            .getClass().getSimpleName());
      //%s表示輸出字符串,也就是將后面的字符串替換模式中的%s
            ToolRunner.printGenericCommandUsage(System.err);
            return null;
        }

        // 1.create job

        Job job = Job.getInstance(conf, this.getClass().getSimpleName());
        job.setJarByClass(ModuleMapReduce.class);
        // step 2:set input path
        Path inputPath = new Path(args[0]);
        FileInputFormat.addInputPath(job, inputPath);
        // step 9:set output path
        Path outputPath = new Path(args[0]);
        FileOutputFormat.setOutputPath(job, outputPath);

        return job;
    }

    public static void main(String[] args) {
        try {
            int status = ToolRunner.run(new ModuleMapReduce(), args);// 返回值即為isCompletion ? 0 : 1
            System.exit(status);// System.exit(0)中斷虛擬機(jī)的運(yùn)行,退出應(yīng)用程序,0表示沒有異常正常退出。
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


倒排索引代碼


輸入文件如下:
13588888888 112
13678987879 13509098987
18987655436 110
2543789    112
15699807656 110
011-678987 112
說明:每一行為一條電話通話記錄,左邊的號(hào)碼(記為a)打給右邊的號(hào)碼(記為b號(hào)碼),中間用空格隔開

要求:
將以上文件以如下格式輸出:
110 18987655436|15699807656
112 13588888888|011-678987
13509098987 13678987879
說明:左邊為被呼叫的號(hào)碼b,右邊為呼叫b的號(hào)碼a以"|"分割

package org.conan.myhadoop.mr;

import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ReverseIndex extends Configured implements Tool {

    enum Counter {
        LINESKIP, // 出錯(cuò)的行
    }

    public static class Map extends Mapper {
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString(); // 讀取源數(shù)據(jù)
            try {
                // 數(shù)據(jù)處理
                String[] lineSplit = line.split(" ");
                String anum = lineSplit[0];
                String bnum = lineSplit[1];
                context.write(new Text(bnum), new Text(anum)); // 輸出

            } catch (java.lang.ArrayIndexOutOfBoundsException e) {
                context.getCounter(Counter.LINESKIP).increment(1); // 出錯(cuò)hang計(jì)數(shù)器+1
                return;
            }
        }
    }

    public static class Reduce extends Reducer {
        public void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            String valueString;
            String out = "";
            for (Text value : values) {
                valueString = value.toString();
                out += valueString + "|";
                System.out.println("Ruduce:key=" + key + "  value=" + value);
            }
            context.write(key, new Text(out));
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = this.getConf();

        Job job = new Job(conf, "ReverseIndex"); // 任務(wù)名
        job.setJarByClass(ReverseIndex.class); // 指定Class

        FileInputFormat.addInputPath(job, new Path(args[0])); // 輸入路徑
        FileOutputFormat.setOutputPath(job, new Path(args[1])); // 輸出路徑

        job.setMapperClass(Map.class); // 調(diào)用上面Map類作為Map任務(wù)代碼
        job.setReducerClass(ReverseIndex.Reduce.class); // 調(diào)用上面Reduce類作為Reduce任務(wù)代碼

        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class); // 指定輸出的KEY的格式
        job.setOutputValueClass(Text.class); // 指定輸出的VALUE的格式

        job.waitForCompletion(true);

        // 輸出任務(wù)完成情況
        System.out.println("任務(wù)名稱:" + job.getJobName());
        System.out.println("任務(wù)成功:" + (job.isSuccessful() ? "是" : "否"));
        System.out.println("輸入行數(shù):"
                + job.getCounters()
                        .findCounter("org.apache.hadoop.mapred.Task$Counter",
                                "MAP_INPUT_RECORDS").getValue());
        System.out.println("輸出行數(shù):"
                + job.getCounters()
                        .findCounter("org.apache.hadoop.mapred.Task$Counter",
                                "MAP_OUTPUT_RECORDS").getValue());
        System.out.println("跳過的行:"
                + job.getCounters().findCounter(Counter.LINESKIP).getValue());

        return job.isSuccessful() ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        // 判斷參數(shù)個(gè)數(shù)是否正確
        // 如果無參數(shù)運(yùn)行則顯示以作程序說明
        if (args.length != 2) {
            System.err.println("");
            System.err
                    .println("Usage: ReverseIndex < input path > < output path > ");
            System.err
                    .println("Example: hadoop jar ~/ReverseIndex.jar hdfs://localhost:9000/in/telephone.txt hdfs://localhost:9000/out");

            System.exit(-1);
        }
        // 記錄開始時(shí)間
        DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date start = new Date();
        // 運(yùn)行任務(wù)
        int res = ToolRunner.run(new Configuration(), new ReverseIndex(), args);

        // 輸出任務(wù)耗時(shí)
        Date end = new Date();
        float time = (float) ((end.getTime() - start.getTime()) / 60000.0);
        System.out.println("任務(wù)開始:" + formatter.format(start));
        System.out.println("任務(wù)結(jié)束:" + formatter.format(end));
        System.out.println("任務(wù)耗時(shí):" + String.valueOf(time) + " 分鐘");

        System.exit(res);
   }
    
}


去重代碼

 //Mapper任務(wù)
      static class DDMap extends Mapper<LongWritable,Text,Text,Text>{
       private static Text line = new Text();
       protected void map(LongWritable k1,Text v1,Context context){
        line = v1;
        Text text = new Text("");
         try {
          context.write(line,text);
         } catch (IOException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
         } catch (InterruptedException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
         }
       };
      }

    //Reducer任務(wù)
      static class DDReduce extends Reducer<Text,Text,Text,Text>{
       protected void reduce(Text k2,Iterable<Text> v2s,Context context){
        Text text = new Text("");
        try {
         context.write(k2, text);
        } catch (IOException e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
        } catch (InterruptedException e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
        }
       };
      }


參考文章;

一個(gè)經(jīng)典的MapReduce模板代碼,倒排索引(ReverseIndex)

http://blog.itpub.net/26400547/viewspace-1214945/

詳解MapReduce實(shí)現(xiàn)數(shù)據(jù)去重與倒排索引應(yīng)用場(chǎng)景案例

http://www.tuicool.com/articles/emi6Fb



向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI