溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

九、MapReduce--input源碼分析

發(fā)布時(shí)間:2020-07-29 13:45:42 來源:網(wǎng)絡(luò) 閱讀:221 作者:隔壁小白 欄目:大數(shù)據(jù)

當(dāng)job提交至yarn之后,就會(huì)開始調(diào)度運(yùn)行map任務(wù),這里開始講解map輸入的源碼分析。
一個(gè)map任務(wù)的入口就是 MapTask.class 中的run() 方法

1、首先看看MapTask.run() 方法

MapTask.class

//---------------------------------MapTask.java
public void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException { 
    this.umbilical = umbilical;
    if (this.isMapTask()) {
        if (this.conf.getNumReduceTasks() == 0) {
            this.mapPhase = this.getProgress().addPhase("map", 1.0F);
        } else {
            this.mapPhase = this.getProgress().addPhase("map", 0.667F);
            this.sortPhase = this.getProgress().addPhase("sort", 0.333F);
        }
    }

    TaskReporter reporter = this.startReporter(umbilical);
    boolean useNewApi = job.getUseNewMapper();

    //進(jìn)行map任務(wù)的初始化
    this.initialize(job, this.getJobID(), reporter, useNewApi);
    if (this.jobCleanup) {
        this.runJobCleanupTask(umbilical, reporter);
    } else if (this.jobSetup) {
        this.runJobSetupTask(umbilical, reporter);
    } else if (this.taskCleanup) {
        this.runTaskCleanupTask(umbilical, reporter);
    } else {
        //啟動(dòng)map任務(wù),判斷是使用新的還是舊的api
        if (useNewApi) {
            this.runNewMapper(job, this.splitMetaInfo, umbilical, reporter);
        } else {
            this.runOldMapper(job, this.splitMetaInfo, umbilical, reporter);
        }

        this.done(umbilical, reporter);
    }
}

上面重點(diǎn)有兩個(gè)方法,一個(gè)是 this.initialize()以及 this.runNewMapper()。

2、下面看看this.initialize()

//---------------------------------Task.java
public void initialize(JobConf job, JobID id, Reporter reporter, boolean useNewApi) throws IOException, ClassNotFoundException, InterruptedException {
    //創(chuàng)建task以及job上下文對(duì)象
    this.jobContext = new JobContextImpl(job, id, reporter);
    this.taskContext = new TaskAttemptContextImpl(job, this.taskId, reporter);
    //將task任務(wù)的狀態(tài)改為正在運(yùn)行
    if (this.getState() == org.apache.hadoop.mapred.TaskStatus.State.UNASSIGNED) {
        this.setState(org.apache.hadoop.mapred.TaskStatus.State.RUNNING);
    }

    if (useNewApi) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("using new api for output committer");
        }

        //獲取job中配置的輸出格式類,并通過反射獲取該類的Class對(duì)象
        this.outputFormat = (OutputFormat)ReflectionUtils.newInstance(this.taskContext.getOutputFormatClass(), job);
        //通過outputformat類獲取commiter
        this.committer = this.outputFormat.getOutputCommitter(this.taskContext);
    } else {
        this.committer = this.conf.getOutputCommitter();
    }

    //從FileOutputFormat獲取任務(wù)結(jié)果輸出路徑。
    /*
    可能有的人會(huì)奇怪,為啥mapper這里要獲取outputformat 的輸出路徑。
    首先我們要知道,一個(gè)MapReduce任務(wù)可以只有mapper,而沒有reducer的,
    那么這時(shí)候程序的輸出是有mapper直接輸出的,這時(shí)候自然就需要知道輸出的路徑,這里就派上用場(chǎng)了
    */
    Path outputPath = FileOutputFormat.getOutputPath(this.conf);
    if (outputPath != null) {
        if (this.committer instanceof FileOutputCommitter) {
            FileOutputFormat.setWorkOutputPath(this.conf, ((FileOutputCommitter)this.committer).getTaskAttemptPath(this.taskContext));
        } else {
            FileOutputFormat.setWorkOutputPath(this.conf, outputPath);
        }
    }

    this.committer.setupTask(this.taskContext);
    Class<? extends ResourceCalculatorProcessTree> clazz = this.conf.getClass("mapreduce.job.process-tree.class", (Class)null, ResourceCalculatorProcessTree.class);
    this.pTree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree((String)System.getenv().get("JVM_PID"), clazz, this.conf);
    LOG.info(" Using ResourceCalculatorProcessTree : " + this.pTree);
    if (this.pTree != null) {
        this.pTree.updateProcessTree();
        this.initCpuCumulativeTime = this.pTree.getCumulativeCpuTime();
    }

}

