溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

MapReduce的輸出格式是怎樣的

發(fā)布時(shí)間:2021-12-23 16:40:41 來(lái)源:億速云 閱讀:165 作者:iii 欄目:云計(jì)算

本篇內(nèi)容主要講解“MapReduce的輸出格式是怎樣的”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“MapReduce的輸出格式是怎樣的”吧!

MapReduce的輸出格式


        Hadoop 都有相應(yīng)的輸出格式。默認(rèn)情況下只有一個(gè) Reduce,輸出只有一個(gè)文件,默認(rèn)文件名為 part-r-00000,輸出文件的個(gè)數(shù)與 Reduce 的個(gè)數(shù)一致。 如果有兩個(gè)Reduce,輸出結(jié)果就有兩個(gè)文件,第一個(gè)為part-r-00000,第二個(gè)為part-r-00001,依次類(lèi)推。

OutputFormat 接口

        OutputFormat主要用于描述輸出數(shù)據(jù)的格式,它能夠?qū)⒂脩?hù)提供的key/value對(duì)寫(xiě)入特定格式的文件中。 通過(guò)OutputFormat 接口,實(shí)現(xiàn)具體的輸出格式,過(guò)程有些復(fù)雜也沒(méi)有這個(gè)必要。Hadoop 自帶了很多 OutputFormat 的實(shí)現(xiàn),它們與InputFormat實(shí)現(xiàn)相對(duì)應(yīng),足夠滿足我們業(yè)務(wù)的需要。 OutputFormat 類(lèi)的層次結(jié)構(gòu)如下圖所示。

MapReduce的輸出格式是怎樣的

        OutputFormat 是 MapReduce 輸出的基類(lèi),所有實(shí)現(xiàn) MapReduce 輸出都實(shí)現(xiàn)了 OutputFormat 接口。 我們可以把這些實(shí)現(xiàn)接口類(lèi)分為以下幾種類(lèi)型,分別一一介紹。

文本輸出

        默認(rèn)的輸出格式是 TextOutputFormat,它把每條記錄寫(xiě)為文本行。它的鍵和值可以是任意類(lèi)型,因?yàn)?TextOutputFormat 調(diào)用 toString() 方法把它們轉(zhuǎn)換為字符串。 每個(gè)鍵/值對(duì)由制表符進(jìn)行分割,當(dāng)然也可以設(shè)定 mapreduce.output.textoutputformat.separator 屬性(舊版本 API 中的 mapred.textoutputformat.separator)改變默認(rèn)的分隔符。 與 TextOutputFormat 對(duì)應(yīng)的輸入格式是 KeyValueTextInputFormat,它通過(guò)可配置的分隔符將鍵/值對(duì)文本分割。

        可以使用 NullWritable 來(lái)省略輸出的鍵或值(或兩者都省略,相當(dāng)于 NullOutputFormat 輸出格式,后者什么也不輸出)。 這也會(huì)導(dǎo)致無(wú)分隔符輸出,以使輸出適合用 TextInputFormat 讀取。

二進(jìn)制輸出

        1、關(guān)于SequenceFileOutputFormat

        顧名思義,SequenceFileOutputFormat 將它的輸出寫(xiě)為一個(gè)順序文件。如果輸出需要作為后續(xù) MapReduce 任務(wù)的輸入,這便是一種好的輸出格式, 因?yàn)樗母袷骄o湊,很容易被壓縮。

        2、關(guān)于SequenceFileAsBinaryOutputFormat

        SequenceFileAsBinaryOutputFormat 把鍵/值對(duì)作為二進(jìn)制格式寫(xiě)到一個(gè) SequenceFile 容器中。

        3、關(guān)于MapFileOutputFormat

        MapFileOutputFormat 把 MapFile 作為輸出。MapFile 中的鍵必須順序添加,所以必須確保 reducer 輸出的鍵已經(jīng)排好序。

多個(gè)輸出

        上面我們提到,默認(rèn)情況下只有一個(gè) Reduce,輸出只有一個(gè)文件。有時(shí)可能需要對(duì)輸出的文件名進(jìn)行控制或讓每個(gè) reducer 輸出多個(gè)文件。 我們有兩種方式實(shí)現(xiàn)reducer輸出多個(gè)文件。

        1、Partitioner

        我們考慮這樣一個(gè)需求:按學(xué)生的年齡段,將數(shù)據(jù)輸出到不同的文件路徑下。這里我們分為三個(gè)年齡段:小于等于20歲、大于20歲小于等于50歲和大于50歲。

        我們采用的方法是每個(gè)年齡段對(duì)應(yīng)一個(gè) reducer。為此,我們需要通過(guò)以下兩步實(shí)現(xiàn)。

        第一步:把作業(yè)的 reducer 數(shù)設(shè)為年齡段數(shù)即為3。

job.setPartitionerClass(PCPartitioner.class);//設(shè)置Partitioner類(lèi)
job.setNumReduceTasks(3);// reduce個(gè)數(shù)設(shè)置為3

        第二步:寫(xiě)一個(gè) Partitioner,把同一個(gè)年齡段的數(shù)據(jù)放到同一個(gè)分區(qū)。

public static class PCPartitioner extends Partitioner< Text, Text> {
		@Override
		public int getPartition(Text key, Text value, int numReduceTasks) {
			// TODO Auto-generated method stub
			String[] nameAgeScore = value.toString().split("\t");
			String age = nameAgeScore[1];//學(xué)生年齡
			int ageInt = Integer.parseInt(age);//按年齡段分區(qū)

			// 默認(rèn)指定分區(qū) 0
			if (numReduceTasks == 0)
				return 0;

			//年齡小于等于20,指定分區(qū)0
			if (ageInt <= 20) {				return 0;
			}
			// 年齡大于20,小于等于50,指定分區(qū)1
			if (ageInt > 20 && ageInt <= 50) {				return 1 % numReduceTasks;
			}
			// 剩余年齡,指定分區(qū)2
			else
				return 2 % numReduceTasks;
		}
}

        這種方法實(shí)現(xiàn)多文件輸出,也只能滿足此種需求。很多情況下是無(wú)法實(shí)現(xiàn)的,因?yàn)檫@樣做存在兩個(gè)缺點(diǎn)。

        第一,需要在作業(yè)運(yùn)行之前需要知道分區(qū)數(shù)和年齡段的個(gè)數(shù),如果分區(qū)數(shù)很大或者未知,就無(wú)法操作。

        第二,一般來(lái)說(shuō),讓?xiě)?yīng)用程序來(lái)嚴(yán)格限定分區(qū)數(shù)并不好,因?yàn)榭赡軐?dǎo)致分區(qū)數(shù)少或分區(qū)不均。

        2、MultipleOutputs 類(lèi)

        MultipleOutputs 類(lèi)可以將數(shù)據(jù)寫(xiě)到多個(gè)文件,這些文件的名稱(chēng)源于輸出的鍵和值或者任意字符串。這允許每個(gè) reducer(或者只有 map 作業(yè)的 mapper)創(chuàng)建多個(gè)文件。 采用name-m-nnnnn 形式的文件名用于 map 輸出,name-r-nnnnn 形式的文件名用于 reduce 輸出,其中 name 是由程序設(shè)定的任意名字, nnnnn 是一個(gè)指明塊號(hào)的整數(shù)(從 0 開(kāi)始)。塊號(hào)保證從不同塊(mapper 或 reducer)寫(xiě)的輸出在相同名字情況下不會(huì)沖突。

        假如這里有一份郵箱數(shù)據(jù)文件,我們期望統(tǒng)計(jì)郵箱出現(xiàn)次數(shù)并按照郵箱的類(lèi)別,將這些郵箱分別輸出到不同文件路徑下。數(shù)據(jù)集示例如下所示。

wolys@21cn.com
zss1984@126.com
294522652@qq.com
simulateboy@163.com
zhoushigang_123@163.com
sirenxing424@126.com
lixinyu23@qq.com
chenlei1201@gmail.com
370433835@qq.com
cxx0409@126.com
viv093@sina.com
q62148830@163.com
65993266@qq.com
summeredison@sohu.com
zhangbao-autumn@163.com
diduo_007@yahoo.com.cn
fxh852@163.com

        下面我們編寫(xiě) MapReduce 程序,實(shí)現(xiàn)上述業(yè)務(wù)需求。

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Email extends Configured implements Tool {

	public static class MailMapper extends Mapper< LongWritable, Text, Text, IntWritable> {
		private final static IntWritable one = new IntWritable(1);

		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			context.write(value, one);
		}
	}

	public static class MailReducer extends Reducer< Text, IntWritable, Text, IntWritable> {
		private IntWritable result = new IntWritable();
		private MultipleOutputs< Text, IntWritable> multipleOutputs;

		@Override
		protected void setup(Context context) throws IOException ,InterruptedException{
			multipleOutputs = new MultipleOutputs< Text, IntWritable>(context);
		}
		protected void reduce(Text Key, Iterable< IntWritable> Values,Context context) throws IOException, InterruptedException {
			int begin = Key.toString().indexOf("@");
            int end = Key.toString().indexOf(".");
            if(begin>=end){
            	return;
            }
            //獲取郵箱類(lèi)別,比如 qq
            String name = Key.toString().substring(begin+1, end);
			int sum = 0;
			for (IntWritable value : Values) {
				sum += value.get();
			}
			result.set(sum);
			multipleOutputs.write(Key, result, name);
		}
		@Override
		protected void cleanup(Context context) throws IOException ,InterruptedException{
			multipleOutputs.close();
		}
	}

	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = new Configuration();// 讀取配置文件
		
		Path mypath = new Path(args[1]);
		FileSystem hdfs = mypath.getFileSystem(conf);//創(chuàng)建輸出路徑
		if (hdfs.isDirectory(mypath)) {
			hdfs.delete(mypath, true);
		}
		Job job = Job.getInstance();// 新建一個(gè)任務(wù)
		job.setJarByClass(Email.class);// 主類(lèi)
		
		FileInputFormat.addInputPath(job, new Path(args[0]));// 輸入路徑
		FileOutputFormat.setOutputPath(job, new Path(args[1]));// 輸出路徑

		job.setMapperClass(MailMapper.class);// Mapper
		job.setReducerClass(MailReducer.class);// Reducer
		
		job.setOutputKeyClass(Text.class);// key輸出類(lèi)型
		job.setOutputValueClass(IntWritable.class);// value輸出類(lèi)型
		
		job.waitForCompletion(true);
		return 0;
	}

	public static void main(String[] args) throws Exception {
		String[] args0 = {
				"hdfs://single.hadoop.dajiangtai.com:9000/junior/mail.txt",
				"hdfs://single.hadoop.dajiangtai.com:9000/junior/mail-out/" };
		int ec = ToolRunner.run(new Configuration(), new Email(), args0);
		System.exit(ec);
	}
}

        在 reducer 中,在 setup() 方法中構(gòu)造一個(gè) MultipleOutputs 的實(shí)例并將它賦給一個(gè)實(shí)例變量。在 reduce() 方法中使用 MultipleOutputs 實(shí)例來(lái)寫(xiě)輸出, 而不是 context 。write() 方法作用于鍵、值、和名字。

        程序運(yùn)行之后,輸出文件的命名如下所示。

/mail-out/163-r-00000
/mail-out/126-r-00000
/mail-out/21cn-r-00000
/mail-out/gmail-r-00000
/mail-out/qq-r-00000
/mail-out/sina-r-00000
/mail-out/sohu-r-00000
/mail-out/yahoo-r-00000
/mail-out/part-r-00000

        在 MultipleOutputs 的 write() 方法中指定的基本路徑相當(dāng)于輸出路徑進(jìn)行解釋?zhuān)驗(yàn)樗梢园募窂椒指舴?/), 創(chuàng)建任意深度的子目錄是有可能的。

數(shù)據(jù)庫(kù)輸出

        DBOutputFormat 適用于將作業(yè)輸出數(shù)據(jù)(中等規(guī)模的數(shù)據(jù))轉(zhuǎn)存到Mysql、Oracle等數(shù)據(jù)庫(kù)。

到此,相信大家對(duì)“MapReduce的輸出格式是怎樣的”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢(xún),關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI