溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

如何使用Spark Streaming SQL對(duì)PV和UV進(jìn)行統(tǒng)計(jì)

發(fā)布時(shí)間:2021-09-04 09:24:38 來(lái)源:億速云 閱讀:197 作者:chen 欄目:大數(shù)據(jù)

本篇內(nèi)容介紹了“如何使用Spark Streaming SQL對(duì)PV和UV進(jìn)行統(tǒng)計(jì)”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

1.背景介紹

PV/UV統(tǒng)計(jì)是流式分析一個(gè)常見的場(chǎng)景。通過(guò)PV可以對(duì)訪問(wèn)的網(wǎng)站做流量或熱點(diǎn)分析,例如廣告主可以通過(guò)PV值預(yù)估投放廣告網(wǎng)頁(yè)所帶來(lái)的流量以及廣告收入。另外一些場(chǎng)景需要對(duì)訪問(wèn)的用戶作分析,比如分析用戶的網(wǎng)頁(yè)點(diǎn)擊行為,此時(shí)就需要對(duì)UV做統(tǒng)計(jì)。

使用Spark Streaming SQL,并結(jié)合Redis可以很方便進(jìn)行PV/UV的統(tǒng)計(jì)。本文將介紹通過(guò)Streaming  SQL消費(fèi)Loghub中存儲(chǔ)的用戶訪問(wèn)信息,對(duì)過(guò)去1分鐘內(nèi)的數(shù)據(jù)進(jìn)行PV/UV統(tǒng)計(jì),將結(jié)果存入Redis中。

2.準(zhǔn)備工作

  • 創(chuàng)建E-MapReduce 3.23.0以上版本的Hadoop集群。

  • 下載并編譯E-MapReduce-SDK包

git clone git@github.com:aliyun/aliyun-emapreduce-sdk.git cd aliyun-emapreduce-sdk git checkout -b master-2.x origin/master-2.x mvn clean package -DskipTests

編譯完后,  assembly/target目錄下會(huì)生成emr-datasources_shaded_${version}.jar,其中${version}為sdk的版本。

數(shù)據(jù)源

本文采用Loghub作為數(shù)據(jù)源,有關(guān)日志采集、日志解析請(qǐng)參考日志服務(wù)。

3.統(tǒng)計(jì)PV/UV

一般場(chǎng)景下需要將統(tǒng)計(jì)出的PV/UV以及相應(yīng)的統(tǒng)計(jì)時(shí)間存入Redis。其他一些業(yè)務(wù)場(chǎng)景中,也會(huì)只保存最新結(jié)果,用新的結(jié)果不斷覆蓋更新舊的數(shù)據(jù)。以下首先介紹第一種情況的操作流程。

3.1啟動(dòng)客戶端

命令行啟動(dòng)streaming-sql客戶端

streaming-sql --master yarn-client --num-executors 2 --executor-memory 2g --executor-cores 2 --jars emr-datasources_shaded_2.11-${version}.jar --driver-class-path emr-datasources_shaded_2.11-${version}.jar

也可以創(chuàng)建SQL語(yǔ)句文件,通過(guò)streaming-sql -f的方式運(yùn)行。

3.1定義數(shù)據(jù)表

數(shù)據(jù)源表定義如下

CREATE TABLE loghub_source(user_ip STRING, __time__ TIMESTAMP)  USING loghub  OPTIONS( sls.project=${sls.project}, sls.store=${sls.store}, access.key.id=${access.key.id}, access.key.secret=${access.key.secret}, endpoint=${endpoint});

其中,數(shù)據(jù)源表包含user_ip和__time__兩個(gè)字段,分別代表用戶的IP地址和loghub上的時(shí)間列。OPTIONS中配置項(xiàng)的值根據(jù)實(shí)際配置。

結(jié)果表定義如下

CREATE TABLE redis_sink  USING redis  OPTIONS( table='statistic_info', host=${redis_host}, key.column='interval');

其中,statistic_info為Redis存儲(chǔ)結(jié)果的表名,interval對(duì)應(yīng)統(tǒng)計(jì)結(jié)果中的interval字段;配置項(xiàng)${redis_host}的值根據(jù)實(shí)際配置。

3.2創(chuàng)建流作業(yè)

CREATE SCAN loghub_scan ON loghub_source USING STREAM OPTIONS( watermark.column='__time__', watermark.delayThreshold='10 second');  CREATE STREAM job OPTIONS( checkpointLocation=${checkpoint_location}) INSERT INTO redis_sink SELECT COUNT(user_ip) AS pv, approx_count_distinct( user_ip) AS uv, window.end AS interval FROM loghub_scan GROUP BY TUMBLING(__time__, interval 1 minute), window;

4.3查看統(tǒng)計(jì)結(jié)果

最終的統(tǒng)計(jì)結(jié)果如下圖所示

可以看到,每隔一分鐘都會(huì)生成一條數(shù)據(jù),key的形式為表名:interval,value為pv和uv的值。

3.4實(shí)現(xiàn)覆蓋更新

將結(jié)果表的配置項(xiàng)key.column修改為一個(gè)固定的值,例如定義如下

CREATE TABLE redis_sink USING redis  OPTIONS( table='statistic_info', host=${redis_host}, key.column='statistic_type');

創(chuàng)建流作業(yè)的SQL改為

CREATE STREAM job OPTIONS( checkpointLocation='/tmp/spark-test/checkpoint') INSERT INTO redis_sink SELECT "PV_UV" as statistic_type,COUNT(user_ip) AS pv, approx_count_distinct( user_ip) AS uv, window.end AS interval FROM loghub_scan GROUP BY TUMBLING(__time__, interval 1 minute), window;

最終的統(tǒng)計(jì)結(jié)果如下圖所示

如何使用Spark Streaming SQL對(duì)PV和UV進(jìn)行統(tǒng)計(jì)

可以看到,Redis中值保留了一個(gè)值,這個(gè)值每分鐘都被更新,value包含pv、uv和interval的值。

“如何使用Spark Streaming SQL對(duì)PV和UV進(jìn)行統(tǒng)計(jì)”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI