溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

hadoop-Mapper的示例分析

發(fā)布時間:2021-12-08 10:19:06 來源:億速云 閱讀:178 作者:小新 欄目:云計算

這篇文章將為大家詳細講解有關hadoop-Mapper的示例分析,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。

* Licensed to the Apache Software Foundation (ASF) under one

package org.apache.hadoop.mapreduce;

import java.io.IOException;

/** 
 * Maps input key/value pairs to a set of intermediate key/value pairs.  
 * 
 * <p>Maps are the individual tasks which transform input records into a 
 * intermediate records. The transformed intermediate records need not be of 
 * the same type as the input records. A given input pair may map to zero or 
 * many output pairs.</p> 
 * 
 * <p>The Hadoop Map-Reduce framework spawns one map task for each 
 * {@link InputSplit} generated by the {@link InputFormat} for the job.
 * <code>Mapper</code> implementations can access the {@link Configuration} for 
 * the job via the {@link JobContext#getConfiguration()}.
 * 
 * <p>The framework first calls 
 * {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by
 * {@link #map(Object, Object, Context)} 
 * for each key/value pair in the <code>InputSplit</code>. Finally 
 * {@link #cleanup(Context)} is called.</p>
 * 
 * <p>All intermediate values associated with a given output key are 
 * subsequently grouped by the framework, and passed to a {@link Reducer} to  
 * determine the final output. Users can control the sorting and grouping by 
 * specifying two key {@link RawComparator} classes.</p>
 *
 * <p>The <code>Mapper</code> outputs are partitioned per 
 * <code>Reducer</code>. Users can control which keys (and hence records) go to 
 * which <code>Reducer</code> by implementing a custom {@link Partitioner}.
 * 
 * <p>Users can optionally specify a <code>combiner</code>, via 
 * {@link Job#setCombinerClass(Class)}, to perform local aggregation of the 
 * intermediate outputs, which helps to cut down the amount of data transferred 
 * from the <code>Mapper</code> to the <code>Reducer</code>.
 * 
 * <p>Applications can specify if and how the intermediate
 * outputs are to be compressed and which {@link CompressionCodec}s are to be
 * used via the <code>Configuration</code>.</p>
 *  
 * <p>If the job has zero
 * reduces then the output of the <code>Mapper</code> is directly written
 * to the {@link OutputFormat} without sorting by keys.</p>
 * 
 * <p>Example:</p>
 * <p><blockquote><pre>
 * public class TokenCounterMapper 
 *     extends Mapper<Object, Text, Text, IntWritable>{
 *    
 *   private final static IntWritable one = new IntWritable(1);
 *   private Text word = new Text();
 *   
 *   public void map(Object key, Text value, Context context) throws IOException {
 *     StringTokenizer itr = new StringTokenizer(value.toString());
 *     while (itr.hasMoreTokens()) {
 *       word.set(itr.nextToken());
 *       context.collect(word, one);
 *     }
 *   }
 * }
 * </pre></blockquote></p>
 *
 * <p>Applications may override the {@link #run(Context)} method to exert 
 * greater control on map processing e.g. multi-threaded <code>Mapper</code>s 
 * etc.</p>
 * 
 * @see InputFormat
 * @see JobContext
 * @see Partitioner  
 * @see Reducer
 */
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

  public class Context 
    extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
    public Context(Configuration conf, TaskAttemptID taskid,
                   RecordReader<KEYIN,VALUEIN> reader,
                   RecordWriter<KEYOUT,VALUEOUT> writer,
                   OutputCommitter committer,
                   StatusReporter reporter,
                   InputSplit split) throws IOException, InterruptedException {
      super(conf, taskid, reader, writer, committer, reporter, split);
    }
  }
  
  /**
   * Called once at the beginning of the task.
   */
  protected void setup(Context context
                       ) throws IOException, InterruptedException {
    // NOTHING
  }

  /**
   * Called once for each key/value pair in the input split. Most applications
   * should override this, but the default is the identity function.
   */
  @SuppressWarnings("unchecked")
  protected void map(KEYIN key, VALUEIN value, 
                     Context context) throws IOException, InterruptedException {
    context.write((KEYOUT) key, (VALUEOUT) value);
  }

  /**
   * Called once at the end of the task.
   */
  protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
    // NOTHING
  }
  
  /**
   * Expert users can override this method for more complete control over the
   * execution of the Mapper.
   * @param context
   * @throws IOException
   */
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    while (context.nextKeyValue()) {
      map(context.getCurrentKey(), context.getCurrentValue(), context);
    }
    cleanup(context);
  }
}

Mapper的四個方法是setup,map,cleanup和run。其中,setup和cleanup用于管理Mapper生命周期中的資源,setup在完成Mapper構(gòu)造,即將開始執(zhí)行map動作前調(diào)用,cleanup則在所有的map動作完成后被調(diào)用。方法map用于對一次輸入的key/value對進行map動作。run方法執(zhí)行了上面描述的過程,它調(diào)用setup,讓后迭代所有的key/value對,進行map,最后調(diào)用cleanup。

org.apache.hadoop.mapreduce.lib.map中實現(xiàn)了Mapper的三個子類,分別是InverseMapper(將輸入<key, value> map為輸出<value, key>),MultithreadedMapper(多線程執(zhí)行map方法)和TokenCounterMapper(對輸入的value分解為token并計數(shù))。其中最復雜的是MultithreadedMapper,我們就以它為例,來分析Mapper的實現(xiàn)。

InverseMapper源代碼:

 * Licensed to the Apache Software Foundation (ASF) under one


package org.apache.hadoop.mapreduce.lib.map;


import java.io.IOException;


/** A {@link Mapper} that swaps keys and values. */
public class InverseMapper<K, V> extends Mapper<K,V,V,K> {


  /** The inverse function.  Input keys and values are swapped.*/
  @Override
  public void map(K key, V value, Context context
                  ) throws IOException, InterruptedException {
    context.write(value, key);
  }
  
}

TokenCountMapper源代碼:

 * Licensed to the Apache Software Foundation (ASF) under one


package org.apache.hadoop.mapreduce.lib.map;


import java.io.IOException;


/**
 * Tokenize the input values and emit each word with a count of 1.
 */
public class TokenCounterMapper extends Mapper<Object, Text, Text, IntWritable>{
    
  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();
  
  @Override
  public void map(Object key, Text value, Context context
                  ) throws IOException, InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    while (itr.hasMoreTokens()) {
      word.set(itr.nextToken());
      context.write(word, one);
    }
  }
}

MultithreadedMapper會啟動多個線程執(zhí)行另一個Mapper的map方法,它會啟動mapred.map.multithreadedrunner.threads(配置項)個線程執(zhí)行Mapper:mapred.map.multithreadedrunner.class(配置項)。MultithreadedMapper重寫了基類Mapper的run方法,啟動N個線程(對應的類為MapRunner)執(zhí)行mapred.map.multithreadedrunner.class(我們稱為目標Mapper)的run方法(就是說,目標Mapper的setup和cleanup會被執(zhí)行多次)。目標Mapper共享同一份InputSplit,這就意味著,對InputSplit的數(shù)據(jù)讀必須線程安全。為此,MultithreadedMapper引入了內(nèi)部類SubMapRecordReader,SubMapRecordWriter,SubMapStatusReporter,分別繼承自RecordReader,RecordWriter和StatusReporter,它們通過互斥訪問MultithreadedMapper的Mapper.Context,實現(xiàn)了對同一份InputSplit的線程安全訪問,為Mapper提供所需的Context。這些類的實現(xiàn)方法都很簡單。

關于“hadoop-Mapper的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI