您好,登錄后才能下訂單哦!
編寫可擴展、分布式的數(shù)據(jù)密集型程序和基礎(chǔ)知識
理解Hadoop和MapReduce
編寫和運行一個基本的MapReduce程序
1、什么是Hadoop
Hadoop是一個開源的框架,可編寫和運行分布式應(yīng)用處理大規(guī)模數(shù)據(jù)。
Hadoop與眾不同之處在于以下幾點:
方便——Hadoop運行在由一般商用機器構(gòu)成的大型集群上,或者云計算服務(wù)之上;
健壯——Hadoop致力于在一般商用硬件上運行,其架構(gòu)假設(shè)硬件會頻繁地出現(xiàn)失效;
可擴展——Hadoop通過增加集群節(jié)點,可以線性地擴展以處理更大的數(shù)據(jù)集;
簡單——Hadoop運行用戶快速編寫出高效的并行代碼。
2、了解分布式系統(tǒng)和Hadoop
理解分布式系統(tǒng)(向外擴展)和大型單機服務(wù)器(向上擴展)之間的對比,考慮現(xiàn)有I/O技術(shù)的性價比。
理解Hadoop和其他分布式架構(gòu)(SETI@home)的區(qū)別:
Hadoop設(shè)計理念是代碼向數(shù)據(jù)遷移,而SETI@home設(shè)計理念是數(shù)據(jù)遷移。
要運行的程序在規(guī)模上比數(shù)據(jù)小幾個數(shù)量級,更容易移動;此外,在網(wǎng)絡(luò)上移動數(shù)據(jù)要比在其上加載代碼更花時間,不如讓數(shù)據(jù)不動而將可執(zhí)行代碼移動到數(shù)據(jù)所在機器上去。
3、比較SQL數(shù)據(jù)庫和Hadoop
SQL(結(jié)構(gòu)化查詢語言)是針對結(jié)構(gòu)化數(shù)據(jù)設(shè)計的,而Hadoop最初的許多應(yīng)用針對的是文本這種非結(jié)構(gòu)化數(shù)據(jù)。讓我們從特定的視角將Hadoop與典型SQL數(shù)據(jù)庫做更詳細的比較:
用向外擴展代替向上擴展——擴展商用關(guān)系型數(shù)據(jù)庫的代價會更加昂貴的
用鍵/值對代替關(guān)系表——Hadoop使用鍵/值對作為基本數(shù)據(jù)單元,可足夠靈活地處理較少結(jié)構(gòu)化的數(shù)據(jù)類型
用函數(shù)式編程(MapReduce)代替聲明式查詢(SQL)——在MapReduce中,實際的數(shù)據(jù)處理步驟是由你指定的,很類似于SQL引擎的一個執(zhí)行計劃
用離線處理代替在線處理——Hadoop是專為離線處理和大規(guī)模數(shù)據(jù)分析而設(shè)計的,并不適合那種對幾個記錄隨機讀寫的在線事務(wù)處理模式
4、理解MapReduce
MapReduce是一種數(shù)據(jù)處理模型,最大的優(yōu)點是容易擴展到多個計算節(jié)點上處理數(shù)據(jù);
在MapReduce模型中,數(shù)據(jù)處理原語被稱為mapper和reducer;
分解一個數(shù)據(jù)處理應(yīng)用為mapper和reducer有時是繁瑣的,但是一旦一MapReduce的形式寫好了一個應(yīng)用程序,僅需修改配置就可以將它擴展到集群中幾百、幾千,甚至幾萬臺機器上運行。
[動手擴展一個簡單程序]
少量文檔處理方式:對于每個文檔,使用分詞過程逐個提取單詞;對于每個單詞,在多重集合wordcount中的相應(yīng)項上加1;最后display()函數(shù)打印出wordcount中的所有條目。
大量文檔處理方式:將工作分布到多臺機器上,每臺機器處理這些文檔的不同部分,當(dāng)所有機器都完成時,第二個處理階段將合并這些結(jié)果。
一些細節(jié)可能會妨礙程序按預(yù)期工作,如文檔讀取過量導(dǎo)致中央存儲服務(wù)器的帶寬性能跟不上、多重集合wordcount條目過多超過計算機的內(nèi)存容量。此外,第二階段只有一個計算機處理wordcount任務(wù),容易出現(xiàn)瓶頸,所以可以采用分布的方式運轉(zhuǎn),以某種方式將其分割到多臺計算機上,使之能夠獨立運行,即需要在第一階段后將wordcount分區(qū),使得第二階段的每臺計算機僅需處理一個分區(qū)。
為了使它工作在一個分布式計算機集群上,需要添加以下功能:
存儲文件到許多計算機上(第一階段)
編寫一個基于磁盤的散列表,使得處理不受內(nèi)存容量限制
劃分來自第一階段的中間數(shù)據(jù)(即wordcount)
洗牌這些分區(qū)到第二階段中合適的計算機上
MapReduce程序執(zhí)行分為兩個主要階段,為mapping和reducing,每個階段均定義為一個數(shù)據(jù)處理函數(shù),分別稱為mapper和reducer。在mapping階段,MapReduce獲取輸入數(shù)據(jù)并將數(shù)據(jù)單元裝入mapper;在reduce階段,reducer處理來自mapper的所有輸出,并給出最終結(jié)果。簡而言之,mapper意味著將輸入進行過濾與轉(zhuǎn)換,使reducer可以完成聚合。
另外,為了擴展分布式的單詞統(tǒng)計程序,不得不編寫了partitioning和shuffling函數(shù)。
在MapReduce框架中編寫應(yīng)用程序就是定制化mapper和reducer的過程,以下是完整的數(shù)據(jù)流:
應(yīng)用的輸入必須組織為一個鍵/值對的列表list(<k1,v1>);
含有鍵/值對的列表被拆分,進而通過調(diào)用mapper的map函數(shù)對每個單獨的鍵/值對<k1,v1>進行處理;
所有mapper的輸出被聚合到一個包含<k2,v2>對的巨大列表中;
每個reducer分別處理每個被聚合起來的<k2,list(v2)>,并輸出<k3,v3>。
5、用Hadoop統(tǒng)計單詞——運行第一個程序
Linux操作系統(tǒng)
JDK1.6以上運行環(huán)境
Hadoop操作環(huán)境
Usage:hadoop [—config configdir] COMMAND
這里COMMAND為下列其中一個:
namenode -format 格式化DFS文件系統(tǒng)
secondarynamenode 運行DFS的第二個namenode
namenode 運行DFS的namenode
datanode 運行一個DFS的datanode
dfsadmin 運行一個DFS的admin客戶端
fsck 運行一個DFS文件系統(tǒng)的檢查工具
fs 運行一個普通的文件系統(tǒng)用戶客戶端
balancer 運行一個集群負載均衡工具
jobtracker 運行MapReduce的jobtracker節(jié)點
pipes 運行一個pipes作業(yè)
tasktracker 運行一個MapReduce的tasktracker節(jié)點
job 處理MapReduce作業(yè)
version 打印版本
jar <jar> 運行一個jar文件
distcp <srcurl> <desturl> 遞歸地復(fù)制文件或者目錄
archive -archiveName NAME <src>* <dest> 生成一個Hadoop檔案
daemonlog 獲取或設(shè)置每個daemon的log級別
CLASSNAME 運行名為CLASSNAME的類大多數(shù)命令會在使用w/o參數(shù)
時打出幫助信息。
運行單詞統(tǒng)計示例程序的命令形式如下:
hadoop jar hadoop-*-examples.jar wordcount [-m <maps>] [-r reduces] input output
編譯修改后的單詞統(tǒng)計程序的命令形式如下:
javac -classpath hadoop-*-core.jar -d playground/classes playground/src/WordCount.java
jar -cvf playground/src/wordcount.jar -C playground/classes/
運行修改后的單詞統(tǒng)計程序的命令形式如下:
hadoop jar playground/wordcount.jar org.apache.hadoop.examples.WordCount input output
代碼清單 WordCount.java
package org.apache.hadoop.examples; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); //(1)使用空格進行分詞 while (itr.hasMoreTokens()) { word.set(itr.nextToken()); //(2)把Token放入Text對象中 context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); //(3)輸出每個Token的統(tǒng)計結(jié)果 } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
在(1)的位置上wordcount以默認配置使用了Java的StringTokenizer,這里僅基于空格來分詞。為了在分詞過程中忽略標(biāo)準(zhǔn)的標(biāo)點符號,將它們加入到stringTokenizer的定界符列表中:
StringTokenizer itr = new StringTokenizer(value.toString(),” \t\n\r\f,.:;?![]’");
因為希望單詞統(tǒng)計忽略大小寫,把它們轉(zhuǎn)換為Text對象前先將所有的單詞都變成小寫:
word.set(itr.nextToken().toLowerCase());
希望僅僅顯示出現(xiàn)次數(shù)大于4次的單詞:
if (sum > 4) context.write(key, result);
6、hadoop歷史
創(chuàng)始人:Doug Cutting
2004年左右——Google發(fā)表了兩篇論文來論述Google文件系統(tǒng)(GFS)和MapReduce框架。
2006年1月——雅虎聘用Doug,讓他和一個專項團隊一起改進Hadoop,并將其作為一個開源項目。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。