溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

分布式計算框架MapReduce

發(fā)布時間:2020-05-27 00:27:34 來源:網(wǎng)絡(luò) 閱讀:15107 作者:ZeroOne01 欄目:大數(shù)據(jù)

MapReduce概述

MapReduce源自Google的MapReduce論文,論文發(fā)表于2004年12月。Hadoop MapReduce可以說是Google MapReduce的一個開源實現(xiàn)。MapReduce優(yōu)點在于可以將海量的數(shù)據(jù)進行離線處理,并且MapReduce也易于開發(fā),因為MapReduce框架幫我們封裝好了分布式計算的開發(fā)。而且對硬件設(shè)施要求不高,可以運行在廉價的機器上。MapReduce也有缺點,它最主要的缺點就是無法完成實時流式計算,只能離線處理。

MapReduce屬于一種編程模型,用于大規(guī)模數(shù)據(jù)集(大于1TB)的并行運算。概念"Map(映射)"和"Reduce(歸約)",是它們的主要思想,都是從函數(shù)式編程語言里借來的,還有從矢量編程語言里借來的特性。它極大地方便了編程人員在不會分布式并行編程的情況下,將自己的程序運行在分布式系統(tǒng)上。 當前的軟件實現(xiàn)是指定一個Map(映射)函數(shù),用來把一組鍵值對映射成一組新的鍵值對,指定并發(fā)的Reduce(歸約)函數(shù),用來保證所有映射的鍵值對中的每一個共享相同的鍵組。

MapReduce官方文檔地址如下:

https://hadoop.apache.org/docs/stable/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html

在學習MapReduce之前我們需要準備好Hadoop的環(huán)境,也就是需要先安裝好HDFS以及YARN,環(huán)境的搭建方式可以參考我之前的兩篇文章:HDFS偽分布式環(huán)境搭建 以及 分布式資源調(diào)度——YARN框架


從WordCount案例說起MapReduce編程模型

在安裝Hadoop時,它就自帶有一個WordCount的案例,這個案例是統(tǒng)計文件中每個單詞出現(xiàn)的次數(shù),也就是詞頻統(tǒng)計,我們在學習大數(shù)據(jù)開發(fā)時,一般都以WordCount作為入門。

例如,我現(xiàn)在有一個test.txt,文件內(nèi)容如下:

hello world
hello hadoop
hello MapReduce

現(xiàn)在的需求是統(tǒng)計這個文件中每個單詞出現(xiàn)的次數(shù)。假設(shè)我現(xiàn)在寫了一些代碼實現(xiàn)了這個文件的詞頻統(tǒng)計,統(tǒng)計的結(jié)果如下:

hello 3
world 1
hadoop 1
MapReduce 1

以上這就是一個詞頻統(tǒng)計的例子。

詞頻統(tǒng)計看起來貌似很簡單的樣子,一般不需要多少代碼就能完成了,而且如果對shell腳本比較熟悉的話,甚至一句代碼就能完成這個詞頻統(tǒng)計的功能。確實詞頻統(tǒng)計是不難,但是為什么還要用大數(shù)據(jù)技術(shù)去完成這個詞頻統(tǒng)計的功能呢?這是因為實現(xiàn)小文件的詞頻統(tǒng)計功能或許用簡單的代碼就能完成,但是如果是幾百GB、TB甚至是PB級的大文件還能用簡單的代碼完成嗎?這顯然是不可能的,就算能也需要花費相當大的時間成本。

而大數(shù)據(jù)技術(shù)就是要解決這種處理海量數(shù)據(jù)的問題,MapReduce在其中就是充當一個分布式并行計算的角色,分布式并行計算能大幅度提高海量數(shù)據(jù)的處理速度,畢竟多個人干活肯定比一個人干活快。又回到我們上面所說到的詞頻統(tǒng)計的例子,在實際工作中很多場景的開發(fā)都是在WordCount的基礎(chǔ)上進行改造的。例如,要從所有服務(wù)器的訪問日志中統(tǒng)計出被訪問得最多的url以及訪問量最高的IP,這就是一個典型的WordCount應(yīng)用場景,要知道即便是小公司的服務(wù)器訪問日志通常也都是GB級別的。

使用MapReduce執(zhí)行WordCount的流程示意圖:
分布式計算框架MapReduce

從上圖中,可以看到,輸入的數(shù)據(jù)集會被拆分為多個塊,然后這些塊都會被放到不同的節(jié)點上進行并行的計算。在Splitting這一環(huán)節(jié)會把單詞按照分割符或者分割規(guī)則進行拆分,拆分完成后就到Mapping上了,到Mapping這個環(huán)節(jié)后會把相同的單詞通過網(wǎng)絡(luò)進行映射或者說傳輸?shù)酵粋€節(jié)點上。接著這些相同的單詞就會在Shuffling環(huán)節(jié)時進行洗牌也就是合并,合并完成之后就會進入Reducing環(huán)節(jié),這一環(huán)節(jié)就是把所有節(jié)點合并后的單詞再進行一次合并,也就是會輸出到HDFS文件系統(tǒng)中的某一個文件中。大體來看就是一個拆分又合并的過程,所以MapReduce是分為map和Reduce的。最重要的是,要清楚這一流程都是分布式并行的,每個節(jié)點都不會互相依賴,都是相互獨立的。


MapReduce執(zhí)行流程

以上我們也提到了MapReduce是分為Map和Reduce的,也就是說一個MapReduce作業(yè)會被拆分成Map和Reduce階段。Map階段對應(yīng)的就是一堆的Map Tasks,同樣的Reduce階段也是會對應(yīng)一堆的Reduce Tasks。

其實簡單來說這也是一個輸入輸出的流程,要注意的是在MapReduce框架中輸入的數(shù)據(jù)集會被序列化成鍵/值對,map階段完成后會對這些鍵值對進行排序,最后到reduce階段中進行合并輸出,輸出的也是鍵/值對,官網(wǎng)文檔寫的流程如下:

