您好,登錄后才能下訂單哦!
數(shù)據(jù)格式:
每一行都是字符串并且以空格分開(kāi)。
代碼實(shí)現(xiàn):
object SparkSqlTest {
def main(args: Array[String]): Unit = {
//屏蔽多余的日志
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.project-spark").setLevel(Level.WARN)
//構(gòu)建編程入口
val conf: SparkConf = new SparkConf()
conf.setAppName("SparkSqlTest")
.setMaster("local[2]")
val spark: SparkSession = SparkSession.builder().config(conf)
.enableHiveSupport()
.getOrCreate()
//創(chuàng)建sqlcontext對(duì)象
val sqlContext: SQLContext = spark.sqlContext
val wordDF: DataFrame = sqlContext.read.text("C:\\z_data\\test_data\\ip.txt").toDF("line")
wordDF.createTempView("lines")
val sql=
"""
|select t1.word,count(1) counts
|from (
|select explode(split(line,'\\s+')) word
|from lines) t1
|group by t1.word
|order by counts
""".stripMargin
spark.sql(sql).show()
}
}
結(jié)果:
數(shù)據(jù)格式:
取每門(mén)課程中成績(jī)最好的前三
代碼實(shí)現(xiàn):
object SparkSqlTest {
def main(args: Array[String]): Unit = {
//屏蔽多余的日志
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.project-spark").setLevel(Level.WARN)
//構(gòu)建編程入口
val conf: SparkConf = new SparkConf()
conf.setAppName("SparkSqlTest")
.setMaster("local[2]")
val spark: SparkSession = SparkSession.builder().config(conf)
.enableHiveSupport()
.getOrCreate()
//創(chuàng)建sqlcontext對(duì)象
val sqlContext: SQLContext = spark.sqlContext
val topnDF: DataFrame = sqlContext.read.json("C:\\z_data\\test_data\\score.json")
topnDF.createTempView("student")
val sql=
"""select
|t1.course course,
|t1.name name,
|t1.score score
|from (
|select
|course,
|name,
|score,
|row_number() over(partition by course order by score desc ) top
|from student) t1 where t1.top<=3
""".stripMargin
spark.sql(sql).show()
}
}
結(jié)果:
思路: (使用兩階段的聚合)
- 找到發(fā)生數(shù)據(jù)傾斜的key
- 對(duì)發(fā)生傾斜的數(shù)據(jù)的key進(jìn)行拆分
- 做局部聚合
- 去后綴
- 全局聚合
以上面的wordcount為例,找出相應(yīng)的數(shù)據(jù)量比較大的單詞
代碼實(shí)現(xiàn):
object SparkSqlTest {
def main(args: Array[String]): Unit = {
//屏蔽多余的日志
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.project-spark").setLevel(Level.WARN)
//構(gòu)建編程入口
val conf: SparkConf = new SparkConf()
conf.setAppName("SparkSqlTest")
.setMaster("local[2]")
val spark: SparkSession = SparkSession.builder().config(conf)
.enableHiveSupport()
.getOrCreate()
//創(chuàng)建sqlcontext對(duì)象
val sqlContext: SQLContext = spark.sqlContext
//注冊(cè)UDF
sqlContext.udf.register[String,String,Integer]("add_prefix",add_prefix)
sqlContext.udf.register[String,String]("remove_prefix",remove_prefix)
//創(chuàng)建sparkContext對(duì)象
val sc: SparkContext = spark.sparkContext
val lineRDD: RDD[String] = sc.textFile("C:\\z_data\\test_data\\ip.txt")
//找出數(shù)據(jù)傾斜的單詞
val wordsRDD: RDD[String] = lineRDD.flatMap(line => {
line.split("\\s+")
})
val sampleRDD: RDD[String] = wordsRDD.sample(false,0.2)
val sortRDD: RDD[(String, Int)] = sampleRDD.map(word=>(word,1)).reduceByKey(_+_).sortBy(kv=>kv._2,false)
val hot_word = sortRDD.take(1)(0)._1
val bs: Broadcast[String] = sc.broadcast(hot_word)
import spark.implicits._
//將數(shù)據(jù)傾斜的key打標(biāo)簽
val lineDF: DataFrame = sqlContext.read.text("C:\\z_data\\test_data\\ip.txt")
val wordDF: Dataset[String] = lineDF.flatMap(row => {
row.getAs[String](0).split("\\s+")
})
//有數(shù)據(jù)傾斜的word
val hotDS: Dataset[String] = wordDF.filter(row => {
val hot_word = bs.value
row.equals(hot_word)
})
val hotDF: DataFrame = hotDS.toDF("word")
hotDF.createTempView("hot_table")
//沒(méi)有數(shù)據(jù)傾斜的word
val norDS: Dataset[String] = wordDF.filter(row => {
val hot_word = bs.value
!row.equals(hot_word)
})
val norDF: DataFrame = norDS.toDF("word")
norDF.createTempView("nor_table")
var sql=
"""
|(select
|t3.word,
|sum(t3.counts) counts
|from (select
|remove_prefix(t2.newword) word,
|t2.counts
|from (select
|t1.newword newword,
|count(1) counts
|from
|(select
|add_prefix(word,3) newword
|from hot_table) t1
|group by t1.newword) t2) t3
|group by t3.word)
|union
|(select
| word,
| count(1) counts
|from nor_table
|group by word)
""".stripMargin
spark.sql(sql).show()
}
//自定義UDF加前綴
def add_prefix(word:String,range:Integer): String ={
val random=new Random()
random.nextInt(range)+"_"+word
}
//自定義UDF去除后綴
def remove_prefix(word:String): String ={
word.substring(word.indexOf("_")+1)
}
}
結(jié)果:
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。