溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Hadoop中如何分區(qū)

發(fā)布時間:2021-12-09 15:44:33 來源:億速云 閱讀:165 作者:小新 欄目:云計算

小編給大家分享一下Hadoop中如何分區(qū),相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

package partition;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class KpiApp {
	public static final String INPUT_PATH = "hdfs://hadoop:9000/files/HTTP_20130313143750.dat";
	public static final String OUTPUT_PATH = "hdfs://hadoop:9000/files/format";
	public static void main(String[] args)throws Exception {
		Configuration conf = new Configuration();
		existsFile(conf);
		Job job = new Job(conf, KpiApp.class.getName());
		//打成Jar在Linux運行
		job.setJarByClass(KpiApp.class);
		
		//1.1
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		job.setInputFormatClass(TextInputFormat.class);
		
		//1.2
		job.setMapperClass(MyMapper.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(KpiWritable.class);
		
		//1.3 自定義分區(qū)
		job.setPartitionerClass(KpiPartition.class);
		job.setNumReduceTasks(2);
		
		//1.4 排序分組
		//1.5 聚合
		
		//2.1
		
		//2.2
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(KpiWritable.class);
		
		//2.3
		FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
		job.setOutputFormatClass(TextOutputFormat.class);
		
		job.waitForCompletion(true);
	}
	private static void existsFile(Configuration conf) throws IOException,
			URISyntaxException {
		FileSystem fs = FileSystem.get(new URI(OUTPUT_PATH),conf);
		if(fs.exists(new Path(OUTPUT_PATH))){
			fs.delete(new Path(OUTPUT_PATH), true);
		}
	}
	static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable>{

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String string = value.toString();
			String[] split = string.split("\t");
			String phone = split[1];
			Text key2 = new Text();
			key2.set(phone);
			
			KpiWritable v2= new KpiWritable();
			v2.set(split[6],split[7],split[8],split[9]);
			context.write(key2, v2);
		}
		
	}
	static class MyReducer extends Reducer<Text, KpiWritable, Text, KpiWritable>{

		@Override
		protected void reduce(Text key2, Iterable<KpiWritable> values,Context context)
				throws IOException, InterruptedException {
				long upPackNum = 0L;
				long downPackNum = 0L;
				long upPayLoad = 0L;
				long downPayLoad = 0L;
				for(KpiWritable writable : values){
					upPackNum += writable.upPackNum;
					downPackNum += writable.downPackNum;
					upPayLoad += writable.upPayLoad;
					downPayLoad += writable.downPayLoad;
				}
				KpiWritable value3 = new KpiWritable();
				value3.set(String.valueOf(upPackNum), String.valueOf(downPackNum), String.valueOf(upPayLoad), String.valueOf(downPayLoad));
				context.write(key2, value3);
		}
	}
}
class KpiWritable implements Writable{
	long upPackNum;
	long downPackNum;
	long upPayLoad;
	long downPayLoad;
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeLong(this.upPackNum);
		out.writeLong(this.downPackNum);
		out.writeLong(this.upPayLoad);
		out.writeLong(this.downPayLoad);
	}

	public void set(String string, String string2, String string3,
			String string4) {
		this.upPackNum = Long.parseLong(string);
		this.downPackNum = Long.parseLong(string2);
		this.upPayLoad = Long.parseLong(string3);
		this.downPayLoad = Long.parseLong(string4);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.upPackNum = in.readLong();
		this.downPackNum = in.readLong();
		this.upPayLoad = in.readLong();
		this.downPayLoad = in.readLong();
	}

	@Override
	public String toString() {
		return  upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t" + downPayLoad;
	}
}
class KpiPartition extends Partitioner<Text, KpiWritable>{

	@Override
	public int getPartition(Text key, KpiWritable value, int numPartitions) {
		String string = key.toString();
		return string.length()==11?0:1;
	}
}

  Paritioner是Hashpartitioner的基類,如果需要定制Partitioner也需要繼承該類。

  HashPartitioner是MapReduce的默認(rèn)Partitioner。

以上是“Hadoop中如何分區(qū)”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識,歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI