溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Spark中怎么實(shí)現(xiàn)聚合功能

發(fā)布時(shí)間:2021-08-09 14:11:27 來(lái)源:億速云 閱讀:156 作者:Leah 欄目:云計(jì)算

Spark中怎么實(shí)現(xiàn)聚合功能,相信很多沒(méi)有經(jīng)驗(yàn)的人對(duì)此束手無(wú)策,為此本文總結(jié)了問(wèn)題出現(xiàn)的原因和解決方法,通過(guò)這篇文章希望你能解決這個(gè)問(wèn)題。

互聯(lián)網(wǎng)公司-面試題:
/**  
舉個(gè)例子,比如要統(tǒng)計(jì)用戶的總訪問(wèn)次數(shù)和去除訪問(wèn)同一個(gè)URL之后的總訪問(wèn)次數(shù),隨便造了幾條樣例數(shù)據(jù)(四個(gè)字段:id,name,vtm,url,vtm字段本例沒(méi)用,不用管)如下:

id1,user1,2,http://www.hupu.com
id1,user1,2,http://www.hupu.com
id1,user1,3,http://www.hupu.com
id1,user1,100,http://www.hupu.com
id2,user2,2,http://www.hupu.com
id2,user2,1,http://www.hupu.com
id2,user2,50,http://www.hupu.com
id2,user2,2,http://touzhu.hupu.com

根據(jù)這個(gè)數(shù)據(jù)集,我們可以寫hql 實(shí)現(xiàn):
select id,name, count(0) as ct,count(distinct url) as urlcount 
from table 
group by id,name;

得出結(jié)果應(yīng)該是:

id1,user1,4,1
id2,user2,4,2

下面用Spark實(shí)現(xiàn)這個(gè)聚合功能<發(fā)現(xiàn)Spark還是有難度的,臥槽>
簡(jiǎn)單說(shuō)說(shuō)MR的解析過(guò)程:

map階段: id和name組合為key, url為value
reduce階段: len(urls) 出現(xiàn)次數(shù), len(set(urls)) 出現(xiàn)用戶數(shù)
由于本人是不寫MR,導(dǎo)致面試很尷尬。
想裝逼寫個(gè)Spark, 發(fā)現(xiàn)難度很大,因?yàn)榈拇_很多函數(shù)不熟悉。

代碼如下:

import org.apache.spark.SparkContext._  
import org.apache.spark._   

object SparkDemo2 {  
  def main(args: Array[String]) {  
  
    case class User(id: String, name: String, vtm: String, url: String)  
    //val rowkey = (new RowKey).evaluate(_)  
    // val HADOOP_USER = "hdfs"  
    // 設(shè)置訪問(wèn)spark使用的用戶名  
    // System.setProperty("user.name", HADOOP_USER);  
    // 設(shè)置訪問(wèn)hadoop使用的用戶名  
    // System.setProperty("HADOOP_USER_NAME", HADOOP_USER);  
  
    val conf = new SparkConf().setAppName("wordcount").setMaster("local") //.setExecutorEnv("HADOOP_USER_NAME", HADOOP_USER)  
    val sc = new SparkContext(conf)  
    val data = sc.textFile("/Users/jiangzl/Desktop/test.txt")  
    val rdd1 = data.map(line => {  
      val r = line.split(",")  
      User(r(0), r(1), r(2), r(3))  
    })
    val rdd2 = rdd1.map(r => ((r.id, r.name), r))  
  
    val seqOp = (a: (Int, List[String]), b: User) => a match {  
      case (0, List()) => (1, List(b.url))  
      case _ => (a._1 + 1, b.url :: a._2)  
    }  
  
    val combOp = (a: (Int, List[String]), b: (Int, List[String])) => {  
      (a._1 + b._1, a._2 ::: b._2)  
    }
  
    println("-----------------------------------------")  
    val rdd3 = rdd2.aggregateByKey((0, List[String]()))(seqOp, combOp).map(a => {  
      (a._1, a._2._1, a._2._2.distinct.length)  
    })  
    rdd3.collect.foreach(println)  
    println("-----------------------------------------")  
    sc.stop()  
  }  
}

解決方案-報(bào)錯(cuò)Scala版本問(wèn)題:Exception in thread "main" java.lang.NoSuchMethodError: scala.runtime.VolatileObjectRef.zero()Lscala/runtime/VolatileObjectRef;

修改Scala版本2.11.7改為2.10.4

simple.sbt

name := "SparkDemo Project"
version := "1.0"
scalaVersion := "2.11.7"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.1"

———————————————————————————修改為:——————————————————————————

name := "SparkDemo Project"
version := "1.0"
scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.1"

運(yùn)行過(guò)程

jiangzhongliandeMacBook-Pro:spark-1.4.1-bin-hadoop2.6 jiangzl$ ./bin/spark-submit --class "SparkDemo2" ~/Desktop/tmp/target/scala-2.11/simple-project_2.11-1.0.jar
Exception in thread "main" java.lang.NoSuchMethodError: scala.runtime.VolatileObjectRef.zero()Lscala/runtime/VolatileObjectRef;
    at SparkDemo2$.main(tmp_spark.scala)
    at SparkDemo2.main(tmp_spark.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

———————————————————————————修改為:——————————————————————————

jiangzhongliandeMacBook-Pro:spark-1.4.1-bin-hadoop2.6 jiangzl$ ./bin/spark-submit --class "SparkDemo2" ~/Desktop/tmp/target/scala-2.10/sparkdemo-project_2.10-1.0.jar
16/04/29 12:40:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-----------------------------------------
((id1,user1),4,1)
((id2,user2),4,2)
-----------------------------------------

看完上述內(nèi)容,你們掌握Spark中怎么實(shí)現(xiàn)聚合功能的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI