溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

spark-sql的進(jìn)階案例

發(fā)布時(shí)間:2020-05-25 06:38:17 來(lái)源:網(wǎng)絡(luò) 閱讀:808 作者:原生zzy 欄目:大數(shù)據(jù)

(1)骨灰級(jí)案例--UDTF求wordcount

數(shù)據(jù)格式:
spark-sql的進(jìn)階案例
每一行都是字符串并且以空格分開(kāi)。
代碼實(shí)現(xiàn):

object SparkSqlTest {
    def main(args: Array[String]): Unit = {
        //屏蔽多余的日志
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.project-spark").setLevel(Level.WARN)
        //構(gòu)建編程入口
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparkSqlTest")
            .setMaster("local[2]")

        val spark: SparkSession = SparkSession.builder().config(conf)
            .enableHiveSupport()
            .getOrCreate()

        //創(chuàng)建sqlcontext對(duì)象
        val sqlContext: SQLContext = spark.sqlContext
        val wordDF: DataFrame = sqlContext.read.text("C:\\z_data\\test_data\\ip.txt").toDF("line")
        wordDF.createTempView("lines")
        val sql=
            """
              |select t1.word,count(1) counts
              |from (
              |select explode(split(line,'\\s+')) word
              |from lines) t1
              |group by t1.word
              |order by counts
            """.stripMargin
        spark.sql(sql).show()
    }
}

結(jié)果:
spark-sql的進(jìn)階案例

(2)窗口函數(shù)求topN

數(shù)據(jù)格式:
spark-sql的進(jìn)階案例
取每門(mén)課程中成績(jī)最好的前三
代碼實(shí)現(xiàn):

object SparkSqlTest {
    def main(args: Array[String]): Unit = {
        //屏蔽多余的日志
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.project-spark").setLevel(Level.WARN)
        //構(gòu)建編程入口
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparkSqlTest")
            .setMaster("local[2]")

        val spark: SparkSession = SparkSession.builder().config(conf)
            .enableHiveSupport()
            .getOrCreate()

        //創(chuàng)建sqlcontext對(duì)象
        val sqlContext: SQLContext = spark.sqlContext
        val topnDF: DataFrame = sqlContext.read.json("C:\\z_data\\test_data\\score.json")
        topnDF.createTempView("student")
        val sql=
            """select
              |t1.course course,
              |t1.name name,
              |t1.score score
              |from (
              |select
              |course,
              |name,
              |score,
              |row_number() over(partition by course order by score desc ) top
              |from student) t1 where t1.top<=3
            """.stripMargin
        spark.sql(sql).show()
    }
}

結(jié)果:
spark-sql的進(jìn)階案例

(3)SparkSQL去處理DataSkew數(shù)據(jù)傾斜的問(wèn)題

思路: (使用兩階段的聚合)
 - 找到發(fā)生數(shù)據(jù)傾斜的key
 - 對(duì)發(fā)生傾斜的數(shù)據(jù)的key進(jìn)行拆分
 - 做局部聚合
 - 去后綴
 - 全局聚合
以上面的wordcount為例,找出相應(yīng)的數(shù)據(jù)量比較大的單詞
代碼實(shí)現(xiàn):

object SparkSqlTest {
    def main(args: Array[String]): Unit = {
        //屏蔽多余的日志
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.project-spark").setLevel(Level.WARN)
        //構(gòu)建編程入口
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparkSqlTest")
            .setMaster("local[2]")

        val spark: SparkSession = SparkSession.builder().config(conf)
            .enableHiveSupport()
            .getOrCreate()
        //創(chuàng)建sqlcontext對(duì)象
        val sqlContext: SQLContext = spark.sqlContext
        //注冊(cè)UDF
        sqlContext.udf.register[String,String,Integer]("add_prefix",add_prefix)
        sqlContext.udf.register[String,String]("remove_prefix",remove_prefix)
        //創(chuàng)建sparkContext對(duì)象
        val sc: SparkContext = spark.sparkContext
        val lineRDD: RDD[String] = sc.textFile("C:\\z_data\\test_data\\ip.txt")
        //找出數(shù)據(jù)傾斜的單詞
        val wordsRDD: RDD[String] = lineRDD.flatMap(line => {
            line.split("\\s+")
        })
        val sampleRDD: RDD[String] = wordsRDD.sample(false,0.2)
        val sortRDD: RDD[(String, Int)] = sampleRDD.map(word=>(word,1)).reduceByKey(_+_).sortBy(kv=>kv._2,false)
        val hot_word = sortRDD.take(1)(0)._1
        val bs: Broadcast[String] = sc.broadcast(hot_word)

        import spark.implicits._
        //將數(shù)據(jù)傾斜的key打標(biāo)簽
        val lineDF: DataFrame = sqlContext.read.text("C:\\z_data\\test_data\\ip.txt")
        val wordDF: Dataset[String] = lineDF.flatMap(row => {
            row.getAs[String](0).split("\\s+")
        })
        //有數(shù)據(jù)傾斜的word
        val hotDS: Dataset[String] = wordDF.filter(row => {
            val hot_word = bs.value
            row.equals(hot_word)
        })
        val hotDF: DataFrame = hotDS.toDF("word")
        hotDF.createTempView("hot_table")
        //沒(méi)有數(shù)據(jù)傾斜的word
        val norDS: Dataset[String] = wordDF.filter(row => {
            val hot_word = bs.value
            !row.equals(hot_word)
        })
        val norDF: DataFrame = norDS.toDF("word")
        norDF.createTempView("nor_table")
        var sql=
            """
              |(select
              |t3.word,
              |sum(t3.counts) counts
              |from (select
              |remove_prefix(t2.newword) word,
              |t2.counts
              |from (select
              |t1.newword newword,
              |count(1) counts
              |from
              |(select
              |add_prefix(word,3) newword
              |from hot_table) t1
              |group by t1.newword) t2) t3
              |group by t3.word)
              |union
              |(select
              | word,
              | count(1) counts
              |from nor_table
              |group by word)
            """.stripMargin
        spark.sql(sql).show()

    }
    //自定義UDF加前綴
    def add_prefix(word:String,range:Integer): String ={
        val random=new Random()
        random.nextInt(range)+"_"+word
    }
    //自定義UDF去除后綴
    def remove_prefix(word:String): String ={
        word.substring(word.indexOf("_")+1)
    }
}

結(jié)果:
spark-sql的進(jìn)階案例

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI