您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“hadoop多文件輸出新舊API的方法是什么”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
一般來(lái)說(shuō)Map/Reduce都是輸出一組文件,但是有些情況下需要我們輸出多組文件,比如我上面提到的需求,接下來(lái)我用新舊API分別說(shuō)明如何實(shí)現(xiàn)多文件輸出
舊API:
MultipleTextOutputFormat 這個(gè)類(lèi)很重要,我們其實(shí)只要寫(xiě)個(gè)類(lèi)繼承MultipleTextOutputFormat,并且重寫(xiě)generateFileNameForKeyValue(Object key, Object value, String name)方法就好了。因?yàn)镸ultipleTextOutputFormat中有個(gè)write方法,即將記錄寫(xiě)到hdfs上,在這個(gè)方法中,會(huì)調(diào)用generateFileNameForKeyValue。廢話不多說(shuō),上代碼:
public class MultiFileOutputFormat extends MultipleTextOutputFormat<Object, Object>{ @Override protected String generateFileNameForKeyValue(Object key, Object value, String name) { if(key instanceof OutputFileName){ return ((OutputFileName) key).getPath()+"/"+name; }else{ return super.generateFileNameForKeyValue(key, value, name); } } }
其中OutputFileName是我自己定義的枚舉類(lèi),便于管理而已,這里也可以return一個(gè)路徑,以下是OutputFileName的代碼
public enum OutputFileName { ERRORLOG("errorlog","logtype=errorlog"), APIREQUEST("apiRequest","logtype=apiRequest"), FIRSTINTOTIME("firstIntoTime","logtype=firstIntoTime"), TABFLUSHTIME("tabFlushTime","logtype=tabFlushTime"), PERFORMANCE("performance","logtype=performance"), FILEREQUEST("fileRequest","logtype=fileRequest"); private String name; private String path; private String tempPath; private OutputFileName(String name,String path){ this.name = name; this.path = path; } public String getName(){ return this.name; } public String getPath(){ if(!StringUtil.isEmpty(tempPath)){ String temp = this.tempPath; this.tempPath = null; return temp; }else{ return this.path; } } }
如何使用MultiFileOutputFormat這個(gè)自己寫(xiě)的類(lèi)呢?就這么用
//job所在類(lèi)的main方法中 JobConf conf = new JobConf(config,XXX.class); conf.setOutputFormat(MultiFileOutputFormat.class); //map函數(shù)中 collector.collect(OutputFileName.ERRORLOG, new Text(log));
此示例做了以上 的操作就可以將數(shù)據(jù)寫(xiě)到logtype=errorlog目錄下了,當(dāng)然可以根據(jù)不同的日志去設(shè)置輸出目錄了
新API:
對(duì)于新的API,我沒(méi)發(fā)現(xiàn)MultipleTextOutputFormat這個(gè)類(lèi),很頭疼,我甚至看了源碼,仿照舊API自己寫(xiě)了MultipleTextOutputFormat,這就需要做很多事情,必須寫(xiě)個(gè)集成RecordWriter的類(lèi),重寫(xiě)里面的方法,當(dāng)時(shí)確實(shí)可以做到將數(shù)據(jù)寫(xiě)到不同的路徑下,但是也有bug,數(shù)據(jù)很多的時(shí)候,路徑下的數(shù)據(jù)只有一部分保留,做了一下測(cè)試,確實(shí)把所有的記錄都寫(xiě)了,但卻只是把最后寫(xiě)的一部分保留在設(shè)定好的路徑下了,至今都沒(méi)發(fā)現(xiàn)原因,這里就不給代碼了,只能保留60多萬(wàn)行的記錄
當(dāng)然我還是有辦法的,經(jīng)過(guò)百般折磨,終于在網(wǎng)上找到相關(guān)資料,使用這個(gè)類(lèi)MultipleOutputs,查查API,還真有,只不過(guò)是在org.apache.hadoop.mapreduce.lib.output包下,這個(gè)類(lèi)相當(dāng)于把舊的API東西又重新整理了一遍,我們不用再去寫(xiě)其他的類(lèi)集成MultipleTextOutputFormat。具體使用方法看代碼吧
public static class MapperClass extends Mapper<Object, Text, Text, NullWritable> { private Text outkey = new Text(""); private MultipleOutputs<Text, NullWritable> mos; public void map(Object key, Text value, Context context) throws IOException,InterruptedExceptio{ String log = value.toString(); outkey.set(log); int begin = log.indexOf("@[#("); if(begin != -1){ String logForSplit = log.substring(begin+"@".length()); String [] split = logForSplit.split("#"); if(split != null && split.length >0){ String cType = split[0]; if(!StringUtil.isEmpty(cType)){ if("apiRequest".equals(cType)){ mos.write("apiRequest", outkey, NullWritable.get()); }else if("errlog".equals(cType)){ mos.write("errorlog", outkey, NullWritable.get()); } } } } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { mos.close(); super.cleanup(context); } @Override protected void setup(Context context) throws IOException, InterruptedException { mos = new MultipleOutputs<Text, NullWritable>(context); super.setup(context); } }
public class TestJob { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); Job job = new Job(conf, "ss"); job.setInputFormatClass(TrackInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setJarByClass(TestJob.class); job.setMapperClass(TestJob.MapperClass.class); job.setNumReduceTasks(0); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); if(inputPaths.length > 0){ Path[] paths = new Path[inputPaths.length]; for(int i = 0 ; i < inputPaths.length ; i++){ paths[i] = new Path(inputPaths[i]); } FileInputFormat.setInputPaths(job, paths); }else{ FileInputFormat.setInputPaths(job, new Path(args[0])); } FileOutputFormat.setOutputPath(job, new Path(args[1])); MultipleOutputs.addNamedOutput(job, "errorlog", TextOutputFormat.class, Text.class, NullWritable.class); MultipleOutputs.addNamedOutput(job, "apiRequest", TextOutputFormat.class, Text.class, NullWritable.class); } }
OK,這就可以了,總結(jié)一下需要注意的問(wèn)題,首先在我們的map類(lèi)中一定要定義MultipleOutputs的對(duì)象,并且重寫(xiě)cleanup和setup方法,分別用來(lái)關(guān)閉和創(chuàng)建MultipleOutputs對(duì)象,最重要的是在job所在的類(lèi)中注冊(cè)我們的文件名,比如errorlog,apiRequest等
上述的兩個(gè)例子有點(diǎn)區(qū)別,第一個(gè)是將數(shù)據(jù)寫(xiě)到不同的目錄下,而第二個(gè)是寫(xiě)到同一個(gè)目錄下,但是會(huì)分成不同類(lèi)型的文件,如我截取的記錄
-rw-r--r-- 2 hadoop supergroup 10569073 2014-06-06 11:50 /test/aa/fileRequest-m-00063.lzo
-rw-r--r-- 2 hadoop supergroup 10512656 2014-06-06 11:50 /test/aa/fileRequest-m-00064.lzo
-rw-r--r-- 2 hadoop supergroup 68780 2014-06-06 11:51 /test/aa/firstIntoTime-m-00000.lzo
-rw-r--r-- 2 hadoop supergroup 67901 2014-06-06 11:51 /test/aa/firstIntoTime-m-00001.lzo
至于怎么樣輸出到不同的目錄下,有待研究,這種方式有個(gè)不好的地方, 會(huì)產(chǎn)生很多的
-rw-r--r-- 2 hadoop supergroup 42 2014-06-06 11:50 /test/aa/part-m-00035.lzo 空文件
“hadoop多文件輸出新舊API的方法是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。