溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Combiner怎么使用

發(fā)布時(shí)間:2021-12-23 16:06:29 來(lái)源:億速云 閱讀:189 作者:iii 欄目:大數(shù)據(jù)

本篇內(nèi)容介紹了“Combiner怎么使用”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

1、Combiner的使用~合并

1、combiner的工作位置:
   kv從緩沖區(qū)中溢寫到磁盤時(shí)可以使用combiner(只要設(shè)置,無(wú)條件使用)
   每個(gè)MapTask的所有數(shù)據(jù)都從緩沖區(qū)寫到磁盤后,在進(jìn)行歸并時(shí)可以使用combiner(滿足條件使用,溢寫次數(shù)>=3)

2、Combiner:  合并
   目的就是在每個(gè)MapTask中將輸出的kv提前進(jìn)行局部合并。
   能夠降低map到reduce傳輸?shù)膋v對(duì)數(shù)量及 reduce最終處理的數(shù)據(jù)量. 

3、Combiner使用限制:
   在不改變業(yè)務(wù)邏輯的情況下才能使用combiner.
   --例如:求平均值時(shí),就不宜使用
   
4、Combiner組件父類就是Reducer
	Combiner是在每一個(gè)MapTask所在的節(jié)點(diǎn)運(yùn)行;
	Reducer是接收全局所有Mappei的輸出結(jié)果;
1、自定義Compiner類
/**
 * combiner作用:
 * 在mapTask進(jìn)行溢寫時(shí),對(duì)每一個(gè)mapTask輸出的數(shù)據(jù)提前進(jìn)行局部匯總,減少寫進(jìn)reduceTask的整體數(shù)據(jù)量
 * 注意:自定義Combiner類,屬于MapTask階段(雖然它繼承Reducer)
 */
public class WordCountCombiner extends Reducer<Text, IntWritable,Text,IntWritable> {
    int count = 0;
    @Override
    protected void reduce(Text key, 
	Iterable<IntWritable> values, Context context) throws Exception{
        for (IntWritable value : values) {
            count+=value.get();
        }
        context.write(key,new IntWritable(count));
    }
}
2、WordCountMapper
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
    private Text outk = new Text();
    private IntWritable outv = new IntWritable(1);

    @Override
    protected void map(LongWritable key,Text value,Context context) throws Exception {
//        獲取輸入到的一行數(shù)據(jù)
        String lineData = value.toString();
//        提前分析知道,按照空格進(jìn)行切割,得到每個(gè)單詞
        String[] splitData = lineData.split(" ");
//        遍歷數(shù)據(jù),將切割得到的數(shù)據(jù)寫出
        for (String str : splitData) {
//            注意,這里得到的數(shù)據(jù)類型是String,需要轉(zhuǎn)為Text
            outk.set(str);
            context.write(outk,outv);
        }
    }
}
3、WordCountReduce
public class WordCountReduce extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable outv = new IntWritable();
    
    @Override
    protected void reduce(Text key, 
	Iterable<IntWritable> values, Context context) throws Exception {
//        定義一個(gè)變量,用來(lái)接收遍歷中次數(shù)匯總
        int count = 0;
//        直接讀取values,獲取到迭代器對(duì)象中記錄的每個(gè)單詞出現(xiàn)次數(shù)
        for (IntWritable value : values) {
//        因?yàn)榈玫降膙alue對(duì)象是IntWritable對(duì)象,不可以直接進(jìn)行加操作,所以要轉(zhuǎn)換為int
            count += value.get();   //get()方法轉(zhuǎn)為int
        }
//        寫出計(jì)算之后的數(shù)據(jù),對(duì)count類型進(jìn)行轉(zhuǎn)換
        outv.set(count);
        context.write(key,outv);
    }
}
4、WordCountDriver
public class WordCountDriver {
    public static void main(String[] args) throws Exception {
//        1、獲取job對(duì)象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
//        2、關(guān)聯(lián)jar,配置執(zhí)行程序時(shí),使用的具體驅(qū)動(dòng)類
        job.setJarByClass(WordCountDriver.class);
//        3、關(guān)聯(lián)mapper 和 reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReduce.class);
//        4、設(shè)置mapper的輸出的key和value類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
//        5、設(shè)置程序最終輸出的key和value類型,如果有reducer
//        就寫reducer輸出的kv類型,如果沒(méi)有reducer,就寫mapper輸出的kv類型.
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
		
//      設(shè)置自定義Combiner類
        job.setCombinerClass(WordCountCombiner.class);
// 	job.setCombinerClass(WordCountReduce.class);也能這樣用

        job.setInputFormatClass(CombineTextInputFormat.class);
        CombineTextInputFormat.setMaxInputSplitSize(job,4194304);

//        6、設(shè)置文件的輸入和輸出路徑
        FileInputFormat.setInputPaths(job,new Path("D:\\io\\hadooptest\\combineinput"));
         //要求該路徑不能存在,交給mr程序創(chuàng)建
	FileOutputFormat.setOutputPath(job,new Path("D:\\io\\hadooptest\\Combineroutput2")); 
//        7、提交job
        job.waitForCompletion(true);
    }
}

