您好,登錄后才能下訂單哦!
本例子采用hadoop1.1.2版本,附件中有例子的數(shù)據(jù)文件
采用氣象數(shù)據(jù)作為處理數(shù)據(jù)
1、MultipleOutputs例子,具體解釋在代碼中有注釋
package StationPatitioner; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapred.lib.MultipleOutputs; import org.apache.hadoop.mapred.lib.NullOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * hadoop Version 1.1.2 * MultipleOutputs例子 * @author 巧克力黑 * */ public class PatitionByStationUsingMultipleOutputs extends Configured implements Tool { enum Counter { LINESKIP, //出錯(cuò)的行 } static class StationMapper extends MapReduceBase implements Mapper<LongWritable , Text, Text , Text>{ private NcdcRecordParser parser = new NcdcRecordParser(); @Override public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { try { parser.parse(value); output.collect(new Text(parser.getStationid()), value); } catch (Exception e) { reporter.getCounter(Counter.LINESKIP).increment(1); //出錯(cuò)令計(jì)數(shù)器+1 } } } static class MultipleOutputReducer extends MapReduceBase implements Reducer<Text, Text, NullWritable, Text>{ private MultipleOutputs multipleOutputs; @Override public void configure(JobConf jobconf) { multipleOutputs = new MultipleOutputs(jobconf);//初始化一個(gè)MultipleOutputs } @Override public void reduce(Text key, Iterator<Text> values, OutputCollector<NullWritable, Text> output, Reporter reporter) throws IOException { //得到OutputCollector OutputCollector collector = multipleOutputs.getCollector("station", key.toString().replace("-", ""), reporter); while(values.hasNext()){ collector.collect(NullWritable.get(), values.next());//MultipleOutputs用OutputCollector輸出數(shù)據(jù) } } @Override public void close() throws IOException { multipleOutputs.close(); } } @Override public int run(String[] as) throws Exception { System.setProperty("HADOOP_USER_NAME", "root");//windows下用戶(hù)與linux用戶(hù)不一直,采用此方法避免報(bào)Permission相關(guān)錯(cuò)誤 JobConf conf = new JobConf(); conf.setMapperClass(StationMapper.class); conf.setReducerClass(MultipleOutputReducer.class); conf.setMapOutputKeyClass(Text.class); conf.setOutputKeyClass(NullWritable.class); conf.setOutputFormat(NullOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path("hdfs://ubuntu:9000/sample1.txt"));//input路徑 FileOutputFormat.setOutputPath(conf, new Path("hdfs://ubuntu:9000/temperature"));//output路徑 MultipleOutputs.addMultiNamedOutput(conf, "station", TextOutputFormat.class, NullWritable.class, Text.class); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception{ int exitCode = ToolRunner.run(new PatitionByStationUsingMultipleOutputs(), args); System.exit(exitCode); } }
2、解析氣象數(shù)據(jù)的類(lèi)
package StationPatitioner; import org.apache.hadoop.io.Text; public class NcdcRecordParser { private static final int MISSING_TEMPERATURE = 9999; private String year; private int airTemperature; private String quality; private String stationid; public void parse(String record) { stationid = record.substring(0, 5); year = record.substring(15, 19); String airTemperatureString; // Remove leading plus sign as parseInt doesn't like them if (record.charAt(87) == '+') { airTemperatureString = record.substring(88, 92); } else { airTemperatureString = record.substring(87, 92); } airTemperature = Integer.parseInt(airTemperatureString); quality = record.substring(92, 93); } public String getStationid(){ return stationid; } public void parse(Text record) { parse(record.toString()); } public boolean isValidTemperature() { return airTemperature != MISSING_TEMPERATURE && quality.matches("[01459]"); } public String getYear() { return year; } public int getAirTemperature() { return airTemperature; } }
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。