(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

示意圖:
分布式計算框架MapReduce

我們可以看到有幾個主要的點:

  • InputFormat:將我們輸入數(shù)據(jù)進行分片(split)
  • Split:將數(shù)據(jù)塊交MapReduce作業(yè)來處理,數(shù)據(jù)塊是MapReduce中最小的計算單元
    • 在HDFS中,數(shù)據(jù)塊是最小的存儲單元,默認為128M
    • 默認情況下,HDFS與MapReduce是一一對應(yīng)的,當然我們也可以手動所設(shè)置它們之間的關(guān)系(但是不建議這么做)
  • OutputFormat:輸出最終的處理結(jié)果

我們可以再來看一張圖,假設(shè)我們手動設(shè)置了block與split的對應(yīng)關(guān)系,一個block對應(yīng)兩個split:
分布式計算框架MapReduce

上圖中一個block對應(yīng)兩個split(默認是一對一),一個split則是對應(yīng)一個Map Task。Map Task將數(shù)據(jù)分完組之后到Shuffle,Shuffle完成后就到Reduce上進行輸出,而每一個Reduce Tasks會輸出到一個文件上,上圖中有三個Reduce Tasks,所以會輸出到三個文件上。


MapReduce1.x架構(gòu)

MapReduce1.x架構(gòu)圖如下:
分布式計算框架MapReduce

簡單說明一下其中的幾個組件:

  1. JobTracker:作業(yè)的管理者,它會將作業(yè)分解成一堆的任務(wù),也就是Task,Task里包含MapTask和ReduceTask。它會將分解后的任務(wù)分派給TaskTracker進行運行,它還需要完成作業(yè)的監(jiān)控以及容錯處理(task作業(yè)掛掉了,會重啟task)。如果在一定的時間內(nèi),JobTracker沒有收到某個TaskTracker的心跳信息的話,就會判斷該TaskTracker掛掉了,然后就會將該TaskTracker上運行的任務(wù)指派到其他的TaskTracker上去執(zhí)行。
  2. TaskTracker:任務(wù)的執(zhí)行者,我們的Task(MapTask和ReduceTask)都是在TaskTracker上運行的,TaskTracker可以與JobTracker進行交互,例如執(zhí)行、啟動或停止作業(yè)以及發(fā)送心跳信息給JobTracker等。
  3. MapTask:我們自己開發(fā)的Map任務(wù)會交由該Task完成,它會解析每條記錄的數(shù)據(jù),然后交給自己編寫的Map方法進行處理,處理完成之后會將Map的輸出結(jié)果寫到本地磁盤。不過有些作業(yè)可能只有map沒有reduce,這時候一般會將結(jié)果輸出到HDFS文件系統(tǒng)里。
  4. ReduceTask:將MapTask輸出的數(shù)據(jù)進行讀取,并按照數(shù)據(jù)的規(guī)則進行分組,然后傳給我們自己編寫的reduce方法處理。處理完成后默認將輸出結(jié)果寫到HDFS。

MapReduce2.x架構(gòu)

MapReduce2.x架構(gòu)圖如下,可以看到JobTracker和TaskTracker已經(jīng)不復存在了,取而代之的是ResourceManager和NodeManager。不僅架構(gòu)變了,功能也變了,2.x之后新引入了YARN,在YARN之上我們可以運行不同的計算框架,不再是1.x那樣只能運行MapReduce了:
分布式計算框架MapReduce

關(guān)于MapReduce2.x的架構(gòu)之前已經(jīng)在分布式資源調(diào)度——YARN框架一文中說明過了,這里就不再贅述了。


Java版本wordcount功能實現(xiàn)

1.創(chuàng)建一個Maven工程,配置依賴如下:

<repositories>
    <repository>
      <id>cloudera</id>
      <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
      <releases>
        <enabled>true</enabled>
      </releases>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
    </repository>
  </repositories>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <hadoop.version>2.6.0-cdh6.7.0</hadoop.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.10</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

2.創(chuàng)建一個類,開始編寫我們wordcount的實現(xiàn)代碼:

package org.zero01.hadoop.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @program: hadoop-train
 * @description: 使用MapReduce開發(fā)WordCount應(yīng)用程序
 * @author: 01
 * @create: 2018-03-31 14:03
 **/
public class WordCountApp {

    /**
     * Map: 讀取輸入的文件內(nèi)容
     */
    public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

        LongWritable one = new LongWritable(1);

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 接收到的每一行數(shù)據(jù)
            String line = value.toString();

            // 按照指定的分割符進行拆分
            String[] words = line.split(" ");
            for (String word : words) {
                // 通過上下文把map的處理結(jié)果輸出
                context.write(new Text((word)), one);
            }
        }
    }

    /**
     * Reduce: 歸并操作
     */
    public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
            long sum = 0;
            for (LongWritable value : values) {
                // 求key出現(xiàn)的次數(shù)總和
                sum += value.get();
            }
            // 將最終的統(tǒng)計結(jié)果輸出
            context.write(key, new LongWritable(sum));
        }
    }

    /**
     * 定義Driver:封裝了MapReduce作業(yè)的所有信息
     */
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        // 創(chuàng)建Job,通過參數(shù)設(shè)置Job的名稱
        Job job = Job.getInstance(configuration, "wordcount");

        // 設(shè)置Job的處理類
        job.setJarByClass(WordCountApp.class);

        // 設(shè)置作業(yè)處理的輸入路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));

        // 設(shè)置map相關(guān)參數(shù)
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        // 設(shè)置reduce相關(guān)參數(shù)
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 設(shè)置作業(yè)處理完成后的輸出路徑
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

3.編寫完成之后,在IDEA里通過Maven進行編譯打包:
分布式計算框架MapReduce

4.把打包好的jar包上傳到服務(wù)器上:
分布式計算框架MapReduce

測試文件內(nèi)容如下:

[root@localhost ~]# hdfs dfs -text /test.txt
hello world
hadoop welcome
hadoop hdfs mapreduce
hadoop hdfs
hello hadoop
[root@localhost ~]# 

5.然后執(zhí)行如下命令執(zhí)行Job:

[root@localhost ~]# hadoop jar ./hadoop-train-1.0.jar org.zero01.hadoop.mapreduce.WordCountApp /test.txt /output/wc

簡單說明一下這個命令:

  • hadoop jar 是Hadoop執(zhí)行jar包的命令
  • ./hadoop-train-1.0.jar 是jar包的所在路徑
  • org.zero01.hadoop.mapreduce.WordCountApp 是jar包的主類也就是main類
  • /test.txt 是測試文件也就是輸入文件所在路徑(HDFS上的路徑)
  • /output/wc 為輸出文件的存在路徑

6.到Y(jié)ARN上查看任務(wù)執(zhí)行的信息:

申請資源:
分布式計算框架MapReduce

運行:
分布式計算框架MapReduce

完成:
分布式計算框架MapReduce

7.可以看到已經(jīng)執(zhí)行成功,命令行終端的日志輸出內(nèi)容如下:

18/03/31 22:55:51 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/03/31 22:55:52 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/03/31 22:55:52 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/03/31 22:55:53 INFO input.FileInputFormat: Total input paths to process : 1
18/03/31 22:55:53 INFO mapreduce.JobSubmitter: number of splits:1
18/03/31 22:55:53 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1522505784761_0001
18/03/31 22:55:54 INFO impl.YarnClientImpl: Submitted application application_1522505784761_0001
18/03/31 22:55:54 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1522505784761_0001/
18/03/31 22:55:54 INFO mapreduce.Job: Running job: job_1522505784761_0001
18/03/31 22:56:06 INFO mapreduce.Job: Job job_1522505784761_0001 running in uber mode : false
18/03/31 22:56:06 INFO mapreduce.Job:  map 0% reduce 0%
18/03/31 22:56:11 INFO mapreduce.Job:  map 100% reduce 0%
18/03/31 22:56:16 INFO mapreduce.Job:  map 100% reduce 100%
18/03/31 22:56:16 INFO mapreduce.Job: Job job_1522505784761_0001 completed successfully
18/03/31 22:56:16 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=190
        FILE: Number of bytes written=223169
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=174
        HDFS: Number of bytes written=54
        HDFS: Number of read operations=6
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters 
        Launched map tasks=1
        Launched reduce tasks=1
        Data-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=3151
        Total time spent by all reduces in occupied slots (ms)=2359
        Total time spent by all map tasks (ms)=3151
        Total time spent by all reduce tasks (ms)=2359
        Total vcore-seconds taken by all map tasks=3151
        Total vcore-seconds taken by all reduce tasks=2359
        Total megabyte-seconds taken by all map tasks=3226624
        Total megabyte-seconds taken by all reduce tasks=2415616
    Map-Reduce Framework
        Map input records=5
        Map output records=11
        Map output bytes=162
        Map output materialized bytes=190
        Input split bytes=100
        Combine input records=0
        Combine output records=0
        Reduce input groups=6
        Reduce shuffle bytes=190
        Reduce input records=11
        Reduce output records=6
        Spilled Records=22
        Shuffled Maps =1
        Failed Shuffles=0
        Merged Map outputs=1
        GC time elapsed (ms)=233
        CPU time spent (ms)=1860
        Physical memory (bytes) snapshot=514777088
        Virtual memory (bytes) snapshot=5571788800
        Total committed heap usage (bytes)=471859200
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=74
    File Output Format Counters 
        Bytes Written=54

8.查看輸出文件的內(nèi)容:

[root@localhost ~]# hdfs dfs -ls /output/wc/
Found 2 items
-rw-r--r--   1 root supergroup          0 2018-03-31 22:56 /output/wc/_SUCCESS
-rw-r--r--   1 root supergroup         54 2018-03-31 22:56 /output/wc/part-r-00000  # 執(zhí)行結(jié)果的輸出文件
[root@localhost ~]# hdfs dfs -text /output/wc/part-r-00000  # 查看文件內(nèi)容
hadoop  4
hdfs    2
hello   2
mapreduce   1
welcome 1
world   1
[root@localhost ~]# 

Java版本wordcount功能重構(gòu)

雖然我們已經(jīng)成功通過編寫java代碼實現(xiàn)了wordcount功能,但是有一個問題,如果我們再執(zhí)行剛剛那條命令,就會報如下錯誤:

[root@localhost ~]# hadoop jar ./hadoop-train-1.0.jar org.zero01.hadoop.mapreduce.WordCountApp /test.txt /output/wc
18/04/01 00:30:12 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/04/01 00:30:12 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/04/01 00:30:12 WARN security.UserGroupInformation: PriviledgedActionException as:root (auth:SIMPLE) cause:org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://192.168.77.130:8020/output/wc already exists
Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://192.168.77.130:8020/output/wc already exists
    at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146)
    at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:270)
    at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:143)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1307)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1304)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:1304)
    at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1325)
    at org.zero01.hadoop.mapreduce.WordCountApp.main(WordCountApp.java:86)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
[root@localhost ~]# 

在平時的MapReduce據(jù)程序開發(fā)中,這個異常非常地常見,這個異常是因為輸出文件的存放目錄已經(jīng)存在:Output directory hdfs://192.168.77.130:8020/output/wc already exists

有兩種方式可以解決這個問題:

  1. 在執(zhí)行MapReduce作業(yè)時,先刪除或更改輸出文件的存放目錄(不推薦)
  2. 在代碼中完成自動刪除功能(推薦)

我們來在代碼中實現(xiàn)自動刪除功能,在剛剛的代碼中,加入如下內(nèi)容:

...
/**
 * 定義Driver:封裝了MapReduce作業(yè)的所有信息
 */
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    Configuration configuration = new Configuration();

    // 準備清理已存在的輸出目錄
    Path outputPath = new Path(args[1]);
    FileSystem fileSystem = FileSystem.get(configuration);
    if (fileSystem.exists(outputPath)) {
        fileSystem.delete(outputPath,true);
        System.out.println("output file exists, but is has deleted");
    }
...

編寫完成之后重新將編輯后的jar包上傳,再執(zhí)行hadoop jar ./hadoop-train-1.0.jar org.zero01.hadoop.mapreduce.WordCountApp /test.txt /output/wc命令,就不會再報錯了。


Combiner應(yīng)用程序開發(fā)

Combiner類似于本地的Reduce,相當于是在Map階段的時候就做一個Reduce的操作,它能夠減少Map Task輸出的數(shù)據(jù)量及網(wǎng)絡(luò)傳輸量。

如下圖:
分布式計算框架MapReduce

在上圖中,可以看到Mapper與Reducer之間有一層Combiner。Mapper先把數(shù)據(jù)在本地進行一個Combiner,也就是先做一個本地數(shù)據(jù)的合并,這個過程類似于Reduce只不過是本地的,也即是本節(jié)點。當Combiner合并完成之后,再把數(shù)據(jù)傳輸?shù)絉educer上再一次進行最終的合并。這樣Map Task輸出的數(shù)據(jù)量就會大大減少,性能也會相應(yīng)的提高,這一點可以從上圖中看到。

我們來嘗試一下在剛才開發(fā)的wordcount程序中,增加一層Combiner。增加Combiner很簡單,只需要在設(shè)置map和reduce參數(shù)的代碼之間增加一行代碼即可,如下:

// 通過Job對象來設(shè)置Combiner處理類,在邏輯上和reduce是一樣的
job.setCombinerClass(MyReducer.class);

修改完成并重新上傳jar包后,這時再執(zhí)行wordcount程序,在終端的日志輸出信息中,會發(fā)現(xiàn)Combiner相關(guān)的字段都有值,那么就代表我們的Combiner已經(jīng)成功添加進去了:
分布式計算框架MapReduce

Combiner的適用場景:

  • 求和、計數(shù),累計類型的場景適合使用

Combiner的不適用的場景:

  • 求平均數(shù)、求公約數(shù)等類型的操作不適合,如果這種場景下使用Combiner,得到的結(jié)果就是錯誤的

Partitioner應(yīng)用程序開發(fā)

Partitioner決定Map Task輸出的數(shù)據(jù)交由哪個Reduce Task處理,也就是類似于制定一個分發(fā)規(guī)則。默認情況下的分發(fā)規(guī)則實現(xiàn):分發(fā)的key的hash值對Reduce Task個數(shù)取模。

如下圖:
分布式計算框架MapReduce

上圖中,把圓形數(shù)據(jù)放到了同一個Reduce Task上,把六邊形數(shù)據(jù)放到了同一個Reduce Task上,剩下的圖形數(shù)據(jù)則放到剩下的Reduce Task上, 這樣的一個分發(fā)過程就是Partitioner。

例如,我現(xiàn)在有一組數(shù)據(jù)如下,這是今日各個手機品牌的銷售量:

[root@localhost ~]# hdfs dfs -text /partitioner.txt 
xiaomi 200
huawei 300
xiaomi 100
iphone7 300
iphone7 500
nokia 100
[root@localhost ~]#

現(xiàn)在我有一個需求,就是將相同品牌的手機名稱,分發(fā)到同一個Reduce上進行處理。這就需要用到Partitioner了,在我們之前的代碼中增加如下內(nèi)容:

public class WordCountApp {
    /**
     * Map: 讀取輸入的文件內(nèi)容
     */
    public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 接收到的每一行數(shù)據(jù)
            String line = value.toString();

            // 按照指定的分割符進行拆分
            String[] words = line.split(" ");
            // 通過上下文把map的處理結(jié)果輸出
            context.write(new Text((words[0])), new LongWritable(Long.parseLong(words[1])));
        }
    }

    ...

    /**
     * Partitioner: 設(shè)定Map Task輸出的數(shù)據(jù)的分發(fā)規(guī)則
     */
    public static class MyPartitioner extends Partitioner<Text, LongWritable> {

        public int getPartition(Text key, LongWritable value, int numPartitions) {
            if(key.toString().equals("xiaomi")){
                return 0;
            }
            if(key.toString().equals("huawei")){
                return 1;
            }
            if(key.toString().equals("iphone7")) {
                return 2;
            }
            return 3;
        }
    }

    /**
     * 定義Driver:封裝了MapReduce作業(yè)的所有信息
     */
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        ...
        // 設(shè)置Job的partition
        job.setPartitionerClass(MyPartitioner.class);
        // 設(shè)置4個reducer,每個分區(qū)一個
        job.setNumReduceTasks(4);
        ...
    }
}

同樣的,修改了代碼后需要重新編譯打包,把新的jar上傳到服務(wù)器上。然后執(zhí)行命令:

[root@localhost ~]# hadoop jar ./hadoop-train-1.0.jar org.zero01.hadoop.mapreduce.WordCountApp /partitioner.txt /output/wc

執(zhí)行成功,此時可以看到/output/wc/目錄下有四個結(jié)果文件,這是因為我們在代碼上設(shè)置了4個reducer,并且可以看到內(nèi)容都是正確的:

[root@localhost ~]# hdfs dfs -ls /output/wc/
Found 5 items
-rw-r--r--   1 root supergroup          0 2018-04-01 04:37 /output/wc/_SUCCESS
-rw-r--r--   1 root supergroup         11 2018-04-01 04:37 /output/wc/part-r-00000
-rw-r--r--   1 root supergroup         11 2018-04-01 04:37 /output/wc/part-r-00001
-rw-r--r--   1 root supergroup         13 2018-04-01 04:37 /output/wc/part-r-00002
-rw-r--r--   1 root supergroup         10 2018-04-01 04:37 /output/wc/part-r-00003
[root@localhost ~]# for i in `seq 0 3`; do hdfs dfs -text /output/wc/part-r-0000$i; done
xiaomi  300
huawei  300
iphone7 800
nokia   100
[root@localhost ~]# 

JobHistory的配置

JobHistory是一個Hadoop自帶的歷史服務(wù)器,它用于記錄已運行完的MapReduce信息到指定的HDFS目錄下。我們都知道,執(zhí)行了MapReduce任務(wù)后,可以在YARN的管理頁面上查看到任務(wù)的相關(guān)信息,但是由于JobHistory默認情況下是不開啟的,所以我們無法通過點擊History查看歷史信息:
分布式計算框架MapReduce

所以我們就需要打開這個服務(wù),編輯配置文件內(nèi)容:

[root@localhost ~]# cd /usr/local/hadoop-2.6.0-cdh6.7.0/etc/hadoop
[root@localhost /usr/local/hadoop-2.6.0-cdh6.7.0/etc/hadoop]# vim mapred-site.xml  # 增加如下內(nèi)容
<!-- jobhistory的通信地址 -->
<property>
    <name>mapreduce.jobhistory.address</name>
    <value>192.168.77.130:10020</value>
    <description>MapReduce JobHistory Server IPC host:port</description>
</property>
<!-- jobhistory的web訪問地址 -->
<property>
    <name>mapreduce.jobhistory.webapp.address</name>
    <value>192.168.77.130:19888</value>
    <description>MapReduce JobHistory Server IPC host:port</description>
</property>
<!-- 任務(wù)運行完成后,history信息所存放的目錄 -->
<property>
    <name>mapreduce.jobhistory.done-dir</name>
    <value>/history/done</value>
</property>
<!-- 任務(wù)運行中,history信息所存放的目錄 -->
<property>
    <name>mapreduce.jobhistory.intermediate-done-dir</name>
    <value>/history/done_intermediate</value>
</property>
[root@localhost /usr/local/hadoop-2.6.0-cdh6.7.0/etc/hadoop]# vim yarn-site.xml  # 增加如下內(nèi)容
<!-- 開啟聚合日志 -->
<property>
    <name>yarn.log-aggregation-enable</name>
    <value>true</value>
</property>
[root@localhost /usr/local/hadoop-2.6.0-cdh6.7.0/etc/hadoop]# 

編輯完配置文件后,重新啟動YARN服務(wù):

[root@localhost /usr/local/hadoop-2.6.0-cdh6.7.0/sbin]# ./stop-yarn.sh 
[root@localhost /usr/local/hadoop-2.6.0-cdh6.7.0/sbin]# ./start-yarn.sh 

啟動JobHistory服務(wù):

[root@localhost /usr/local/hadoop-2.6.0-cdh6.7.0/sbin]# ./mr-jobhistory-daemon.sh start historyserver
starting historyserver, logging to /usr/local/hadoop-2.6.0-cdh6.7.0/logs/mapred-root-historyserver-localhost.out
[root@localhost /usr/local/hadoop-2.6.0-cdh6.7.0/sbin]# 

檢查進程:

[root@localhost /usr/local/hadoop-2.6.0-cdh6.7.0/sbin]# jps
2945 DataNode
12946 JobHistoryServer
3124 SecondaryNameNode
12569 NodeManager
13001 Jps
2812 NameNode
12463 ResourceManager
[root@localhost /usr/local/hadoop-2.6.0-cdh6.7.0/sbin]#

然后執(zhí)行一個案例測試一下:

[root@localhost /usr/local/hadoop-2.6.0-cdh6.7.0/share/hadoop/mapreduce]# hadoop jar ./hadoop-mapreduce-examples-2.6.0-cdh6.7.0.jar pi 3 4

任務(wù)執(zhí)行成功后,這時候訪問http://192.168.77.130:19888就可以進入到JobHistory的web頁面了:
分布式計算框架MapReduce

能夠正常訪問就代表配置已經(jīng)成功了,現(xiàn)在所有任務(wù)的執(zhí)行日志都可以在這里進行查看,有利于我們?nèi)粘i_發(fā)中的排錯,而且ui界面操作起來也要方便一些。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI