溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

基于Spark MLlib平臺(tái)的協(xié)同過(guò)濾算法---電影推薦系統(tǒng)

發(fā)布時(shí)間:2020-07-04 14:11:19 來(lái)源:網(wǎng)絡(luò) 閱讀:35215 作者:996440550 欄目:大數(shù)據(jù)

基于Spark MLlib平臺(tái)的協(xié)同過(guò)濾算法---電影推薦系統(tǒng)

   

    又好一陣子沒(méi)有寫文章了,阿彌陀佛...最近項(xiàng)目中要做理財(cái)推薦,所以,回過(guò)頭來(lái)回顧一下協(xié)同過(guò)濾算法在推薦系統(tǒng)中的應(yīng)用。

    說(shuō)到推薦系統(tǒng),大家可能立馬會(huì)想到協(xié)同過(guò)濾算法。本文基于Spark MLlib平臺(tái)實(shí)現(xiàn)一個(gè)向用戶推薦電影的簡(jiǎn)單應(yīng)用。其中,主要包括三部分內(nèi)容:


  • 協(xié)同過(guò)濾算法概述

  • 基于模型的協(xié)同過(guò)濾應(yīng)---電影推薦

  • 實(shí)時(shí)推薦架構(gòu)分析


    

    一、協(xié)同過(guò)濾算法概述

        本人對(duì)算法的研究,目前還不是很深入,這里簡(jiǎn)單的介紹下其工作原理。

        通常,協(xié)同過(guò)濾算法按照數(shù)據(jù)使用,可以分為:

        1)基于用戶(UserCF)

       2)基于商品(ItemCF)

       3)基于模型(ModelCF)

        按照模型,可以分為:

        1)最近鄰模型:基于距離的協(xié)同過(guò)濾算法

       2)Latent Factor Mode(SVD):基于矩陣分解的模型

       3)Graph:圖模型,社會(huì)網(wǎng)絡(luò)圖模型

        文中,使用的協(xié)同過(guò)濾算法是基于矩陣分解的模型。

        

      1、基于用戶(UserCF)---基于用戶相似性

        基于用戶的協(xié)同過(guò)濾,通過(guò)不同用戶對(duì)物品的評(píng)分來(lái)評(píng)測(cè)用戶之間的相似性,基于用戶之間的相似性做出推薦。簡(jiǎn)單來(lái)講,就是給用戶推薦和他興趣相似的其他用戶喜歡的物品。

        舉個(gè)例子:

        基于Spark MLlib平臺(tái)的協(xié)同過(guò)濾算法---電影推薦系統(tǒng)

        如圖,有三個(gè)用戶A、B、C,四個(gè)物品A、B、C、D,需要向用戶A推薦物品。這里,由于用戶A和用戶C都買過(guò)物品A和物品C,所以,我們認(rèn)為用戶A和用戶C非常相似,同時(shí),用戶C又買過(guò)物品D,那么就需要給A用戶推薦物品D。

        基于UserCF的基本思想相當(dāng)簡(jiǎn)單,基于用戶對(duì)物品的偏好,找到相鄰鄰居用戶,然后將鄰居用戶喜歡的商品推薦給當(dāng)前用戶。

        計(jì)算上,將一個(gè)用戶對(duì)所有物品的偏好作為一個(gè)向量來(lái)計(jì)算用戶之間的相似度,找到K鄰居后,根據(jù)鄰居的相似度權(quán)重以及他們對(duì)物品的偏好,預(yù)測(cè)當(dāng)前用戶沒(méi)有偏好的未涉及物品,計(jì)算得到一個(gè)排序的物品列表作為推薦。


        2、基于商品(ItemCF)---基于商品相似性

      基于商品的協(xié)同過(guò)濾,通過(guò)用戶對(duì)不同item的評(píng)分來(lái)評(píng)測(cè)item之間的相似性,基于item之間的相似性做出推薦。簡(jiǎn)單來(lái)將,就是給用戶推薦和他之前喜歡的物品相似的物品。

       例如:

       基于Spark MLlib平臺(tái)的協(xié)同過(guò)濾算法---電影推薦系統(tǒng)

       如圖,有三個(gè)用戶A、B、C和三件物品A、B、C,需要向用戶C推薦物品。這里,由于用戶A買過(guò)物品A和C,用戶B買過(guò)物品A、B、C,用戶C買過(guò)物品A,從用戶A和B可以看出,這兩個(gè)用戶都買過(guò)物品A和C,說(shuō)明物品A和C非常相似,同時(shí),用戶C又買過(guò)物品A,所以,將物品C推薦給用戶C。

       基于ItemCF的原理和基于UserCF類似,只是在計(jì)算鄰居時(shí)采用物品本身,而不是從用戶的角度,即基于用戶對(duì)物品的偏好找到相似的物品,然后根據(jù)用戶的歷史偏好,推薦相似的物品給他。

       從計(jì)算角度,即將所有用戶對(duì)某個(gè)物品的偏好作為一個(gè)向量來(lái)計(jì)算物品之間的相似度,得到物品的相似物品后,根據(jù)用戶歷史的偏好預(yù)測(cè)當(dāng)前用戶還沒(méi)有表示偏好的物品,計(jì)算得到一個(gè)排序的物品列表作為推薦。


        3、基于模型(ModelCF)

        基于模型的協(xié)同過(guò)濾推薦就是基于樣本的用戶喜好信息,訓(xùn)練一個(gè)推薦模型,然后根據(jù)實(shí)時(shí)的用戶喜好的信息進(jìn)行預(yù)測(cè),計(jì)算推薦。

                本文使用的基于矩陣分解的模型,算法如圖:

        基于Spark MLlib平臺(tái)的協(xié)同過(guò)濾算法---電影推薦系統(tǒng)

         Spark MLlib當(dāng)前支持基于模型的協(xié)同過(guò)濾,其中用戶和商品通過(guò)一小組隱性因子進(jìn)行表達(dá),并且這些因子也用于預(yù)測(cè)缺失的元素。MLlib使用交替最小二乘法(ALS)來(lái)學(xué)習(xí)這些隱性因子。

         如果有興趣,可以閱讀Spark的這部分源代碼:

         基于Spark MLlib平臺(tái)的協(xié)同過(guò)濾算法---電影推薦系統(tǒng)

         

    

    、基于模型的協(xié)同過(guò)濾應(yīng)用---電影推薦

         本文實(shí)現(xiàn)對(duì)用戶推薦電影的簡(jiǎn)單應(yīng)用。

        1、測(cè)試數(shù)據(jù)描述

           本次測(cè)試數(shù)據(jù)主要包括四個(gè)數(shù)據(jù)文件:(詳細(xì)的數(shù)據(jù)描述參見(jiàn)README文件)

           1)用戶數(shù)據(jù)文件

              用戶ID::性別::年齡::職業(yè)編號(hào)::郵編

              基于Spark MLlib平臺(tái)的協(xié)同過(guò)濾算法---電影推薦系統(tǒng)

          2)電影數(shù)據(jù)文件

             電影ID::電影名稱::電影種類

             基于Spark MLlib平臺(tái)的協(xié)同過(guò)濾算法---電影推薦系統(tǒng)

         3)評(píng)分?jǐn)?shù)據(jù)文件

            用戶ID::電影ID::評(píng)分::時(shí)間

            基于Spark MLlib平臺(tái)的協(xié)同過(guò)濾算法---電影推薦系統(tǒng)

        4)測(cè)試數(shù)據(jù)

           用戶ID::電影ID::評(píng)分::時(shí)間

           基于Spark MLlib平臺(tái)的協(xié)同過(guò)濾算法---電影推薦系統(tǒng)

        這里,前三個(gè)數(shù)據(jù)文件用于模型訓(xùn)練,第四個(gè)數(shù)據(jù)文件用于測(cè)試模型。


        2、實(shí)現(xiàn)代碼:

           

import org.apache.log4j.{Level, Logger}

import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}

import org.apache.spark.rdd._

import org.apache.spark.{SparkContext, SparkConf}

import org.apache.spark.SparkContext._


import scala.io.Source


object MovieLensALS {

  def main(args:Array[String]) {


    //屏蔽不必要的日志顯示在終端上

    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

    Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)


    //設(shè)置運(yùn)行環(huán)境

    val sparkConf = new SparkConf().setAppName("MovieLensALS").setMaster("local[5]")

    val sc = new SparkContext(sparkConf)


    //裝載用戶評(píng)分,該評(píng)分由評(píng)分器生成(即生成文件personalRatings.txt)

    val myRatings = loadRatings(args(1))

    val myRatingsRDD = sc.parallelize(myRatings, 1)


    //樣本數(shù)據(jù)目錄

    val movielensHomeDir = args(0)


    //裝載樣本評(píng)分?jǐn)?shù)據(jù),其中最后一列Timestamp取除10的余數(shù)作為key,Rating為值,即(Int,Rating)

    val ratings = sc.textFile(movielensHomeDir + "/ratings.dat").map {

      line =>

        val fields = line.split("::")

        // format: (timestamp % 10, Rating(userId, movieId, rating))

        (fields(3).toLong % 10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble))

    }


    //裝載電影目錄對(duì)照表(電影ID->電影標(biāo)題)

    val movies = sc.textFile(movielensHomeDir + "/movies.dat").map {

      line =>

        val fields = line.split("::")

        // format: (movieId, movieName)

        (fields(0).toInt, fields(1))

    }.collect().toMap

    

    //統(tǒng)計(jì)有用戶數(shù)量和電影數(shù)量以及用戶對(duì)電影的評(píng)分?jǐn)?shù)目

    val numRatings = ratings.count()

    val numUsers = ratings.map(_._2.user).distinct().count()

    val numMovies = ratings.map(_._2.product).distinct().count()

    println("Got " + numRatings + " ratings from " + numUsers + " users " + numMovies + " movies")


    //將樣本評(píng)分表以key值切分成3個(gè)部分,分別用于訓(xùn)練 (60%,并加入用戶評(píng)分), 校驗(yàn) (20%), and 測(cè)試 (20%)

    //該數(shù)據(jù)在計(jì)算過(guò)程中要多次應(yīng)用到,所以cache到內(nèi)存

    val numPartitions = 4

    val training = ratings.filter(x => x._1 < 6).values.union(myRatingsRDD).repartition(numPartitions).persist()

    val validation = ratings.filter(x => x._1 >= 6 && x._1 < 8).values.repartition(numPartitions).persist()

    val test = ratings.filter(x => x._1 >= 8).values.persist()


    val numTraining = training.count()

    val numValidation = validation.count()

    val numTest = test.count()

    println("Training: " + numTraining + " validation: " + numValidation + " test: " + numTest)



    //訓(xùn)練不同參數(shù)下的模型,并在校驗(yàn)集中驗(yàn)證,獲取最佳參數(shù)下的模型

    val ranks = List(8, 12)

    val lambdas = List(0.1, 10.0)

    val numIters = List(10, 20)

    var bestModel: Option[MatrixFactorizationModel] = None

    var bestValidationRmse = Double.MaxValue

    var bestRank = 0

    var bestLambda = -1.0

    var bestNumIter = -1


    for (rank <- ranks; lambda <- lambdas; numIter <- numIters) {

      val model = ALS.train(training, rank, numIter, lambda)

      val validationRmse = computeRmse(model, validation, numValidation)

      println("RMSE(validation) = " + validationRmse + " for the model trained with rank = "

        + rank + ",lambda = " + lambda + ",and numIter = " + numIter + ".")


      if (validationRmse < bestValidationRmse) {

        bestModel = Some(model)

        bestValidationRmse = validationRmse

        bestRank = rank

        bestLambda = lambda

        bestNumIter = numIter

      }

    }


    //用最佳模型預(yù)測(cè)測(cè)試集的評(píng)分,并計(jì)算和實(shí)際評(píng)分之間的均方根誤差(RMSE)

    val testRmse = computeRmse(bestModel.get, test, numTest)

    println("The best model was trained with rank = " + bestRank + " and lambda = " + bestLambda

      + ", and numIter = " + bestNumIter + ", and its RMSE on the test set is " + testRmse + ".")


    //create a naive baseline and compare it with the best model

    val meanRating = training.union(validation).map(_.rating).mean

    val baselineRmse = math.sqrt(test.map(x => (meanRating - x.rating) * (meanRating - x.rating)).reduce(_ + _) / numTest)

    val improvement = (baselineRmse - testRmse) / baselineRmse * 100

    println("The best model improves the baseline by " + "%1.2f".format(improvement) + "%.")


    //推薦前十部最感興趣的電影,注意要剔除用戶已經(jīng)評(píng)分的電影

    val myRatedMovieIds = myRatings.map(_.product).toSet

    val candidates = sc.parallelize(movies.keys.filter(!myRatedMovieIds.contains(_)).toSeq)

    val recommendations = bestModel.get

      .predict(candidates.map((0, _)))

      .collect

      .sortBy(-_.rating)

      .take(10)

    var i = 1

    println("Movies recommended for you:")

    recommendations.foreach { r =>

      println("%2d".format(i) + ": " + movies(r.product))

      i += 1

    }


    sc.stop()

  }



  /** 校驗(yàn)集預(yù)測(cè)數(shù)據(jù)和實(shí)際數(shù)據(jù)之間的均方根誤差 **/

  def computeRmse(model:MatrixFactorizationModel,data:RDD[Rating],n:Long):Double = {


    val predictions:RDD[Rating] = model.predict((data.map(x => (x.user,x.product))))

    val predictionsAndRatings = predictions.map{ x =>((x.user,x.product),x.rating)}

                          .join(data.map(x => ((x.user,x.product),x.rating))).values

    math.sqrt(predictionsAndRatings.map( x => (x._1 - x._2) * (x._1 - x._2)).reduce(_+_)/n)

  }


  /** 裝載用戶評(píng)分文件 personalRatings.txt **/

  def loadRatings(path:String):Seq[Rating] = {

    val lines = Source.fromFile(path).getLines()

    val ratings = lines.map{

      line =>

        val fields = line.split("::")

        Rating(fields(0).toInt,fields(1).toInt,fields(2).toDouble)

    }.filter(_.rating > 0.0)

    if(ratings.isEmpty){

      sys.error("No ratings provided.")

    }else{

      ratings.toSeq

    }

  }

}


        3、運(yùn)行程序

        1)設(shè)置參數(shù),運(yùn)行程序

             基于Spark MLlib平臺(tái)的協(xié)同過(guò)濾算法---電影推薦系統(tǒng)

             這里有兩個(gè)輸入參數(shù):第一個(gè)是數(shù)據(jù)文件目錄,第二個(gè)是測(cè)試數(shù)據(jù)。


         2)程序運(yùn)行效果---模型訓(xùn)練過(guò)程

           基于Spark MLlib平臺(tái)的協(xié)同過(guò)濾算法---電影推薦系統(tǒng)

           從運(yùn)行效果來(lái)看,總共有6040個(gè)用戶,3706個(gè)電影(已經(jīng)去重),1000209條評(píng)分?jǐn)?shù)據(jù);如程序,我們把所有數(shù)據(jù)分為三部分:60%用于訓(xùn)練、20%用戶校驗(yàn)、20%用戶測(cè)試模型;接下來(lái)是模型在不同參數(shù)下的均方根誤差(RMSE)值,以及對(duì)應(yīng)的參數(shù),最優(yōu)的參數(shù)選擇均方根誤差(RMSE---0.8665911...)最小的參數(shù)值---即最優(yōu)參數(shù)模型建立;接著,使用20%的測(cè)試模型數(shù)據(jù)來(lái)測(cè)試模型的好壞,也就是均方根誤差(RMSE),這里計(jì)算的結(jié)果為0.86493444...,在最優(yōu)參數(shù)模型基礎(chǔ)上提升了22.32%的準(zhǔn)確率。

           說(shuō)明下,其實(shí)在數(shù)據(jù)的劃分上(60%+20%+20%),最好隨機(jī)劃分?jǐn)?shù)據(jù),這樣得到的結(jié)果更有說(shuō)服力。


       3)程序運(yùn)行效果---電影推薦結(jié)果

      基于Spark MLlib平臺(tái)的協(xié)同過(guò)濾算法---電影推薦系統(tǒng)

        最后,給用戶推薦10部自己未看過(guò)的電影。


        4、總結(jié)

          這樣,一個(gè)簡(jiǎn)單的基于模型的電影推薦應(yīng)用就算OK了。


    三、實(shí)時(shí)推薦架構(gòu)分析

        上面,實(shí)現(xiàn)了簡(jiǎn)單的推薦系統(tǒng)應(yīng)用,但是,僅僅實(shí)現(xiàn)用戶的定向推薦,在實(shí)際應(yīng)用中價(jià)值不是非常大,如果體現(xiàn)價(jià)值,最好能夠?qū)崿F(xiàn)實(shí)時(shí)或者準(zhǔn)實(shí)時(shí)推薦。

        下面,簡(jiǎn)單介紹下實(shí)時(shí)推薦的一個(gè)架構(gòu):

        基于Spark MLlib平臺(tái)的協(xié)同過(guò)濾算法---電影推薦系統(tǒng)

        

        該架構(gòu)圖取自淘寶Spark On Yarn的實(shí)時(shí)架構(gòu),這里,給出一些個(gè)人的觀點(diǎn):

        架構(gòu)圖分為三層:離線、近線和在線。

            離線部分:主要實(shí)現(xiàn)模型的建立。原始數(shù)據(jù)通過(guò)ETL加工清洗,得到目標(biāo)數(shù)據(jù),目標(biāo)業(yè)務(wù)數(shù)據(jù)結(jié)合合適的算法,學(xué)習(xí)訓(xùn)練模型,得到最佳的模型。

            近線部分:主要使用HBase存儲(chǔ)用戶行為信息,模型混合系統(tǒng)綜合顯性反饋和隱性反饋的模型處理結(jié)果,將最終的結(jié)果推薦給用戶。

            在線部分:這里,主要有兩種反饋,顯性和隱性,個(gè)人理解,顯性反饋理解為用戶將商品加入購(gòu)物車,用戶購(gòu)買商品這些用戶行為;隱性反饋理解為用戶在某個(gè)商品上停留的時(shí)間,用戶點(diǎn)擊哪些商品這些用戶行為。這里,為了實(shí)現(xiàn)實(shí)時(shí)/準(zhǔn)實(shí)時(shí)操作,使用到了Spark Streaming對(duì)數(shù)據(jù)進(jìn)行實(shí)時(shí)處理。(有可能是Flume+Kafka+Spark Streaming架構(gòu))

        這里是個(gè)人的一些理解,不足之處,望各位指點(diǎn)。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI