溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

大數(shù)據(jù)采集、清洗、處理:使用MapReduce進行離線數(shù)據(jù)分析完整案例

發(fā)布時間:2020-07-18 08:12:33 來源:網(wǎng)絡 閱讀:186899 作者:xpleaf 欄目:大數(shù)據(jù)

[TOC]


1 大數(shù)據(jù)處理的常用方法

大數(shù)據(jù)處理目前比較流行的是兩種方法,一種是離線處理,一種是在線處理,基本處理架構如下:

大數(shù)據(jù)采集、清洗、處理:使用MapReduce進行離線數(shù)據(jù)分析完整案例

在互聯(lián)網(wǎng)應用中,不管是哪一種處理方式,其基本的數(shù)據(jù)來源都是日志數(shù)據(jù),例如對于web應用來說,則可能是用戶的訪問日志、用戶的點擊日志等。

如果對于數(shù)據(jù)的分析結(jié)果在時間上有比較嚴格的要求,則可以采用在線處理的方式來對數(shù)據(jù)進行分析,如使用Spark、Storm等進行處理。比較貼切的一個例子是天貓雙十一的成交額,在其展板上,我們看到交易額是實時動態(tài)進行更新的,對于這種情況,則需要采用在線處理。

當然,如果只是希望得到數(shù)據(jù)的分析結(jié)果,對處理的時間要求不嚴格,就可以采用離線處理的方式,比如我們可以先將日志數(shù)據(jù)采集到HDFS中,之后再進一步使用MapReduce、Hive等來對數(shù)據(jù)進行分析,這也是可行的。

本文主要分享對某個電商網(wǎng)站產(chǎn)生的用戶訪問日志(access.log)進行離線處理與分析的過程,基于MapReduce的處理方式,最后會統(tǒng)計出某一天不同省份訪問該網(wǎng)站的uv與pv。

2 生產(chǎn)場景與需求

在我們的場景中,Web應用的部署是如下的架構:

大數(shù)據(jù)采集、清洗、處理:使用MapReduce進行離線數(shù)據(jù)分析完整案例

即比較典型的Nginx負載均衡+KeepAlive高可用集群架構,在每臺Web服務器上,都會產(chǎn)生用戶的訪問日志,業(yè)務需求方給出的日志格式如下:

1001    211.167.248.22  eecf0780-2578-4d77-a8d6-e2225e8b9169    40604   1       GET /top HTTP/1.0       408     null      null    1523188122767
1003    222.68.207.11   eecf0780-2578-4d77-a8d6-e2225e8b9169    20202   1       GET /tologin HTTP/1.1   504     null      Mozilla/5.0 (Windows; U; Windows NT 5.1)Gecko/20070309 Firefox/2.0.0.3  1523188123267
1001    61.53.137.50    c3966af9-8a43-4bda-b58c-c11525ca367b    0       1       GET /update/pass HTTP/1.0       302       null    null    1523188123768
1000    221.195.40.145  1aa3b538-2f55-4cd7-9f46-6364fdd1e487    0       0       GET /user/add HTTP/1.1  200     null      Mozilla/4.0 (compatible; MSIE 7.0; Windows NT5.2)       1523188124269
1000    121.11.87.171   8b0ea90a-77a5-4034-99ed-403c800263dd    20202   1       GET /top HTTP/1.0       408     null      Mozilla/5.0 (Windows; U; Windows NT 5.1)Gecko/20070803 Firefox/1.5.0.12 1523188120263

其每個字段的說明如下:

appid ip mid userid login_type request status http_referer user_agent time
其中:
appid包括:web:1000,android:1001,ios:1002,ipad:1003
mid:唯一的id此id第一次會種在瀏覽器的cookie里。如果存在則不再種。作為瀏覽器唯一標示。移動端或者pad直接取機器碼。
login_type:登錄狀態(tài),0未登錄、1:登錄用戶
request:類似于此種 "GET /userList HTTP/1.1"
status:請求的狀態(tài)主要有:200 ok、404 not found、408 Request Timeout、500 Internal Server Error、504 Gateway Timeout等
http_referer:請求該url的上一個url地址。
user_agent:瀏覽器的信息,例如:"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2526.106 Safari/537.36"
time:時間的long格式:1451451433818。

根據(jù)給定的時間范圍內(nèi)的日志數(shù)據(jù),現(xiàn)在業(yè)務方有如下需求:

統(tǒng)計出每個省每日訪問的PV、UV。

3 數(shù)據(jù)采集:獲取原生數(shù)據(jù)

數(shù)據(jù)采集工作由運維人員來完成,對于用戶訪問日志的采集,使用的是Flume,并且會將采集的數(shù)據(jù)保存到HDFS中,其架構如下:

大數(shù)據(jù)采集、清洗、處理:使用MapReduce進行離線數(shù)據(jù)分析完整案例

可以看到,不同的Web Server上都會部署一個Agent用于該Server上日志數(shù)據(jù)的采集,之后,不同Web Server的Flume Agent采集的日志數(shù)據(jù)會下沉到另外一個被稱為Flume Consolidation Agent(聚合Agent)的Flume Agent上,該Flume Agent的數(shù)據(jù)落地方式為輸出到HDFS。

在我們的HDFS中,可以查看到其采集的日志:

大數(shù)據(jù)采集、清洗、處理:使用MapReduce進行離線數(shù)據(jù)分析完整案例

后面我們的工作正是要基于Flume采集到HDFS中的數(shù)據(jù)做離線處理與分析。

4 數(shù)據(jù)清洗:將不規(guī)整數(shù)據(jù)轉(zhuǎn)化為規(guī)整數(shù)據(jù)

4.1 數(shù)據(jù)清洗目的

剛剛采集到HDFS中的原生數(shù)據(jù),我們也稱為不規(guī)整數(shù)據(jù),即目前來說,該數(shù)據(jù)的格式還無法滿足我們對數(shù)據(jù)處理的基本要求,需要對其進行預處理,轉(zhuǎn)化為我們后面工作所需要的較為規(guī)整的數(shù)據(jù),所以這里的數(shù)據(jù)清洗,其實指的就是對數(shù)據(jù)進行基本的預處理,以方便我們后面的統(tǒng)計分析,所以這一步并不是必須的,需要根據(jù)不同的業(yè)務需求來進行取舍,只是在我們的場景中需要對數(shù)據(jù)進行一定的處理。

4.2 數(shù)據(jù)清洗方案

原來的日志數(shù)據(jù)格式是如下的:

appid ip mid userid login_type request status http_referer user_agent time
其中:
appid包括:web:1000,android:1001,ios:1002,ipad:1003
mid:唯一的id此id第一次會種在瀏覽器的cookie里。如果存在則不再種。作為瀏覽器唯一標示。移動端或者pad直接取機器碼。
login_type:登錄狀態(tài),0未登錄、1:登錄用戶
request:類似于此種 "GET /userList HTTP/1.1"
status:請求的狀態(tài)主要有:200 ok、404 not found、408 Request Timeout、500 Internal Server Error、504 Gateway Timeout等
http_referer:請求該url的上一個url地址。
user_agent:瀏覽器的信息,例如:"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2526.106 Safari/537.36"
time:時間的long格式:1451451433818。

但是如果需要按照省份來統(tǒng)計uv、pv,其所包含的信息還不夠,我們需要對這些數(shù)據(jù)做一定的預處理,比如需要,對于其中包含的IP信息,我們需要將其對應的IP信息解析出來;為了方便我們的其它統(tǒng)計,我們也可以將其request信息解析為method、 request_urlhttp_version等,

所以按照上面的分析,我們希望預處理之后的日志數(shù)據(jù)包含如下的數(shù)據(jù)字段:

appid;  
ip;
//通過ip來衍生出來的字段 province和city
province;
city;

mid;      
userId;    
loginType; 
request; 
//通過request 衍生出來的字段 method request_url http_version
method;
requestUrl;
httpVersion;

status;          
httpReferer; 
userAgent;   
//通過userAgent衍生出來的字段,即用戶的瀏覽器信息
browser;

time;

即在原來的基礎上,我們增加了其它新的字段,如province、city等。

我們采用MapReduce來對數(shù)據(jù)進行預處理,預處理之后的結(jié)果,我們也是保存到HDFS中,即采用如下的架構:

大數(shù)據(jù)采集、清洗、處理:使用MapReduce進行離線數(shù)據(jù)分析完整案例

4.3 數(shù)據(jù)清洗過程:MapReduce程序編寫

數(shù)據(jù)清洗的過程主要是編寫MapReduce程序,而MapReduce程序的編寫又分為寫Mapper、Reducer、Job三個基本的過程。但是在我們這個案例中,要達到數(shù)據(jù)清洗的目的,實際上只需要Mapper就可以了,并不需要Reducer,原因很簡單,我們只是預處理數(shù)據(jù),在Mapper中就已經(jīng)可以對數(shù)據(jù)進行處理了,其輸出的數(shù)據(jù)并不需要進一步經(jīng)過Redcuer來進行匯總處理。

所以下面就直接編寫MapperJob的程序代碼。

4.3.1 AccessLogCleanMapper
package cn.xpleaf.dataClean.mr.mapper;

import cn.xpleaf.dataClean.mr.writable.AccessLogWritable;
import cn.xpleaf.dataClean.utils.JedisUtil;
import cn.xpleaf.dataClean.utils.UserAgent;
import cn.xpleaf.dataClean.utils.UserAgentUtil;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import redis.clients.jedis.Jedis;

import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * access日志清洗的主要mapper實現(xiàn)類
 * 原始數(shù)據(jù)結(jié)構:
 * appid ip mid userid login_tpe request status http_referer user_agent time ---> 10列內(nèi)容
 * 清洗之后的結(jié)果:
 * appid ip province city mid userid login_type request method request_url http_version status http_referer user_agent browser yyyy-MM-dd HH:mm:ss
 */
public class AccessLogCleanMapper extends Mapper<LongWritable, Text, NullWritable, Text> {

    private Logger logger;
    private String[] fields;

    private String appid;      //數(shù)據(jù)來源 web:1000,android:1001,ios:1002,ipad:1003
    private String ip;
    //通過ip來衍生出來的字段 province和city
    private String province;
    private String city;

    private String mid;      //mid:唯一的id此id第一次會種在瀏覽器的cookie里。如果存在則不再種。作為瀏覽器唯一標示。移動端或者pad直接取機器碼。
    private String userId;     //用戶id
    private String loginType; //登錄狀態(tài),0未登錄、1:登錄用戶
    private String request; //類似于此種 "GET userList HTTP/1.1"
    //通過request 衍生出來的字段 method request_url http_version
    private String method;
    private String requestUrl;
    private String httpVersion;

    private String status;          //請求的狀態(tài)主要有:200 ok、/404 not found、408 Request Timeout、500 Internal Server Error、504 Gateway Timeout等
    private String httpReferer; //請求該url的上一個url地址。
    private String userAgent;   //瀏覽器的信息,例如:"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2526.106 Safari/537.36"
    //通過userAgent來獲取對應的瀏覽器
    private String browser;

    //private long time; //action對應的時間戳
    private String time;//action對應的格式化時間yyyy-MM-dd HH:mm:ss

    private DateFormat df;
    private Jedis jedis;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        logger = Logger.getLogger(AccessLogCleanMapper.class);
        df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        jedis = JedisUtil.getJedis();
    }

    /**
     * appid ip mid userid login_tpe request status http_referer user_agent time ---> 10列內(nèi)容
     * ||
     * ||
     * appid ip province city mid userid login_type request method request_url http_version status http_referer user_agent browser yyyy-MM-dd HH:mm:ss
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        fields = value.toString().split("\t");
        if (fields == null || fields.length != 10) { // 有異常數(shù)據(jù)
            return;
        }
        // 因為所有的字段沒有進行特殊操作,只是文本的輸出,所以沒有必要設置特定類型,全部設置為字符串即可,
        // 這樣在做下面的操作時就可以省去類型的轉(zhuǎn)換,但是如果對數(shù)據(jù)的合法性有嚴格的驗證的話,則要保持類型的一致
        appid = fields[0];
        ip = fields[1];
        // 解析IP
        if (ip != null) {
            String ipInfo = jedis.hget("ip_info", ip);
            province = ipInfo.split("\t")[0];
            city = ipInfo.split("\t")[1];
        }

        mid = fields[2];
        userId = fields[3];
        loginType = fields[4];
        request = fields[5];
        method = request.split(" ")[0];
        requestUrl = request.split(" ")[1];
        httpVersion = request.split(" ")[2];

        status = fields[6];
        httpReferer = fields[7];
        userAgent = fields[8];
        if (userAgent != null) {
            UserAgent uAgent = UserAgentUtil.getUserAgent(userAgent);
            if (uAgent != null) {
                browser = uAgent.getBrowserType();
            }
        }
        try { // 轉(zhuǎn)換有可能出現(xiàn)異常
            time = df.format(new Date(Long.parseLong(fields[9])));
        } catch (NumberFormatException e) {
            logger.error(e.getMessage());
        }
        AccessLogWritable access = new AccessLogWritable(appid, ip, province, city, mid,
                userId, loginType, request, method, requestUrl,
                httpVersion, status, httpReferer, this.userAgent, browser, time);
        context.write(NullWritable.get(), new Text(access.toString()));
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        // 資源釋放
        logger = null;
        df = null;
        JedisUtil.returnJedis(jedis);
    }
}
4.3.2 AccessLogCleanJob
package cn.xpleaf.dataClean.mr.job;

import cn.xpleaf.dataClean.mr.mapper.AccessLogCleanMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * 清洗用戶access日志信息
 * 主要的驅(qū)動程序
 *      主要用作組織mapper和reducer的運行
 *
 * 輸入?yún)?shù):
 * hdfs://ns1/input/data-clean/access/2018/04/08 hdfs://ns1/output/data-clean/access
 * 即inputPath和outputPath
 * 目前outputPath統(tǒng)一到hdfs://ns1/output/data-clean/access
 * 而inputPath則不確定,因為我們的日志采集是按天來生成一個目錄的
 * 所以上面的inputPath只是清洗2018-04-08這一天的
 */
public class AccessLogCleanJob {
    public static void main(String[] args) throws Exception {

        if(args == null || args.length < 2) {
            System.err.println("Parameter Errors! Usage <inputPath...> <outputPath>");
            System.exit(-1);
        }

        Path outputPath = new Path(args[args.length - 1]);

        Configuration conf = new Configuration();
        String jobName = AccessLogCleanJob.class.getSimpleName();
        Job job = Job.getInstance(conf, jobName);
        job.setJarByClass(AccessLogCleanJob.class);

        // 設置mr的輸入?yún)?shù)
        for( int i = 0; i < args.length - 1; i++) {
            FileInputFormat.addInputPath(job, new Path(args[i]));
        }
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(AccessLogCleanMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);
        // 設置mr的輸出參數(shù)
        outputPath.getFileSystem(conf).delete(outputPath, true);    // 避免job在運行的時候出現(xiàn)輸出目錄已經(jīng)存在的異常
        FileOutputFormat.setOutputPath(job, outputPath);
        job.setOutputFormatClass(TextOutputFormat.class);

        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(0);   // map only操作,沒有reducer

        job.waitForCompletion(true);
    }
}
4.3.3 執(zhí)行MapReduce程序

將上面的mr程序打包后上傳到我們的Hadoop環(huán)境中,這里,對2018-04-08這一天產(chǎn)生的日志數(shù)據(jù)進行清洗,執(zhí)行如下命令:

yarn jar data-extract-clean-analysis-1.0-SNAPSHOT-jar-with-dependencies.jar\
cn.xpleaf.dataClean.mr.job.AccessLogCleanJob \
hdfs://ns1/input/data-clean/access/2018/04/08 \
hdfs://ns1/output/data-clean/access

觀察其執(zhí)行結(jié)果:

......
18/04/08 20:54:21 INFO mapreduce.Job: Running job: job_1523133033819_0009
18/04/08 20:54:28 INFO mapreduce.Job: Job job_1523133033819_0009 running in uber mode : false
18/04/08 20:54:28 INFO mapreduce.Job:  map 0% reduce 0%
18/04/08 20:54:35 INFO mapreduce.Job:  map 50% reduce 0%
18/04/08 20:54:40 INFO mapreduce.Job:  map 76% reduce 0%
18/04/08 20:54:43 INFO mapreduce.Job:  map 92% reduce 0%
18/04/08 20:54:45 INFO mapreduce.Job:  map 100% reduce 0%
18/04/08 20:54:46 INFO mapreduce.Job: Job job_1523133033819_0009 completed successfully
18/04/08 20:54:46 INFO mapreduce.Job: Counters: 31
......

可以看到MapReduce Job執(zhí)行成功!

4.4 數(shù)據(jù)清洗結(jié)果

上面的MapReduce程序執(zhí)行成功后,可以看到在HDFS中生成的數(shù)據(jù)輸出目錄:

大數(shù)據(jù)采集、清洗、處理:使用MapReduce進行離線數(shù)據(jù)分析完整案例

我們可以下載其中一個結(jié)果數(shù)據(jù)文件,并用Notepadd++打開查看其數(shù)據(jù)信息:

大數(shù)據(jù)采集、清洗、處理:使用MapReduce進行離線數(shù)據(jù)分析完整案例

5 數(shù)據(jù)處理:對規(guī)整數(shù)據(jù)進行統(tǒng)計分析

經(jīng)過數(shù)據(jù)清洗之后,就得到了我們做數(shù)據(jù)的分析統(tǒng)計所需要的比較規(guī)整的數(shù)據(jù),下面就可以進行數(shù)據(jù)的統(tǒng)計分析了,即按照業(yè)務需求,統(tǒng)計出某一天中每個省份的PV和UV。

我們依然是需要編寫MapReduce程序,并且將數(shù)據(jù)保存到HDFS中,其架構跟前面的數(shù)據(jù)清洗是一樣的:

大數(shù)據(jù)采集、清洗、處理:使用MapReduce進行離線數(shù)據(jù)分析完整案例

5.1 數(shù)據(jù)處理思路:如何編寫MapReduce程序

現(xiàn)在我們已經(jīng)得到了規(guī)整的數(shù)據(jù),關于在于如何編寫我們的MapReduce程序。

因為要統(tǒng)計的是每個省對應的pv和uv,pv就是點擊量,uv是獨立訪客量,需要將省相同的數(shù)據(jù)拉取到一起,拉取到一塊的這些數(shù)據(jù)每一條記錄就代表了一次點擊(pv + 1),這里面有同一個用戶產(chǎn)生的數(shù)據(jù)(通過mid來唯一地標識是同一個瀏覽器,用mid進行去重,得到的就是uv)。

而拉取數(shù)據(jù),可以使用Mapper來完成,對數(shù)據(jù)的統(tǒng)計(pv、uv的計算)則可以通過Reducer來完成,即Mapper的各個參數(shù)可以為如下:

Mapper<LongWritable, Text, Text(Province), Text(mid)>

Reducer的各個參數(shù)可以為如下:

Reducer<Text(Province), Text(mid), Text(Province), Text(pv + uv)>

5.2 數(shù)據(jù)處理過程:MapReduce程序編寫

根據(jù)前面的分析,來編寫我們的MapReduce程序。

5.2.1 ProvincePVAndUVMapper
package cn.xpleaf.dataClean.mr.mapper;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * Mapper<LongWritable, Text, Text(Province), Text(mid)>
 * Reducer<Text(Province), Text(mid), Text(Province), Text(pv + uv)>
 */
public class ProvincePVAndUVMapper extends Mapper<LongWritable, Text, Text, Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] fields = line.split("\t");
        if(fields == null || fields.length != 16) {
            return;
        }
        String province = fields[2];
        String mid = fields[4];
        context.write(new Text(province), new Text(mid));
    }
}
5.2.2 ProvincePVAndUVReducer
package cn.xpleaf.dataClean.mr.reducer;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

/**
 * 統(tǒng)計該標準化數(shù)據(jù),產(chǎn)生結(jié)果
 * 省    pv      uv
 * 這里面有同一個用戶產(chǎn)生的數(shù)|據(jù)(通過mid來唯一地標識是同一個瀏覽器,用mid進行去重,得到的就是uv)
 * Mapper<LongWritable, Text, Text(Province), Text(mid)>
 * Reducer<Text(Province), Text(mid), Text(Province), Text(pv + uv)>
 */
public class ProvincePVAndUVReducer extends Reducer<Text, Text, Text, Text> {

    private Set<String> uvSet = new HashSet<>();

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        long pv = 0;
        uvSet.clear();
        for(Text mid : values) {
            pv++;
            uvSet.add(mid.toString());
        }
        long uv = uvSet.size();
        String pvAndUv = pv + "\t" + uv;
        context.write(key, new Text(pvAndUv));
    }
}
5.2.3 ProvincePVAndUVJob
package cn.xpleaf.dataClean.mr.job;

import cn.xpleaf.dataClean.mr.mapper.ProvincePVAndUVMapper;
import cn.xpleaf.dataClean.mr.reducer.ProvincePVAndUVReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * 統(tǒng)計每個省的pv和uv值
 * 輸入:經(jīng)過clean之后的access日志
 *      appid ip province city mid userid login_type request method request_url http_version status http_referer user_agent browser yyyy-MM-dd HH:mm:ss
 * 統(tǒng)計該標準化數(shù)據(jù),產(chǎn)生結(jié)果
 * 省    pv      uv
 *
 * 分析:因為要統(tǒng)計的是每個省對應的pv和uv
 *      pv就是點擊量,uv是獨立訪客量
 *      需要將省相同的數(shù)據(jù)拉取到一起,拉取到一塊的這些數(shù)據(jù)每一條記錄就代表了一次點擊(pv + 1)
 *      這里面有同一個用戶產(chǎn)生的數(shù)據(jù)(通過mid來唯一地標識是同一個瀏覽器,用mid進行去重,得到的就是uv)
 *      Mapper<LongWritable, Text, Text(Province), Text(mid)>
 *      Reducer<Text(Province), Text(mid), Text(Province), Text(pv + uv)>
 *
 *  輸入?yún)?shù):
 *  hdfs://ns1/output/data-clean/access hdfs://ns1/output/pv-uv
 */
public class ProvincePVAndUVJob {
    public static void main(String[] args) throws Exception {

        if (args == null || args.length < 2) {
            System.err.println("Parameter Errors! Usage <inputPath...> <outputPath>");
            System.exit(-1);
        }

        Path outputPath = new Path(args[args.length - 1]);

        Configuration conf = new Configuration();
        String jobName = ProvincePVAndUVJob.class.getSimpleName();
        Job job = Job.getInstance(conf, jobName);
        job.setJarByClass(ProvincePVAndUVJob.class);

        // 設置mr的輸入?yún)?shù)
        for (int i = 0; i < args.length - 1; i++) {
            FileInputFormat.addInputPath(job, new Path(args[i]));
        }
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(ProvincePVAndUVMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        // 設置mr的輸出參數(shù)
        outputPath.getFileSystem(conf).delete(outputPath, true);    // 避免job在運行的時候出現(xiàn)輸出目錄已經(jīng)存在的異常
        FileOutputFormat.setOutputPath(job, outputPath);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setReducerClass(ProvincePVAndUVReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(1);

        job.waitForCompletion(true);
    }
}
5.2.4 執(zhí)行MapReduce程序

將上面的mr程序打包后上傳到我們的Hadoop環(huán)境中,這里,對前面預處理之后的數(shù)據(jù)進行統(tǒng)計分析,執(zhí)行如下命令:

yarn jar data-extract-clean-analysis-1.0-SNAPSHOT-jar-with-dependencies.jar \
cn.xpleaf.dataClean.mr.job.ProvincePVAndUVJob \
hdfs://ns1/output/data-clean/access \
hdfs://ns1/output/pv-uv

觀察其執(zhí)行結(jié)果:

......
18/04/08 22:22:42 INFO mapreduce.Job: Running job: job_1523133033819_0010
18/04/08 22:22:49 INFO mapreduce.Job: Job job_1523133033819_0010 running in uber mode : false
18/04/08 22:22:49 INFO mapreduce.Job:  map 0% reduce 0%
18/04/08 22:22:55 INFO mapreduce.Job:  map 50% reduce 0%
18/04/08 22:22:57 INFO mapreduce.Job:  map 100% reduce 0%
18/04/08 22:23:03 INFO mapreduce.Job:  map 100% reduce 100%
18/04/08 22:23:03 INFO mapreduce.Job: Job job_1523133033819_0010 completed successfully
18/04/08 22:23:03 INFO mapreduce.Job: Counters: 49
......

可以看到MapReduce Job執(zhí)行成功!

5.3 數(shù)據(jù)處理結(jié)果

上面的MapReduce程序執(zhí)行成功后,可以看到在HDFS中生成的數(shù)據(jù)輸出目錄:

大數(shù)據(jù)采集、清洗、處理:使用MapReduce進行離線數(shù)據(jù)分析完整案例

我們可以下載其結(jié)果數(shù)據(jù)文件,并用Notepadd++打開查看其數(shù)據(jù)信息:

大數(shù)據(jù)采集、清洗、處理:使用MapReduce進行離線數(shù)據(jù)分析完整案例

至此,就完成了一個完整的數(shù)據(jù)采集、清洗、處理的完整離線數(shù)據(jù)分析案例。

相關的代碼我已經(jīng)上傳到GitHub,有興趣可以參考一下:
https://github.com/xpleaf/data-extract-clean-analysis

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。

AI