溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

第94課:SparkStreaming 實現廣告計費系統(tǒng)中在線黑名單過濾實戰(zhàn)

發(fā)布時間:2020-06-22 17:06:07 來源:網絡 閱讀:989 作者:csmw00 欄目:大數據

本期課程內容:

  • 在線黑名單過濾實現解析

  • SparkStreaming實現在線黑名單過濾 


  廣告計費系統(tǒng),是電商必不可少的一個功能點。為了防止惡意的廣告點擊(假設商戶A和B同時在某電商做了廣告,A和B為競爭對手,那么如果A使用點擊機器人進行對B的廣告的惡意點擊,那么B的廣告費用將很快被用完),必須對廣告點擊進行黑名單過濾。

  可以使用leftOuterJoin對目標數據和黑名單數據進行關聯,將命中黑名單的數據過濾掉。


  本文主要介紹的是DStream的transform函數的使用


SparkStreaming代碼實現

package com.dt.spark.sparkapps.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * 使用Scala開發(fā)集群運行的Spark 在線黑名單過濾程序
 * Created by Limaoran on 2016/5/2.
 * 新浪微博:http://weibo.com/ilovepains/
 *
 * 背景描述:在廣告點擊計費系統(tǒng)中,我們在線過濾掉黑名單的點擊,進而保護廣告商的利益,
 * 只進行有效的廣告點擊計費或者在防刷評分(或者流量)系統(tǒng),過濾掉無效的投票或者評分或者流量;
 * 實現技術:使用transform Api直接基于RDD編程,進行join操作
 */
object OnlineBlackListFilter {
  def main(args: Array[String]) {
    /**
     * 第1步:創(chuàng)建Spark的配置對象SparkConf,設置Spark程序的運行時的配置信息。
     * 例如說通過setMaster來設置程序要鏈接的Spark集群的Master的URL,如果設置
     * 為local,則代表Spark程序在本地運行,特別適合于機器配置條件非常差(例如
     * 只有1G的內存)的初學者      
     */
    val conf = new SparkConf()  //創(chuàng)建SparkConf對象
    conf.setAppName("OnlineBlackListFilter")  //設置應用程序的名稱,在程序運行的監(jiān)控界面可以看到名稱
    conf.setMaster("spark://Master:7077") //此時,程序在Spark集群

    val ssc = new StreamingContext(conf,Seconds(30))

    /**
     * 黑名單數據準備,實際上黑名單一般都是動態(tài)的,例如在Redis或者數據庫中,黑名單的生成往往有復雜的業(yè)務
     * 邏輯,具體情況算法不同,但是在Spark Streaming進行處理的時候每次都能工訪問完整的信息
     */
    val blackList = Array(("hadoop",true),("mahout",true))
    val blackListRDD = ssc.sparkContext.parallelize(blackList,8)

    val adsClickStream = ssc.socketTextStream("Master" ,9999)
    /**
     * 此處模擬的廣告點擊的每條數據的格式為:time、name
     * 此處map操作的結果是name、(time,name)的格式
     */
    val adsClientStreamFormated = adsClickStream.map(ads=>(ads.split(" ")(1),ads))
    adsClientStreamFormated.transform(userClickRDD => {
      //通過leftOuterJoin操作既保留了左側用戶廣告點擊內容的RDD的所有內容,又獲得了相應點擊內容是否在黑名單中
      val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD)
      /**
       * 進行filter過濾的時候,其輸入元素是一個Tuple:(name,((time,name), boolean))
       * 其中第一個元素是黑名單的名稱,第二元素的第二個元素是進行l(wèi)eftOuterJoin的時候是否存在在值
       * 如果存在的話,表面當前廣告點擊是黑名單,需要過濾掉,否則的話則是有效點擊內容;
       */
      val validClicked = joinedBlackListRDD.filter(joinedItem=>{
        if(joinedItem._2._2.getOrElse(false)){
          false
        }else{
          true
        }
      })
      validClicked.map(validClick => {validClick._2._1})
    }).print()
    /**
     * 計算后的有效數據一般都會寫入Kafka中,下游的計費系統(tǒng)會從kafka中pull到有效數據進行計費
     */
    ssc.start()
    ssc.awaitTermination()
  }
}

將程序打包,并上傳至spark集群


在spark-master節(jié)點,啟動nc

root@spark-master:~# nc -lk 9999

運行OnlineBlacklistFilter程序

root@spark-master:~# /usr/local/spark-1.6.0/bin/spark-submit --class com.dt.spark.sparkapps.streaming.OnlineBlackListFilter --master spark://Master:7077 ./sparkApps.jar

在nc端輸入數據

root@spark-master:~# nc -lk 9999
22555 spark
124321 hadoop
5555 Flink
6666 HDFS
2222 Kafka
572231 Java
66662 mahout

SparkStreaming運行結果:

16/05/02 08:28:00 INFO MapPartitionsRDD: Removing RDD 8 from persistence list
-------------------------------------------
5555 Flink
6666 HDFS
572231 Java
22555 spark
2222 Kafka

可見,結果已經將黑名單設置的hadoop和mathou過濾掉了。


在此程序的基礎上,可以添加更復雜的業(yè)務邏輯規(guī)則,以滿足企業(yè)的需求。

備注:

1、DT大數據夢工廠微信公眾號DT_Spark 
2、IMF晚8點大數據實戰(zhàn)YY直播頻道號:68917580
3、新浪微博: http://www.weibo.com/ilovepains


向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI