您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“Hadoop MapReduce基本原理是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“Hadoop MapReduce基本原理是什么”吧!
Hadoop核心組件之一:分布式計算的方案MapReduce,是一種編程模型,用于大規(guī)模數(shù)據(jù)集的并行運(yùn)算,其中Map(映射)和Reduce(歸約)。
MapReduce既是一個編程模型,也是一個計算組件,處理的過程分為兩個階段,Map階段:負(fù)責(zé)把任務(wù)分解為多個小任務(wù),Reduce負(fù)責(zé)把多個小任務(wù)的處理結(jié)果進(jìn)行匯總。其中Map階段主要輸入是一對Key-Value,經(jīng)過map計算后輸出一對Key-Value值;然后將相同Key合并,形成Key-Value集合;再將這個Key-Value集合轉(zhuǎn)入Reduce階段,經(jīng)過計算輸出最終Key-Value結(jié)果集。
MapReduce可以實(shí)現(xiàn)基于上千臺服務(wù)器并發(fā)工作,提供很強(qiáng)大的數(shù)據(jù)處理能力,如果其中單臺服務(wù)掛掉,計算任務(wù)會自動轉(zhuǎn)義到另外節(jié)點(diǎn)執(zhí)行,保證高容錯性;但是MapReduce不適應(yīng)于實(shí)時計算與流式計算,計算的數(shù)據(jù)是靜態(tài)的。
數(shù)據(jù)文件一般以CSV格式居多,數(shù)據(jù)行通常以空格分隔,這里需要考慮數(shù)據(jù)內(nèi)容特點(diǎn);
文件經(jīng)過切片分配在不同的MapTask任務(wù)中并發(fā)執(zhí)行;
MapTask任務(wù)執(zhí)行完畢之后,執(zhí)行ReduceTask任務(wù),依賴Map階段的數(shù)據(jù);
ReduceTask任務(wù)執(zhí)行完畢后,輸出文件結(jié)果。
hadoop: # 讀取的文件源 inputPath: hdfs://hop01:9000/hopdir/javaNew.txt # 該路徑必須是程序運(yùn)行前不存在的 outputPath: /wordOut
public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> { Text mapKey = new Text(); IntWritable mapValue = new IntWritable(1); @Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1、讀取行 String line = value.toString(); // 2、行內(nèi)容切割,根據(jù)文件中分隔符 String[] words = line.split(" "); // 3、存儲 for (String word : words) { mapKey.set(word); context.write(mapKey, mapValue); } } }
public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> { int sum ; IntWritable value = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { // 1、累加求和統(tǒng)計 sum = 0; for (IntWritable count : values) { sum += count.get(); } // 2、輸出結(jié)果 value.set(sum); context.write(key,value); } }
@RestController public class WordWeb { @Resource private MapReduceConfig mapReduceConfig ; @GetMapping("/getWord") public String getWord () throws IOException, ClassNotFoundException, InterruptedException { // 聲明配置 Configuration hadoopConfig = new Configuration(); hadoopConfig.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName() ); hadoopConfig.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName() ); Job job = Job.getInstance(hadoopConfig); // Job執(zhí)行作業(yè) 輸入路徑 FileInputFormat.addInputPath(job, new Path(mapReduceConfig.getInputPath())); // Job執(zhí)行作業(yè) 輸出路徑 FileOutputFormat.setOutputPath(job, new Path(mapReduceConfig.getOutputPath())); // 自定義 Mapper和Reducer 兩個階段的任務(wù)處理類 job.setMapperClass(WordMapper.class); job.setReducerClass(WordReducer.class); // 設(shè)置輸出結(jié)果的Key和Value的類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //執(zhí)行Job直到完成 job.waitForCompletion(true); return "success" ; } }
將應(yīng)用程序打包放到hop01服務(wù)上執(zhí)行;
java -jar map-reduce-case01.jar
Java數(shù)據(jù)類型與對應(yīng)的Hadoop數(shù)據(jù)序列化類型;
Java類型 | Writable類型 | Java類型 | Writable類型 |
---|---|---|---|
String | Text | float | FloatWritable |
int | IntWritable | long | LongWritable |
boolean | BooleanWritable | double | DoubleWritable |
byte | ByteWritable | array | DoubleWritable |
map | MapWritable |
Mapper模塊:處理輸入的數(shù)據(jù),業(yè)務(wù)邏輯在map()方法中完成,輸出的數(shù)據(jù)也是KV格式;
Reducer模塊:處理Map程序輸出的KV數(shù)據(jù),業(yè)務(wù)邏輯在reduce()方法中;
Driver模塊:將程序提交到y(tǒng)arn進(jìn)行調(diào)度,提交封裝了運(yùn)行參數(shù)的job對象;
序列化:將內(nèi)存中對象轉(zhuǎn)換為二進(jìn)制的字節(jié)序列,可以通過輸出流持久化存儲或者網(wǎng)絡(luò)傳輸;
反序列化:接收輸入字節(jié)流或者讀取磁盤持久化的數(shù)據(jù),加載到內(nèi)存的對象過程;
Hadoop序列化相關(guān)接口:Writable實(shí)現(xiàn)的序列化機(jī)制、Comparable管理Key的排序問題;
案例描述:讀取文件,并對文件相同的行做數(shù)據(jù)累加計算,輸出計算結(jié)果;該案例演示在本地執(zhí)行,不把Jar包上傳的hadoop服務(wù)器,驅(qū)動配置一致。
實(shí)體對象屬性
public class AddEntity implements Writable { private long addNum01; private long addNum02; private long resNum; // 構(gòu)造方法 public AddEntity() { super(); } public AddEntity(long addNum01, long addNum02) { super(); this.addNum01 = addNum01; this.addNum02 = addNum02; this.resNum = addNum01 + addNum02; } // 序列化 @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(addNum01); dataOutput.writeLong(addNum02); dataOutput.writeLong(resNum); } // 反序列化 @Override public void readFields(DataInput dataInput) throws IOException { // 注意:反序列化順序和寫序列化順序一致 this.addNum01 = dataInput.readLong(); this.addNum02 = dataInput.readLong(); this.resNum = dataInput.readLong(); } // 省略Get和Set方法 }
Mapper機(jī)制
public class AddMapper extends Mapper<LongWritable, Text, Text, AddEntity> { Text myKey = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 讀取行 String line = value.toString(); // 行內(nèi)容切割 String[] lineArr = line.split(","); // 內(nèi)容格式處理 String lineNum = lineArr[0]; long addNum01 = Long.parseLong(lineArr[1]); long addNum02 = Long.parseLong(lineArr[2]); myKey.set(lineNum); AddEntity myValue = new AddEntity(addNum01,addNum02); // 輸出 context.write(myKey, myValue); } }
Reducer機(jī)制
public class AddReducer extends Reducer<Text, AddEntity, Text, AddEntity> { @Override protected void reduce(Text key, Iterable<AddEntity> values, Context context) throws IOException, InterruptedException { long addNum01Sum = 0; long addNum02Sum = 0; // 處理Key相同 for (AddEntity addEntity : values) { addNum01Sum += addEntity.getAddNum01(); addNum02Sum += addEntity.getAddNum02(); } // 最終輸出 AddEntity addRes = new AddEntity(addNum01Sum, addNum02Sum); context.write(key, addRes); } }
案例最終結(jié)果:
到此,相信大家對“Hadoop MapReduce基本原理是什么”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。