溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

怎么實踐Spark

發(fā)布時間:2021-12-16 18:47:53 來源:億速云 閱讀:114 作者:柒染 欄目:云計算

怎么實踐Spark,針對這個問題,這篇文章詳細(xì)介紹了相對應(yīng)的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。

Spark小試牛刀

隨著項目的運營,收集了很多的用戶數(shù)據(jù)。最近業(yè)務(wù)上想做些社交圖譜相關(guān)的產(chǎn)品,但因為數(shù)據(jù)很多、很雜,傳統(tǒng)的數(shù)據(jù)庫查詢已經(jīng)滿足不了業(yè)務(wù)的需求。 試著用Spark來做,權(quán)當(dāng)練練手了。

安裝Spark

因為有Scala的開發(fā)經(jīng)驗,所以就不用官方提供的二進(jìn)制包了,自編譯scala 2.11版本。

下載Spark:http://ftp.cuhk.edu.hk/pub/packages/apache.org/spark/spark-1.5.0/spark-1.5.0.tgz

tar zxf spark-1.5.0.tgz
cd spark-1.5.0
./dev/change-scala-version.sh 2.11
mvn -Pyarn -Phadoop-2.6 -Dscala-2.11 -DskipTests clean package

以上命令完成Spark基于scala 2.11版本的編譯??梢赃\行自帶的一個示例程序來驗證安裝是否成功。

./bin/run-example SparkPi

編寫Standalone application

使用sbt來構(gòu)建一個可提交的簡單Spark程序,功能是計算每個用戶加入的群組,并把結(jié)果保存下來。project/Build.scala配置文件如下:

import _root_.sbt.Keys._
import _root_.sbt._
import sbtassembly.AssemblyKeys._

object Build extends Build {

  override lazy val settings = super.settings :+ {
    shellPrompt := (s => Project.extract(s).currentProject.id + " > ")
  }

  lazy val root = Project("spark-mongodb", file("."))
    .settings(
      scalaVersion := "2.11.7",
      assemblyJarName in assembly := "spark-mongodb.jar",
      assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false),
      libraryDependencies ++= Seq(
        "org.apache.spark" %% "spark-core" % verSpark % "scopeProvidedTest,
        "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "1.4.0" excludeAll(
            ExclusionRule(organization = "javax.servlet"), 
            ExclusionRule(organization = "commons-beanutils"), 
            ExclusionRule(organization = "org.apache.hadoop")))
    )
  
  private val scopeProvidedTest = "provided,test"
  private val verSpark = "1.5.0"
}

數(shù)據(jù)存儲在MongoDB數(shù)據(jù)庫中,所以我們還需要使用mongo-hadoop連接器來訪問MongoDB數(shù)據(jù)庫。

示例程序

示例程序非常的簡單,把數(shù)據(jù)從數(shù)據(jù)庫里全部讀出,使用map來把每條記錄里用戶ID對應(yīng)加入的群組ID轉(zhuǎn)換成一個Set,再使用 reduceByKey來把相同用戶ID的set合并到一起,存入數(shù)據(jù)庫即可。

import com.mongodb.BasicDBObject
import com.mongodb.hadoop.{MongoInputFormat, MongoOutputFormat}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkConf, SparkContext}
import org.bson.BSONObject

import scala.collection.JavaConverters._

object QQGroup {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("QQGroup")
    val sc = new SparkContext(sparkConf)

    val inputConfig = new Configuration()
    inputConfig.set("mongo.input.uri", "mongodb://192.168.31.121:27017/db.userGroup")
    inputConfig.set("mongo.input.fields", """{"userId":1, "groupId":1, "_id":0}""")
    inputConfig.set("mongo.input.noTimeout", "true")

    val documentRDD = sc.newAPIHadoopRDD(
      inputConfig,
      classOf[MongoInputFormat],
      classOf[Object],
      classOf[BSONObject])

    val userRDD = documentRDD.map { case (_, doc) =>
      (getValue(doc, "userId"), getValue(doc, "groupId"))
    }.reduceByKey(_ ++ _)

    val resultRDD = userRDD.map { case (userId, groupIds) =>
      val o = new BasicDBObject()
      o.put("groupIds", groupIds.asJava)
      userId -> o
    }

    val outputConfig = new Configuration()
    outputConfig.set("mongo.output.uri", "mongodb://192.168.31.121:27017/db_result.userGroup")

    resultRDD.saveAsNewAPIHadoopFile(
      "file://this-is-completely-unused",
      classOf[Object],
      classOf[BSONObject],
      classOf[MongoOutputFormat[Object, BSONObject]],
      outputConfig)
  }

  def getValue(dbo: BSONObject, key: String) = {
    val value = dbo.get(key)
    if (value eq null) "" else value.asInstanceOf[String]
  }
}

MongoDB官方提供了Hadoop連接器,Spark可以使用mongo-hadoop連接器來讀、寫MongoDB數(shù)據(jù)庫。 主要的輸入配置薦有:

  • mongo.input.uri: MongoDB的連接URI

  • mongo.input.fields: 指定返回哪些數(shù)據(jù),與db.query里的第2個參數(shù)功能一樣

  • mongo.input.query: MongoDB的查詢參數(shù)

相應(yīng)的MongoDB也提供了一系列的輸出參數(shù),如:

  • mongo.output.uri: MongoDB的連接URI

sc.newAPIHadoopRDD()方法有4個參數(shù),分別為:配置、輸入格式化類、待映射數(shù)據(jù)主鍵類型、待映射數(shù)據(jù)類型。

主要的操作代碼:

    val userRDD = documentRDD.map { case (_, doc) =>
      (getValue(doc, "userId"), Set(getValue(doc, "groupId")))
    }.reduceByKey(_ ++ _)

    val resultRDD = userRDD.map { case (userId, groupIds) =>
      val o = new BasicDBObject()
      o.put("groupIds", groupIds.asJava)
      userId -> o
    }

先使用map方法獲取userIdgroupId,并把groupId轉(zhuǎn)換為一個Set。

在把數(shù)據(jù)轉(zhuǎn)換成Tuple2,就是一個KV的形式以后,我們就可以調(diào)用一系列的轉(zhuǎn)換方法來對RDD進(jìn)行操作,這里使用reduceByKey方法來將同一個userId的所以value都合并在一起。這樣我們就有了所有用戶對應(yīng)加入的群組 的一個RDD集了。

(RDD上有兩種類型的操作。一種是“變換”,它只是描述了待進(jìn)行的操作指令,并不會觸發(fā)實際的計算;另一種是“動作”, 它將觸發(fā)實際的計算動作,這時候系統(tǒng)才會實際的從數(shù)據(jù)源讀入數(shù)據(jù),操作內(nèi)存,保存數(shù)據(jù)等)

最后使用resultRDD.saveAsNewAPIHadoopFile()方法來把計算結(jié)果存入MongoDB,這里的一個參數(shù):用于指定 HDFS的存儲位置并不會使用到,因為mongo-hadoop將會使用mongo.output.uri指定的存儲URI連接地址來保存數(shù)據(jù)。

關(guān)于怎么實踐Spark問題的解答就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注億速云行業(yè)資訊頻道了解更多相關(guān)知識。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI