您好,登錄后才能下訂單哦!
代碼如下:
package com.dt.spark.streaming import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.streaming.{StreamingContext, Duration} /** * 使用SparkStreaming結(jié)合SparkSQL對日志進(jìn)行分析。 * 假設(shè)電商網(wǎng)站點(diǎn)擊日志格式(簡化)如下: * userid,itemId,clickTime * 需求:處理10分鐘內(nèi)item點(diǎn)擊次數(shù)排序Top10,并且將商品名稱顯示出來。商品itemId與商品名稱的對應(yīng)關(guān)系存放在MySQL數(shù)據(jù)庫中 * Created by dinglq on 2016/5/4. */ object LogAnalyzerStreamingSQL { val WINDOW_LENGTH = new Duration(600 * 1000) val SLIDE_INTERVAL = new Duration(10 * 1000) def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("LogAnalyzerStreamingSQL").setMaster("local[4]") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ //從數(shù)據(jù)庫中加載itemInfo表 val itemInfoDF = sqlContext.read.format("jdbc").options(Map( "url"-> "jdbc:mysql://spark-master:3306/spark", "driver"->"com.mysql.jdbc.Driver", "dbtable"->"iteminfo", "user"->"root", "password"-> "vincent" )).load() itemInfoDF.registerTempTable("itemInfo") val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL) val logLinesDStream = streamingContext.textFileStream("D:/logs_incoming") val accessLogsDStream = logLinesDStream.map(AccessLog.parseLogLine).cache() val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL) windowDStream.foreachRDD(accessLogs => { if (accessLogs.isEmpty()) { println("No logs received in this time interval") } else { accessLogs.toDF().registerTempTable("accessLogs") val sqlStr = "SELECT a.itemid,a.itemname,b.cnt FROM itemInfo a JOIN " + " (SELECT itemId,COUNT(*) cnt FROM accessLogs GROUP BY itemId) b " + " ON (a.itemid=b.itemId) ORDER BY cnt DESC LIMIT 10 " val topTenClickItemLast10Minus = sqlContext.sql(sqlStr) // Persist top ten table for this window to HDFS as parquet file topTenClickItemLast10Minus.show() } }) streamingContext.start() streamingContext.awaitTermination() } } case class AccessLog(userId: String, itemId: String, clickTime: String) { } object AccessLog { def parseLogLine(log: String): AccessLog = { val logInfo = log.split(",") if (logInfo.length == 3) { AccessLog(logInfo(0),logInfo(1), logInfo(2)) } else { AccessLog("0","0","0") } } }
MySQL中表的內(nèi)容如下:
mysql> select * from spark.iteminfo; +--------+----------+ | itemid | itemname | +--------+----------+ | 001 | phone | | 002 | computer | | 003 | TV | +--------+----------+ 3 rows in set (0.00 sec)
在D創(chuàng)建目錄logs_incoming
運(yùn)行Spark Streaming 程序。
新建文件,內(nèi)容如下:
0001,001,2016-05-04 22:10:20 0002,001,2016-05-04 22:10:21 0003,001,2016-05-04 22:10:22 0004,002,2016-05-04 22:10:23 0005,002,2016-05-04 22:10:24 0006,001,2016-05-04 22:10:25 0007,002,2016-05-04 22:10:26 0008,001,2016-05-04 22:10:27 0009,003,2016-05-04 22:10:28 0010,003,2016-05-04 22:10:29 0011,001,2016-05-04 22:10:30 0012,003,2016-05-04 22:10:31 0013,003,2016-05-04 22:10:32
將文件保存到目錄logs_incoming 中,觀察Spark程序的輸出:
+------+--------+---+ |itemid|itemname|cnt| +------+--------+---+ | 001| phone| 6| | 003| TV| 4| | 002|computer| 3| +------+--------+---+
備注:
1、DT大數(shù)據(jù)夢工廠微信公眾號DT_Spark
2、IMF晚8點(diǎn)大數(shù)據(jù)實(shí)戰(zhàn)YY直播頻道號:68917580
3、新浪微博: http://www.weibo.com/ilovepains
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。