首先編寫(xiě)WordCountDriver:  package com.jym.hadoop.mr.demo;  import java.io.IOException;  import org.apache.hadoop.conf.Configuration;  i..."/>
溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

MapReduce編寫(xiě)實(shí)現(xiàn)wordcount詞頻統(tǒng)計(jì)

發(fā)布時(shí)間:2020-07-29 12:58:10 來(lái)源:網(wǎng)絡(luò) 閱讀:720 作者:nineteens 欄目:編程語(yǔ)言

  p>首先編寫(xiě)WordCountDriver:

  package com.jym.hadoop.mr.demo;

  import java.io.IOException;

  import org.apache.hadoop.conf.Configuration;

  import org.apache.hadoop.fs.Path;

  import org.apache.hadoop.io.IntWritable;

  import org.apache.hadoop.io.Text;

  import org.apache.hadoop.mapreduce.Job;

  import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;

  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

  /**

  * 這個(gè)程序相當(dāng)于一個(gè)yarn集群的客戶端,

  * 需要在此封裝我們的mr程序的相關(guān)運(yùn)行參數(shù),指定jar包,

  * 最后提交給yarn

  * */

  public class WordcountDriver

  {

  public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException

  {

  Configuration conf=new Configuration();

  /*其實(shí)如果在本地運(yùn)行MR程序其實(shí)不用配置下面的代碼程序,在MR默認(rèn)下就是本地運(yùn)行*/

  /**下面這段代碼配置的是在本地模式下運(yùn)行MR程序*/

  /**是否運(yùn)行為本地模式,就是看這個(gè)參數(shù)值是否為local,默認(rèn)就是local;*/

  //conf.set("mapreduce.framework.name", "local"); //在本地運(yùn)行MR程序

  //本地模式運(yùn)行MR程序時(shí),輸入輸出的數(shù)據(jù)可以在本地,也可以在hdfs上

  //到底在哪里,就看以下兩行配置用哪一行了,默認(rèn)是“file:///”

  /**conf.set("fs.defaultFS", "hdfs://hadoop1:9000");*/ //使用的是HDFS系統(tǒng)

  //conf.set("fs.defaultFS", "file:///"); //使用的是本地Windows磁盤(pán)

  /**運(yùn)行集群模式,就是把程序提交到y(tǒng)arn中去運(yùn)行

  * 要想運(yùn)行為集群模式,以下3個(gè)參數(shù)要指定為集群上的值

  * */

  conf.set("mapreduce.framework.name", "yarn");

  conf.set("yarn.resourcemanager.hostname", "hadoop1");

  conf.set("fs.defaultFS", "hdfs://hadoop1:9000");

  Job job = Job.getInstance(conf);

  /**要想在Windows的Eclipse上運(yùn)行程序,并提交到hadoop的YARN集群上需要指定jar包,如下:*/

  /**job.setJar("c:/wc.jar");*/

  //job.setJar("/home/hadoop/wc.jar"); //這種是將程序打包成jar包,放到指定的位置,缺乏靈活性,不建議使用;

  //指定本程序的jar包所在的本地路徑

  job.setJarByClass(WordcountDriver.class);

  //指定本業(yè)務(wù)job要使用的mapper/reducer業(yè)務(wù)類

  job.setMapperClass(WordcountMapper.class);

  job.setReducerClass(WordcountReducerr.class);

  //指定mapper輸出數(shù)據(jù)的kv類型;

  job.setMapOutputKeyClass(Text.class);

  job.setMapOutputValueClass(IntWritable.class);

  //指定最終輸出的數(shù)據(jù)的kv類型

  job.setOutputKeyClass(Text.class);

  job.setOutputValueClass(IntWritable.class);

  //指定需要使用的combiner,以及用哪一個(gè)類作為combiner的邏輯

  /*job.setCombinerClass(WordcountCombiner.class);*/

  job.setCombinerClass(WordcountReducerr.class);

  /**因?yàn)閏ombiner的工作原理通reducecer的作用是一樣的,所以直接反射調(diào)用reducerr類其實(shí)作用是一樣的*/

  /**此處為之后為測(cè)試添加的*/

  //如果不設(shè)置InputFormat,它默認(rèn)使用的是TextInputFormat.class

  /**job.setInputFormatClass(CombineTextInputFormatInputFormatInputFormat.class);

  CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

  CombineTextInputFormat.setMinInputSplitSize(job, 2097152);

  */

  //指定job的輸入原始文件所在目錄

  //FileInputFormat.setInputPaths(job, new Path("/wordcount/input")); //此處添加的路徑為HDFS文件系統(tǒng)的路徑;

  FileInputFormat.setInputPaths(job, new Path(args[0])); //傳一個(gè)路徑參數(shù)

  //指定job的輸出結(jié)果所在目錄

  FileOutputFormat.setOutputPath(job, new Path(args[1])); //傳一個(gè)參數(shù)進(jìn)來(lái)作為輸出的路徑參數(shù)

  //將job中配置的相關(guān)參數(shù),以及job所用的Java類所在的jar包,提交給yarn去運(yùn)行;

  /*job.submit(); */

  boolean res = job.waitForCompletion(true);

  System.exit(res?0:1);

  }

  }

  其次編寫(xiě)WordCountMapper:

  package com.jym.hadoop.mr.demo;

  import java.io.IOException;

  import org.apache.hadoop.io.IntWritable;

  import org.apache.hadoop.io.LongWritable;

  import org.apache.hadoop.io.Text;

  //這是一個(gè)簡(jiǎn)單的MapReduce例子,進(jìn)行單詞數(shù)量的統(tǒng)計(jì)操作;

  import org.apache.hadoop.mapreduce.Mapper;

  /**

  * KEYIN:默認(rèn)情況下,是mr框架所讀到的一行文本的起始偏移量,Long類型,但是在Hadoop中有更精簡(jiǎn)的序列化接口,因此采用LongWritable類型;

  * VALUEIN:默認(rèn)情況下,是mr框架所讀到的一行文本的內(nèi)容,String類型的,同上用Text(org.apache.hadoop.io.Text)類型;

  * KEYOUT:是用戶自定義邏輯處理完成之后輸出數(shù)據(jù)中的key,在此處是單詞,為String類型,同上用Text類型;

  * VALUEOUT:是用戶自定義邏輯處理完成之后輸出數(shù)據(jù)中的value,在此處是單詞數(shù)量,為Integer類型,同上用IntWritable類型;

  * */

  public class WordcountMapper extends Mapper

  {

  /**

  * map階段的業(yè)務(wù)邏輯就寫(xiě)在自定義的map()方法中,

  * maptask會(huì)對(duì)每一行輸入數(shù)據(jù)調(diào)用一次我們自定義的map()方法;

  * */

  @Override //覆寫(xiě)Mapper中的方法;

  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException

  {

  //將maptask傳給我們的文本內(nèi)容先轉(zhuǎn)換成String類型

  String line = value.toString();

  //根據(jù)空格將這一行切分成單詞;

  String[] words = line.split(" ");

  //將單詞輸出為<單詞,1>

  for(String word:words)

  {

  //將單詞作為key,將次數(shù)1作為value,以便于后續(xù)的數(shù)據(jù)分發(fā),可以根據(jù)單詞分發(fā),以便于相同單詞會(huì)分到相同的reduce task中;

  context.write(new Text(word),new IntWritable(1)); //進(jìn)行類型轉(zhuǎn)換一下;

  }無(wú)錫×××醫(yī)院 https://yyk.familydoctor.com.cn/20612/

  }

  最后編寫(xiě)WordCountReduceer:

  package com.jym.hadoop.mr.demo;

  import java.io.IOException;

  import java.util.Iterator;

  import org.apache.hadoop.io.IntWritable;

  import org.apache.hadoop.io.Text;

  import org.apache.hadoop.mapreduce.Reducer;

  /**

  * KEYIN,VALUEIN應(yīng)該對(duì)應(yīng)mapper中的輸出的KEYOUT,VALUEOUT類型;

  * KEYOUT是單詞

  * VALUEOUT是總次數(shù)*/

  public class WordcountReducerr extends Reducer

  {

  /**

  * 例如:

  *

  * 輸入?yún)?shù)key,是一組相同單詞kv對(duì)的key

  * */

  @Override

  protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException

  {

  int count= 0;

  /* //采用迭代器的方式進(jìn)行統(tǒng)計(jì)單詞的數(shù)量;

  Iterator iterator = values.iterator();

  while(iterator.hasNext())

  {

  count+=iterator.next().get(); //獲取key對(duì)應(yīng)的value值

  }

  */

  //下面的for循環(huán)和上面注釋中的效果是一樣的;

  for(IntWritable value:values)

  {

  count+=value.get();

  }

  //輸出統(tǒng)計(jì)結(jié)果

  context.write(key, new IntWritable(count));

  /**

  * 默認(rèn)情況下reduce task會(huì)將輸出結(jié)果放到一個(gè)文件中(最好是HDFS文件系統(tǒng)上的一個(gè)文件)

  * */

  }

  }

  然而還可以編寫(xiě)一個(gè)Combiner類:

  package com.jym.hadoop.mr.demo;

  import java.io.IOException;

  import org.apache.hadoop.io.IntWritable;

  import org.apache.hadoop.io.Text;

  import org.apache.hadoop.mapreduce.Reducer;

  /*

  * 此處的這個(gè)combiner其實(shí)不用自己編寫(xiě),因?yàn)閏ombiner的工作原理同reducer的原理是一樣

  * 的,故可以直接反射調(diào)用WordcountReducer類即可

  * */

  public class WordcountCombiner extends Reducer

  {

  @Override

  protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException

  {

  }


向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI