溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark?SQL配置及使用的方法是什么

發(fā)布時間:2021-12-03 15:09:13 來源:億速云 閱讀:320 作者:iii 欄目:開發(fā)技術

本篇內(nèi)容介紹了“Spark SQL配置及使用的方法是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!

    XY個人記

    SparkSQL是spark的一個模塊,主入口是SparkSession,將SQL查詢與Spark程序無縫混合。DataFrames和SQL提供了訪問各種數(shù)據(jù)源(通過JDBC或ODBC連接)的常用方法包括Hive,Avro,Parquet,ORC,JSON和JDBC。您甚至可以跨這些來源加入數(shù)據(jù)。以相同方式連接到任何數(shù)據(jù)源。Spark SQL還支持HiveQL語法以及Hive SerDes和UDF,允許您訪問現(xiàn)有的Hive倉庫。

    Spark SQL包括基于成本的優(yōu)化器,列式存儲和代碼生成,以快速進行查詢。同時,它使用Spark引擎擴展到數(shù)千個節(jié)點和多小時查詢,該引擎提供完整的中間查詢?nèi)蒎e。不要擔心使用不同的引擎來獲取歷史數(shù)據(jù)。

    Spark?SQL配置及使用的方法是什么

    SparkSQL版本: 

        Spark2.0之前
    入口:SQLContext和HiveContext
    SQLContext:主要DataFrame的構(gòu)建以及DataFrame的執(zhí)行,SQLContext指的是spark中SQL模塊的程序入口
    HiveContext:是SQLContext的子類,專門用于與Hive的集成,比如讀取Hive的元數(shù)據(jù),數(shù)據(jù)存儲到Hive表、Hive的窗口分析函數(shù)等

        Spark2.0之后
    入口:SparkSession(spark應用程序的一個整體入口),合并了SQLContext和HiveContext

        SparkSQL核心抽象:DataFrame/Dataset     type DataFrame = Dataset[Row]    //type 給某個數(shù)據(jù)類型起個別名

    SparkSQL DSL語法 

    SparkSQL除了支持直接的HQL語句的查詢外,還支持通過DSL語句/API進行數(shù) 據(jù)的操作,主要DataFrame API列表如下:

    select:類似于HQL語句中的select,獲取需要的字段信息

    where/filter:類似HQL語句中的where語句,根據(jù)給定條件過濾數(shù)據(jù)

    sort/orderBy: 全局數(shù)據(jù)排序功能,類似Hive中的order by語句,按照給定字段進行全部 數(shù)據(jù)的排序

    sortWithinPartitions:類似Hive的sort by語句,按照分區(qū)進行數(shù)據(jù)排序

    groupBy:數(shù)據(jù)聚合操作

    limit:獲取前N條數(shù)據(jù)記錄

    SparkSQL和Hive的集成

    集成步驟:
    -1. namenode和datanode啟動
    -2. 將hive配置文件軟連接或者復制到spark的conf目錄下面

    $ ln -s /opt/modules/apache/hive-1.2.1/conf/hive-site.xml 
    or
    $ cp /opt/modules/apache/hive-1.2.1/conf/hive-site.xml ./

            -3. 根據(jù)hive-site.xml中不同配置項,采用不同策略操作
    根據(jù)hive.metastore.uris參數(shù)
    -a. 當hive.metastore.uris參數(shù)為空的時候(默認值)
    將Hive元數(shù)據(jù)庫的驅(qū)動jar文件添加spark的classpath環(huán)境變量中即可完成SparkSQL到hive的集成
    -b. 當hive.metastore.uris非空時候
    -1. 啟動hive的metastore服務
    ./bin/hive --service metastore &
    -2. 完成SparkSQL與Hive集成工作

            -4.啟動spark-SQL($ bin/spark-sql)時候 發(fā)現(xiàn)報錯:

    java.lang.ClassNotFoundException: org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver

            at java.net.URLClassLoader$1.run(URLClassLoader.java:366)

            at java.net.URLClassLoader$1.run(URLClassLoader.java:355)

            at java.security.AccessController.doPrivileged(Native Method)

            at java.net.URLClassLoader.findClass(URLClassLoader.java:354)

            at java.lang.ClassLoader.loadClass(ClassLoader.java:425)

            at java.lang.ClassLoader.loadClass(ClassLoader.java:358)

            at java.lang.Class.forName0(Native Method)

            at java.lang.Class.forName(Class.java:270)

            at org.apache.spark.util.Utils$.classForName(Utils.scala:228)

            at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:693)

            at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)

            at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)

            at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)

            at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

    Failed to load main class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.

    You need to build Spark with -Phive and -Phive-thriftserver.

    解決辦法:將spark源碼中sql/hive-thriftserver/target/spark-hive-thriftserver_2.11-2.0.2.jar拷貝到spark的jars目錄下

    完成。(查看數(shù)據(jù)庫 spark-sql (default)> show databases; ,它操作的都是Hive)

    Spark?SQL配置及使用的方法是什么

        編寫兩個簡單的SQL

    spark-sql (default)> select * from emp;

    Spark?SQL配置及使用的方法是什么

        也可以做兩張變的jion

    spark-sql (default)> select a.*,b.* from emp a left join dept b on a.deptno = b.deptno;

    Spark?SQL配置及使用的方法是什么

    可以對表進行一個緩存操作3

    > cache table emp;    #緩存操作
    > uncache table dept;    #清除緩存操作
    > explain select * from emp;    #執(zhí)行計劃

    我們可以看到相應的Storage信息,執(zhí)行完清除緩存操作后下面的Stages操作消失

    Spark?SQL配置及使用的方法是什么

    Spark?SQL配置及使用的方法是什么

    啟動一個Spark Shell,可以直接在shell里面編寫SQL語句

    $ bin/spark-shell
    #可以在shell里面寫sql
    scala> spark.sql("show databases").show
    scala> spark.sql("use common").show
    scala> spark.sql("select * from emp a join dept b on a.deptno = b.deptno").show

    Spark?SQL配置及使用的方法是什么

    Spark?SQL配置及使用的方法是什么

          用一個變量名稱接收DataFrame

    Spark?SQL配置及使用的方法是什么

        比如使用registerTempTable注冊一個臨時表。注:臨時表是所有數(shù)據(jù)庫公有的不需要指定數(shù)據(jù)庫

    scala> df.registerTempTable("table_regis01")

    Spark?SQL配置及使用的方法是什么

    Spark應用依賴第三方jar包文件解決方案        

    在我們的4040頁面Environment節(jié)點下的Classpath Entries節(jié)點里可以看到我們服務所依賴的jar包。http://hadoop01.com:4040/environment/

    Spark?SQL配置及使用的方法是什么

        1.直接添加驅(qū)動jar到${SPARK_HOME}/jars

        2. 使用參數(shù)--jars 添加本地jar包
    ./bin/spark-shell --jars jars/mysql-connector-java-5.1.27-bin.jar,/opt/modules/hive-1.2.1/lib/servlet-api-2.5.jar
    添加多個本地jar的話,用逗號隔開
    ./bin/spark-shell --jars jars/mysql-connector-java-5.1.27-bin.jar,/opt/modules/hive-1.2.1/lib/*
    注意:不能使用*去添加jar包,如果想要添加多個依賴jar,只能一個一個去添加

    3. 使用參數(shù)--packages添加maven中的第三方jar文件
    . bin/spark-shell --packages mysql:mysql-connector-java:5.1.28       
    可以使用逗號隔開給定多個,格式(groupId:artifactId:version)
    (底層執(zhí)行原理先從maven中央庫下載本地沒有的第三方jar文件到本地,jar文件會先下載到本地的/home/ijeffrey/.ivy2/jars目錄下,最后通過spark.jars來控制添加classpath中)
    --exclude-packages    去掉不需要的包
    --repositories maven源,指定URL連接    

    4. 使用SPARK_CLASSPATH環(huán)境變量給定jar文件路徑    
    編輯spark-env.sh文件
    SPARK_CLASSPATH=/opt/modules/apache/spark-2.0.2/external_jars/*          外部jar的路徑
    5. 將第三方jar文件打包到最終的jar文件中    
    在IDEA中添加依賴jar到最終的需要運行的spark應用的jar中

    SparkSQL的ThriftServer服務

        ThriftServer底層就是Hive的HiveServer2服務,下面是客戶端連接Hive Server2 方法的相關連接
    https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients#HiveServer2Clients-JDBC    
    https://cwiki.apache.org/confluence/display/Hive/LanguageManual+WindowingAndAnalytics    #hiveserver2的配置
    https://cwiki.apache.org/confluence/display/Hive/Setting+Up+HiveServer2   

        配置:
    -1. ThriftServer服務運行的Spark環(huán)境必須完成SparkSQL和Hive的集成
    -2. hive-site.xml中配置hiveserver2服務的相關參數(shù)

    <!-- 監(jiān)聽的端口號 -->
    <property>
    	<name>hive.server2.thrift.bind.port</name>
    	<value>10000</value>
    </property>
    <!-- 監(jiān)聽的主機名 -->
    <property>
    	<name>hive.server2.thrift.bind.host</name>
    	<value>hadoop01.com</value>
    </property>

            -3. 啟動hive的元數(shù)據(jù)服務

    $ ./bin/hive --service metastore &

            -4. 啟動spark的thriftserver服務,也是一個SparkSubmit服務

    $ sbin/start-thriftserver.sh

    Spark?SQL配置及使用的方法是什么

        也可以看到相應的WEBUI界面,比之前的多了一個JDBC/ODBC Server
    Spark?SQL配置及使用的方法是什么

    注意:如果需要啟動Spark ThriftServer 服務,需要關閉hiveserver2 服務

    SparkSQL的ThriftServer服務測試

        -1. 查看進程是否存在
    jps -ml | grep HiveThriftServer2
    -2. 查看WEB界面是否正常
    有JDBC/ODBC Server這個選項就是正常的
    -3. 通過spark自帶的beeline命令
    ./bin/beeline
    -4. 通過jdbc來訪問spark的ThriftServer接口

    Spark中beeline的使用

    $ bin/beeline    #啟動beeline
    #可以使用!help查看相應的命令
    beeline> !help
    #如connect
    beeline> !connect
    Usage: connect <url> <username> <password> [driver]
    #這樣可以多個用戶連接
    beeline> !connect jdbc:hive2://hadoop01.com:10000
    #退出
    beeline> !quit

    Spark?SQL配置及使用的方法是什么

    連接成功,在4040 頁面也可以看到我們連接的hive

    Spark?SQL配置及使用的方法是什么

    注:如果報錯
    No known driver to handle "jdbc:hive2://hadoop01.com:10000"
    說明缺少了hive的驅(qū)動jar,在我們編譯好的源碼中hive-jdbc-1.2.1.spark2.jar 找到并copy到spark的jars中

    通過jdbc來訪問spark的ThriftServer接口

    向我們java連接mysql一樣,我們使用scala來連接ThriftServer

    package com.jeffrey
     
    import java.sql.DriverManager
     
    object SparkJDBCThriftServerDemo {
        def main(args: Array[String]): Unit = {
            //1 添加驅(qū)動
            val driver = "org.apache.hive.jdbc.HiveDriver"
            Class.forName(driver)
     
            //2 構(gòu)建連接對象
            val url = "jdbc:hive2://hadoop01.com:10000"
            val conn = DriverManager.getConnection(url,"ijeffrey","123456")
     
            //3 sql 語句執(zhí)行
            conn.prepareStatement("use common").execute()
     
            var pstmt = conn.prepareStatement("select empno,ename,sal from emp")
     
            var rs = pstmt.executeQuery()
     
            while (rs.next()){
                println(s"empno = ${rs.getInt("empno")}  " +
                        s"ename=${rs.getString("ename")}   " +
                        s" sal=${rs.getDouble("sal")}")
            }
     
            println("---------------------------------------------------------------------------")
     
            pstmt = conn.prepareStatement("select empno,ename,sal from emp where sal > ? and ename = ?")
            pstmt.setDouble(1,3000)
            pstmt.setString(2,"KING")
     
            rs = pstmt.executeQuery()
     
            while (rs.next()){
                println(s"empno = ${rs.getInt("empno")}  " +
                        s"ename=${rs.getString("ename")}   " +
                        s" sal=${rs.getDouble("sal")}")
            }
     
            rs.close()
            pstmt.close()
            conn.close()
        }
    }

    執(zhí)行結(jié)果:

    Spark?SQL配置及使用的方法是什么

    SparkSQL案例

    案例一:SparkSQL讀取HDFS上Json格式的文件

        1. 將案例數(shù)據(jù)上傳到HDFS上
    樣例數(shù)據(jù)在${SPARK_HOME}/examples/src/main/resources/*

        2. 編寫SparkSQL程序
    啟動一個spark-shell進行編寫

    scala> val path = "/spark/data/people.json"
    scala> val df = spark.read.json(path)
    scala> df.registerTempTable("tmp04") //通過DataFrame注冊一個臨時表
    scala> spark.sql("show tables").show  //通過SQL語句進行操作
    scala> spark.sql("select * from tmp04").show
     
    #saveAsTable 使用之前 先要use table
    scala> spark.sql("select * from tmp04").write.saveAsTable("test01")
    #overwrite 覆蓋  append 拼接  ignore 忽略
    scala> spark.sql("select * from tmp01").write.mode("overwrite").saveAsTable("test01")
    scala> spark.sql("select * from tmp01").write.mode("append").saveAsTable("test01")
    scala> spark.sql("select * from tmp01").write.mode("ignore").saveAsTable("test01")

        saveAsTable("test01")默認保存到一張不存在的表中(test01不是臨時表),如果表存在的話就會報錯

        SaveMode四種情況:
    Append:拼接
    Overwrite: 重寫
    ErrorIfExists:如果表已經(jīng)存在,則報錯,默認就是這一種,存在即報錯
    Ignore:如果表已經(jīng)存在了,則忽略這一步操作

    除了spark.read.json的方式去讀取數(shù)據(jù)外,還可以使用spark.sql的方式直接讀取數(shù)據(jù)

    scala> spark.sql("select * from json.`/spark/data/people.json` where age is not null").show 
    +---+------+
    |age|  name|
    +---+------+
    | 30|  Andy|
    | 19|Justin|
    +---+------+
    # hdfs上的路徑使用`(反票號)引起來

    案例二:DataFrame和Dataset和RDD之間的互相轉(zhuǎn)換

        在IDEA中集成Hive的話,需要將hive-site.xml文件放到resources目錄下面

    package com.jeffrey.sql
     
    import java.util.Properties
     
    import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
     
    object HiveJoinMySQLDemo {
        def main(args: Array[String]): Unit = {
            System.setProperty("hadoop.home.dir","D:\\hadoop-2.7.3")
            // 1.構(gòu)建SparkSession
            val warehouseLocation = "/user/hive/warehouse"
     
            val spark = SparkSession
                    .builder()
                    .master("local")    //如果放到集群運行需要注釋掉
                    .appName("RDD 2 DataFrame")
                    .config("spark.sql.warehouse.dir",warehouseLocation)
                    .enableHiveSupport()
                    .getOrCreate()
     
            import spark.implicits._
            import spark.sql
     
            val url = "jdbc:mysql://hadoop01.com:3306/test"
            val table = "tb_dept"
     
            val props = new Properties()
            props.put("user","root")
            props.put("password","123456")
     
            // 1.Hive表數(shù)據(jù)導入到MySQL中    在shell中可以使用paste寫多行
            spark.read.table("common.dept")
                    .write
                    .mode(SaveMode.Overwrite)
                    .jdbc(url,table,props)
     
            // 2.Hive和MySQL的join操作
            //2.1 讀取MySQL的數(shù)據(jù)
           val df: DataFrame = spark
                    .read
                    .jdbc(url,table,props)
     
            df.createOrReplaceTempView("tmp_tb_dept")
            //2.1 數(shù)據(jù)聚合
            spark.sql(
                """
                  |select a.*,b.dname,b.loc
                  |from common.emp a
                  |join tmp_tb_dept b on a.deptno = b.deptno
                """.stripMargin).createOrReplaceTempView("tmp_emp_join_dept_result")
     
            spark.sql("select * from tmp_emp_join_dept_result").show()
     
            // 對表進行緩存的方法
            spark.read.table("tmp_emp_join_dept_result").cache()
            spark.catalog.cacheTable("tmp_emp_join_dept_result")
     
            //輸出到HDFS上
            // 方法一
            /*spark
                    .read
                    .table("tmp_emp_join_dept_result")
                    .write.parquet("/spark/sql/hive_join_mysql")*/
     
            // 方法二
            spark
                    .read
                    .table("tmp_emp_join_dept_result")
                    .write
                    .format("parquet")
                    .save(s"hdfs://hadoop01.com:8020/spark/sql/hive_join_mysql/${System.currentTimeMillis()}")
     
     
            //輸出到Hive中,并且是parquet格式 按照deptno分區(qū)
            spark
                    .read
                    .table("tmp_emp_join_dept_result")
                    .write
                    .format("parquet")
                    .partitionBy("deptno")
                    .mode(SaveMode.Overwrite)
                    .saveAsTable("hive_emp_dept")
     
            println("------------------------------------------------------------")
     
            spark.sql("show tables").show()
     
            //清空緩存
            spark.catalog.uncacheTable("tmp_emp_join_dept_result")
     
        }
    }

    可以打成jar文件放在集群上執(zhí)行

    bin/spark-submit \
    --class com.jeffrey.sql.HiveJoinMySQLDemo \
    --master yarn \
    --deploy-mode client \
    /opt/datas/jar/hivejoinmysql.jar
     
     
    bin/spark-submit \
    --class com.jeffrey.sql.HiveJoinMySQLDemo \
    --master yarn \
    --deploy-mode cluster \
    /opt/datas/logAnalyze.jar

    以上即使Spark SQL的基本使用。

    SparkSQL的函數(shù)

    HIve支持的函數(shù),SparkSQL基本都是支持的,SparkSQL支持兩種自定義函數(shù),分別是:UDF和UDAF,兩種函數(shù)都是通過SparkSession的udf屬性進行函數(shù)的注冊使用的;SparkSQL不支持UDTF函數(shù)的 自定義使用。

    ☆ UDF:一條數(shù)據(jù)輸入,一條數(shù)據(jù)輸出,一對一的函數(shù),即普通函數(shù)

    ☆ UDAF:多條數(shù)據(jù)輸入,一條數(shù)據(jù)輸出,多對一的函數(shù),即聚合函數(shù)

    “Spark SQL配置及使用的方法是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關的知識可以關注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!

    向AI問一下細節(jié)

    免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。

    AI