您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“Hadoop的多文件輸出及自定義文件名方法是什么”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
首先是輸出格式的類,也就是job.setOutputFormatClass(……)參數(shù)列表中的類:
public class MoreFileOutputFormat extends Multiple<Text,Text> { @Override protected String generateFileNameForKeyValue(Text key, Text value,Configuration conf) { return "Your name"; } }
這里,繼承Multiple類后必須重寫generateFileNameForKeyValue()方法,這個(gè)方法返回的字符串作為輸出文件的文件名。內(nèi)容有各位自己根據(jù)需要編寫。同時(shí),key和value的值也根據(jù)自己的需要更換。
接下來(lái)是Multiple模板類的代碼:
import java.io.DataOutputStream; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; public abstract class Multiple<K extends WritableComparable<?>, V extends Writable> extends FileOutputFormat<K, V> { // 接口類,需要在調(diào)用程序中實(shí)現(xiàn)generateFileNameForKeyValue來(lái)獲取文件名 private MultiRecordWriter writer = null; public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { if (writer == null) { writer = new MultiRecordWriter(job, getTaskOutputPath(job)); } return writer; } /** * get task output path * * @param conf * @return * @throws IOException */ private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException { Path workPath = null; OutputCommitter committer = super.getOutputCommitter(conf); if (committer instanceof FileOutputCommitter) { workPath = ((FileOutputCommitter) committer).getWorkPath(); } else { Path outputPath = super.getOutputPath(conf); if (outputPath == null) { throw new IOException("Undefined job output-path"); } workPath = outputPath; } return workPath; } //繼承后重寫以獲得文件名 protected abstract String generateFileNameForKeyValue(K key, V value,Configuration conf); //實(shí)現(xiàn)記錄寫入器RecordWriter類 (內(nèi)部類) public class MultiRecordWriter extends RecordWriter<K, V> { /** RecordWriter的緩存 */ private HashMap<String, RecordWriter<K, V>> recordWriters = null; private TaskAttemptContext job = null; /** 輸出目錄 */ private Path workPath = null; public MultiRecordWriter(TaskAttemptContext job, Path workPath) { super(); this.job = job; this.workPath = workPath; recordWriters = new HashMap<String, RecordWriter<K, V>>(); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator(); while (values.hasNext()) { values.next().close(context); } this.recordWriters.clear(); } @Override public void write(K key, V value) throws IOException, InterruptedException { // 得到輸出文件名 String baseName = generateFileNameForKeyValue(key, value,job.getConfiguration()); // 如果recordWriters里沒有文件名,那么就建立。否則就直接寫值。 RecordWriter<K, V> rw = this.recordWriters.get(baseName); if (rw == null) { rw = getBaseRecordWriter(job, baseName); this.recordWriters.put(baseName, rw); } rw.write(key, value); } // ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension} private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); // 查看是否使用解碼器 boolean isCompressed = getCompressOutput(job); RecordWriter<K, V> recordWriter = null; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass( job, GzipCodec.class); CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); Path file = new Path(workPath, baseName + codec.getDefaultExtension()); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); // 這里我使用的自定義的OutputFormat recordWriter = new MyRecordWriter<K, V>(new DataOutputStream( codec.createOutputStream(fileOut))); } else { Path file; System.out.println("workPath = " + workPath + ", basename = " + baseName); file = new Path(workPath, baseName); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); // 這里我使用的自定義的OutputFormat recordWriter = new MyRecordWriter<K, V>(fileOut); } return recordWriter; } } }
現(xiàn)在來(lái)實(shí)現(xiàn)Multiple的內(nèi)部類MultiRecordWriter中的MyRecordWriter類以實(shí)現(xiàn)自己想要的輸出方式:
public class MyRecordWriter<K, V> extends RecordWriter<K,V> { private static final String utf8 = "UTF-8";//定義字符編碼格式 protected DataOutputStream out; public MyRecordWriter(DataOutputStream out) { this.out = out; } private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; out.write(to.getBytes(), 0, to.getLength()); } else { //輸出成字節(jié)流。如果不是文本類的,請(qǐng)更改此處 out.write(o.toString().getBytes(utf8)); } } /** * 將mapreduce的key,value以自定義格式寫入到輸出流中 */ public synchronized void write(K key, V value) throws IOException { writeObject(value); } public synchronized void close(TaskAttemptContext context) throws IOException { out.close(); } }
這個(gè)類中還有其它集中方法,不過(guò)筆者不需要那些方法,所以把它們都刪除了,但最初的文件也刪除了- -,所以現(xiàn)在找不到了。請(qǐng)大家見諒。
現(xiàn)在,只需在main()或者run()函數(shù)中將job的輸出格式設(shè)置成MoreFileOutputFormat類就行了,如下:
job.setOutputFormatClass(MoreFileOutputFormatClass);
“Hadoop的多文件輸出及自定義文件名方法是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。