溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

大數(shù)據(jù)SparkSQl指的是什么呢

發(fā)布時(shí)間:2021-12-17 14:19:57 來源:億速云 閱讀:193 作者:柒染 欄目:大數(shù)據(jù)

這期內(nèi)容當(dāng)中小編將會給大家?guī)碛嘘P(guān)大數(shù)據(jù)SparkSQl指的是什么呢,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

Spark SQL是Spark用來處理結(jié)構(gòu)化數(shù)據(jù)的一個(gè)模塊,它提供了一個(gè)編程抽象叫做DataFrame并且作為分布式SQL查詢引擎的作用。SparkSql中返回的數(shù)據(jù)類型是DataFrame

1.1.1.   為什么要學(xué)習(xí)Spark SQL

我們已經(jīng)學(xué)習(xí)了Hive,它是將Hive SQL轉(zhuǎn)換成MapReduce然后提交到集群上執(zhí)行,大大簡化了編寫MapReduce的程序的復(fù)雜性,由于MapReduce這種計(jì)算模型執(zhí)行效率比較慢。所有Spark SQL的應(yīng)運(yùn)而生,它是將Spark SQL轉(zhuǎn)換成RDD,然后提交到集群執(zhí)行,執(zhí)行效率非常快!

HIVE:簡化編寫MapReduce的程序的復(fù)雜性

Spark SQL轉(zhuǎn)換成RDD:替代MapReduce,提高效率

Spark1.0版本開始就推出了SparkSQL,最早是叫Shark

1、內(nèi)存列存儲--可以大大優(yōu)化內(nèi)存使用效率,減少了內(nèi)存消耗,避免了gc對大量數(shù)據(jù)的性能開銷

2、字節(jié)碼生成技術(shù)(byte-code generation)--可以使用動(dòng)態(tài)字節(jié)碼生成技術(shù)來優(yōu)化性能

3、Scala代碼的優(yōu)化

  結(jié)構(gòu)化數(shù)據(jù)是指任何有結(jié)構(gòu)信息的數(shù)據(jù)。所謂結(jié)構(gòu)信息,就是每條記錄共用的已知的字段集合。當(dāng)數(shù)據(jù)符合 這樣的條件時(shí),Spark SQL 就會使得針對這些數(shù)據(jù)的讀取和查詢變得更加簡單高效。具體 來說,Spark SQL 提供了以下三大功能(見圖 9-1)。

(1) Spark SQL 可以從各種結(jié)構(gòu)化數(shù)據(jù)源(例如 JSON、Hive、Parquet 等)中讀取數(shù)據(jù)。

(2) Spark SQL 不僅支持在 Spark 程序內(nèi)使用 SQL 語句進(jìn)行數(shù)據(jù)查詢,也支持從類似商業(yè) 智能軟件 Tableau 這樣的外部工具中通過標(biāo)準(zhǔn)數(shù)據(jù)庫連接器(JDBC/ODBC)連接 Spark SQL 進(jìn)行查詢。

(3) 當(dāng)在 Spark 程序內(nèi)使用 Spark SQL 時(shí),Spark SQL 支持 SQL 與常規(guī)的 Python/Java/Scala 代碼高度整合,包括連接 RDD 與 SQL 表、公開的自定義 SQL 函數(shù)接口等。這樣一來, 許多工作都更容易實(shí)現(xiàn)了。

為了實(shí)現(xiàn)這些功能,Spark SQL 提供了一種特殊的 RDD,叫作 SchemaRDD。SchemaRDD 是存放 Row 對象的 RDD,每個(gè) Row 對象代表一行記錄。SchemaRDD 還包含記錄的結(jié)構(gòu)信 息(即數(shù)據(jù)字段)。SchemaRDD 看起來和普通的 RDD 很像,但是在內(nèi)部,SchemaRDD 可 以利用結(jié)構(gòu)信息更加高效地存儲數(shù)據(jù)。此外,SchemaRDD 還支持 RDD 上所沒有的一些新 操作,比如運(yùn)行 SQL 查詢。SchemaRDD 可以從外部數(shù)據(jù)源創(chuàng)建,也可以從查詢結(jié)果或普 通 RDD 中創(chuàng)建。

什么是DataFrames

(SparkSql中返回的數(shù)據(jù)類型它在概念上等同于關(guān)系數(shù)據(jù)庫中的表,但在查詢上進(jìn)行了優(yōu)化)

與RDD類似,DataFrame也是一個(gè)分布式數(shù)據(jù)容器。然而DataFrame更像傳統(tǒng)數(shù)據(jù)庫的二維表格,除了數(shù)據(jù)以外,還記錄數(shù)據(jù)的結(jié)構(gòu)信息,即schema。

1.1.1.   創(chuàng)建DataFrames

 在Spark SQL中SQLContext是創(chuàng)建DataFrames和執(zhí)行SQL的入口,在spark-1.6.1中已經(jīng)內(nèi)置了一個(gè)sqlContext

1.在本地創(chuàng)建一個(gè)文件,有三列,分別是id、name、age,用空格分隔,然后上傳到hdfs上

hdfs dfs -put person.txt /

2.在spark shell執(zhí)行下面命令,讀取數(shù)據(jù),將每一行的數(shù)據(jù)使用列分隔符分割

val lineRDD = sc.textFile("hdfs://node01:9000/person.txt").map(_.split(" "))

3.定義case class(相當(dāng)于表的schema

case class Person(id:Int, name:String, age:Int)

4.RDDcase class關(guān)聯(lián)

val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))

  (里面的數(shù)據(jù)是在Array中)

5.RDD轉(zhuǎn)換成DataFrame

val personDF = personRDD.toDF

6.對DataFrame進(jìn)行處理

personDF.show

val seq1 = Seq(("1","bingbing",35),("2","yuanyuan",34),("3","mimi",33))

val rdd1 =sc.parallelize(seq1)

val df = rdd1.toDF("id","name","age")

df.show

DSL:領(lǐng)域特定語言

////查看DataFrame中的內(nèi)容

大數(shù)據(jù)SparkSQl指的是什么呢

//查看DataFrame部分列中的內(nèi)容

1.

大數(shù)據(jù)SparkSQl指的是什么呢

2.

大數(shù)據(jù)SparkSQl指的是什么呢

3.

大數(shù)據(jù)SparkSQl指的是什么呢

//打印DataFrame的Schema信息

大數(shù)據(jù)SparkSQl指的是什么呢

//查詢所有的name和age,并將age+1

1.df.select(col("id"),col("name"),col("age")+1).show

大數(shù)據(jù)SparkSQl指的是什么呢

2.df.select(df("id"), df("name"), df("age") + 1).show

大數(shù)據(jù)SparkSQl指的是什么呢

//過濾age大于等于18的

df.filter(col("age") >= 35).show

大數(shù)據(jù)SparkSQl指的是什么呢

//按年齡進(jìn)行分組并統(tǒng)計(jì)相同年齡的人數(shù)

df.groupBy("age").count().show()

大數(shù)據(jù)SparkSQl指的是什么呢

SQL風(fēng)格語法

//查詢年齡最大的前兩名

1.如果想使用SQL風(fēng)格的語法,需要將DataFrame注冊成表

df.registerTempTable("t_person")

2.sqlContext.sql("select * from t_person order by age desc limit 2").show

大數(shù)據(jù)SparkSQl指的是什么呢

//顯示表的Schema信息

大數(shù)據(jù)SparkSQl指的是什么呢

以編程方式執(zhí)行Spark SQL查詢

1.編寫Spark SQL查詢程序

1.通過反射推斷Schema

=======================================================

package com.qf.gp1708.day06

//通過反射獲取用戶信息

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{DataFrame, SQLContext}

import org.apache.spark.{SparkConf, SparkContext}

object InferSchema {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()

      .setMaster("local")

      .setAppName("inferschema")

    val sc = new SparkContext(conf)

    val sqlContext:SQLContext = new SQLContext(sc)

  1.  //獲取數(shù)據(jù)并切分

    val line = sc.textFile("C://Users/Song/Desktop/person.txt").map(_.split(","))

   3 //將獲取的數(shù)據(jù)和Person樣例類進(jìn)行關(guān)聯(lián)

    val personRdd: RDD[Godness] = line.map(arr=>Godness(arr(0).toLong,arr(1),arr(2).toInt,arr(3).toInt))

    //引入隱式轉(zhuǎn)換函數(shù),這樣才可以調(diào)用到toDF方法

    import sqlContext.implicits._

   4 //將personRDD轉(zhuǎn)換成DataFrame

    val dF: DataFrame = personRdd.toDF

  5.  //注冊一張臨時(shí)表

    dF.registerTempTable("t_person")

    val sql = "select * from t_person where fv > 70 order by age"

    //查詢

    val res: DataFrame = sqlContext.sql(sql)

    res.show()

    sc.stop()

  }

}

2//創(chuàng)建樣例類

case class Godness(id:Long,name:String,age:Int,fv:Int)

=========================================================

2.通過StructType直接指定Schema

===========================================

package com.qf.gp1708.day06

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{DataFrame, Row, SQLContext}

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

import org.apache.spark.{SparkConf, SparkContext}

/**

  * 通過StructType類型直接指定Schema

  */

object StructTypeSchema {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()

      .setAppName("str")

      .setMaster("local")

    val sc = new SparkContext(conf)

    val sqlContext = new SQLContext(sc)

    //獲取數(shù)據(jù)并切分

    val lines = sc.textFile("hdfs://...").map(_.split(","))

    //指定schema信息

    StructType{

      List(

        StructField("id",IntegerType,false),

        StructField("name",StringType,true),

        StructField("age",IntegerType,true),

        StructField("fv",IntegerType,true),

      )

    }

    //開始映射

    val rowRDD: RDD[Row] = lines.map(arr =>Row(arr(0).toInt,arr(1),arr(2).toInt,arr(3).toInt))

    //把RDD轉(zhuǎn)換為DataFrame

    val personDF: DataFrame = sqlContext.createDataFrame(rowRDD,schema)

    //生成臨時(shí)表

    personDF.registerTempTable("t_person")

    val sql = "select name,age,fv from t_person where age >30 order by age desc"

    val res = sqlContext.sql(sql)

    res.write.mode("append").json("c://out-20180903-1")

    sc.stop()

  }

}

=================================================================

1.   數(shù)據(jù)源

1.1. JDBC

Spark SQL可以通過JDBC從關(guān)系型數(shù)據(jù)庫中讀取數(shù)據(jù)的方式創(chuàng)建DataFrame,通過對DataFrame一系列的計(jì)算后,還可以將數(shù)據(jù)再寫回關(guān)系型數(shù)據(jù)庫中。

1.1.1.   MySQL中加載數(shù)據(jù)(Spark Shell方式)

1.啟動(dòng)Spark Shell,必須指定mysql連接驅(qū)動(dòng)jar包

/usr/local/spark-1.6.1-bin-hadoop2.6/bin/spark-shell \

--master spark://node01:7077 \

--jars /usr/local/spark-1.6.1-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar \

  (指定MySQL包)

--driver-class-path /usr/local/spark-1.6.1-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar (指定驅(qū)動(dòng)類)

2.從mysql中加載數(shù)據(jù)

val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:mysql://node03:3306/bigdata", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "person", "user" -> "root", "password" -> "root")).load()

3.執(zhí)行查詢

jdbcDF.show()

1.1.2.   將數(shù)據(jù)寫入到MySQL中(打jar包方式)

package com.qf.gp1708.day06

import java.util.Properties

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{Row, SQLContext}

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

import org.apache.spark.{SparkConf, SparkContext}

/**

  * 寫入數(shù)據(jù)到MySQL

  */

object InsertData2MySQLDemo {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("").setMaster("local[2]")

    val sc = new SparkContext(conf)

    val sqlContext = new SQLContext(sc)

    val lines= sc.textFile("").map(_.split(","))

    //生成Schema

    val schema = StructType {

      Array(

        StructField("name", StringType, true),

        StructField("age", IntegerType, true),

        StructField("fv", StringType, true),

      )

    }

    //映射

    val personRDD = lines.map(arr =>Row(arr(1).toString,arr(2).toInt,arr(3).toInt))

    //生成DataFrame

    val personDF = sqlContext.createDataFrame(personRDD,schema)

    //生成用于寫入MySQL的配置信息

    val prop = new Properties()

    prop.put("user","root")

    prop.put("password","root")

    prop.put("driver","com.mysql.jdbc.Driver")

    val jdbcUrl="jdbc:mysql://hadoop03:3306/bigdata"

    val table="person"

    //把數(shù)據(jù)寫入MySQL

    personDF.write.mode("append").jdbc(jdbcUrl,table,prop)

    sc.stop()

  }

}

/usr/local/spark-1.6.3-bin-hadoop2.6/spark-submit \

--class com.qf..... \

--master spark://hadoop01:7077 \

--executor-memory 512m \

--total-executor-cores 2 \

--jars /usr/.../mysql-connector-java-5.1.35-bin.jar \

--driver-class-path /usr/.../mysql-connector-java-5.1.35-bin.jar \

/root/1.jar

=======================================================

kafka:消息中間件(緩存數(shù)據(jù))---解耦

  為處理實(shí)時(shí)數(shù)據(jù)提供一個(gè)統(tǒng)一、高吞吐量、低等待的平臺

  3、為什么需要消息隊(duì)列(重要、了解)

  消息系統(tǒng)的核心作用就是三點(diǎn):解耦,異步和并行

  Kafka對消息保存時(shí)根據(jù)Topic進(jìn)行歸類

  Topic:底層就是隊(duì)列,將不同的消息放在不同的隊(duì)列中進(jìn)行分類

上述就是小編為大家分享的大數(shù)據(jù)SparkSQl指的是什么呢了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI