溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何自定義hadoop MapReduce InputFormat切分輸入文件

發(fā)布時間:2021-12-08 10:13:42 來源:億速云 閱讀:148 作者:小新 欄目:云計算

小編給大家分享一下如何自定義hadoop MapReduce InputFormat切分輸入文件,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!

我們實(shí)現(xiàn)了按 cookieId 和 time 進(jìn)行二次排序,現(xiàn)在又有新問題:假如我需要按 cookieId 和 cookieId&time 的組合進(jìn)行分析呢?此時最好的辦法是自定義 InputFormat,讓 mapreduce 一次讀取一個 cookieId 下的所有記錄,然后再按 time 進(jìn)行切分 session,邏輯偽碼如下:

for OneSplit in MyInputFormat.getSplit() // OneSplit 是某個 cookieId 下的所有記錄

    for session in OneSplit // session 是按 time 把 OneSplit 進(jìn)行了二次分割

        for line in session // line 是 session 中的每條記錄,對應(yīng)原始日志的某條記錄

1、原理:

InputFormat是MapReduce中一個很常用的概念,它在程序的運(yùn)行中到底起到了什么作用呢?

InputFormat其實(shí)是一個接口,包含了兩個方法:

public interface InputFormat<K, V> {
  InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
 

  RecordReader<K, V> createRecordReader(InputSplit split, 

                                  TaskAttemptContext context)  throws IOException;

}

這兩個方法有分別完成著以下工作:

      方法 getSplits 將輸入數(shù)據(jù)切分成splits,splits的個數(shù)即為map tasks的個數(shù),splits的大小默認(rèn)為塊大小,即64M

     方法  getRecordReader 將每個 split   解析成records, 再依次將record解析成<K,V>對

也就是說 InputFormat完成以下工作:

 InputFile -->  splits  -->  <K,V>


 

系統(tǒng)常用的  InputFormat 又有哪些呢?

                      如何自定義hadoop MapReduce InputFormat切分輸入文件

其中Text InputFormat便是最常用的,它的 <K,V>就代表 <行偏移,該行內(nèi)容>


 

然而系統(tǒng)所提供的這幾種固定的將  InputFile轉(zhuǎn)換為 <K,V>的方式有時候并不能滿足我們的需求:

此時需要我們自定義   InputFormat ,從而使Hadoop框架按照我們預(yù)設(shè)的方式來將

InputFile解析為<K,V>

在領(lǐng)會自定義   InputFormat 之前,需要弄懂一下幾個抽象類、接口及其之間的關(guān)系:


 

InputFormat(interface), FileInputFormat(abstract class), TextInputFormat(class),

RecordReader  (interface), Line  RecordReader(class)的關(guān)系

       FileInputFormat implements   InputFormat

       TextInputFormat extends   FileInputFormat

       TextInputFormat.get  RecordReader calls   Line  RecordReader

       Line  RecordReader   implements   RecordReader


 

對于InputFormat接口,上面已經(jīng)有詳細(xì)的描述

再看看 FileInputFormat,它實(shí)現(xiàn)了 InputFormat接口中的 getSplits方法,而將 getRecordReader與isSplitable留給具體類(如 TextInputFormat )實(shí)現(xiàn), isSplitable方法通常不用修改,所以只需要在自定義的 InputFormat中實(shí)現(xiàn)

getRecordReader方法即可,而該方法的核心是調(diào)用  Line  RecordReader(即由LineRecorderReader類來實(shí)現(xiàn) "  將每個s  plit解析成records, 再依次將record解析成<K,V>對"  ),該方法實(shí)現(xiàn)了接口RecordReader


  public interface RecordReader<K, V> {

  boolean   next(K key, V value) throws IOException;
  K   createKey();
  V   createValue();
  long   getPos() throws IOException;
  public void   close() throws IOException;
  float   getProgress() throws IOException;
}


 

     因此自定義InputFormat的核心是自定義一個實(shí)現(xiàn)接口RecordReader類似于LineRecordReader的類,該類的核心也正是重寫接口RecordReader中的幾大方法,

     定義一個InputFormat的核心是定義一個類似于LineRecordReader的,自己的RecordReader


 

2、代碼:

package MyInputFormat;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class TrackInputFormat extends FileInputFormat<LongWritable, Text> {

	@SuppressWarnings("deprecation")
	@Override
	public RecordReader<LongWritable, Text> createRecordReader(
			InputSplit split, TaskAttemptContext context) {
		return new TrackRecordReader();
	}

	@Override
	protected boolean isSplitable(JobContext context, Path file) {
		CompressionCodec codec = new CompressionCodecFactory(
				context.getConfiguration()).getCodec(file);
		return codec == null;
	}

}


package MyInputFormat;

import java.io.IOException;
import java.io.InputStream;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**
 * Treats keys as offset in file and value as line.
 * 
 * @deprecated Use
 *             {@link org.apache.hadoop.mapreduce.lib.input.LineRecordReader}
 *             instead.
 */
public class TrackRecordReader extends RecordReader<LongWritable, Text> {
	private static final Log LOG = LogFactory.getLog(TrackRecordReader.class);

	private CompressionCodecFactory compressionCodecs = null;
	private long start;
	private long pos;
	private long end;
	private NewLineReader in;
	private int maxLineLength;
	private LongWritable key = null;
	private Text value = null;
	// ----------------------
	// 行分隔符,即一條記錄的分隔符
	private byte[] separator = "END\n".getBytes();

	// --------------------

	public void initialize(InputSplit genericSplit, TaskAttemptContext context)
			throws IOException {
		FileSplit split = (FileSplit) genericSplit;
		Configuration job = context.getConfiguration();
		this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
				Integer.MAX_VALUE);
		start = split.getStart();
		end = start + split.getLength();
		final Path file = split.getPath();
		compressionCodecs = new CompressionCodecFactory(job);
		final CompressionCodec codec = compressionCodecs.getCodec(file);

		FileSystem fs = file.getFileSystem(job);
		FSDataInputStream fileIn = fs.open(split.getPath());
		boolean skipFirstLine = false;
		if (codec != null) {
			in = new NewLineReader(codec.createInputStream(fileIn), job);
			end = Long.MAX_VALUE;
		} else {
			if (start != 0) {
				skipFirstLine = true;
				this.start -= separator.length;//
				// --start;
				fileIn.seek(start);
			}
			in = new NewLineReader(fileIn, job);
		}
		if (skipFirstLine) { // skip first line and re-establish "start".
			start += in.readLine(new Text(), 0,
					(int) Math.min((long) Integer.MAX_VALUE, end - start));
		}
		this.pos = start;
	}

	public boolean nextKeyValue() throws IOException {
		if (key == null) {
			key = new LongWritable();
		}
		key.set(pos);
		if (value == null) {
			value = new Text();
		}
		int newSize = 0;
		while (pos < end) {
			newSize = in.readLine(value, maxLineLength,
					Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
							maxLineLength));
			if (newSize == 0) {
				break;
			}
			pos += newSize;
			if (newSize < maxLineLength) {
				break;
			}

			LOG.info("Skipped line of size ">


package MyInputFormat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class TestMyInputFormat {

	public static class MapperClass extends Mapper<LongWritable, Text, Text, Text> {

		public void map(LongWritable key, Text value, Context context) throws IOException,
				InterruptedException {
			System.out.println("key:\t " + key);
			System.out.println("value:\t " + value);
			System.out.println("-------------------------");
		}
	}

	public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
		Configuration conf = new Configuration();
		 Path outPath = new Path("/hive/11");
		 FileSystem.get(conf).delete(outPath, true);
		Job job = new Job(conf, "TestMyInputFormat");
		job.setInputFormatClass(TrackInputFormat.class);
		job.setJarByClass(TestMyInputFormat.class);
		job.setMapperClass(TestMyInputFormat.MapperClass.class);
		job.setNumReduceTasks(0);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, outPath);

		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}


3、測試數(shù)據(jù):

  cookieId    time     url                 cookieOverFlag

1       a        1_hao123
1       a        1_baidu
1       b        1_google       2END
2       c        2_google
2       c        2_hao123
2       c        2_google       1END
3       a        3_baidu
3       a        3_sougou
3       b        3_soso         2END


4、結(jié)果:

key:	 0
value:	 1	a	1_hao123	
1	a	 1_baidu	
1	b	 1_google	2
-------------------------
key:	 47
value:	 2	c	 2_google	
2	c	 2_hao123	
2	c	 2_google	1
-------------------------
key:	 96
value:	 3	a	 3_baidu	
3	a	 3_sougou	
3	b	 3_soso	2
-------------------------

看完了這篇文章,相信你對“如何自定義hadoop MapReduce InputFormat切分輸入文件”有了一定的了解,如果想了解更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI