您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“Combiner怎么使用”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
1、combiner的工作位置: kv從緩沖區(qū)中溢寫到磁盤時(shí)可以使用combiner(只要設(shè)置,無(wú)條件使用) 每個(gè)MapTask的所有數(shù)據(jù)都從緩沖區(qū)寫到磁盤后,在進(jìn)行歸并時(shí)可以使用combiner(滿足條件使用,溢寫次數(shù)>=3) 2、Combiner: 合并 目的就是在每個(gè)MapTask中將輸出的kv提前進(jìn)行局部合并。 能夠降低map到reduce傳輸?shù)膋v對(duì)數(shù)量及 reduce最終處理的數(shù)據(jù)量. 3、Combiner使用限制: 在不改變業(yè)務(wù)邏輯的情況下才能使用combiner. --例如:求平均值時(shí),就不宜使用 4、Combiner組件父類就是Reducer Combiner是在每一個(gè)MapTask所在的節(jié)點(diǎn)運(yùn)行; Reducer是接收全局所有Mappei的輸出結(jié)果;
1、自定義Compiner類 /** * combiner作用: * 在mapTask進(jìn)行溢寫時(shí),對(duì)每一個(gè)mapTask輸出的數(shù)據(jù)提前進(jìn)行局部匯總,減少寫進(jìn)reduceTask的整體數(shù)據(jù)量 * 注意:自定義Combiner類,屬于MapTask階段(雖然它繼承Reducer) */ public class WordCountCombiner extends Reducer<Text, IntWritable,Text,IntWritable> { int count = 0; @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws Exception{ for (IntWritable value : values) { count+=value.get(); } context.write(key,new IntWritable(count)); } }
2、WordCountMapper public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> { private Text outk = new Text(); private IntWritable outv = new IntWritable(1); @Override protected void map(LongWritable key,Text value,Context context) throws Exception { // 獲取輸入到的一行數(shù)據(jù) String lineData = value.toString(); // 提前分析知道,按照空格進(jìn)行切割,得到每個(gè)單詞 String[] splitData = lineData.split(" "); // 遍歷數(shù)據(jù),將切割得到的數(shù)據(jù)寫出 for (String str : splitData) { // 注意,這里得到的數(shù)據(jù)類型是String,需要轉(zhuǎn)為Text outk.set(str); context.write(outk,outv); } } }
3、WordCountReduce public class WordCountReduce extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable outv = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws Exception { // 定義一個(gè)變量,用來(lái)接收遍歷中次數(shù)匯總 int count = 0; // 直接讀取values,獲取到迭代器對(duì)象中記錄的每個(gè)單詞出現(xiàn)次數(shù) for (IntWritable value : values) { // 因?yàn)榈玫降膙alue對(duì)象是IntWritable對(duì)象,不可以直接進(jìn)行加操作,所以要轉(zhuǎn)換為int count += value.get(); //get()方法轉(zhuǎn)為int } // 寫出計(jì)算之后的數(shù)據(jù),對(duì)count類型進(jìn)行轉(zhuǎn)換 outv.set(count); context.write(key,outv); } }
4、WordCountDriver public class WordCountDriver { public static void main(String[] args) throws Exception { // 1、獲取job對(duì)象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2、關(guān)聯(lián)jar,配置執(zhí)行程序時(shí),使用的具體驅(qū)動(dòng)類 job.setJarByClass(WordCountDriver.class); // 3、關(guān)聯(lián)mapper 和 reducer job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReduce.class); // 4、設(shè)置mapper的輸出的key和value類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5、設(shè)置程序最終輸出的key和value類型,如果有reducer // 就寫reducer輸出的kv類型,如果沒(méi)有reducer,就寫mapper輸出的kv類型. job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 設(shè)置自定義Combiner類 job.setCombinerClass(WordCountCombiner.class); // job.setCombinerClass(WordCountReduce.class);也能這樣用 job.setInputFormatClass(CombineTextInputFormat.class); CombineTextInputFormat.setMaxInputSplitSize(job,4194304); // 6、設(shè)置文件的輸入和輸出路徑 FileInputFormat.setInputPaths(job,new Path("D:\\io\\hadooptest\\combineinput")); //要求該路徑不能存在,交給mr程序創(chuàng)建 FileOutputFormat.setOutputPath(job,new Path("D:\\io\\hadooptest\\Combineroutput2")); // 7、提交job job.waitForCompletion(true); } }
①:Outputformat是一個(gè)接口,其內(nèi)部定義兩個(gè)抽象方法 --RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,String name,Progressable progress): 該方法用來(lái)獲取RecordWriter對(duì)象,主負(fù)責(zé)數(shù)據(jù)的寫出操作. --void checkOutputSpecs(FileSystem ignored, JobConf job): 該方法用來(lái)檢測(cè)輸出路徑,當(dāng)driver中的輸出路徑存在時(shí),會(huì)由該方法的實(shí)現(xiàn)類拋出異常 //131行拋出異常("Output directory " + outDir + " already exists") ②:通過(guò)ctrl+h 查看當(dāng)前接口的實(shí)現(xiàn)類如下圖 --TextOutputFormat(hadoop默認(rèn)使用的寫出方式),按行寫出,內(nèi)部重寫了getRecordWriter()方法 --SequenceFileOutputFormat(最終寫出的文件是二進(jìn)制格式) --MultipleOutputFormat(子抽象類,其下還有具體實(shí)現(xiàn)方法)
![OutputFormat實(shí)現(xiàn)類](https://oscimg.oschina.net/oscnet/up- 777fe19a5bf6864396beac3aa83d8350e9e.png "OutputFormat實(shí)現(xiàn)類")
//1、LogMapper public class LogMapper extends Mapper<LongWritable, Text,Text, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws Exception { context.write(value,NullWritable.get()); } }
//2、LogReducer public class LogReducer extends Reducer<Text, NullWritable,Text,NullWritable> { @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws Exception{ for (NullWritable value : values) { context.write(key,NullWritable.get()); } } }
//3、MyOutPutFormat public class MyOutPutFormat extends FileOutputFormat<Text, NullWritable> { /** * 重寫getRecordWriter()方法,在內(nèi)部自定義一個(gè)寫出類 * @param job * @return * @throws IOException * @throws InterruptedException */ public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws Exception { LogRecordWriter rw = new LogRecordWriter(job.getConfiguration()); return rw; } }
//4、LogRecordWriter /** * 自定義LogRecordWriter對(duì)象需要繼承RecordWriter類 * * 需求: * 將包含"luck"的日志數(shù)據(jù)寫到 D:/bigtools/luck.log * 將不包含"luck"的日志數(shù)據(jù)寫到 D:/bigtools/other.log */ public class LogRecordWriter extends RecordWriter { // 文件輸出路徑 private String luckPath = "D:/bigtools/luck.log"; private String otherPath = "D:/bigtools/other.log"; private FSDataOutputStream atguiguOut; private FSDataOutputStream otherOut; private FileSystem fs; /** * 初始化 * @param conf */ public LogRecordWriter(Configuration conf){ try { fs = FileSystem.get(conf); luckOut = fs.create(new Path(luckPath)); otherOut = fs.create(new Path(otherPath)); } catch (IOException e) { e.printStackTrace(); } } /** * 重寫write方法 * @param key * @param value * @throws IOException * @throws InterruptedException */ public void write(Object key, Object value) throws Exception { String log = key.toString(); if(log.contains("luck")){ luckOut.writeBytes(log + "\n"); }else{ otherOut.writeBytes(log + "\n"); } } /** * 關(guān)閉流 * @param context * @throws IOException * @throws InterruptedException */ public void close(TaskAttemptContext context) throws IOException, InterruptedException { IOUtils.closeStream(luckOut); IOUtils.closeStream(otherOut); } }
//5、LogDriver public class LogDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(LogDriver.class); job.setMapperClass(LogMapper.class); job.setReducerClass(LogReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 設(shè)置自定義輸出 job.setOutputFormatClass(MyOutPutFormat.class); FileInputFormat.setInputPaths(job,new Path("D:\\io\\hadooptest\\loginput")); FileOutputFormat.setOutputPath(job,new Path("D:\\io\\hadooptest\\logoutput")); job.waitForCompletion(true); } }
“Combiner怎么使用”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。