2、OutPutFormat數(shù)據(jù)輸出

2.1、OutputFormat介紹

①:Outputformat是一個(gè)接口,其內(nèi)部定義兩個(gè)抽象方法
--RecordWriter<K, V> getRecordWriter(FileSystem ignored,
JobConf job,String name,Progressable progress):
該方法用來(lái)獲取RecordWriter對(duì)象,主負(fù)責(zé)數(shù)據(jù)的寫出操作.

--void checkOutputSpecs(FileSystem ignored, JobConf job):
該方法用來(lái)檢測(cè)輸出路徑,當(dāng)driver中的輸出路徑存在時(shí),會(huì)由該方法的實(shí)現(xiàn)類拋出異常
//131行拋出異常("Output directory " + outDir + " already exists")

②:通過(guò)ctrl+h 查看當(dāng)前接口的實(shí)現(xiàn)類如下圖
--TextOutputFormat(hadoop默認(rèn)使用的寫出方式),按行寫出,內(nèi)部重寫了getRecordWriter()方法
--SequenceFileOutputFormat(最終寫出的文件是二進(jìn)制格式)
--MultipleOutputFormat(子抽象類,其下還有具體實(shí)現(xiàn)方法)

![OutputFormat實(shí)現(xiàn)類](https://oscimg.oschina.net/oscnet/up- 777fe19a5bf6864396beac3aa83d8350e9e.png "OutputFormat實(shí)現(xiàn)類")

2.2、自定義輸出類

//1、LogMapper
public class LogMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws Exception {
        context.write(value,NullWritable.get());
    }
}
//2、LogReducer
public class LogReducer extends Reducer<Text, NullWritable,Text,NullWritable> {
    @Override
    protected void reduce(Text key,
	Iterable<NullWritable> values, Context context) throws Exception{
        for (NullWritable value : values) {
            context.write(key,NullWritable.get());
        }
    }
}
//3、MyOutPutFormat
public class MyOutPutFormat extends FileOutputFormat<Text, NullWritable> {
    /**
     * 重寫getRecordWriter()方法,在內(nèi)部自定義一個(gè)寫出類
     * @param job
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    public RecordWriter<Text, NullWritable> 
		getRecordWriter(TaskAttemptContext job) throws Exception {
        LogRecordWriter rw = new LogRecordWriter(job.getConfiguration());
        return rw;
    }
}
//4、LogRecordWriter
/**
 * 自定義LogRecordWriter對(duì)象需要繼承RecordWriter類
 *
 * 需求:
 *     將包含"luck"的日志數(shù)據(jù)寫到   D:/bigtools/luck.log
 *     將不包含"luck"的日志數(shù)據(jù)寫到 D:/bigtools/other.log
 */
public class LogRecordWriter extends RecordWriter {

//    文件輸出路徑
    private String luckPath = "D:/bigtools/luck.log";
    private String otherPath = "D:/bigtools/other.log";
    private FSDataOutputStream atguiguOut;
    private FSDataOutputStream otherOut;
    private FileSystem fs;
    /**
     * 初始化
     * @param conf
     */
    public LogRecordWriter(Configuration conf){
        try {
            fs = FileSystem.get(conf);
            luckOut = fs.create(new Path(luckPath));
            otherOut = fs.create(new Path(otherPath));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * 重寫write方法
     * @param key
     * @param value
     * @throws IOException
     * @throws InterruptedException
     */
    public void write(Object key, Object value) throws Exception {
        String log = key.toString();
        if(log.contains("luck")){
            luckOut.writeBytes(log + "\n");
        }else{
            otherOut.writeBytes(log + "\n");
        }
    }
    /**
     * 關(guān)閉流
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        IOUtils.closeStream(luckOut);
        IOUtils.closeStream(otherOut);
    }
}
//5、LogDriver
public class LogDriver {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(LogDriver.class);

        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
//        設(shè)置自定義輸出
        job.setOutputFormatClass(MyOutPutFormat.class);

        FileInputFormat.setInputPaths(job,new Path("D:\\io\\hadooptest\\loginput"));
        FileOutputFormat.setOutputPath(job,new Path("D:\\io\\hadooptest\\logoutput"));
        job.waitForCompletion(true);
    }
}

“Combiner怎么使用”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI