您好,登錄后才能下訂單哦!
Hadoop MapReduce 是一個分布式計算框架,用于編寫批處理應(yīng)用程序。編寫好的程序可以提交到 Hadoop 集群上用于并行處理大規(guī)模的數(shù)據(jù)集。
MapReduce 作業(yè)通過將輸入的數(shù)據(jù)集拆分為獨立的塊,這些塊由 map
以并行的方式處理,框架對 map
的輸出進行排序,然后輸入到 reduce
中。MapReduce 框架專門用于 <key,value>
鍵值對處理,它將作業(yè)的輸入視為一組 <key,value>
對,并生成一組 <key,value>
對作為輸出。輸出和輸出的 key
和 value
都必須實現(xiàn)Writable 接口。
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)
這里以詞頻統(tǒng)計為例進行說明,MapReduce 處理的流程如下:
input : 讀取文本文件;
splitting : 將文件按照行進行拆分,此時得到的 K1
行數(shù),V1
表示對應(yīng)行的文本內(nèi)容;
List(K2,V2)
,其中 K2
代表每一個單詞,由于是做詞頻統(tǒng)計,所以 V2
的值為 1,代表出現(xiàn) 1 次;Mapping
操作可能是在不同的機器上并行處理的,所以需要通過 shuffling
將相同 key
值的數(shù)據(jù)分發(fā)到同一個節(jié)點上去合并,這樣才能統(tǒng)計出最終的結(jié)果,此時得到 K2
為每一個單詞,List(V2)
為可迭代集合,V2
就是 Mapping 中的 V2;Reducing
對 List(V2)
進行歸約求和操作,最終輸出。MapReduce 編程模型中 splitting
和 shuffing
操作都是由框架實現(xiàn)的,需要我們自己編程實現(xiàn)的只有 mapping
和 reducing
,這也就是 MapReduce 這個稱呼的來源。
InputFormat
將輸出文件拆分為多個 InputSplit
,并由 RecordReaders
將 InputSplit
轉(zhuǎn)換為標準的<key,value>鍵值對,作為 map 的輸出。這一步的意義在于只有先進行邏輯拆分并轉(zhuǎn)為標準的鍵值對格式后,才能為多個 map
提供輸入,以便進行并行處理。
combiner
是 map
運算后的可選操作,它實際上是一個本地化的 reduce
操作,它主要是在 map
計算出中間文件后做一個簡單的合并重復(fù) key
值的操作。這里以詞頻統(tǒng)計為例:
map
在遇到一個 hadoop 的單詞時就會記錄為 1,但是這篇文章里 hadoop 可能會出現(xiàn) n 多次,那么 map
輸出文件冗余就會很多,因此在 reduce
計算前對相同的 key 做一個合并操作,那么需要傳輸?shù)臄?shù)據(jù)量就會減少,傳輸效率就可以得到提升。
但并非所有場景都適合使用 combiner
,使用它的原則是 combiner
的輸出不會影響到 reduce
計算的最終輸入,例如:求總數(shù),最大值,最小值時都可以使用 combiner
,但是做平均值計算則不能使用 combiner
。
不使用 combiner 的情況:
<div align="center"> <img width="600px" src="https://raw.githubusercontent.com/heibaiying/BigData-Notes/master/pictures/mapreduce-without-combiners.png"/> </div>
使用 combiner 的情況:
<div align="center"> <img width="600px" src="https://raw.githubusercontent.com/heibaiying/BigData-Notes/master/pictures/mapreduce-with-combiners.png"/> </div>
可以看到使用 combiner 的時候,需要傳輸?shù)?reducer 中的數(shù)據(jù)由 12keys,降低到 10keys。降低的幅度取決于你 keys 的重復(fù)率,下文詞頻統(tǒng)計案例會演示用 combiner 降低數(shù)百倍的傳輸量。
partitioner
可以理解成分類器,將 map
的輸出按照 key 值的不同分別分給對應(yīng)的 reducer
,支持自定義實現(xiàn),下文案例會給出演示。
這里給出一個經(jīng)典的詞頻統(tǒng)計的案例:統(tǒng)計如下樣本數(shù)據(jù)中每個單詞出現(xiàn)的次數(shù)。
Spark HBase
Hive Flink Storm Hadoop HBase Spark
Flink
HBase Storm
HBase Hadoop Hive Flink
HBase Flink Hive Storm
Hive Flink Hadoop
HBase Hive
Hadoop Spark HBase Storm
HBase Hadoop Hive Flink
HBase Flink Hive Storm
Hive Flink Hadoop
HBase Hive
為方便大家開發(fā),我在項目源碼中放置了一個工具類 WordCountDataUtils
,用于模擬產(chǎn)生詞頻統(tǒng)計的樣本,生成的文件支持輸出到本地或者直接寫到 HDFS 上。
項目代碼下載地址:hadoop-word-count
想要進行 MapReduce 編程,需要導(dǎo)入 hadoop-client
依賴:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
將每行數(shù)據(jù)按照指定分隔符進行拆分。這里需要注意在 MapReduce 中必須使用 Hadoop 定義的類型,因為 Hadoop 預(yù)定義的類型都是可序列化,可比較的,所有類型均實現(xiàn)了 WritableComparable
接口。
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {
String[] words = value.toString().split("\t");
for (String word : words) {
context.write(new Text(word), new IntWritable(1));
}
}
}
WordCountMapper
對應(yīng)下圖的 Mapping 操作:
WordCountMapper
繼承自 Mappe
類,這是一個泛型類,定義如下:
WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
......
}
mapping
輸入 key 的類型,即每行的偏移量 (每行第一個字符在整個文本中的位置),Long
類型,對應(yīng) Hadoop 中的 LongWritable
類型;mapping
輸入 value 的類型,即每行數(shù)據(jù);String
類型,對應(yīng) Hadoop 中 Text
類型;mapping
輸出的 key 的類型,即每個單詞;String
類型,對應(yīng) Hadoop 中 Text
類型;mapping
輸出 value 的類型,即每個單詞出現(xiàn)的次數(shù);這里用 int
類型,對應(yīng) IntWritable
類型。在 Reduce 中進行單詞出現(xiàn)次數(shù)的統(tǒng)計:
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
InterruptedException {
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
如下圖,shuffling
的輸出是 reduce 的輸入。這里的 key 是每個單詞,values 是一個可迭代的數(shù)據(jù)類型,類似 (1,1,1,...)
。
組裝 MapReduce 作業(yè),并提交到服務(wù)器運行,代碼如下:
/**
* 組裝作業(yè) 并提交到集群運行
*/
public class WordCountApp {
// 這里為了直觀顯示參數(shù) 使用了硬編碼,實際開發(fā)中可以通過外部傳參
private static final String HDFS_URL = "hdfs://192.168.0.107:8020";
private static final String HADOOP_USER_NAME = "root";
public static void main(String[] args) throws Exception {
// 文件輸入路徑和輸出路徑由外部傳參指定
if (args.length < 2) {
System.out.println("Input and output paths are necessary!");
return;
}
// 需要指明 hadoop 用戶名,否則在 HDFS 上創(chuàng)建目錄時可能會拋出權(quán)限不足的異常
System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);
Configuration configuration = new Configuration();
// 指明 HDFS 的地址
configuration.set("fs.defaultFS", HDFS_URL);
// 創(chuàng)建一個 Job
Job job = Job.getInstance(configuration);
// 設(shè)置運行的主類
job.setJarByClass(WordCountApp.class);
// 設(shè)置 Mapper 和 Reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 設(shè)置 Mapper 輸出 key 和 value 的類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 設(shè)置 Reducer 輸出 key 和 value 的類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 如果輸出目錄已經(jīng)存在,則必須先刪除,否則重復(fù)運行程序時會拋出異常
FileSystem fileSystem = FileSystem.get(new URI(HDFS_URL), configuration, HADOOP_USER_NAME);
Path outputPath = new Path(args[1]);
if (fileSystem.exists(outputPath)) {
fileSystem.delete(outputPath, true);
}
// 設(shè)置作業(yè)輸入文件和輸出文件的路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, outputPath);
// 將作業(yè)提交到群集并等待它完成,參數(shù)設(shè)置為 true 代表打印顯示對應(yīng)的進度
boolean result = job.waitForCompletion(true);
// 關(guān)閉之前創(chuàng)建的 fileSystem
fileSystem.close();
// 根據(jù)作業(yè)結(jié)果,終止當(dāng)前運行的 Java 虛擬機,退出程序
System.exit(result ? 0 : -1);
}
}
需要注意的是:如果不設(shè)置 Mapper
操作的輸出類型,則程序默認它和 Reducer
操作輸出的類型相同。
在實際開發(fā)中,可以在本機配置 hadoop 開發(fā)環(huán)境,直接在 IDE 中啟動進行測試。這里主要介紹一下打包提交到服務(wù)器運行。由于本項目沒有使用除 Hadoop 外的第三方依賴,直接打包即可:
# mvn clean package
使用以下命令提交作業(yè):
hadoop jar /usr/appjar/hadoop-word-count-1.0.jar \
com.heibaiying.WordCountApp \
/wordcount/input.txt /wordcount/output/WordCountApp
作業(yè)完成后查看 HDFS 上生成目錄:
# 查看目錄
hadoop fs -ls /wordcount/output/WordCountApp
# 查看統(tǒng)計結(jié)果
hadoop fs -cat /wordcount/output/WordCountApp/part-r-00000
想要使用 combiner
功能只要在組裝作業(yè)時,添加下面一行代碼即可:
// 設(shè)置 Combiner
job.setCombinerClass(WordCountReducer.class);
加入 combiner
后統(tǒng)計結(jié)果是不會有變化的,但是可以從打印的日志看出 combiner
的效果:
沒有加入 combiner
的打印日志:
加入 combiner
后的打印日志如下:
這里我們只有一個輸入文件并且小于 128M,所以只有一個 Map 進行處理??梢钥吹浇?jīng)過 combiner 后,records 由 3519
降低為 6
(樣本中單詞種類就只有 6 種),在這個用例中 combiner 就能極大地降低需要傳輸?shù)臄?shù)據(jù)量。
這里假設(shè)有個需求:將不同單詞的統(tǒng)計結(jié)果輸出到不同文件。這種需求實際上比較常見,比如統(tǒng)計產(chǎn)品的銷量時,需要將結(jié)果按照產(chǎn)品種類進行拆分。要實現(xiàn)這個功能,就需要用到自定義 Partitioner
。
這里先介紹下 MapReduce 默認的分類規(guī)則:在構(gòu)建 job 時候,如果不指定,默認的使用的是 HashPartitioner
:對 key 值進行哈希散列并對 numReduceTasks
取余。其實現(xiàn)如下:
public class HashPartitioner<K, V> extends Partitioner<K, V> {
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
這里我們繼承 Partitioner
自定義分類規(guī)則,這里按照單詞進行分類:
public class CustomPartitioner extends Partitioner<Text, IntWritable> {
public int getPartition(Text text, IntWritable intWritable, int numPartitions) {
return WordCountDataUtils.WORD_LIST.indexOf(text.toString());
}
}
在構(gòu)建 job
時候指定使用我們自己的分類規(guī)則,并設(shè)置 reduce
的個數(shù):
// 設(shè)置自定義分區(qū)規(guī)則
job.setPartitionerClass(CustomPartitioner.class);
// 設(shè)置 reduce 個數(shù)
job.setNumReduceTasks(WordCountDataUtils.WORD_LIST.size());
執(zhí)行結(jié)果如下,分別生成 6 個文件,每個文件中為對應(yīng)單詞的統(tǒng)計結(jié)果:
更多大數(shù)據(jù)系列文章可以參見 GitHub 開源項目: 大數(shù)據(jù)入門指南
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。