您好,登錄后才能下訂單哦!
這篇文章主要為大家展示了“Hadoop如何實現(xiàn)輔助排序”,內容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領大家一起研究并學習一下“Hadoop如何實現(xiàn)輔助排序”這篇文章吧。
1. 樣例數(shù)據(jù)
011990-99999 SIHCCAJAVRI 012650-99999 TYNSET-HANSMOEN
012650-99999 194903241200 111 012650-99999 194903241800 78 011990-99999 195005150700 0 011990-99999 195005151200 22 011990-99999 195005151800 -11
2. 需求
3. 思路、代碼
將氣象站ID相同的氣象站信息和天氣信息交由同一個 Reducer 處理,并保證氣象站信息首先到達;然后 reduce() 函數(shù)從第一行中獲取氣象臺名稱,從第二行開始獲取天氣信息并輸出。
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableUtils; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * 組合鍵,此例中用于輔助排序,包括氣象站ID和“標記”。 * “標記”是一個虛擬字段,其唯一目的是對記錄排序,使氣象站的記錄比天氣記錄先到達。 * 雖然可以不指定數(shù)據(jù)傳輸次序,并將待處理的記錄緩存在內存之中,但應該盡量避免這種情況, * 因為其中任何一組的記錄數(shù)量都可能非常龐大,遠遠超出 reducer 的可用內存量 */ public class TextPair implements WritableComparable<TextPair> { private Text first; private Text second; public TextPair() { set(new Text(), new Text()); } public TextPair(String first, String second) { set(new Text(first), new Text(second)); } public TextPair(Text first, Text second) { set(first, second); } public void set(Text first, Text second) { this.first = first; this.second = second; } public Text getFirst() { return first; } public Text getSecond() { return second; } public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); } @Override public int hashCode() { return first.hashCode() * 163 + second.hashCode(); } @Override public boolean equals(Object obj) { if (obj instanceof TextPair) { TextPair tp = (TextPair) obj; return first.equals(tp.first) && second.equals(tp.second); } return false; } @Override public String toString() { return first + "\t" + second; } public int compareTo(TextPair o) { int cmp = first.compareTo(o.first); if (cmp == 0) { cmp = second.compareTo(o.second); } return cmp; } // RawComparator 允許直接比較數(shù)據(jù)流中的記錄,無須先把數(shù)據(jù)流反序列化為對象,這樣避免了新建對象的額外開銷 // WritableComparator 是對繼承自 WritableComparable 類的 RawComparator 的一個通用實現(xiàn)。 public static class FirstComparator extends WritableComparator { private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator(); public FirstComparator() { super(TextPair.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { // firstL1、firstL2 表示每個字節(jié)流中第一個 Text 字段的長度 int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2); } catch (IOException e) { throw new IllegalArgumentException(e); } } @Override public int compare(WritableComparable a, WritableComparable b) { if (a instanceof TextPair && b instanceof TextPair) { return ((TextPair) a).first.compareTo(((TextPair) b).first); } return super.compare(a, b); } } }
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * 標記氣象站記錄的 mapper */ public class JoinStationMapper extends Mapper<LongWritable, Text, TextPair, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] val = value.toString().split("\\t"); if (val.length == 2) { context.write(new TextPair(val[0], "0"), new Text(val[1])); } } }
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * 標記天氣記錄的 mapper */ public class JoinRecordMapper extends Mapper<LongWritable, Text, TextPair, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] val = value.toString().split("\\t"); if (val.length == 3) { context.write(new TextPair(val[0], "1"), new Text(val[1] + "\t" + val[2])); } } }
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Iterator; /** * 連接已標記的氣象站記錄和天氣記錄的 reducer */ public class JoinReducer extends Reducer<TextPair, Text, Text, Text> { @Override protected void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Iterator<Text> iter = values.iterator(); Text stationName = new Text(iter.next()); // reducer 會先接收氣象站記錄(這里千萬不能寫成 Text stationName = iter.next(); ) while (iter.hasNext()) { Text record = iter.next(); Text outValue = new Text(stationName.toString() + "\t" + record.toString()); context.write(key.getFirst(), outValue); } } }
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class JoinRecordWithStationName { static class KeyPartitioner extends Partitioner<TextPair, Text> { @Override public int getPartition(TextPair textPair, Text text, int numPartitions) { return (textPair.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions; } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 3) { System.err.println("Parameter number is wrong, please enter three parameters:<ncdc input> <station input> <output>"); System.exit(-1); } Path ncdcInputPath = new Path(otherArgs[0]); Path stationInputPath = new Path(otherArgs[1]); Path outputPath = new Path(otherArgs[2]); //conf.set("fs.defaultFS", "hdfs://vmnode.zhch:9000"); Job job = Job.getInstance(conf, "JoinRecordWithStationName"); //job.setJar("F:/workspace/AssistRanking/target/AssistRanking-1.0-SNAPSHOT.jar"); job.setJarByClass(JoinRecordWithStationName.class); MultipleInputs.addInputPath(job, ncdcInputPath, TextInputFormat.class, JoinRecordMapper.class); MultipleInputs.addInputPath(job, stationInputPath, TextInputFormat.class, JoinStationMapper.class); FileOutputFormat.setOutputPath(job, outputPath); //僅按照 first(氣象臺ID) 分區(qū)、分組 (同一分區(qū)的記錄將被同一個Reducer處理,同一區(qū)同一組的記錄將被同一個Reducer在同一次reduce()函數(shù)調用中處理) job.setPartitionerClass(KeyPartitioner.class); job.setGroupingComparatorClass(TextPair.FirstComparator.class); job.setMapOutputKeyClass(TextPair.class); job.setReducerClass(JoinReducer.class); job.setOutputKeyClass(Text.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
4. 運行結果
以上是“Hadoop如何實現(xiàn)輔助排序”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業(yè)資訊頻道!
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經查實,將立刻刪除涉嫌侵權內容。