溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何使用Spark Streaming SQL基于時間窗口進行數(shù)據(jù)統(tǒng)計

發(fā)布時間:2021-08-04 18:03:08 來源:億速云 閱讀:228 作者:chen 欄目:大數(shù)據(jù)

這篇文章主要介紹“如何使用Spark Streaming SQL基于時間窗口進行數(shù)據(jù)統(tǒng)計”,在日常操作中,相信很多人在如何使用Spark Streaming SQL基于時間窗口進行數(shù)據(jù)統(tǒng)計問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”如何使用Spark Streaming SQL基于時間窗口進行數(shù)據(jù)統(tǒng)計”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

1.背景介紹

流式計算一個很常見的場景是基于事件時間進行處理,常用于檢測、監(jiān)控、根據(jù)時間進行統(tǒng)計等系統(tǒng)中。比如埋點日志中每條日志記錄了埋點處操作的時間,或者業(yè)務(wù)系統(tǒng)中記錄了用戶操作時間,用于統(tǒng)計各種操作處理的頻率等,或者根據(jù)規(guī)則匹配,進行異常行為檢測或監(jiān)控系統(tǒng)告警。這樣的時間數(shù)據(jù)都會包含在事件數(shù)據(jù)中,需要提取時間字段并根據(jù)一定的時間范圍進行統(tǒng)計或者規(guī)則匹配等。
使用Spark Streaming SQL可以很方便的對事件數(shù)據(jù)中的時間字段進行處理,同時Spark Streaming SQL提供的時間窗口函數(shù)可以將事件時間按照一定的時間區(qū)間對數(shù)據(jù)進行統(tǒng)計操作。
本文通過講解一個統(tǒng)計用戶在過去5秒鐘內(nèi)點擊網(wǎng)頁次數(shù)的案例,介紹如何使用Spark Streaming SQL對事件時間進行操作。

2.時間窗語法說明

Spark Streaming SQL支持兩類窗口操作:滾動窗口(TUMBLING)和滑動窗口(HOPPING)。

2.1滾動窗口

滾動窗口(TUMBLING)根據(jù)每條數(shù)據(jù)的時間字段將數(shù)據(jù)分配到一個指定大小的窗口中進行操作,窗口以窗口大小為步長進行滑動,窗口之間不會出現(xiàn)重疊。例如:如果指定了一個5分鐘大小的滾動窗口,數(shù)據(jù)會根據(jù)時間劃分到 [0:00 - 0:05)、 [0:05, 0:10)、[0:10, 0:15)等窗口。

  • 語法

GROUP BY TUMBLING ( colName, windowDuration )
  • 示例

對inventory表的inv_data_time時間列進行窗口操作,統(tǒng)計inv_quantity_on_hand的均值;窗口大小為1分鐘。

SELECT avg(inv_quantity_on_hand) qohFROM inventoryGROUP BY TUMBLING (inv_data_time, interval 1 minute)

2.2滑動窗口

滑動窗口(HOPPING),也被稱作Sliding Window。不同于滾動窗口,滑動窗口可以設(shè)置窗口滑動的步長,所以窗口可以重疊?;瑒哟翱谟袃蓚€參數(shù):windowDuration和slideDuration。slideDuration為每次滑動的步長,windowDuration為窗口的大小。當slideDuration < windowDuration時窗口會重疊,每個元素會被分配到多個窗口中。
所以,滾動窗口其實是滑動窗口的一種特殊情況,即slideDuration = windowDuration則等同于滾動窗口。

  • 語法

GROUP BY HOPPING ( colName, windowDuration, slideDuration )
  • 示例

對inventory表的inv_data_time時間列進行窗口操作,統(tǒng)計inv_quantity_on_hand的均值;窗口為1分鐘,滑動步長為30秒。

SELECT avg(inv_quantity_on_hand) qohFROM inventoryGROUP BY HOPPING (inv_data_time, interval 1 minute, interval 30 second)

3.系統(tǒng)架構(gòu)

如何使用Spark Streaming SQL基于時間窗口進行數(shù)據(jù)統(tǒng)計

業(yè)務(wù)日志收集到Aliyun SLS后,Spark對接SLS,通過Streaming SQL對數(shù)據(jù)進行處理并將統(tǒng)計后的結(jié)果寫入HDFS中。后續(xù)的操作流程主要集中在Spark Streaming SQL接收SLS數(shù)據(jù)并寫入HDFS的部分,有關(guān)日志的采集請參考日志服務(wù)。

4.操作流程

4.1環(huán)境準備

  • 創(chuàng)建E-MapReduce 3.21.0以上版本的Hadoop集群。

  • 下載并編譯E-MapReduce-SDK包

git clone git@github.com:aliyun/aliyun-emapreduce-sdk.gitcd aliyun-emapreduce-sdkgit checkout -b master-2.x origin/master-2.xmvn clean package -DskipTests

編譯完后, assembly/target目錄下會生成emr-datasources_shaded_${version}.jar,其中${version}為sdk的版本。

4.2創(chuàng)建表

命令行啟動spark-sql客戶端

spark-sql --master yarn-client --num-executors 2 --executor-memory 2g --executor-cores 2 --jars emr-datasources_shaded_2.11-${version}.jar --driver-class-path emr-datasources_shaded_2.11-${version}.jar

創(chuàng)建SLS和HDFS表

spark-sql> CREATE DATABASE IF NOT EXISTS default;spark-sql> USE default;
-- 數(shù)據(jù)源表spark-sql> CREATE TABLE IF NOT EXISTS sls_user_logUSING loghubOPTIONS (sls.project = "${logProjectName}",sls.store = "${logStoreName}",access.key.id = "${accessKeyId}",access.key.secret = "${accessKeySecret}",endpoint = "${endpoint}");
--結(jié)果表spark-sql> CREATE TABLE hdfs_user_click_countUSING org.apache.spark.sql.jsonOPTIONS (path '${hdfsPath}');

其中,內(nèi)建函數(shù)delay()用來設(shè)置Streaming SQL中的watermark,后續(xù)會有專門的文章介紹Streaming SQL watermark的相關(guān)內(nèi)容。

4.4查看結(jié)果

如何使用Spark Streaming SQL基于時間窗口進行數(shù)據(jù)統(tǒng)計

可以看到,產(chǎn)生的結(jié)果會自動生成一個window列,包含窗口的起止時間信息。

到此,關(guān)于“如何使用Spark Streaming SQL基于時間窗口進行數(shù)據(jù)統(tǒng)計”的學習就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI