溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

MAPREDUCE實(shí)踐篇(2)

發(fā)布時間:2020-06-13 21:06:49 來源:網(wǎng)絡(luò) 閱讀:606 作者:yushiwh 欄目:開發(fā)技術(shù)

4.1. Mapreduce中的排序初步

4.1.1 需求

對日志數(shù)據(jù)中的上下行流量信息匯總,并輸出按照總流量倒序排序的結(jié)果

數(shù)據(jù)如下:

1363157985066 1372623050300-FD-07-A4-72-B8:CMCC120.196.100.82             2427248124681200

1363157995052 138265441015C-0E-8B-C7-F1-E0:CMCC120.197.40.4402640200

1363157991076 1392643565620-10-7A-28-CC-0A:CMCC120.196.100.99241321512200

1363154400022 139262511065C-0E-8B-8B-B1-50:CMCC120.197.40.4402400200

 

 

4.1.2 分析

基本思路:實(shí)現(xiàn)自定義的bean來封裝流量信息,并將bean作為map輸出的key來傳輸

 

MR程序在處理數(shù)據(jù)的過程中會對數(shù)據(jù)排序(map輸出的kv對傳輸?shù)?/span>reduce之前,會排序),排序的依據(jù)是map輸出的key

所以,我們?nèi)绻獙?shí)現(xiàn)自己需要的排序規(guī)則,則可以考慮將排序因素放到key中,讓key實(shí)現(xiàn)接口:WritableComparable

然后重寫keycompareTo方法

 

4.1.3 實(shí)現(xiàn)

1、 自定義的bean

public class FlowBean implements WritableComparable<FlowBean>{


long upflow;

long downflow;

long sumflow;


//如果空參構(gòu)造函數(shù)被覆蓋,一定要顯示定義一下,否則在反序列時會拋異常

public FlowBean(){}


public FlowBean(long upflow, long downflow) {

super();

this.upflow = upflow;

this.downflow = downflow;

this.sumflow = upflow + downflow;

}


public long getSumflow() {

return sumflow;

}

 

public void setSumflow(long sumflow) {

this.sumflow = sumflow;

}

 

public long getUpflow() {

return upflow;

}

public void setUpflow(long upflow) {

this.upflow = upflow;

}

public long getDownflow() {

return downflow;

}

public void setDownflow(long downflow) {

this.downflow = downflow;

}

 

//序列化,將對象的字段信息寫入輸出流

@Override

public void write(DataOutput out) throws IOException {


out.writeLong(upflow);

out.writeLong(downflow);

out.writeLong(sumflow);


}

 

//反序列化,從輸入流中讀取各個字段信息

@Override

public void readFields(DataInput in) throws IOException {

upflow = in.readLong();

downflow = in.readLong();

sumflow = in.readLong();


}



@Override

public String toString() {

return upflow + "\t" + downflow + "\t" + sumflow;

}

@Override

public int compareTo(FlowBean o) {

//自定義倒序比較規(guī)則

return sumflow > o.getSumflow() ? -1:1;

}

}

 

 

 

2、 mapper reducer

public class FlowCount {

 

static class FlowCountMapper extends Mapper<LongWritable, Text, FlowBean,Text > {

 

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

 

String line = value.toString();

String[] fields = line.split("\t");

try {

String phonenbr = fields[0];

 

long upflow = Long.parseLong(fields[1]);

long dflow = Long.parseLong(fields[2]);

 

FlowBean flowBean = new FlowBean(upflow, dflow);

 

context.write(flowBean,new Text(phonenbr));

} catch (Exception e) {

 

e.printStackTrace();

}

 

}

 

}

 

static class FlowCountReducer extends Reducer<FlowBean,Text,Text, FlowBean> {

 

@Override

protected void reduce(FlowBean bean, Iterable<Text> phonenbr, Context context) throws IOException, InterruptedException {

 

Text phoneNbr = phonenbr.iterator().next();

 

context.write(phoneNbr, bean);

 

}

 

}

 

public static void main(String[] args) throws Exception {

 

Configuration conf = new Configuration();

 

Job job = Job.getInstance(conf);

 

job.setJarByClass(FlowCount.class);

 

job.setMapperClass(FlowCountMapper.class);

job.setReducerClass(FlowCountReducer.class);

 

 job.setMapOutputKeyClass(FlowBean.class);

 job.setMapOutputValueClass(Text.class);

 

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(FlowBean.class);

 

// job.setInputFormatClass(TextInputFormat.class);

 

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

job.waitForCompletion(true);

 

}

 

}

 

 

 

4.2. Mapreduce中的分區(qū)Partitioner

4.2.1 需求

根據(jù)歸屬地輸出流量統(tǒng)計(jì)數(shù)據(jù)結(jié)果到不同文件,以便于在查詢統(tǒng)計(jì)結(jié)果時可以定位到省級范圍進(jìn)行

4.2.2 分析

Mapreduce中會將map輸出的kv對,按照相同key分組,然后分發(fā)給不同的reducetask

默認(rèn)的分發(fā)規(guī)則為:根據(jù)keyhashcode%reducetask數(shù)來分發(fā)

所以:如果要按照我們自己的需求進(jìn)行分組,則需要改寫數(shù)據(jù)分發(fā)(分組)組件Partitioner

自定義一個CustomPartitioner繼承抽象類:Partitioner

然后在job對象中,設(shè)置自定義partitionerjob.setPartitionerClass(CustomPartitioner.class)

 

4.2.3 實(shí)現(xiàn)

/**

 * 定義自己的從mapreduce之間的數(shù)據(jù)(分組)分發(fā)規(guī)則 按照手機(jī)號所屬的省份來分發(fā)(分組)ProvincePartitioner

 * 默認(rèn)的分組組件是HashPartitioner

 *

 * @author

 *

 */

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {

 

static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>();

 

static {

 

provinceMap.put("135", 0);

provinceMap.put("136", 1);

provinceMap.put("137", 2);

provinceMap.put("138", 3);

provinceMap.put("139", 4);

 

}

 

@Override

public int getPartition(Text key, FlowBean value, int numPartitions) {

 

Integer code = provinceMap.get(key.toString().substring(0, 3));

 

return code == null ? 5 : code;

}

 

}

 

 

 

 

4.3. mapreduce數(shù)據(jù)壓縮

4.3.1 概述

這是mapreduce的一種優(yōu)化策略:通過壓縮編碼對mapper或者reducer的輸出進(jìn)行壓縮,以減少磁盤IO,提高MR程序運(yùn)行速度(但相應(yīng)增加了cpu運(yùn)算負(fù)擔(dān))

1、 Mapreduce支持將map輸出的結(jié)果或者reduce輸出的結(jié)果進(jìn)行壓縮,以減少網(wǎng)絡(luò)IO或最終輸出數(shù)據(jù)的體積

2、 壓縮特性運(yùn)用得當(dāng)能提高性能,但運(yùn)用不當(dāng)也可能降低性能

3、 基本原則:

運(yùn)算密集型的job,少用壓縮

IO密集型的job,多用壓縮

 

 

 

4.3.2 MR支持的壓縮編碼

MAPREDUCE實(shí)踐篇(2) 

 

4.3.3 Reducer輸出壓縮

在配置參數(shù)或在代碼中都可以設(shè)置reduce的輸出壓縮

1、在配置參數(shù)中設(shè)置

mapreduce.output.fileoutputformat.compress=false

mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DefaultCodec

mapreduce.output.fileoutputformat.compress.type=RECORD

 

2、在代碼中設(shè)置

Job job = Job.getInstance(conf);

FileOutputFormat.setCompressOutput(job, true);

FileOutputFormat.setOutputCompressorClass(job, (Class<? extends CompressionCodec>) Class.forName(""));

4.3.4 Mapper輸出壓縮

在配置參數(shù)或在代碼中都可以設(shè)置reduce的輸出壓縮

1、在配置參數(shù)中設(shè)置

mapreduce.map.output.compress=false

mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.DefaultCodec

 

2、在代碼中設(shè)置:

conf.setBoolean(Job.MAP_OUTPUT_COMPRESS, true);

conf.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class, CompressionCodec.class);

 

 

 

4.3.5 壓縮文件的讀取

Hadoop自帶的InputFormat類內(nèi)置支持壓縮文件的讀取,比如TextInputformat類,在其initialize方法中:

  public void initialize(InputSplit genericSplit,

                         TaskAttemptContext context) throws IOException {

    FileSplit split = (FileSplit) genericSplit;

    Configuration job = context.getConfiguration();

    this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);

    start = split.getStart();

    end = start + split.getLength();

    final Path file = split.getPath();

 

    // open the file and seek to the start of the split

    final FileSystem fs = file.getFileSystem(job);

    fileIn = fs.open(file);

    //根據(jù)文件后綴名創(chuàng)建相應(yīng)壓縮編碼的codec

    CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);

    if (null!=codec) {

      isCompressedInput = true;

      decompressor = CodecPool.getDecompressor(codec);

  //判斷是否屬于可切片壓縮編碼類型

      if (codec instanceof SplittableCompressionCodec) {

        final SplitCompressionInputStream cIn =

          ((SplittableCompressionCodec)codec).createInputStream(

            fileIn, decompressor, start, end,

            SplittableCompressionCodec.READ_MODE.BYBLOCK);

 //如果是可切片壓縮編碼,則創(chuàng)建一個CompressedSplitLineReader讀取壓縮數(shù)據(jù)

        in = new CompressedSplitLineReader(cIn, job,

            this.recordDelimiterBytes);

        start = cIn.getAdjustedStart();

        end = cIn.getAdjustedEnd();

        filePosition = cIn;

      } else {

//如果是不可切片壓縮編碼,則創(chuàng)建一個SplitLineReader讀取壓縮數(shù)據(jù),并將文件輸入流轉(zhuǎn)換成解壓數(shù)據(jù)流傳遞給普通SplitLineReader讀取

        in = new SplitLineReader(codec.createInputStream(fileIn,

            decompressor), job, this.recordDelimiterBytes);

        filePosition = fileIn;

      }

    } else {

      fileIn.seek(start);

   //如果不是壓縮文件,則創(chuàng)建普通SplitLineReader讀取數(shù)據(jù)

      in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes);

      filePosition = fileIn;

    }

 

 

 

 

 

 

 

 

 


4.4. 更多MapReduce編程案例

4.4.1 reducejoin算法實(shí)現(xiàn)

1、需求:

訂單數(shù)據(jù)表t_order

id

date

pid

amount

1001

20150710

P0001

2

1002

20150710

P0001

3

1002

20150710

P0002

3

 

商品信息表t_product

id

name

category_id

price

P0001

小米5

C01

2

P0002

錘子T1

C01

3

 

假如數(shù)據(jù)量巨大,兩表的數(shù)據(jù)是以文件的形式存儲在HDFS中,需要用mapreduce程序來實(shí)現(xiàn)一下SQL查詢運(yùn)算:

select  a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product b on a.pid = b.id

 

2、實(shí)現(xiàn)機(jī)制:

通過將關(guān)聯(lián)的條件作為map輸出的key,將兩表滿足join條件的數(shù)據(jù)并攜帶數(shù)據(jù)所來源的文件信息,發(fā)往同一個reduce task,在reduce中進(jìn)行數(shù)據(jù)的串聯(lián)

 

 

public class OrderJoin {

 

static class OrderJoinMapper extends Mapper<LongWritable, Text, Text, OrderJoinBean> {

 

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

 

// 拿到一行數(shù)據(jù),并且要分辨出這行數(shù)據(jù)所屬的文件

String line = value.toString();

 

String[] fields = line.split("\t");

 

// 拿到itemid

String itemid = fields[0];

 

// 獲取到這一行所在的文件名(通過inpusplit

String name = "你拿到的文件名";

 

// 根據(jù)文件名,切分出各字段(如果是a,切分出兩個字段,如果是b,切分出3個字段)

 

OrderJoinBean bean = new OrderJoinBean();

bean.set(null, null, null, null, null);

context.write(new Text(itemid), bean);

 

}

 

}

 

static class OrderJoinReducer extends Reducer<Text, OrderJoinBean, OrderJoinBean, NullWritable> {

 

@Override

protected void reduce(Text key, Iterable<OrderJoinBean> beans, Context context) throws IOException, InterruptedException {


 //拿到的key是某一個itemid,比如1000

//拿到的beans是來自于兩類文件的bean

//  {1000,amount} {1000,amount} {1000,amount}   ---   {1000,price,name}


//將來自于b文件的bean里面的字段,跟來自于a的所有bean進(jìn)行字段拼接并輸出

}

}

}

 

 

缺點(diǎn):這種方式中,join的操作是在reduce階段完成,reduce端的處理壓力太大,map節(jié)點(diǎn)的運(yùn)算負(fù)載則很低,資源利用率不高,且在reduce階段極易產(chǎn)生數(shù)據(jù)傾斜

 

解決方案: mapjoin實(shí)現(xiàn)方式

 

 

 

 

 

 

4.4.2 mapjoin算法實(shí)現(xiàn)

1、原理闡述

適用于關(guān)聯(lián)表中有小表的情形;

可以將小表分發(fā)到所有的map節(jié)點(diǎn),這樣,map節(jié)點(diǎn)就可以在本地對自己所讀到的大表數(shù)據(jù)進(jìn)行join并輸出最終結(jié)果,可以大大提高join操作的并發(fā)度,加快處理速度

2、實(shí)現(xiàn)示例

--先在mapper類中預(yù)先定義好小表,進(jìn)行join

--引入實(shí)際場景中的解決方案:一次加載數(shù)據(jù)庫或者用distributedcache

public class TestDistributedCache {

static class TestDistributedCacheMapper extends Mapper<LongWritable, Text, Text, Text>{

FileReader in = null;

BufferedReader reader = null;

HashMap<String,String> b_tab = new HashMap<String, String>();

String localpath =null;

String uirpath = null;


//是在map任務(wù)初始化的時候調(diào)用一次

@Override

protected void setup(Context context) throws IOException, InterruptedException {

//通過這幾句代碼可以獲取到cache file的本地絕對路徑,測試驗(yàn)證用

Path[] files = context.getLocalCacheFiles();

localpath = files[0].toString();

URI[] cacheFiles = context.getCacheFiles();



//緩存文件的用法——直接用本地IO來讀取

//這里讀的數(shù)據(jù)是map task所在機(jī)器本地工作目錄中的一個小文件

in = new FileReader("b.txt");

reader =new BufferedReader(in);

String line =null;

while(null!=(line=reader.readLine())){


String[] fields = line.split(",");

b_tab.put(fields[0],fields[1]);


}

IOUtils.closeStream(reader);

IOUtils.closeStream(in);


}


@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

 

//這里讀的是這個map task所負(fù)責(zé)的那一個切片數(shù)據(jù)(在hdfs上)

 String[] fields = value.toString().split("\t");

 

 String a_itemid = fields[0];

 String a_amount = fields[1];

 

 String b_name = b_tab.get(a_itemid);

 

 // 輸出結(jié)果  100198.9banan

 context.write(new Text(a_itemid), new Text(a_amount + "\t" + ":" + localpath + "\t" +b_name ));

 

}



}



public static void main(String[] args) throws Exception {


Configuration conf = new Configuration();

Job job = Job.getInstance(conf);


job.setJarByClass(TestDistributedCache.class);


job.setMapperClass(TestDistributedCacheMapper.class);


job.setOutputKeyClass(Text.class);

job.setOutputValueClass(LongWritable.class);


//這里是我們正常的需要處理的數(shù)據(jù)所在路徑

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));


//不需要reducer

job.setNumReduceTasks(0);

//分發(fā)一個文件到task進(jìn)程的工作目錄

job.addCacheFile(new URI("hdfs://hadoop-server01:9000/cachefile/b.txt"));


//分發(fā)一個歸檔文件到task進(jìn)程的工作目錄

//job.addArchiveToClassPath(archive);

 

//分發(fā)jar包到task節(jié)點(diǎn)的classpath

//job.addFileToClassPath(jarfile);


job.waitForCompletion(true);

}

}

4.4.3 web日志預(yù)處理

1、需求:

web訪問日志中的各字段識別切分

去除日志中不合法的記錄

根據(jù)KPI統(tǒng)計(jì)需求,生成各類訪問請求過濾數(shù)據(jù)

 

2、實(shí)現(xiàn)代碼:

a) 定義一個bean,用來記錄日志數(shù)據(jù)中的各數(shù)據(jù)字段

public class WebLogBean {


    private String remote_addr;// 記錄客戶端的ip地址

    private String remote_user;// 記錄客戶端用戶名稱,忽略屬性"-"

    private String time_local;// 記錄訪問時間與時區(qū)

    private String request;// 記錄請求的urlhttp協(xié)議

    private String status;// 記錄請求狀態(tài);成功是200

    private String body_bytes_sent;// 記錄發(fā)送給客戶端文件主體內(nèi)容大小

    private String http_referer;// 用來記錄從那個頁面鏈接訪問過來的

    private String http_user_agent;// 記錄客戶瀏覽器的相關(guān)信息

 

    private boolean valid = true;// 判斷數(shù)據(jù)是否合法

 

    

    

public String getRemote_addr() {

return remote_addr;

}

 

public void setRemote_addr(String remote_addr) {

this.remote_addr = remote_addr;

}

 

public String getRemote_user() {

return remote_user;

}

 

public void setRemote_user(String remote_user) {

this.remote_user = remote_user;

}

 

public String getTime_local() {

return time_local;

}

 

public void setTime_local(String time_local) {

this.time_local = time_local;

}

 

public String getRequest() {

return request;

}

 

public void setRequest(String request) {

this.request = request;

}

 

public String getStatus() {

return status;

}

 

public void setStatus(String status) {

this.status = status;

}

 

public String getBody_bytes_sent() {

return body_bytes_sent;

}

 

public void setBody_bytes_sent(String body_bytes_sent) {

this.body_bytes_sent = body_bytes_sent;

}

 

public String getHttp_referer() {

return http_referer;

}

 

public void setHttp_referer(String http_referer) {

this.http_referer = http_referer;

}

 

public String getHttp_user_agent() {

return http_user_agent;

}

 

public void setHttp_user_agent(String http_user_agent) {

this.http_user_agent = http_user_agent;

}

 

public boolean isValid() {

return valid;

}

 

public void setValid(boolean valid) {

this.valid = valid;

}

    

    

@Override

public String toString() {

        StringBuilder sb = new StringBuilder();

        sb.append(this.valid);

        sb.append("\001").append(this.remote_addr);

        sb.append("\001").append(this.remote_user);

        sb.append("\001").append(this.time_local);

        sb.append("\001").append(this.request);

        sb.append("\001").append(this.status);

        sb.append("\001").append(this.body_bytes_sent);

        sb.append("\001").append(this.http_referer);

        sb.append("\001").append(this.http_user_agent);

        return sb.toString();

}

}

 

b)定義一個parser用來解析過濾web訪問日志原始記錄

public class WebLogParser {

    public static WebLogBean parser(String line) {

        WebLogBean webLogBean = new WebLogBean();

        String[] arr = line.split(" ");

        if (arr.length > 11) {

        webLogBean.setRemote_addr(arr[0]);

        webLogBean.setRemote_user(arr[1]);

        webLogBean.setTime_local(arr[3].substring(1));

        webLogBean.setRequest(arr[6]);

        webLogBean.setStatus(arr[8]);

        webLogBean.setBody_bytes_sent(arr[9]);

        webLogBean.setHttp_referer(arr[10]);

            

            if (arr.length > 12) {

            webLogBean.setHttp_user_agent(arr[11] + " " + arr[12]);

            } else {

            webLogBean.setHttp_user_agent(arr[11]);

            }

            if (Integer.parseInt(webLogBean.getStatus()) >= 400) {// 大于400HTTP錯誤

            webLogBean.setValid(false);

            }

        } else {

        webLogBean.setValid(false);

        }

        return webLogBean;

    }

   

    public static String parserTime(String time) {

    

    time.replace("/", "-");

    return time;

    

    }

}

 

 

c) mapreduce程序

public class WeblogPreProcess {

 

static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

Text k = new Text();

NullWritable v = NullWritable.get();

 

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

 

String line = value.toString();

WebLogBean webLogBean = WebLogParser.parser(line);

if (!webLogBean.isValid())

return;

k.set(webLogBean.toString());

context.write(k, v);

 

}

 

}

 

public static void main(String[] args) throws Exception {


Configuration conf = new Configuration();

Job job = Job.getInstance(conf);


job.setJarByClass(WeblogPreProcess.class);


job.setMapperClass(WeblogPreProcessMapper.class);


job.setOutputKeyClass(Text.class);

job.setOutputValueClass(NullWritable.class);


FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));


job.waitForCompletion(true);


}

}

 

 


向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI