您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關(guān)mapreduce中怎么合并小文件,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
HDFS中PathFilter類
在單個(gè)操作中處理一批文件,這是很常見的需求。比如說處理日志的 MapReduce作業(yè)可能需要分析一個(gè)月內(nèi)包含在大量目錄中的日志文件。在一個(gè)表達(dá)式中使用通配符在匹配多個(gè)文件時(shí)比較方便的,無需列舉每個(gè)文件和目錄 來指定輸入。hadoop為執(zhí)行通配提供了兩個(gè)FIleSystem方法:
1 public FileStatus[] globStatus(Path pathPattern) throw IOException
2 public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throw IOException
globStatus()方法返回與路徑想匹配的所有文件的FileStatus對(duì)象數(shù)組,并按路徑排序。hadoop所支持的通配符與Unix bash相同。
第二個(gè)方法傳了一個(gè)PathFilter對(duì)象作為參數(shù),PathFilter可以進(jìn)一步對(duì)匹配進(jìn)行限制。PathFilter是一個(gè)接口,里面只有一個(gè)方法accept(Path path)。具體使用參考下面代碼
package com.tv; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.IOUtils; public class MergeSmallFilesToHDFS { private static FileSystem fs = null; private static FileSystem local = null; public static class RegexExcludePathFilter implements PathFilter{ private final String regex; public RegexExcludePathFilter(String regex) { this.regex = regex; } public boolean accept(Path path) { // TODO Auto-generated method stub boolean flag = path.toString().matches(regex); //過濾 regex 格式的文件,只需 return !flag return !flag; } } public static class RegexAcceptPathFilter implements PathFilter { private final String regex; public RegexAcceptPathFilter(String regex) { this.regex = regex; } public boolean accept(Path path) { // TODO Auto-generated method stub boolean flag = path.toString().matches(regex); //接受 regex 格式的文件,只需 return flag return flag; } } public static void list() throws IOException, URISyntaxException { //讀取配置文件 Configuration conf = new Configuration(); URI uri = new URI("hdfs://zbc:9000"); // FileSystem是用戶操作HDFS的核心類,它獲得URI對(duì)應(yīng)的HDFS文件系統(tǒng) fs = FileSystem.get(uri, conf); // 獲得本地文件系統(tǒng) local = FileSystem.getLocal(conf); //獲取該目錄下的所有子目錄(日期名稱) FileStatus[] dirstatus = local.globStatus(new Path("C:/Users/zaish/Documents/學(xué)習(xí)/hadooop分析數(shù)據(jù)/tvdata/*"),new RegexExcludePathFilter("^.*svn$")); Path[] dirs = FileUtil.stat2Paths(dirstatus); FSDataOutputStream out = null; FSDataInputStream in = null; for (Path dir : dirs) { String fileName = dir.getName().replace("-", "");//文件名稱 //只接受日期目錄下的.txt文件 FileStatus[] localStatus = local.globStatus(new Path(dir+"/*"),new RegexAcceptPathFilter("^.*txt$")); // 獲得日期目錄下的所有文件 Path[] listedPaths = FileUtil.stat2Paths(localStatus); //輸出路徑 Path block = new Path("hdfs://zbc:9000/middle/tv/"+ fileName + ".txt"); // 打開輸出流 out = fs.create(block); for (Path p : listedPaths) { in = local.open(p);// 打開輸入流 IOUtils.copyBytes(in, out, 4096, false); // 復(fù)制數(shù)據(jù) // 關(guān)閉輸入流 in.close(); } if (out != null) { // 關(guān)閉輸出流 out.close(); } } } public static void main(String[] args) throws Exception { list(); } }
看完上述內(nèi)容,你們對(duì)mapreduce中怎么合并小文件有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。