您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“MapReduce的輸出格式是怎樣的”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“MapReduce的輸出格式是怎樣的”吧!
Hadoop 都有相應(yīng)的輸出格式。默認(rèn)情況下只有一個(gè) Reduce,輸出只有一個(gè)文件,默認(rèn)文件名為 part-r-00000,輸出文件的個(gè)數(shù)與 Reduce 的個(gè)數(shù)一致。 如果有兩個(gè)Reduce,輸出結(jié)果就有兩個(gè)文件,第一個(gè)為part-r-00000,第二個(gè)為part-r-00001,依次類(lèi)推。
OutputFormat主要用于描述輸出數(shù)據(jù)的格式,它能夠?qū)⒂脩?hù)提供的key/value對(duì)寫(xiě)入特定格式的文件中。 通過(guò)OutputFormat 接口,實(shí)現(xiàn)具體的輸出格式,過(guò)程有些復(fù)雜也沒(méi)有這個(gè)必要。Hadoop 自帶了很多 OutputFormat 的實(shí)現(xiàn),它們與InputFormat實(shí)現(xiàn)相對(duì)應(yīng),足夠滿足我們業(yè)務(wù)的需要。 OutputFormat 類(lèi)的層次結(jié)構(gòu)如下圖所示。
OutputFormat 是 MapReduce 輸出的基類(lèi),所有實(shí)現(xiàn) MapReduce 輸出都實(shí)現(xiàn)了 OutputFormat 接口。 我們可以把這些實(shí)現(xiàn)接口類(lèi)分為以下幾種類(lèi)型,分別一一介紹。
默認(rèn)的輸出格式是 TextOutputFormat,它把每條記錄寫(xiě)為文本行。它的鍵和值可以是任意類(lèi)型,因?yàn)?TextOutputFormat 調(diào)用 toString() 方法把它們轉(zhuǎn)換為字符串。 每個(gè)鍵/值對(duì)由制表符進(jìn)行分割,當(dāng)然也可以設(shè)定 mapreduce.output.textoutputformat.separator 屬性(舊版本 API 中的 mapred.textoutputformat.separator)改變默認(rèn)的分隔符。 與 TextOutputFormat 對(duì)應(yīng)的輸入格式是 KeyValueTextInputFormat,它通過(guò)可配置的分隔符將鍵/值對(duì)文本分割。
可以使用 NullWritable 來(lái)省略輸出的鍵或值(或兩者都省略,相當(dāng)于 NullOutputFormat 輸出格式,后者什么也不輸出)。 這也會(huì)導(dǎo)致無(wú)分隔符輸出,以使輸出適合用 TextInputFormat 讀取。
1、關(guān)于SequenceFileOutputFormat
顧名思義,SequenceFileOutputFormat 將它的輸出寫(xiě)為一個(gè)順序文件。如果輸出需要作為后續(xù) MapReduce 任務(wù)的輸入,這便是一種好的輸出格式, 因?yàn)樗母袷骄o湊,很容易被壓縮。
2、關(guān)于SequenceFileAsBinaryOutputFormat
SequenceFileAsBinaryOutputFormat 把鍵/值對(duì)作為二進(jìn)制格式寫(xiě)到一個(gè) SequenceFile 容器中。
3、關(guān)于MapFileOutputFormat
MapFileOutputFormat 把 MapFile 作為輸出。MapFile 中的鍵必須順序添加,所以必須確保 reducer 輸出的鍵已經(jīng)排好序。
上面我們提到,默認(rèn)情況下只有一個(gè) Reduce,輸出只有一個(gè)文件。有時(shí)可能需要對(duì)輸出的文件名進(jìn)行控制或讓每個(gè) reducer 輸出多個(gè)文件。 我們有兩種方式實(shí)現(xiàn)reducer輸出多個(gè)文件。
1、Partitioner
我們考慮這樣一個(gè)需求:按學(xué)生的年齡段,將數(shù)據(jù)輸出到不同的文件路徑下。這里我們分為三個(gè)年齡段:小于等于20歲、大于20歲小于等于50歲和大于50歲。
我們采用的方法是每個(gè)年齡段對(duì)應(yīng)一個(gè) reducer。為此,我們需要通過(guò)以下兩步實(shí)現(xiàn)。
第一步:把作業(yè)的 reducer 數(shù)設(shè)為年齡段數(shù)即為3。
job.setPartitionerClass(PCPartitioner.class);//設(shè)置Partitioner類(lèi) job.setNumReduceTasks(3);// reduce個(gè)數(shù)設(shè)置為3
第二步:寫(xiě)一個(gè) Partitioner,把同一個(gè)年齡段的數(shù)據(jù)放到同一個(gè)分區(qū)。
public static class PCPartitioner extends Partitioner< Text, Text> { @Override public int getPartition(Text key, Text value, int numReduceTasks) { // TODO Auto-generated method stub String[] nameAgeScore = value.toString().split("\t"); String age = nameAgeScore[1];//學(xué)生年齡 int ageInt = Integer.parseInt(age);//按年齡段分區(qū) // 默認(rèn)指定分區(qū) 0 if (numReduceTasks == 0) return 0; //年齡小于等于20,指定分區(qū)0 if (ageInt <= 20) { return 0; } // 年齡大于20,小于等于50,指定分區(qū)1 if (ageInt > 20 && ageInt <= 50) { return 1 % numReduceTasks; } // 剩余年齡,指定分區(qū)2 else return 2 % numReduceTasks; } }
這種方法實(shí)現(xiàn)多文件輸出,也只能滿足此種需求。很多情況下是無(wú)法實(shí)現(xiàn)的,因?yàn)檫@樣做存在兩個(gè)缺點(diǎn)。
第一,需要在作業(yè)運(yùn)行之前需要知道分區(qū)數(shù)和年齡段的個(gè)數(shù),如果分區(qū)數(shù)很大或者未知,就無(wú)法操作。
第二,一般來(lái)說(shuō),讓?xiě)?yīng)用程序來(lái)嚴(yán)格限定分區(qū)數(shù)并不好,因?yàn)榭赡軐?dǎo)致分區(qū)數(shù)少或分區(qū)不均。
2、MultipleOutputs 類(lèi)
MultipleOutputs 類(lèi)可以將數(shù)據(jù)寫(xiě)到多個(gè)文件,這些文件的名稱(chēng)源于輸出的鍵和值或者任意字符串。這允許每個(gè) reducer(或者只有 map 作業(yè)的 mapper)創(chuàng)建多個(gè)文件。 采用name-m-nnnnn 形式的文件名用于 map 輸出,name-r-nnnnn 形式的文件名用于 reduce 輸出,其中 name 是由程序設(shè)定的任意名字, nnnnn 是一個(gè)指明塊號(hào)的整數(shù)(從 0 開(kāi)始)。塊號(hào)保證從不同塊(mapper 或 reducer)寫(xiě)的輸出在相同名字情況下不會(huì)沖突。
假如這里有一份郵箱數(shù)據(jù)文件,我們期望統(tǒng)計(jì)郵箱出現(xiàn)次數(shù)并按照郵箱的類(lèi)別,將這些郵箱分別輸出到不同文件路徑下。數(shù)據(jù)集示例如下所示。
wolys@21cn.com zss1984@126.com 294522652@qq.com simulateboy@163.com zhoushigang_123@163.com sirenxing424@126.com lixinyu23@qq.com chenlei1201@gmail.com 370433835@qq.com cxx0409@126.com viv093@sina.com q62148830@163.com 65993266@qq.com summeredison@sohu.com zhangbao-autumn@163.com diduo_007@yahoo.com.cn fxh852@163.com
下面我們編寫(xiě) MapReduce 程序,實(shí)現(xiàn)上述業(yè)務(wù)需求。
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Email extends Configured implements Tool { public static class MailMapper extends Mapper< LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(value, one); } } public static class MailReducer extends Reducer< Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); private MultipleOutputs< Text, IntWritable> multipleOutputs; @Override protected void setup(Context context) throws IOException ,InterruptedException{ multipleOutputs = new MultipleOutputs< Text, IntWritable>(context); } protected void reduce(Text Key, Iterable< IntWritable> Values,Context context) throws IOException, InterruptedException { int begin = Key.toString().indexOf("@"); int end = Key.toString().indexOf("."); if(begin>=end){ return; } //獲取郵箱類(lèi)別,比如 qq String name = Key.toString().substring(begin+1, end); int sum = 0; for (IntWritable value : Values) { sum += value.get(); } result.set(sum); multipleOutputs.write(Key, result, name); } @Override protected void cleanup(Context context) throws IOException ,InterruptedException{ multipleOutputs.close(); } } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration();// 讀取配置文件 Path mypath = new Path(args[1]); FileSystem hdfs = mypath.getFileSystem(conf);//創(chuàng)建輸出路徑 if (hdfs.isDirectory(mypath)) { hdfs.delete(mypath, true); } Job job = Job.getInstance();// 新建一個(gè)任務(wù) job.setJarByClass(Email.class);// 主類(lèi) FileInputFormat.addInputPath(job, new Path(args[0]));// 輸入路徑 FileOutputFormat.setOutputPath(job, new Path(args[1]));// 輸出路徑 job.setMapperClass(MailMapper.class);// Mapper job.setReducerClass(MailReducer.class);// Reducer job.setOutputKeyClass(Text.class);// key輸出類(lèi)型 job.setOutputValueClass(IntWritable.class);// value輸出類(lèi)型 job.waitForCompletion(true); return 0; } public static void main(String[] args) throws Exception { String[] args0 = { "hdfs://single.hadoop.dajiangtai.com:9000/junior/mail.txt", "hdfs://single.hadoop.dajiangtai.com:9000/junior/mail-out/" }; int ec = ToolRunner.run(new Configuration(), new Email(), args0); System.exit(ec); } }
在 reducer 中,在 setup() 方法中構(gòu)造一個(gè) MultipleOutputs 的實(shí)例并將它賦給一個(gè)實(shí)例變量。在 reduce() 方法中使用 MultipleOutputs 實(shí)例來(lái)寫(xiě)輸出, 而不是 context 。write() 方法作用于鍵、值、和名字。
程序運(yùn)行之后,輸出文件的命名如下所示。
/mail-out/163-r-00000 /mail-out/126-r-00000 /mail-out/21cn-r-00000 /mail-out/gmail-r-00000 /mail-out/qq-r-00000 /mail-out/sina-r-00000 /mail-out/sohu-r-00000 /mail-out/yahoo-r-00000 /mail-out/part-r-00000
在 MultipleOutputs 的 write() 方法中指定的基本路徑相當(dāng)于輸出路徑進(jìn)行解釋?zhuān)驗(yàn)樗梢园募窂椒指舴?/), 創(chuàng)建任意深度的子目錄是有可能的。
DBOutputFormat 適用于將作業(yè)輸出數(shù)據(jù)(中等規(guī)模的數(shù)據(jù))轉(zhuǎn)存到Mysql、Oracle等數(shù)據(jù)庫(kù)。
到此,相信大家對(duì)“MapReduce的輸出格式是怎樣的”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢(xún),關(guān)注我們,繼續(xù)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。