您好,登錄后才能下訂單哦!
小編給大家分享一下hadoop中mapreduce如何實(shí)現(xiàn)串聯(lián)執(zhí)行,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
import java.io.IOException; import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class PickMain { private static final Log LOG = LogFactory.getLog(PickMain.class); public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { /* * Configuration conf = new Configuration(); Job job1 = Job.getInstance(conf); job1.setJarByClass(PickMain.class); job1.setMapperClass(FindMapper.class); job1.setReducerClass(FindReducer.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job1, new Path(args[0])); FileOutputFormat.setOutputPath(job1, new Path(args[1])); boolean flag1 = job1.waitForCompletion(true); //下面這種方法也可以實(shí)現(xiàn)串聯(lián)執(zhí)行job if(flag1) { Job job2 = Job.getInstance(conf); job2.setJarByClass(PickMain.class); job2.setMapperClass(SecondFindMapper.class); job2.setReducerClass(SecondFindReducer.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job2, new Path(args[1])); FileOutputFormat.setOutputPath(job2, new Path(args[2])); boolean flag2 = job2.waitForCompletion(true); System.out.println(flag2?0:1); if(flag2) { LOG.info("The job is done!"); System.exit(0); }else { LOG.info("The Second job is wrong!"); System.exit(1); } }else { LOG.info("The firt job is Running Wrong job break!"); System.exit(1); } */ //下面通過使用ContolledJob和JobControl來實(shí)現(xiàn)提交多個作業(yè) Configuration conf = new Configuration(); Job job1 = Job.getInstance(conf); job1.setJarByClass(PickMain.class); job1.setMapperClass(FindMapper.class); job1.setReducerClass(FindReducer.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job1, new Path(args[0])); FileOutputFormat.setOutputPath(job1, new Path(args[1])); Configuration conf2 = new Configuration(); Job job2 = Job.getInstance(conf2); job2.setJarByClass(PickMain.class); job2.setMapperClass(SecondFindMapper.class); job2.setReducerClass(SecondFindReducer.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job2, new Path(args[1])); FileOutputFormat.setOutputPath(job2, new Path(args[2])); //創(chuàng)建ControlledJob對job進(jìn)行包裝 ControlledJob cjob1 = new ControlledJob(conf); ControlledJob cjob2 = new ControlledJob(conf2); cjob1.setJob(job1); cjob2.setJob(job2); //設(shè)置依賴關(guān)系,這個時(shí)候只有等到j(luò)ob1執(zhí)行完成后job2才會執(zhí)行 cjob2.addDependingJob(cjob1); //JobControl該類相當(dāng)于一個job控制器,它是一個線程,需要通過線程啟動 JobControl jc = new JobControl("my_jobcontrol"); jc.addJob(cjob1); jc.addJob(cjob2); Thread th = new Thread(jc); th.start(); //等到所有的job都執(zhí)行完成后在退出 while(!jc.allFinished()) { Thread.sleep(5000); } System.exit(0); } } class FindMapper extends Mapper<LongWritable, Text, Text, Text>{ Text m1 = new Text(); Text m2 = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] tmp1 = line.split(":"); String outval = tmp1[0]; String[] outkeys = tmp1[1].split(","); for(int i = 0 ; i<outkeys.length;i++) { m1.set(outkeys[i]);m2.set(outval); context.write(m1,m2); } } } class FindReducer extends Reducer<Text, Text, Text, NullWritable>{ StringBuilder sb = new StringBuilder(); NullWritable nul = NullWritable.get(); Text outval = new Text(); String spector = ":"; @Override protected void reduce(Text txt, Iterable<Text> txtiter, Reducer<Text, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { sb.delete(0, sb.length()); sb.append(txt.toString()); Iterator<Text> it = txtiter.iterator(); while(it.hasNext()) { sb.append(spector+it.next().toString()); } outval.set(sb.toString()); context.write(outval, nul); } } class SecondFindMapper extends Mapper<LongWritable, Text, Text, Text>{ Text keyout = new Text(); Text valueout = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String[] fs = value.toString().split(":"); valueout.set(fs[0]); if(fs.length>0) { for(int i = 1;i<fs.length-1;i++) { for(int j = i+1;j<fs.length;j++) { if((int)fs[i].toCharArray()[0]>(int)fs[j].toCharArray()[0]) { keyout.set(fs[j]+"-"+fs[i]); }else { keyout.set(fs[i]+"-"+fs[j]); } context.write(keyout, valueout); } } } } } class SecondFindReducer extends Reducer<Text, Text, Text, Text>{ StringBuilder sb = new StringBuilder(); Text outvalue = new Text(); @Override protected void reduce(Text key, Iterable<Text> iter, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { sb.delete(0, sb.length()); Iterator<Text> it = iter.iterator(); if(it.hasNext()) { sb.append(it.next().toString()); } while(it.hasNext()) { sb.append(","+it.next().toString()); } outvalue.set(sb.toString()); context.write(key, outvalue); } }
以上是“hadoop中mapreduce如何實(shí)現(xiàn)串聯(lián)執(zhí)行”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識,歡迎關(guān)注億速云行業(yè)資訊頻道!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。