這個(gè)方法主要做了一些初始化工作,比如創(chuàng)建上下文對(duì)象,獲取輸出outputFormat類,以及路徑等。

3、下面接著看看this.runNewMapper()

//---------------------------------MapTask.java
private <INKEY, INVALUE, OUTKEY, OUTVALUE> void runNewMapper(JobConf job, TaskSplitIndex splitIndex, TaskUmbilicalProtocol umbilical, TaskReporter reporter) throws IOException, ClassNotFoundException, InterruptedException {
    TaskAttemptContext taskContext = new TaskAttemptContextImpl(job, this.getTaskID(), reporter);
    //通過反射獲取job中配置的mapper實(shí)現(xiàn)類
    Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE> mapper = (Mapper)ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
    //通過反射獲取job中配置的輸入格式類,默認(rèn)是TextInputFormat
    InputFormat<INKEY, INVALUE> inputFormat = (InputFormat)ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
    org.apache.hadoop.mapreduce.InputSplit split = null;
    //獲取切片詳細(xì)信息,傳入輸出路徑以及偏移量作為參數(shù).也就是當(dāng)前mapper處理的某個(gè)切片
    split = (org.apache.hadoop.mapreduce.InputSplit)this.getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset());
    LOG.info("Processing split: " + split);
    //獲取輸入的讀取數(shù)據(jù)文件的 RecordReader 的對(duì)象,默認(rèn)inputformat為TextInputFormat,對(duì)應(yīng)默認(rèn)的RecordReader為LineRecordReader
    org.apache.hadoop.mapreduce.RecordReader<INKEY, INVALUE> input = new MapTask.NewTrackingRecordReader(split, inputFormat, reporter, taskContext);
    job.setBoolean("mapreduce.job.skiprecords", this.isSkipping());
    RecordWriter output = null;
    //獲取RecordWriter輸出對(duì)象
    if (job.getNumReduceTasks() == 0) {
        output = new MapTask.NewDirectOutputCollector(taskContext, job, umbilical, reporter);
    } else {
        output = new MapTask.NewOutputCollector(taskContext, job, umbilical, reporter);
    }

    MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = new MapContextImpl(job, this.getTaskID(), input, (RecordWriter)output, this.committer, reporter, split);
    org.apache.hadoop.mapreduce.Mapper.Context mapperContext = (new WrappedMapper()).getMapContext(mapContext);

    try {
        //初始化RecordReader中的數(shù)據(jù)
        input.initialize(split, mapperContext);
        //運(yùn)行mapper中的run方法,也就是Mapper類中的run方法,開始運(yùn)行map任務(wù)
        mapper.run(mapperContext);
        this.mapPhase.complete();
        this.setPhase(Phase.SORT);
        this.statusUpdate(umbilical);
        //map運(yùn)行完,關(guān)閉輸入、輸出流
        input.close();
        input = null;
        ((RecordWriter)output).close(mapperContext);
        output = null;
    } finally {
        this.closeQuietly((org.apache.hadoop.mapreduce.RecordReader)input);
        this.closeQuietly((RecordWriter)output, mapperContext);
    }

}

可以看到,這里就是整個(gè)map任務(wù)的核心流程,做了以下工作:
(1)獲取mapper類對(duì)象,下面要執(zhí)行里面的map方法
(2)獲取InputFormat對(duì)象,默認(rèn)是默認(rèn)inputformat為TextInputFormat
(3)通過InputFormat對(duì)象獲取RecordReader對(duì)象,后面用于讀取數(shù)據(jù)文件
(4)獲取用于輸出map的結(jié)果的RecordWriter對(duì)象
(5)獲取切片信息,比如切片所在文件的路徑,起始偏移量等
(6)初始化切片數(shù)據(jù)
(7)開始運(yùn)行mapper中的run()方法
(8)運(yùn)行完畢,關(guān)閉輸入流,將結(jié)果通過RecordWriter刷寫。
(9)刷寫完畢后,關(guān)閉輸入流以及輸出流
下面看看其中的核心方法

4、this.getSplitDetails() 獲取切片信息

//---------------------------------MapTask.java
private <T> T getSplitDetails(Path file, long offset) throws IOException {
    //獲取文件系統(tǒng)對(duì)象,并打開文件輸出流
    FileSystem fs = file.getFileSystem(this.conf);
    FSDataInputStream inFile = fs.open(file);
    //跳過指定的偏移量,也就是從指定偏移量的位置開始讀取數(shù)據(jù),其實(shí)就是切片開始的偏移量
    inFile.seek(offset);
    String className = StringInterner.weakIntern(Text.readString(inFile));

    Class cls;
    try {
        cls = this.conf.getClassByName(className);
    } catch (ClassNotFoundException var13) {
        IOException wrap = new IOException("Split class " + className + " not found");
        wrap.initCause(var13);
        throw wrap;
    }

    SerializationFactory factory = new SerializationFactory(this.conf);
    //反序列化方式打開輸入流
    Deserializer<T> deserializer = factory.getDeserializer(cls);
    deserializer.open(inFile);
    T split = deserializer.deserialize((Object)null);
    long pos = inFile.getPos();
    ((Counter)this.getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)).increment(pos - offset);
    inFile.close();
    //返回切片經(jīng)過反序列化之后的可讀取對(duì)象
    return split;
}

可以看到這里主要是返回切片的反序列化之后可以讀取的信息對(duì)象

5、接著看看 input.initialize()

在看這個(gè)方法之前,首先我們看看input這個(gè)對(duì)象是由哪個(gè)類創(chuàng)建的。它是由NewTrackingRecordReader 這個(gè)類創(chuàng)建的。這是個(gè)靜態(tài)內(nèi)部類

//---------------------------------MapTask.java
static class NewTrackingRecordReader<K, V> extends org.apache.hadoop.mapreduce.RecordReader<K, V> {
    private final org.apache.hadoop.mapreduce.RecordReader<K, V> real;
    private final org.apache.hadoop.mapreduce.Counter inputRecordCounter;
    private final org.apache.hadoop.mapreduce.Counter fileInputByteCounter;
    private final TaskReporter reporter;
    private final List<Statistics> fsStats;

    NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split, InputFormat<K, V> inputFormat, TaskReporter reporter, TaskAttemptContext taskContext) throws InterruptedException, IOException {
        this.reporter = reporter;
        this.inputRecordCounter = reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS);
        this.fileInputByteCounter = reporter.getCounter(FileInputFormatCounter.BYTES_READ);
        List<Statistics> matchedStats = null;
        if (split instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
            matchedStats = Task.getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit)split).getPath(), taskContext.getConfiguration());
        }

        this.fsStats = matchedStats;
        long bytesInPrev = this.getInputBytes(this.fsStats);
        //調(diào)用job任務(wù)中定義的inputformat類中的createRecordReader方法,獲取RecordReader對(duì)象。返回的是 LineRecordReader
        this.real = inputFormat.createRecordReader(split, taskContext);
        long bytesInCurr = this.getInputBytes(this.fsStats);
        this.fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
    }
    ...........
}

我們可以看到構(gòu)造方法中,是調(diào)用 inputFormat對(duì)象的createRecordReader() 方法來創(chuàng)建RecordReader對(duì)象的,上面也說了默認(rèn)inputFormat為 TextInputFormat。

//---------------------------TextInputFormat.java
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
    public TextInputFormat() {
    }

    public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
        String delimiter = context.getConfiguration().get("textinputformat.record.delimiter");
        byte[] recordDelimiterBytes = null;
        if (null != delimiter) {
            recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
        }

        return new LineRecordReader(recordDelimiterBytes);
    }

可以清楚看到,返回的就是 LineRecordReader 這個(gè)reader類。

接著我們繼續(xù)看 input.initialize()

static class NewTrackingRecordReader<K, V> extends org.apache.hadoop.mapreduce.RecordReader<K, V> {
    public void initialize(org.apache.hadoop.mapreduce.InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        long bytesInPrev = this.getInputBytes(this.fsStats);
        //調(diào)用 RecordReader對(duì)象的 initialize方法,初始化輸入。上面說到默認(rèn)的是LineRecordReader
        //this.real已經(jīng)在上面初始化了,就是LineRecordReader
        this.real.initialize(split, context);
        long bytesInCurr = this.getInputBytes(this.fsStats);
        this.fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
    }
}

可以看到,調(diào)用 RecordReader中的 initialize 方法,也就是調(diào)用LineRecordReader 中的 initialize() 方法,下面看看

//---------------------------------------LineRecordReader.java
public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit)genericSplit;
    Configuration job = context.getConfiguration();
    this.maxLineLength = job.getInt("mapreduce.input.linerecordreader.line.maxlength", 2147483647);
    //獲取切片的數(shù)據(jù)開始位置以及終止位置
    this.start = split.getStart();
    this.end = this.start + split.getLength();
    //獲取切片對(duì)應(yīng)的文件的輸入流
    Path file = split.getPath();
    FileSystem fs = file.getFileSystem(job);
    this.fileIn = fs.open(file);
    //如果文件有壓縮,則用壓縮類解壓
    CompressionCodec codec = (new CompressionCodecFactory(job)).getCodec(file);
    //以壓縮方式讀取切片
    if (null != codec) {
        this.isCompressedInput = true;
        this.decompressor = CodecPool.getDecompressor(codec);
        if (codec instanceof SplittableCompressionCodec) {
            SplitCompressionInputStream cIn = ((SplittableCompressionCodec)codec).createInputStream(this.fileIn, this.decompressor, this.start, this.end, READ_MODE.BYBLOCK);
            this.in = new CompressedSplitLineReader(cIn, job, this.recordDelimiterBytes);
            this.start = cIn.getAdjustedStart();
            this.end = cIn.getAdjustedEnd();
            this.filePosition = cIn;
        } else {
            if (this.start != 0L) {
                throw new IOException("Cannot seek in " + codec.getClass().getSimpleName() + " compressed stream");
            }

            this.in = new SplitLineReader(codec.createInputStream(this.fileIn, this.decompressor), job, this.recordDelimiterBytes);
            this.filePosition = this.fileIn;
        }
    } else {
        //無壓縮方式讀取切片
        this.fileIn.seek(this.start);
        //這里很重要,是真正用于讀取數(shù)據(jù)的類
        this.in = new UncompressedSplitLineReader(this.fileIn, job, this.recordDelimiterBytes, split.getLength());
        this.filePosition = this.fileIn;
    }

    //對(duì)起始偏移量進(jìn)行修正,并賦值給pos這個(gè)偏移量
    if (this.start != 0L) {
        this.start += (long)this.in.readLine(new Text(), 0, this.maxBytesToConsume(this.start));
    }

    this.pos = this.start;
}

這里的工作主要是給 RecordReader對(duì)象讀取文件做初始化工作。主要就是獲取切片的輸入流對(duì)象。
this.in 這里就用于后面讀取數(shù)據(jù)的對(duì)象,這里就是完成了這個(gè)輸入流對(duì)象的初始化。

6、接著我們回到3中,看mapper.run() 方法

這個(gè)其實(shí)就是寫的mapper 的run方法:

//------------------------Mapper.java   mapper.run(mapperContext);
public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
    this.setup(context);

    try {
        //這里循環(huán)讀取key和value,給map方法處理
        //關(guān)鍵在于 context這個(gè)對(duì)象,從上面runNewApi中可以看到,是MapContextImpl類型的
        while(context.nextKeyValue()) {
            this.map(context.getCurrentKey(), context.getCurrentValue(), context);
        }
    } finally {
        this.cleanup(context);
    }
}

可以看到,這里是個(gè)while循環(huán),通過context上下文對(duì)象獲取KV,然后傳入map方法中處理。

7、下面看看 context.nextKeyValue()

從3中可以看到,這個(gè)context是 MapContextImpl類型的,看看這個(gè)類

//-----------------------MapContextImpl.java..    
public class MapContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends TaskInputOutputContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT> implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    private RecordReader<KEYIN, VALUEIN> reader;
    private InputSplit split;

    //構(gòu)造方法中包括獲取 RecordReader對(duì)象,以及split
    public MapContextImpl(Configuration conf, TaskAttemptID taskid, RecordReader<KEYIN, VALUEIN> reader, RecordWriter<KEYOUT, VALUEOUT> writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) {
        super(conf, taskid, writer, committer, reporter);
        this.reader = reader;
        this.split = split;
    }

    public InputSplit getInputSplit() {
        return this.split;
    }

    //下面都是調(diào)用 RecordReader 中的get方法獲取key value
    public KEYIN getCurrentKey() throws IOException, InterruptedException {
        return this.reader.getCurrentKey();
    }

    public VALUEIN getCurrentValue() throws IOException, InterruptedException {
        return this.reader.getCurrentValue();
    }

    public boolean nextKeyValue() throws IOException, InterruptedException {
       //這里就是調(diào)用reader 的方法
        return this.reader.nextKeyValue();
    }
}

在它的構(gòu)造方法中,主要從3中傳入了 split切片,以及 RecordReader對(duì)象。下面就是三個(gè)獲取KV的方法,也就是在 mapper.run() 中調(diào)用的方法。

下面看看 this.reader.nextKeyValue()

//----------------------------------LineRecordReader.java
public boolean nextKeyValue() throws IOException {
    if (this.key == null) {
        this.key = new LongWritable();
    }

    //設(shè)置key為偏移量
    this.key.set(this.pos);
    if (this.value == null) {
        this.value = new Text();
    }

    int newSize = 0;

    while(this.getFilePosition() <= this.end || this.in.needAdditionalRecordAfterSplit()) {
        if (this.pos == 0L) {
            newSize = this.skipUtfByteOrderMark();
        } else {
            /*讀取數(shù)據(jù)到value中。this.in是UncompressedSplitLineReader類型的,在LineRecordReader的initialize方法中初始化了。該類父類為LineReader。*/
            //調(diào)用 LineRreader 的readline 方法。讀一行數(shù)據(jù)
            newSize = this.in.readLine(this.value, this.maxLineLength, this.maxBytesToConsume(this.pos));
            this.pos += (long)newSize;
        }

        if (newSize == 0 || newSize < this.maxLineLength) {
            break;
        }

        LOG.info("Skipped line of size " + newSize + " at pos " + (this.pos - (long)newSize));
    }

    if (newSize == 0) {
        this.key = null;
        this.value = null;
        return false;
    } else {
        return true;
    }
}

可以看到,這里已經(jīng)看到key和value的蹤影了。key就是數(shù)據(jù)偏移量,value就是通過readLine讀取的數(shù)據(jù)。如果有數(shù)據(jù)返回true,mapper.run() 通過getKey和getValue對(duì)應(yīng)的KV。下面看看 this.in.readLine,也就是 LineReader.readLine()。

8、LineReader.readLine() 按行讀取的reader

//---------------------------LineReader.java
public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException {
    return this.recordDelimiterBytes != null ? this.readCustomLine(str, maxLineLength, maxBytesToConsume) : this.readDefaultLine(str, maxLineLength, maxBytesToConsume);
}

private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException {
    str.clear();
    int txtLength = 0;
    long bytesConsumed = 0L;
    int delPosn = 0;
    int ambiguousByteCount = 0;

    do {
        int startPosn = this.bufferPosn;
        if (this.bufferPosn >= this.bufferLength) {
            startPosn = this.bufferPosn = 0;
            this.bufferLength = this.fillBuffer(this.in, this.buffer, ambiguousByteCount > 0);
            if (this.bufferLength <= 0) {
                if (ambiguousByteCount > 0) {
                    str.append(this.recordDelimiterBytes, 0, ambiguousByteCount);
                    bytesConsumed += (long)ambiguousByteCount;
                }
                break;
            }
        }

        for(; this.bufferPosn < this.bufferLength; ++this.bufferPosn) {
            if (this.buffer[this.bufferPosn] == this.recordDelimiterBytes[delPosn]) {
                ++delPosn;
                if (delPosn >= this.recordDelimiterBytes.length) {
                    ++this.bufferPosn;
                    break;
                }
            } else if (delPosn != 0) {
                this.bufferPosn -= delPosn;
                if (this.bufferPosn < -1) {
                    this.bufferPosn = -1;
                }

                delPosn = 0;
            }
        }

        int readLength = this.bufferPosn - startPosn;
        bytesConsumed += (long)readLength;
        int appendLength = readLength - delPosn;
        if (appendLength > maxLineLength - txtLength) {
            appendLength = maxLineLength - txtLength;
        }

        bytesConsumed += (long)ambiguousByteCount;
        if (appendLength >= 0 && ambiguousByteCount > 0) {
            //看到這里就很明顯了,將數(shù)據(jù)追加到 value中
            str.append(this.recordDelimiterBytes, 0, ambiguousByteCount);
            ambiguousByteCount = 0;
            this.unsetNeedAdditionalRecordAfterSplit();
        }

        if (appendLength > 0) {
            str.append(this.buffer, startPosn, appendLength);
            txtLength += appendLength;
        }

        if (this.bufferPosn >= this.bufferLength && delPosn > 0 && delPosn < this.recordDelimiterBytes.length) {
            ambiguousByteCount = delPosn;
            bytesConsumed -= (long)delPosn;
        }
    } while(delPosn < this.recordDelimiterBytes.length && bytesConsumed < (long)maxBytesToConsume);

    if (bytesConsumed > 2147483647L) {
        throw new IOException("Too many bytes before delimiter: " + bytesConsumed);
    } else {
        return (int)bytesConsumed;
    }
}

上面重要就是讀取數(shù)據(jù)的過程了,過程過于長,抓住關(guān)鍵的看,其實(shí)就是將讀取的一行數(shù)據(jù)追加到 this.value中。

9、總結(jié)

至此,map的整個(gè)輸入流程涉及到兩個(gè)重要的類
InputFormat -- 處理原始數(shù)據(jù)并切片;創(chuàng)建RecordReader 對(duì)象
RecordReader -- 讀取切片中的數(shù)據(jù),處理成KV,傳遞KV給map方法處理

這兩個(gè)都是抽象類:

public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
    public RecordReader() {
    }

    public abstract void initialize(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;

    public abstract boolean nextKeyValue() throws IOException, InterruptedException;

    public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;

    public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;

    public abstract float getProgress() throws IOException, InterruptedException;

    public abstract void close() throws IOException;
}
public abstract class InputFormat<K, V> {
    public InputFormat() {
    }

    public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException;

    public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
}

當(dāng)我們想自定義inputformat類和recordreader類時(shí),就需要繼承這兩個(gè)類,并實(shí)現(xiàn)其中的方法。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI