您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)大數(shù)據(jù)開(kāi)發(fā)中如何進(jìn)行Spark-RDD http日志分析,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。
配置文件,或者配置表,一般是放在在線db,比如mysql等關(guān)系型數(shù)據(jù)庫(kù),或者后臺(tái)rd直接丟給你一份文件,數(shù)據(jù)量比起整個(gè)離線數(shù)據(jù)倉(cāng)庫(kù)的大表來(lái)說(shuō)算很小,所以這種情況下,一般的做法是將小表,或者小文件廣播出去,那么下面一個(gè)例子來(lái)看,廣播表的使用解決ip地址映射問(wèn)題
http.log:
用戶訪問(wèn)網(wǎng)站所產(chǎn)生的日志。日志格式為:時(shí)間戳、IP地址、訪問(wèn)網(wǎng)址、訪問(wèn)數(shù)據(jù)、瀏覽器信息等,樣例如下:
ip.dat:ip段數(shù)據(jù),記錄著一些ip段范圍對(duì)應(yīng)的位置,總量大概在11萬(wàn)條,數(shù)據(jù)量也算很小的,樣例如下
文件位置:data/http.log、data/ip.dat
鏈接:https://pan.baidu.com/s/1FmFxSrPIynO3udernLU0yQ提取碼:hell
要求:將 http.log 文件中的 ip 轉(zhuǎn)換為地址。如將 122.228.96.111 轉(zhuǎn)為溫州,并統(tǒng)計(jì)各城市的總訪問(wèn)量
有三個(gè)關(guān)鍵點(diǎn),http.log的關(guān)鍵信息是ip地址,所以根據(jù)數(shù)據(jù)的精簡(jiǎn)原則,只讀取ip即可,另外ip映射比對(duì)的時(shí)候 ,ip地址映射文件是排序的,所以為了提高查找效率,采用將ip地址轉(zhuǎn)為long類型,然后再用二分法來(lái)查找,找到地址后映射為地址。
package com.hoult.work import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession /** * 數(shù)據(jù)源:1.ip地址的訪問(wèn)日志 2.ip地址映射表 * 需要把映射表廣播,地址轉(zhuǎn)換為long類型進(jìn)行比較 */ object FindIp { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .master("local[*]") .appName(this.getClass.getCanonicalName) .getOrCreate() val sc = spark.sparkContext import spark.implicits._ val ipLogsRDD = sc.textFile("data/http.log") .map(_.split("\\|")(1)) val ipInfoRDD = sc.textFile("data/ip.dat").map { case line: String => { val strSplit: Array[String] = line.split("\\|") Ip(strSplit(0), strSplit(1), strSplit(7)) } } val brIPInfo = sc.broadcast(ipInfoRDD.map(x => (ip2Long(x.startIp), ip2Long(x.endIp), x.address))collect()) //關(guān)聯(lián)后的結(jié)果rdd ipLogsRDD .map(x => { val index = binarySearch(brIPInfo.value, ip2Long(x)) if (index != -1 ) brIPInfo.value(index)._3 else "NULL" }).map(x => (x, 1)) .reduceByKey(_ + _) .map(x => s"城市:${x._1}, 訪問(wèn)量:${x._2}") .saveAsTextFile("data/work/output_ips") } //ip轉(zhuǎn)成long類型 def ip2Long(ip: String): Long = { val fragments = ip.split("[.]") var ipNum = 0L for (i <- 0 until fragments.length) { ipNum = fragments(i).toLong | ipNum << 8L } ipNum } //二分法匹配ip規(guī)則 def binarySearch(lines: Array[(Long, Long, String)], ip: Long): Int = { var low = 0 var high = lines.length - 1 while (low <= high) { val middle = (low + high) / 2 if ((ip >= lines(middle)._1) && (ip <= lines(middle)._2)) return middle if (ip < lines(middle)._1) high = middle - 1 else { low = middle + 1 } } -1 } } case class Ip(startIp: String, endIp: String, address: String)
結(jié)果截圖如下:
日志格式:IP命中率(Hit/Miss)響應(yīng)時(shí)間請(qǐng)求時(shí)間請(qǐng)求方法請(qǐng)求URL請(qǐng)求協(xié)議狀態(tài)碼響應(yīng)大小referer 用戶代理
日志文件位置:data/cdn.txt
數(shù)據(jù)case:
任務(wù):
2.1、計(jì)算獨(dú)立IP數(shù)
2.2、統(tǒng)計(jì)每個(gè)視頻獨(dú)立IP數(shù)(視頻的標(biāo)志:在日志文件的某些可以找到 *.mp4,代表一個(gè)視頻文件)
2.3、統(tǒng)計(jì)一天中每個(gè)小時(shí)的流量
分析:剛開(kāi)始去找格林時(shí)間的jod-time解析,找了一圈不知道該怎么寫, 后面發(fā)現(xiàn)只需要小時(shí)即可,使用正則來(lái)提取, 注意在求video的訪問(wèn)ip時(shí)候,可以用aggregateByKey
來(lái)提高性能
package com.hoult.work import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession /** * 讀取日志表到rdd * 拿到需要的字段:ip, 訪問(wèn)時(shí)間:小時(shí)即可, 視頻名video_name (url中的xx.mp4), * 分析: * 1.計(jì)算獨(dú)立IP數(shù) * 2.統(tǒng)計(jì)每個(gè)視頻獨(dú)立IP數(shù)(視頻的標(biāo)志:在日志文件的某些可以找到 *.mp4,代表一個(gè)視頻文件) * 3.統(tǒng)計(jì)一天中每個(gè)小時(shí)的流量 */ object LogAnaylse { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .master("local[*]") .appName(this.getClass.getCanonicalName) .getOrCreate() val sc = spark.sparkContext val cdnRDD = sc.textFile("data/cdn.txt") //計(jì)算獨(dú)立ips // aloneIPs(cdnRDD.repartition(1)) //每個(gè)視頻獨(dú)立ip數(shù) // videoIPs(cdnRDD.repartition(1)) //每小時(shí)流量 hourPoor(cdnRDD.repartition(1)) } /** * 獨(dú)立ip數(shù) */ def aloneIPs(cdnRDD: RDD[String]) = { //匹配ip地址 val IPPattern = "((?:(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d)))\\.){3}(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d))))".r val ipnums = cdnRDD .flatMap(x => (IPPattern findFirstIn x)) .map(y => (y,1)) .reduceByKey(_+_) .sortBy(_._2,false) ipnums.saveAsTextFile("data/cdn/aloneIPs") } /** * 視頻獨(dú)立ip數(shù) */ def videoIPs(cdnRDD: RDD[String]) = { //匹配 http 響應(yīng)碼和請(qǐng)求數(shù)據(jù)大小 val httpSizePattern = ".*\\s(200|206|304)\\s([0-9]+)\\s.*".r //[15/Feb/2017:11:17:13 +0800] 匹配 2017:11 按每小時(shí)播放量統(tǒng)計(jì) val timePattern = ".*(2017):([0-9]{2}):[0-9]{2}:[0-9]{2}.*".r import scala.util.matching.Regex // Entering paste mode (ctrl-D to finish) def isMatch(pattern: Regex, str: String) = { str match { case pattern(_*) => true case _ => false } } def getTimeAndSize(line: String) = { var res = ("", 0L) try { val httpSizePattern(code, size) = line val timePattern(year, hour) = line res = (hour, size.toLong) } catch { case ex: Exception => ex.printStackTrace() } res } val IPPattern = "((?:(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d)))\\.){3}(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d))))".r val videoPattern = "([0-9]+).mp4".r val res = cdnRDD .filter(x => x.matches(".*([0-9]+)\\.mp4.*")) .map(x => (videoPattern findFirstIn x toString,IPPattern findFirstIn x toString)) .aggregateByKey(List[String]())( (lst, str) => (lst :+ str), (lst1, lst2) => (lst1 ++ lst2) ) .mapValues(_.distinct) .sortBy(_._2.size,false) res.saveAsTextFile("data/cdn/videoIPs") } /** * 一天中每個(gè)小時(shí)的流量 * */ def hourPoor(cdnRDD: RDD[String]) = { val httpSizePattern = ".*\\s(200|206|304)\\s([0-9]+)\\s.*".r val timePattern = ".*(2017):([0-9]{2}):[0-9]{2}:[0-9]{2}.*".r import scala.util.matching.Regex def isMatch(pattern: Regex, str: String) = { str match { case pattern(_*) => true case _ => false } } def getTimeAndSize(line: String) = { var res = ("", 0L) try { val httpSizePattern(code, size) = line val timePattern(year, hour) = line res = (hour, size.toLong) } catch { case ex: Exception => ex.printStackTrace() } res } cdnRDD .filter(x=>isMatch(httpSizePattern,x)) .filter(x=>isMatch(timePattern,x)) .map(x=>getTimeAndSize(x)) .groupByKey() .map(x=>(x._1,x._2.sum)) .sortByKey() .map(x=>x._1+"時(shí) CDN流量="+x._2/(102424*1024)+"G") .saveAsTextFile("data/cdn/hourPoor") } }
運(yùn)行結(jié)果截圖:
假設(shè)點(diǎn)擊日志文件(click.log)和曝光日志imp.log, 中每行記錄格式如下
//點(diǎn)擊日志 INFO 2019-09-01 00:29:53 requestURI:/click?app=1&p=1&adid=18005472&industry=469&adid=31 INFO 2019-09-01 00:30:31 requestURI:/click?app=2&p=1&adid=18005472&industry=469&adid=31 INFO 2019-09-01 00:31:03 requestURI:/click?app=1&p=1&adid=18005472&industry=469&adid=32 INFO 2019-09-01 00:31:51 requestURI:/click?app=1&p=1&adid=18005472&industry=469&adid=33 //曝光日志 INFO 2019-09-01 00:29:53 requestURI:/imp?app=1&p=1&adid=18005472&industry=469&adid=31 INFO 2019-09-01 00:29:53 requestURI:/imp?app=1&p=1&adid=18005472&industry=469&adid=31 INFO 2019-09-01 00:29:53 requestURI:/imp?app=1&p=1&adid=18005472&industry=469&adid=34
用Spark-Core實(shí)現(xiàn)統(tǒng)計(jì)每個(gè)adid的曝光數(shù)與點(diǎn)擊數(shù),思路較簡(jiǎn)單,直接上代碼
代碼:
package com.hoult.work import org.apache.spark.sql.SparkSession object AddLog { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .master("local[*]") .appName(this.getClass.getCanonicalName) .getOrCreate() val sc = spark.sparkContext val clickRDD = sc.textFile("data/click.log") val impRDD = sc.textFile("data/imp.log") val clickRes = clickRDD.map{line => { val arr = line.split("\\s+") val adid = arr(3).substring(arr(3).lastIndexOf("=") + 1) (adid, 1) }}.reduceByKey(_ + _) val impRes = impRDD.map { line => val arr = line.split("\\s+") val adid = arr(3).substring(arr(3).lastIndexOf("=") + 1) (adid, 1) }.reduceByKey(_ + _) //保存到hdfs clickRes.fullOuterJoin(impRes) .map(x => x._1 + "," + x._2._1.getOrElse(0) + "," + x._2._2.getOrElse(0)) .repartition(1) // .saveAsTextFile("hdfs://linux121:9000/data/") .saveAsTextFile("data/add_log") sc.stop() } }
分析:共有兩次shuffle
, fulljon
可以修改為union + reduceByKey
,將shuffle
減少到一次
A表有三個(gè)字段:ID、startdate、enddate,有3條數(shù)據(jù):
1 2019-03-04 2020-02-03
2 2020-04-05 2020-08-04
3 2019-10-09 2020-06-11
寫SQL(需要SQL和DSL)將以上數(shù)據(jù)變化為:
2019-03-04 2019-10-09
2019-10-09 2020-02-03
2020-02-03 2020-04-05
2020-04-05 2020-06-11
2020-06-11 2020-08-04
2020-08-04 2020-08-04
分析:觀察,可以得到,第一列實(shí)際上是startdate 和 enddate兩列疊加的結(jié)果,而第二列是下一個(gè),可以用lead
窗口函數(shù)
代碼如下:
package com.hoult.work import org.apache.spark.sql.{DataFrame, SparkSession} object DataExchange { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("DateSort") .master("local[*]") .getOrCreate() spark.sparkContext.setLogLevel("warn") // 原數(shù)據(jù) val tab = List((1, "2019-03-04", "2020-02-03"),(2, "2020-04-05", "2020-08-04"),(3, "2019-10-09", "2020-06-11")) val df: DataFrame = spark.createDataFrame(tab).toDF("ID", "startdate", "enddate") val dateset: DataFrame = df.select("startdate").union(df.select("enddate")) dateset.createOrReplaceTempView("t") val result: DataFrame = spark.sql( """ |select tmp.startdate, nvl(lead(tmp.startdate) over(partition by col order by tmp.startdate), startdate) enddate from |(select "1" col, startdate from t) tmp |""".stripMargin) result.show() } }
運(yùn)行結(jié)果:
關(guān)于大數(shù)據(jù)開(kāi)發(fā)中如何進(jìn)行Spark-RDD http日志分析就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。