溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Hadoop中MapReduce常用算法有哪些

發(fā)布時間:2021-12-08 10:55:42 來源:億速云 閱讀:196 作者:小新 欄目:云計算

這篇文章將為大家詳細(xì)講解有關(guān)Hadoop中MapReduce常用算法有哪些,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。


 

1.排序: 

    1)數(shù)據(jù):
 

         hadoop fs -mkdir /import 
           創(chuàng)建一個或者多個文本,上傳 
           hadoop fs -put test.txt /import/ 

Hadoop中MapReduce常用算法有哪些

    2)代碼:

package com.cuiweiyou.sort;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

//hadoop默認(rèn)排序: 
//如果k2、v2類型是Text-文本,結(jié)果是按照字典順序
//如果k2、v2類型是LongWritable-數(shù)字,結(jié)果是按照數(shù)字大小順序

public class SortTest {
	/**
	 * 內(nèi)部類:映射器 Mapper<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
	 */
	public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> {
		/**
		 * 重寫map方法
		 */
		public void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
			//這里v1轉(zhuǎn)為k2-數(shù)字類型,舍棄k1。null為v2
			context.write(new LongWritable(Long.parseLong(v1.toString())), NullWritable.get());
  //因為v1可能重復(fù),這時,k2也是可能有重復(fù)的
		}
	}

	/**
	 * 內(nèi)部類:拆分器 Reducer<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
	 */
	public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
		/**
		 * 重寫reduce方法
   * 在此方法執(zhí)行前,有個shuffle過程,會根據(jù)k2將對應(yīng)的v2歸并為v2[...] 
		 */
		protected void reduce(LongWritable k2, Iterable<NullWritable> v2, Reducer<LongWritable, Context context) throws IOException, InterruptedException {
			//k2=>k3, v2[...]舍棄。null => v3
			context.write(k2, NullWritable.get());
  //此時,k3如果發(fā)生重復(fù),根據(jù)默認(rèn)算法會發(fā)生覆蓋,即最終僅保存一個k3 
		}
	}

	public static void main(String[] args) throws Exception {
		// 聲明配置信息
		Configuration conf = new Configuration();
		conf.set("fs.default.name", "hdfs://localhost:9000");
		
		// 創(chuàng)建作業(yè)
		Job job = new Job(conf, "SortTest");
		job.setJarByClass(SortTest.class);
		
		// 設(shè)置mr
		job.setMapperClass(MyMapper.class);
		job.setReducerClass(MyReducer.class);
		
		// 設(shè)置輸出類型,和Context上下文對象write的參數(shù)類型一致
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(NullWritable.class);
		
		// 設(shè)置輸入輸出路徑
		FileInputFormat.setInputPaths(job, new Path("/import/"));
		FileOutputFormat.setOutputPath(job, new Path("/out"));
		
		// 執(zhí)行
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}


 

    3)測試:

        可以看到,不僅排序而且去重了。

Hadoop中MapReduce常用算法有哪些


2.去重: 

    需求:查取手機(jī)號有哪些。這里的思路和上面排序算法的思路是一致的,僅僅多了分割出手機(jī)號這一步驟。

    1)數(shù)據(jù):

        創(chuàng)建兩個文本,手動輸入一些測試內(nèi)容。每個字段用制表符隔開。日期,電話,地址,方式,數(shù)據(jù)量。
Hadoop中MapReduce常用算法有哪些

 

    2)代碼:

        (1)map和reduce:
/**
	 * 映射器 Mapper<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
	 */
	public static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
		/**
		 * 重寫map方法
		 */
		protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException {
			//按照制表符進(jìn)行分割
			String[] tels = v1.toString().split("\t");
			//k1 => k2-第2列手機(jī)號,null => v2
			context.write(new Text(tels[1]), NullWritable.get());
		}
	}
	
	
	/************************************************************
	 *  在map后,reduce前,有個shuffle過程,會根據(jù)k2將對應(yīng)的v2歸并為v2[...] 
	 ***********************************************************/
	

	/**
	 * 拆分器 Reducer<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
	 */
	public static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
		/**
		 * 重寫reduce方法
		 */
		protected void reduce(Text k2, Iterable<NullWritable> v2, Context context) throws IOException ,InterruptedException {
			//此時,k3如果發(fā)生重復(fù),根據(jù)默認(rèn)算法會發(fā)生覆蓋,即最終僅保存一個k3,達(dá)到去重到效果
			context.write(k2, NullWritable.get());
		}
	}


 

        (2)配置輸出:
// 設(shè)置輸出類型,和Context上下文對象write的參數(shù)類型一致
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);


 

    3)測試:

Hadoop中MapReduce常用算法有哪些


3.過濾:

    需求:查詢在北京地區(qū)發(fā)生的上網(wǎng)記錄。思路同上,當(dāng)寫出 k2 、 v2 時加一個判斷即可。

    1)數(shù)據(jù):

        同上。

    2)代碼:

        (1)map和reduce:
/**
	 * 內(nèi)部類:映射器 Mapper<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
	 */
	public static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
		/**
		 * 重寫map方法
		 */
		protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException {
			//按照制表符進(jìn)行分割
			final String[] adds = v1.toString().split("\t");
			//地址在第3列
			//k1 => k2-地址,null => v2
			if(adds[2].equals("beijing")){
				context.write(new Text(v1.toString()), NullWritable.get());
			}
		}
	}

	/**
	 * 內(nèi)部類:拆分器 Reducer<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT>
	 */
	public static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
		/**
		 * 重寫reduce方法
		 */
		protected void reduce(Text k2, Iterable<NullWritable> v2, Context context) throws IOException ,InterruptedException {
			context.write(k2, NullWritable.get());
		}
	}


 

        (2)配置輸出:
// 設(shè)置輸出類型,和Context上下文對象write的參數(shù)類型一致
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);


 

    3)測試:

Hadoop中MapReduce常用算法有哪些


4.TopN: 

    這個算法非常經(jīng)典,面試必問。實現(xiàn)這個效果的算法也很多。下面是個簡單的示例。
    需求:找到流量最大值;找出前5個最大值。

    1)數(shù)據(jù):

        同上。

    2)代碼1-最大值:

        (1)map和reduce:
//map
	public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> {

		//首先創(chuàng)建一個臨時變量,保存一個可存儲的最小值:Long.MIN_VALUE=-9223372036854775808
		long temp = Long.MIN_VALUE;
		
		//找出最大值
		protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException {
			//按照制表符進(jìn)行分割
			final String[] flows = v1.toString().split("\t");
			//將文本轉(zhuǎn)數(shù)值
			final long val = Long.parseLong(flows[4]);
			//如果v1比臨時變量大,則保存v1的值
			if(temp<val){
				temp = val;
			}
		}
		
		/** ---此方法在全部的map任務(wù)結(jié)束后執(zhí)行一次。這時僅輸出臨時變量到最大值--- **/
		protected void cleanup(Context context) throws IOException ,InterruptedException {
			context.write(new LongWritable(temp), NullWritable.get());
			System.out.println("文件讀取完畢");
		}
	}
	
	//reduce
	public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
		//臨時變量
		Long temp = Long.MIN_VALUE;

		//因為一個文件得到一個最大值,再次將這些值比對,得到最大的
		protected void reduce(LongWritable k2, Iterable<NullWritable> v2, Context context) throws IOException ,InterruptedException {
			
			long long1 = Long.parseLong(k2.toString());
			//如果k2比臨時變量大,則保存k2的值
			if(temp<long1){
				temp = long1;
			}
		}
		
		/** ?。?!此方法在全部的reduce任務(wù)結(jié)束后執(zhí)行一次。這時僅輸出臨時變量到最大值?。。?nbsp;**/
		protected void cleanup(Context context) throws IOException, InterruptedException {
			context.write(new LongWritable(temp), NullWritable.get());
		}
	}


 

        (2)配置輸出:
// 設(shè)置輸出類型
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(NullWritable.class);


 

    3)測試1:

Hadoop中MapReduce常用算法有哪些

    4)代碼2-TopN:

        (1)map和reduce:
//map
	public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> {

		//首先創(chuàng)建一個臨時變量,保存一個可存儲的最小值:Long.MIN_VALUE=-9223372036854775808
		long temp = Long.MIN_VALUE;
		//Top5存儲空間
		long[] tops;
		
		/** 次方法在run中調(diào)用,在全部map之前執(zhí)行一次 **/
		protected void setup(Context context) {
			//初始化數(shù)組長度為5
			tops = new long[5];  
		}
		
		//找出最大值
		protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException {
			//按照制表符進(jìn)行分割
			final String[] flows = v1.toString().split("\t");
			//將文本轉(zhuǎn)數(shù)值
			final long val = Long.parseLong(flows[4]);
			//保存在0索引
			tops[0] = val;
			//排序后最大值在最后一個索引,這樣從后到前依次減小
			Arrays.sort(tops);
		}
		
		/** ---此方法在全部到map任務(wù)結(jié)束后執(zhí)行一次。這時僅輸出臨時變量到最大值--- **/
		protected void cleanup(Context context) throws IOException ,InterruptedException {
			//保存前5條數(shù)據(jù)
			for( int i = 0; i < tops.length; i++) {  
				context.write(new LongWritable(tops[i]), NullWritable.get());  
			}
		}
	}
	
	//reduce
	public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
		//臨時變量
		Long temp = Long.MIN_VALUE;
		//Top5存儲空間
		long[] tops;

		/** 次方法在run中調(diào)用,在全部map之前執(zhí)行一次 **/
		protected void setup(Context context) {
			//初始化長度為5
			tops = new long[5];  
		}
		
		//因為每個文件都得到5個值,再次將這些值比對,得到最大的
		protected void reduce(LongWritable k2, Iterable<NullWritable> v2, Context context) throws IOException ,InterruptedException {
			
			long top = Long.parseLong(k2.toString());
			//
			tops[0] = top;
			//
			Arrays.sort(tops);
		}
		
		/** ---此方法在全部到reduce任務(wù)結(jié)束后執(zhí)行一次。輸出前5個最大值--- **/
		protected void cleanup(Context context) throws IOException, InterruptedException {
			//保存前5條數(shù)據(jù)
			for( int i = 0; i < tops.length; i++) {  
				context.write(new LongWritable(tops[i]), NullWritable.get());  
			} 
		}
	}


 

        (2)配置輸出:
// 設(shè)置輸出類型
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(NullWritable.class);


 

    5)測試2:

Hadoop中MapReduce常用算法有哪些


5.單表關(guān)聯(lián): 

    本例中的單表實際就是一個文本文件。 

    1)數(shù)據(jù):

Hadoop中MapReduce常用算法有哪些

    2)代碼:

        (1)map和reduce:
//map
	public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
		//拆分原始數(shù)據(jù)
		protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException {
			//按制表符拆分記錄
			String[] splits = v1.toString().split("\t");
			//一條k2v2記錄:把孫輩作為k2;祖輩加下劃線區(qū)分,作為v2
			context.write(new Text(splits[0]), new Text("_"+splits[1]));
			//一條k2v2記錄:把祖輩作為k2;孫輩作為v2。就是把原兩個單詞調(diào)換位置保存
			context.write(new Text(splits[1]), new Text(splits[0]));
		}
			
			/**
				張三		_張三爸爸
				張三爸爸	張三
				
				張三爸爸	_張三爺爺
				張三爺爺	張三爸爸
			**/
	}
	
	//reduce
	public static class MyReducer extends Reducer<Text, Text, Text, Text> {
		//拆分k2v2[...]數(shù)據(jù)
		protected void reduce(Text k2, Iterable<Text> v2, Context context) throws IOException ,InterruptedException {
			String grandchild = "";	//孫輩
			String grandfather = "";	//祖輩
			
			/**
			 	張三爸爸		[_張三爺爺,張三]
			**/
			
			//從迭代中遍歷v2[...]
			for (Text man : v2) {
				String p = man.toString();
				//如果單詞是以下劃線開始的
				if(p.startsWith("_")){
					//從索引1開始截取字符串,保存到祖輩變量
					grandfather = p.substring(1);
				}
				//如果單詞沒有下劃線起始
				else{
					//直接賦值給孫輩變量
					grandchild = p;
				}
			}
			
			//在得到有效數(shù)據(jù)的情況下
			if( grandchild!="" && grandfather!=""){
				//寫出得到的結(jié)果。
				context.write(new Text(grandchild), new Text(grandfather));
			}
			
			/**
				k3=張三,v3=張三爺爺
			**/
		}
	}


 

        (2)配置輸出:
// 設(shè)置輸出類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);


 

    3)測試:

Hadoop中MapReduce常用算法有哪些


6.雙表關(guān)聯(lián): 

    本例中仍簡單采用兩個文本文件。

    1)數(shù)據(jù):

Hadoop中MapReduce常用算法有哪些

    2)代碼:

        (1)map和reduce:
//map
	public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
		//拆分原始數(shù)據(jù)
		protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException {
			//拆分記錄
			String[] splited = v1.toString().split("\t");
			//如果第一列是數(shù)字(使用正則判斷),就是地址表
			if(splited[0].matches("^[-+]?(([0-9]+)([.]([0-9]+))?|([.]([0-9]+))?)$")){
				String addreId = splited[0];
				String address = splited[1];
  //k2,v2-加兩條下劃線作為前綴標(biāo)識為地址
				context.write(new Text(addreId), new Text("__"+address));
			}
			//否則就是人員表
			else{
				String personId = splited[1];
				String persName = splited[0];
  //k2,v2-加兩條橫線作為前綴標(biāo)識為人員
				context.write(new Text(personId), new Text("--"+persName));
			}
			/**
			 1	__北京
			 1	--張三
			**/
		}
	}
	
	//reduce
	public static class MyReducer extends Reducer<Text, Text, Text, Text> {
		//拆分k2v2[...]數(shù)據(jù)
		protected void reduce(Text k2, Iterable<Text> v2, Context context) throws IOException ,InterruptedException {
			String address = "";	//地址
			String person = "";		//人員
			/**
				1, [__北京,--張三]
			**/
			//迭代的是address或者person
			for (Text text : v2) {
				String tmp = text.toString();
				
				if(tmp.startsWith("__")){
					//如果是__開頭的是address
					address = tmp.substring(2);	//從索引2開始截取字符串
				}
				if(tmp.startsWith("--")){
					//如果是--開頭的是person
					person = tmp.substring(2);
				}
			}
			context.write(new Text(person), new Text(address));
		}
		/**
		 k3=張三,v3=北京
		**/
	

 

        (2)配置輸出:
// 設(shè)置輸出類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

    3)測試:

Hadoop中MapReduce常用算法有哪些

關(guān)于“Hadoop中MapReduce常用算法有哪些”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學(xué)到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI