溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

Hadoop中Map-Reduce如何配置、測(cè)試和調(diào)試

發(fā)布時(shí)間:2021-12-08 10:45:45 來(lái)源:億速云 閱讀:146 作者:小新 欄目:云計(jì)算

這篇文章將為大家詳細(xì)講解有關(guān)Hadoop中Map-Reduce如何配置、測(cè)試和調(diào)試,小編覺(jué)得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。

Map-Reduce 之 配置,測(cè)試,調(diào)試

Evironvemnt:

cdh6.1

Configuration 

配置文件位置

使用cdh6.1,該文件位于

/etc/hadoop/conf, 其實(shí)/etc/hadoop下面有幾種目錄,比如conf.dist,conf.pseudo,conf.impala

文件列表

hadoop-env.sh,可以控制全局環(huán)境變量

core-site.xml,最重要的是參數(shù)fs.defaultFS

1.value = File:\\\home\, 這個(gè)是單機(jī)模式(single-node standalone),hadoop daemon運(yùn)行在一個(gè)jvm進(jìn)程。主要方便調(diào)試。

2.value = hdfs://localhost:8020,這個(gè)是偽分布式(Pseudo-distributes),就是每個(gè)daemon運(yùn)行在單獨(dú)的jvm進(jìn)程,但還是都在一臺(tái)主機(jī)上。主要用于學(xué)習(xí)測(cè)試調(diào)試等。

3.value = hdfs://host:8020, 集群模式.

hdfs-site.xml,最重要的是參數(shù)dfs.replication

除了集群模式是3,一般都設(shè)置為1.

dfs.namenode.replication.min = 1,塊復(fù)制的底線

mapred-site.xml,最重要的是參數(shù)mapred.job.tracker

也就是jobtracker運(yùn)行在那一臺(tái)機(jī)器上。

yarn-site.xml,主要用來(lái)配置resourcemanager。

hadoop-metrics.properties,如果配置了Ambari,需要配置此文件,以便于發(fā)射監(jiān)控指標(biāo)給Ambari服務(wù)器。

log4j.properties

如果有多個(gè)配置文件加載,那么一般情況下,后加載的配置覆蓋相同的早加載的配置文件。為了防止不期望的覆蓋,配置文件中有final的關(guān)鍵字,它可以防止后面的覆蓋。

conf和jvm的配置, 我們可以把某些配置寫(xiě)入jvm properties,如果這樣做,它是最高優(yōu)先級(jí)的,比conf高。

hadoop jar -Ddfs.replication=1

Map-Reduce Sample

首先說(shuō)主程序,MyWordCount繼承于Tool 和 Configured, Configured主要用來(lái)幫助Tool實(shí)現(xiàn)Configurable.

interface Tool extends Configurable

Configured extends Configurable

一般都會(huì)調(diào)用ToolRunner來(lái)運(yùn)行程序,ToolRunner內(nèi)部會(huì)調(diào)用GenericOptionsParser,所以你的程序可以添加參數(shù)的能力。

這里和hadoop1的不同在于org.apache.hadoop.mapreduce,我記得1.0,好像是mapred.

/**
 * write by jinbao
 */
package com.jinbao.hadoop.mapred.unittest;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


/**
 * @author cloudera
 *
 */
public class MyWordCount  extends Configured implements Tool  {

	/**
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {

		try {
			ToolRunner.run(new MyWordCount(), args);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Override
	public int run(String[] args) throws Exception {
		if(args.length != 2){
			System.err.printf("usage: %s,  [generic options] <input> <output> \n",getClass().getSimpleName());
			ToolRunner.printGenericCommandUsage(System.err);
			return -1;
		}
		
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf,"word counting");
		
		job.setJarByClass(MyWordCount.class);
		job.setMapperClass(TokenizerMapper.class);
		job.setReducerClass(SumReducer.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job,new Path(args[1]));

		
		System.exit(job.waitForCompletion(true)?0:1);
		
		return 0;
	}
	/**
	 * @author cloudera
	 *
	 */
	public static class TokenizerMapper extends Mapper<Object,Text,Text,IntWritable> {
		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();
		
		public void map(Object key, Text value, Context context) throws IOException,InterruptedException {
			StringTokenizer itr = new StringTokenizer(value.toString());
			while ( itr.hasMoreTokens()){
				word.set(itr.nextToken());
				context.write(word, one);
			}
		}
	}
	
	public static class SumReducer extends Reducer<Text,IntWritable, Text, IntWritable>{
		private static IntWritable result = new IntWritable();
		public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable val:values){
				sum += val.get();
			}
			
			result.set(sum);
			context.write(key, result);
		}
	}

}

MapReduce Web UI
MRv1: http://jobtracker-host:50030
MRv2: http://resourcemgr-host:8088/cluster
      application細(xì)節(jié),可以到j(luò)ob history里邊去看。

單元測(cè)試-MRUnit

這是一個(gè)專(zhuān)門(mén)針對(duì)map-reduce單元測(cè)試的工具包

需要下載依賴(lài)

1. junit,這個(gè)eclipse已經(jīng)自帶了,hadoop的lib下面也有。

2. mockito,這個(gè)下面的包里有。

3. powermock,下載連接here

4. MRUnit,去apache家找here。

下面上我的程序:

package com.jinbao.hadoop.mapred.unittest;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.hadoop.io.*;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.types.Pair;


public class MyWordCountTest {
	private MapDriver<Object, Text, Text, IntWritable> mapDriver;
	private ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver;
	private MapReduceDriver<Object, Text, Text, IntWritable, Text, IntWritable> mapReduceDriver;

	@Before
	public void setUp() {
		MyWordCount.TokenizerMapper mapper = new MyWordCount.TokenizerMapper();
		MyWordCount.SumReducer reducer = new MyWordCount.SumReducer();

		mapDriver = MapDriver.newMapDriver(mapper);
		reduceDriver = ReduceDriver.newReduceDriver(reducer);
		mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
	}

	@Test
	public void testMapper() throws IOException {
		mapDriver.withInput(new LongWritable(), new Text("test input from unit test"));
		
		ArrayList<Pair<Text,IntWritable>> outputRecords = new ArrayList<Pair<Text,IntWritable>>();
		outputRecords.add( new Pair<Text,IntWritable>(new Text("test"),new IntWritable(1) ) );
		outputRecords.add( new Pair<Text,IntWritable>(new Text("input"),new IntWritable(1) ) );
		outputRecords.add( new Pair<Text,IntWritable>(new Text("from"),new IntWritable(1) ) );
		outputRecords.add( new Pair<Text,IntWritable>(new Text("unit"),new IntWritable(1) ) );
		outputRecords.add( new Pair<Text,IntWritable>(new Text("test"),new IntWritable(1) ) );
		
		mapDriver.withAllOutput(outputRecords);
		mapDriver.runTest();
	}

	@Test
	public void testReducer() throws IOException {
		reduceDriver.withInput(new Text("input"), new ArrayList<IntWritable>(Arrays.asList(new IntWritable(1), new IntWritable(3))) );
		reduceDriver.withOutput(new Text("input"), new IntWritable(4));
		reduceDriver.runTest();
	}
	
	@Test
	public void testMapperReducer() throws IOException {
		mapReduceDriver.withInput(new LongWritable(), new Text("test input input input input input test") );
		
		ArrayList<Pair<Text,IntWritable>> outputRecords = new ArrayList<Pair<Text,IntWritable>>();
		
		outputRecords.add( new Pair<Text,IntWritable>(new Text("input"),new IntWritable(5) ) );
		outputRecords.add( new Pair<Text,IntWritable>(new Text("test"),new IntWritable(2) ) );
		
		mapReduceDriver.withAllOutput(outputRecords);
		mapReduceDriver.runTest();
	}
}

Run MRUnit

上圖直接運(yùn)行@Test方法就可以解決90%以上的問(wèn)題,否則你的UnitTest覆蓋率太低,那么后期在cluster出問(wèn)題,就debug成本比較高了.

Run Locally

Eclipse里邊配置Debug Configuration:
/home/cloudera/workspace/in /home/cloudera/workspace/out
注意:job runner運(yùn)行的都是本地目錄,使用toolrunner默認(rèn)是啟動(dòng)一個(gè)standalone的jvm來(lái)運(yùn)行hadoop,另外,只能有0或1個(gè)reduce.這個(gè)不是問(wèn)題,只要非常方便的調(diào)試就可以了.

YARN里邊默認(rèn)是mapreduce.framework.name必須設(shè)置為local,不過(guò)這都是默認(rèn)的,不需要管它。

Run in Cluster

導(dǎo)出jar,我都是用eclipse來(lái)干,用ant,命令行等都可以,看喜好了。
如果你的jar包有依賴(lài),那么也要把依賴(lài)包到處在某個(gè)lib里邊,并且minifest里邊配置main class是哪一個(gè).這個(gè)package和war打包沒(méi)什么區(qū)別

%hadoop fs -copyFromLocal /home/cloudera/word.txt data/in
%hadoop jar wordcount.jar data/in data/out

IsolationRunner and Remote Debugger

前提:keep.failed.task.files,該選項(xiàng)默認(rèn)為 false,表示對(duì)于失敗的task,其運(yùn)行的臨時(shí)數(shù)據(jù)和目錄是不會(huì)被保存的。這是一個(gè)per job的配置,運(yùn)行job的時(shí)候加上這個(gè)選項(xiàng)。
如何重跑: 
    當(dāng)fail的task環(huán)境具備以后,就可以對(duì)單獨(dú)的task進(jìn)行重跑了。重跑的方式為:
1. 上到task出錯(cuò)的tasktracker機(jī)器 上
2. 在該tasktracker上找到fail的task運(yùn)行時(shí)的目錄環(huán)境 1. 在 tasktracker中,對(duì)于每一個(gè)task都會(huì)有一個(gè)單獨(dú)的執(zhí)行環(huán)境,其中包括其work目錄,其對(duì)應(yīng)的中間文件,以及其運(yùn)行時(shí)需要用到的配置文件等
2. 這些 目錄是由tasktracker的配置決定,配置選項(xiàng)為: mapred.local.dir. 該選項(xiàng)可能是一個(gè)逗號(hào)分隔的路徑list,每個(gè) list都是tasktracker對(duì)在其上執(zhí)行的task建立工作目錄的根目錄。比如如果mapred.local.dir=/disk1 /mapred/local,/disk2/mapred/local,那么task的執(zhí)行環(huán)境就是mapred.local.dir /taskTracker/jobcache/job-ID/task-attempt-ID
3. 找到該task的執(zhí)行工作目錄后,就可以進(jìn)入到 該目錄下,然后其中就會(huì)有該task的運(yùn)行環(huán)境,通常包括一個(gè)work目錄,一個(gè)job.xml文件,以及一個(gè)task要進(jìn)行操作的數(shù)據(jù)文件(對(duì)map來(lái) 說(shuō)是split.dta,對(duì)reduce來(lái)說(shuō)是file.out)。
4. 找到環(huán)境以后,就可以重跑task了。 1. cd work
2. hadoop org.apache.hadoop.mapred.IsolationRunner ../job.xml
?   ? 這樣,IsolationRunner就會(huì)讀取job.xml的配置(這里的job.xml相當(dāng) 于提交客戶(hù)端的hadoop-site.xml配置文件與命令行-D配置的接合),然后對(duì)該map或者reduce進(jìn)行重新運(yùn)行。
1. 到這里為止,已經(jīng)實(shí)現(xiàn)了task單獨(dú)重跑,但是還是沒(méi)有解決對(duì)其進(jìn)行單步斷點(diǎn)debug。這里利用到的其實(shí)是jvm的遠(yuǎn)程 debug的功能。方式如下: 1. 在重跑task之前,export一個(gè)環(huán)境變 量:export HADOOP_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8888"
2. 這 樣,hadoop的指令就會(huì)通過(guò)8888端口將debug信息發(fā)送出去
3. 然后在自己本地的開(kāi)發(fā)環(huán)境IDE中(比如 eclipse),launch一個(gè)遠(yuǎn)程調(diào)試,并在代碼中打一個(gè)斷點(diǎn),就可以對(duì)在tasktracker上運(yùn)行的獨(dú)立map或者reduce task進(jìn)行遠(yuǎn)程單步調(diào)試了。
詳細(xì)可以去到這個(gè)blog看看。
http://blog.csdn.net/cwyspy/article/details/10004995

Note: 非常不幸,在最近的版本里面,IsolationRunner已經(jīng)不能使用,所以在hadoop2里邊,需要找到失敗節(jié)點(diǎn)后,把問(wèn)題文件拷貝出來(lái),進(jìn)行單機(jī)調(diào)試。

合并結(jié)果集

根據(jù)Reduce個(gè)數(shù),可以會(huì)有多個(gè)part的結(jié)果集,那么可以使用下面命令來(lái)合并

% hadoop fs -getmerge max-temp max-temp-local

% sort max-temp-local | tail

Tuning a Job

  1. Number of mappers

  2. Number of reducers

  3. Combiners

  4. Intermediate compression

  5. Custom serialization

  6. Shuffle tweaks

MapReduce Workflows

In other words, as a rule of thumb, think about adding more jobs, rather than adding complexity to jobs.

ChainMapper and ChainReducer

It's a Map*/Reduce model, which means multiple mappers work as a chain, and after last mapper, output will go to reducer. this sounds reduced network IO.

Though called 'ChainReducer', actually only a Reducer working for ChainMapper, so gets the name.

Mapper1->Mapper2->MapperN->Reducer

JobControl

MR has a class JobControl, but as I test it's really not maintained well. 

Simply to use:

if(Run(job1)

   Run(job2)

Apache Oozie

Oozie是一種Java Web應(yīng)用程序,它運(yùn)行在Java servlet容器——即Tomcat——中,并使用數(shù)據(jù)庫(kù)來(lái)存儲(chǔ)以下內(nèi)容:

  • 工作流定義

  • 當(dāng)前運(yùn)行的工作流實(shí)例,包括實(shí)例的狀態(tài)和變量

Oozie工作流是放置在控制依賴(lài)DAG(有向無(wú)環(huán)圖 Direct Acyclic Graph)中的一組動(dòng)作(例如,Hadoop的Map/Reduce作業(yè)、Pig作業(yè)等),其中指定了動(dòng)作執(zhí)行的順序。我們會(huì)使用hPDL(一種XML流程定義語(yǔ)言)來(lái)描述這個(gè)圖。

hPDL是一種很簡(jiǎn)潔的語(yǔ)言,只會(huì)使用少數(shù)流程控制和動(dòng)作節(jié)點(diǎn)??刂乒?jié)點(diǎn)會(huì)定義執(zhí)行的流程,并包含工作流的起點(diǎn)和終點(diǎn)(start、end和fail節(jié)點(diǎn))以及控制工作流執(zhí)行路徑的機(jī)制(decision、fork和join節(jié)點(diǎn))。動(dòng)作節(jié)點(diǎn)是一些機(jī)制,通過(guò)它們工作流會(huì)觸發(fā)執(zhí)行計(jì)算或者處理任務(wù)。Oozie為以下類(lèi)型的動(dòng)作提供支持: Hadoop map-reduce、Hadoop文件系統(tǒng)、Pig、Java和Oozie的子工作流(SSH動(dòng)作已經(jīng)從Oozie schema 0.2之后的版本中移除了)

關(guān)于“Hadoop中Map-Reduce如何配置、測(cè)試和調(diào)試”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺(jué)得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI