您好,登錄后才能下訂單哦!
對日志數(shù)據(jù)中的上下行流量信息匯總,并輸出按照總流量倒序排序的結(jié)果
數(shù)據(jù)如下:
1363157985066 1372623050300-FD-07-A4-72-B8:CMCC120.196.100.82 2427248124681200 1363157995052 138265441015C-0E-8B-C7-F1-E0:CMCC120.197.40.4402640200 1363157991076 1392643565620-10-7A-28-CC-0A:CMCC120.196.100.99241321512200 1363154400022 139262511065C-0E-8B-8B-B1-50:CMCC120.197.40.4402400200 |
基本思路:實(shí)現(xiàn)自定義的bean來封裝流量信息,并將bean作為map輸出的key來傳輸
MR程序在處理數(shù)據(jù)的過程中會對數(shù)據(jù)排序(map輸出的kv對傳輸?shù)?/span>reduce之前,會排序),排序的依據(jù)是map輸出的key
所以,我們?nèi)绻獙?shí)現(xiàn)自己需要的排序規(guī)則,則可以考慮將排序因素放到key中,讓key實(shí)現(xiàn)接口:WritableComparable
然后重寫key的compareTo方法
1、 自定義的bean
public class FlowBean implements WritableComparable<FlowBean>{ long upflow; long downflow; long sumflow; //如果空參構(gòu)造函數(shù)被覆蓋,一定要顯示定義一下,否則在反序列時會拋異常 public FlowBean(){} public FlowBean(long upflow, long downflow) { super(); this.upflow = upflow; this.downflow = downflow; this.sumflow = upflow + downflow; } public long getSumflow() { return sumflow; }
public void setSumflow(long sumflow) { this.sumflow = sumflow; }
public long getUpflow() { return upflow; } public void setUpflow(long upflow) { this.upflow = upflow; } public long getDownflow() { return downflow; } public void setDownflow(long downflow) { this.downflow = downflow; }
//序列化,將對象的字段信息寫入輸出流 @Override public void write(DataOutput out) throws IOException { out.writeLong(upflow); out.writeLong(downflow); out.writeLong(sumflow); }
//反序列化,從輸入流中讀取各個字段信息 @Override public void readFields(DataInput in) throws IOException { upflow = in.readLong(); downflow = in.readLong(); sumflow = in.readLong(); } @Override public String toString() { return upflow + "\t" + downflow + "\t" + sumflow; } @Override public int compareTo(FlowBean o) { //自定義倒序比較規(guī)則 return sumflow > o.getSumflow() ? -1:1; } } |
2、 mapper 和 reducer
public class FlowCount {
static class FlowCountMapper extends Mapper<LongWritable, Text, FlowBean,Text > {
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString(); String[] fields = line.split("\t"); try { String phonenbr = fields[0];
long upflow = Long.parseLong(fields[1]); long dflow = Long.parseLong(fields[2]);
FlowBean flowBean = new FlowBean(upflow, dflow);
context.write(flowBean,new Text(phonenbr)); } catch (Exception e) {
e.printStackTrace(); }
}
}
static class FlowCountReducer extends Reducer<FlowBean,Text,Text, FlowBean> {
@Override protected void reduce(FlowBean bean, Iterable<Text> phonenbr, Context context) throws IOException, InterruptedException {
Text phoneNbr = phonenbr.iterator().next();
context.write(phoneNbr, bean);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowCount.class);
job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class);
job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class);
// job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
} |
根據(jù)歸屬地輸出流量統(tǒng)計(jì)數(shù)據(jù)結(jié)果到不同文件,以便于在查詢統(tǒng)計(jì)結(jié)果時可以定位到省級范圍進(jìn)行
Mapreduce中會將map輸出的kv對,按照相同key分組,然后分發(fā)給不同的reducetask
默認(rèn)的分發(fā)規(guī)則為:根據(jù)key的hashcode%reducetask數(shù)來分發(fā)
所以:如果要按照我們自己的需求進(jìn)行分組,則需要改寫數(shù)據(jù)分發(fā)(分組)組件Partitioner
自定義一個CustomPartitioner繼承抽象類:Partitioner
然后在job對象中,設(shè)置自定義partitioner: job.setPartitionerClass(CustomPartitioner.class)
/** * 定義自己的從map到reduce之間的數(shù)據(jù)(分組)分發(fā)規(guī)則 按照手機(jī)號所屬的省份來分發(fā)(分組)ProvincePartitioner * 默認(rèn)的分組組件是HashPartitioner * * @author * */ public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>();
static {
provinceMap.put("135", 0); provinceMap.put("136", 1); provinceMap.put("137", 2); provinceMap.put("138", 3); provinceMap.put("139", 4);
}
@Override public int getPartition(Text key, FlowBean value, int numPartitions) {
Integer code = provinceMap.get(key.toString().substring(0, 3));
return code == null ? 5 : code; }
} |
這是mapreduce的一種優(yōu)化策略:通過壓縮編碼對mapper或者reducer的輸出進(jìn)行壓縮,以減少磁盤IO,提高MR程序運(yùn)行速度(但相應(yīng)增加了cpu運(yùn)算負(fù)擔(dān))
1、 Mapreduce支持將map輸出的結(jié)果或者reduce輸出的結(jié)果進(jìn)行壓縮,以減少網(wǎng)絡(luò)IO或最終輸出數(shù)據(jù)的體積
2、 壓縮特性運(yùn)用得當(dāng)能提高性能,但運(yùn)用不當(dāng)也可能降低性能
3、 基本原則:
運(yùn)算密集型的job,少用壓縮
IO密集型的job,多用壓縮
在配置參數(shù)或在代碼中都可以設(shè)置reduce的輸出壓縮
1、在配置參數(shù)中設(shè)置
mapreduce.output.fileoutputformat.compress=false
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DefaultCodec
mapreduce.output.fileoutputformat.compress.type=RECORD
2、在代碼中設(shè)置
Job job = Job.getInstance(conf); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, (Class<? extends CompressionCodec>) Class.forName("")); |
在配置參數(shù)或在代碼中都可以設(shè)置reduce的輸出壓縮
1、在配置參數(shù)中設(shè)置
mapreduce.map.output.compress=false
mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.DefaultCodec
2、在代碼中設(shè)置:
conf.setBoolean(Job.MAP_OUTPUT_COMPRESS, true); conf.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class, CompressionCodec.class); |
Hadoop自帶的InputFormat類內(nèi)置支持壓縮文件的讀取,比如TextInputformat類,在其initialize方法中:
public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath();
// open the file and seek to the start of the split final FileSystem fs = file.getFileSystem(job); fileIn = fs.open(file); //根據(jù)文件后綴名創(chuàng)建相應(yīng)壓縮編碼的codec CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); if (null!=codec) { isCompressedInput = true; decompressor = CodecPool.getDecompressor(codec); //判斷是否屬于可切片壓縮編碼類型 if (codec instanceof SplittableCompressionCodec) { final SplitCompressionInputStream cIn = ((SplittableCompressionCodec)codec).createInputStream( fileIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK); //如果是可切片壓縮編碼,則創(chuàng)建一個CompressedSplitLineReader讀取壓縮數(shù)據(jù) in = new CompressedSplitLineReader(cIn, job, this.recordDelimiterBytes); start = cIn.getAdjustedStart(); end = cIn.getAdjustedEnd(); filePosition = cIn; } else { //如果是不可切片壓縮編碼,則創(chuàng)建一個SplitLineReader讀取壓縮數(shù)據(jù),并將文件輸入流轉(zhuǎn)換成解壓數(shù)據(jù)流傳遞給普通SplitLineReader讀取 in = new SplitLineReader(codec.createInputStream(fileIn, decompressor), job, this.recordDelimiterBytes); filePosition = fileIn; } } else { fileIn.seek(start); //如果不是壓縮文件,則創(chuàng)建普通SplitLineReader讀取數(shù)據(jù) in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes); filePosition = fileIn; } |
1、需求:
訂單數(shù)據(jù)表t_order:
id | date | pid | amount |
1001 | 20150710 | P0001 | 2 |
1002 | 20150710 | P0001 | 3 |
1002 | 20150710 | P0002 | 3 |
商品信息表t_product
id | name | category_id | price |
P0001 | 小米5 | C01 | 2 |
P0002 | 錘子T1 | C01 | 3 |
假如數(shù)據(jù)量巨大,兩表的數(shù)據(jù)是以文件的形式存儲在HDFS中,需要用mapreduce程序來實(shí)現(xiàn)一下SQL查詢運(yùn)算:
select a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product b on a.pid = b.id |
2、實(shí)現(xiàn)機(jī)制:
通過將關(guān)聯(lián)的條件作為map輸出的key,將兩表滿足join條件的數(shù)據(jù)并攜帶數(shù)據(jù)所來源的文件信息,發(fā)往同一個reduce task,在reduce中進(jìn)行數(shù)據(jù)的串聯(lián)
public class OrderJoin {
static class OrderJoinMapper extends Mapper<LongWritable, Text, Text, OrderJoinBean> {
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 拿到一行數(shù)據(jù),并且要分辨出這行數(shù)據(jù)所屬的文件 String line = value.toString();
String[] fields = line.split("\t");
// 拿到itemid String itemid = fields[0];
// 獲取到這一行所在的文件名(通過inpusplit) String name = "你拿到的文件名";
// 根據(jù)文件名,切分出各字段(如果是a,切分出兩個字段,如果是b,切分出3個字段)
OrderJoinBean bean = new OrderJoinBean(); bean.set(null, null, null, null, null); context.write(new Text(itemid), bean);
}
}
static class OrderJoinReducer extends Reducer<Text, OrderJoinBean, OrderJoinBean, NullWritable> {
@Override protected void reduce(Text key, Iterable<OrderJoinBean> beans, Context context) throws IOException, InterruptedException { //拿到的key是某一個itemid,比如1000 //拿到的beans是來自于兩類文件的bean // {1000,amount} {1000,amount} {1000,amount} --- {1000,price,name} //將來自于b文件的bean里面的字段,跟來自于a的所有bean進(jìn)行字段拼接并輸出 } } } |
缺點(diǎn):這種方式中,join的操作是在reduce階段完成,reduce端的處理壓力太大,map節(jié)點(diǎn)的運(yùn)算負(fù)載則很低,資源利用率不高,且在reduce階段極易產(chǎn)生數(shù)據(jù)傾斜
解決方案: map端join實(shí)現(xiàn)方式
1、原理闡述
適用于關(guān)聯(lián)表中有小表的情形;
可以將小表分發(fā)到所有的map節(jié)點(diǎn),這樣,map節(jié)點(diǎn)就可以在本地對自己所讀到的大表數(shù)據(jù)進(jìn)行join并輸出最終結(jié)果,可以大大提高join操作的并發(fā)度,加快處理速度
2、實(shí)現(xiàn)示例
--先在mapper類中預(yù)先定義好小表,進(jìn)行join
--引入實(shí)際場景中的解決方案:一次加載數(shù)據(jù)庫或者用distributedcache
public class TestDistributedCache { static class TestDistributedCacheMapper extends Mapper<LongWritable, Text, Text, Text>{ FileReader in = null; BufferedReader reader = null; HashMap<String,String> b_tab = new HashMap<String, String>(); String localpath =null; String uirpath = null; //是在map任務(wù)初始化的時候調(diào)用一次 @Override protected void setup(Context context) throws IOException, InterruptedException { //通過這幾句代碼可以獲取到cache file的本地絕對路徑,測試驗(yàn)證用 Path[] files = context.getLocalCacheFiles(); localpath = files[0].toString(); URI[] cacheFiles = context.getCacheFiles(); //緩存文件的用法——直接用本地IO來讀取 //這里讀的數(shù)據(jù)是map task所在機(jī)器本地工作目錄中的一個小文件 in = new FileReader("b.txt"); reader =new BufferedReader(in); String line =null; while(null!=(line=reader.readLine())){ String[] fields = line.split(","); b_tab.put(fields[0],fields[1]); } IOUtils.closeStream(reader); IOUtils.closeStream(in); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//這里讀的是這個map task所負(fù)責(zé)的那一個切片數(shù)據(jù)(在hdfs上) String[] fields = value.toString().split("\t");
String a_itemid = fields[0]; String a_amount = fields[1];
String b_name = b_tab.get(a_itemid);
// 輸出結(jié)果 100198.9banan context.write(new Text(a_itemid), new Text(a_amount + "\t" + ":" + localpath + "\t" +b_name ));
} } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(TestDistributedCache.class); job.setMapperClass(TestDistributedCacheMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //這里是我們正常的需要處理的數(shù)據(jù)所在路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //不需要reducer job.setNumReduceTasks(0); //分發(fā)一個文件到task進(jìn)程的工作目錄 job.addCacheFile(new URI("hdfs://hadoop-server01:9000/cachefile/b.txt")); //分發(fā)一個歸檔文件到task進(jìn)程的工作目錄 //job.addArchiveToClassPath(archive);
//分發(fā)jar包到task節(jié)點(diǎn)的classpath下 //job.addFileToClassPath(jarfile); job.waitForCompletion(true); } } |
1、需求:
對web訪問日志中的各字段識別切分
去除日志中不合法的記錄
根據(jù)KPI統(tǒng)計(jì)需求,生成各類訪問請求過濾數(shù)據(jù)
2、實(shí)現(xiàn)代碼:
a) 定義一個bean,用來記錄日志數(shù)據(jù)中的各數(shù)據(jù)字段
public class WebLogBean { private String remote_addr;// 記錄客戶端的ip地址 private String remote_user;// 記錄客戶端用戶名稱,忽略屬性"-" private String time_local;// 記錄訪問時間與時區(qū) private String request;// 記錄請求的url與http協(xié)議 private String status;// 記錄請求狀態(tài);成功是200 private String body_bytes_sent;// 記錄發(fā)送給客戶端文件主體內(nèi)容大小 private String http_referer;// 用來記錄從那個頁面鏈接訪問過來的 private String http_user_agent;// 記錄客戶瀏覽器的相關(guān)信息
private boolean valid = true;// 判斷數(shù)據(jù)是否合法
public String getRemote_addr() { return remote_addr; }
public void setRemote_addr(String remote_addr) { this.remote_addr = remote_addr; }
public String getRemote_user() { return remote_user; }
public void setRemote_user(String remote_user) { this.remote_user = remote_user; }
public String getTime_local() { return time_local; }
public void setTime_local(String time_local) { this.time_local = time_local; }
public String getRequest() { return request; }
public void setRequest(String request) { this.request = request; }
public String getStatus() { return status; }
public void setStatus(String status) { this.status = status; }
public String getBody_bytes_sent() { return body_bytes_sent; }
public void setBody_bytes_sent(String body_bytes_sent) { this.body_bytes_sent = body_bytes_sent; }
public String getHttp_referer() { return http_referer; }
public void setHttp_referer(String http_referer) { this.http_referer = http_referer; }
public String getHttp_user_agent() { return http_user_agent; }
public void setHttp_user_agent(String http_user_agent) { this.http_user_agent = http_user_agent; }
public boolean isValid() { return valid; }
public void setValid(boolean valid) { this.valid = valid; }
@Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append(this.valid); sb.append("\001").append(this.remote_addr); sb.append("\001").append(this.remote_user); sb.append("\001").append(this.time_local); sb.append("\001").append(this.request); sb.append("\001").append(this.status); sb.append("\001").append(this.body_bytes_sent); sb.append("\001").append(this.http_referer); sb.append("\001").append(this.http_user_agent); return sb.toString(); } } |
b)定義一個parser用來解析過濾web訪問日志原始記錄
public class WebLogParser { public static WebLogBean parser(String line) { WebLogBean webLogBean = new WebLogBean(); String[] arr = line.split(" "); if (arr.length > 11) { webLogBean.setRemote_addr(arr[0]); webLogBean.setRemote_user(arr[1]); webLogBean.setTime_local(arr[3].substring(1)); webLogBean.setRequest(arr[6]); webLogBean.setStatus(arr[8]); webLogBean.setBody_bytes_sent(arr[9]); webLogBean.setHttp_referer(arr[10]);
if (arr.length > 12) { webLogBean.setHttp_user_agent(arr[11] + " " + arr[12]); } else { webLogBean.setHttp_user_agent(arr[11]); } if (Integer.parseInt(webLogBean.getStatus()) >= 400) {// 大于400,HTTP錯誤 webLogBean.setValid(false); } } else { webLogBean.setValid(false); } return webLogBean; }
public static String parserTime(String time) {
time.replace("/", "-"); return time;
} } |
c) mapreduce程序
public class WeblogPreProcess {
static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> { Text k = new Text(); NullWritable v = NullWritable.get();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString(); WebLogBean webLogBean = WebLogParser.parser(line); if (!webLogBean.isValid()) return; k.set(webLogBean.toString()); context.write(k, v);
}
}
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(WeblogPreProcess.class); job.setMapperClass(WeblogPreProcessMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } } |
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。