溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Spark SQL編程的示例分析

發(fā)布時(shí)間:2021-12-14 10:13:01 來(lái)源:億速云 閱讀:149 作者:小新 欄目:云計(jì)算

這篇文章將為大家詳細(xì)講解有關(guān)Spark SQL編程的示例分析,小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。

#Spark SQL 編程指南#

##簡(jiǎn)介## Spark SQL支持在Spark中執(zhí)行SQL,或者HiveQL的關(guān)系查詢表達(dá)式。它的核心組件是一個(gè)新增的RDD類型JavaSchemaRDD。JavaSchemaRDD由Row對(duì)象和表述這個(gè)行的每一列的數(shù)據(jù)類型的schema組成。一個(gè)JavaSchemaRDD類似于傳統(tǒng)關(guān)系數(shù)據(jù)庫(kù)的一個(gè)表。JavaSchemaRDD可以通過(guò)一個(gè)已存在的RDD,Parquet文件,JSON數(shù)據(jù)集,或者通過(guò)運(yùn)行HiveSQL獲得存儲(chǔ)在Apache Hive上的數(shù)據(jù)創(chuàng)建。

Spark SQL目前是一個(gè)alpha組件。盡管我們會(huì)盡量減少API變化,但是一些API任然后再以后的發(fā)布中改變。

##入門## 在Spark中,所有關(guān)系函數(shù)功能的入口點(diǎn)是JavaSQLContext類?;蛘咚淖宇悺R?jiǎng)?chuàng)建一個(gè)基本的JavaSQLContext,所有你需要的只是一個(gè)JavaSparkContext。

JavaSparkContext sc = ...; // An existing JavaSparkContext.
JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc);

##數(shù)據(jù)源## Spark SQL支持通過(guò)JavaSchemaRDD接口操作各種各樣的數(shù)據(jù)源。一單一個(gè)數(shù)據(jù)集被加載,它可以被注冊(cè)成一個(gè)表,甚至和來(lái)自其他源的數(shù)據(jù)連接。

###RDDs### Spark SQL支持的表的其中一個(gè)類型是由JavaBeans的RDD。BeanInfo定義了這個(gè)表的schema。現(xiàn)在 ,Spark SQL 不支持包括嵌套或者復(fù)雜類型例如Lists或者Arrays的JavaBeans。你可以通過(guò)創(chuàng)建一個(gè)實(shí)現(xiàn)了Serializable并且它的所有字段都有g(shù)etters和setters方法的類類創(chuàng)建一個(gè)JavaBeans。

public static class Person implements Serializable {
  private String name;
  private int age;

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public int getAge() {
    return age;
  }

  public void setAge(int age) {
    this.age = age;
  }
}

一個(gè)schema可以被應(yīng)用在一個(gè)已存在的RDD上,通過(guò)調(diào)用applySchema并且提供這個(gè)JavaBean的類對(duì)象。

// sc is an existing JavaSparkContext.
JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc)

// Load a text file and convert each line to a JavaBean.
JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map(
  new Function<String, Person>() {
    public Person call(String line) throws Exception {
      String[] parts = line.split(",");

      Person person = new Person();
      person.setName(parts[0]);
      person.setAge(Integer.parseInt(parts[1].trim()));

      return person;
    }
  });

// Apply a schema to an RDD of JavaBeans and register it as a table.
JavaSchemaRDD schemaPeople = sqlContext.applySchema(people, Person.class);
schemaPeople.registerAsTable("people");

// SQL can be run over RDDs that have been registered as tables.
JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
  public String call(Row row) {
    return "Name: " + row.getString(0);
  }
}).collect();

注意,Spark SQL目前使用一個(gè)非常簡(jiǎn)單的SQL解析器。用戶如果想獲得一個(gè)更加完整的SQL方言,應(yīng)該看看HiveContext提供的HiveQL支持。

###Parquet Files### Parquet是一個(gè)columnar格式,并且被許多其他數(shù)據(jù)處理系統(tǒng)支持。Spark SQL對(duì)讀寫Parquet文件提供支持,并且自動(dòng)保存原始數(shù)據(jù)的Schema。通過(guò)下面的例子使用數(shù)據(jù):

// sqlContext from the previous example is used in this example.

JavaSchemaRDD schemaPeople = ... // The JavaSchemaRDD from the previous example.

// JavaSchemaRDDs can be saved as Parquet files, maintaining the schema information.
schemaPeople.saveAsParquetFile("people.parquet");

// Read in the Parquet file created above.  Parquet files are self-describing so the schema is preserved.
// The result of loading a parquet file is also a JavaSchemaRDD.
JavaSchemaRDD parquetFile = sqlContext.parquetFile("people.parquet");

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerAsTable("parquetFile");
JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
  public String call(Row row) {
    return "Name: " + row.getString(0);
  }
}).collect();

###JSON Datasets### Spark SQL可以自動(dòng)推斷一個(gè)JSON數(shù)據(jù)集的schema,并加載成一個(gè)JavaSchemaRDD。這個(gè)轉(zhuǎn)換可以通過(guò)JavaSQLContext中的兩個(gè)方法中的一個(gè)完成:

  • jsonFile -從一個(gè)目錄下的文件中加載數(shù)據(jù),這個(gè)文件中的每一行都是一個(gè)JSON對(duì)象。

  • jsonRdd -從一個(gè)已存在的RDD加載數(shù)據(jù),這個(gè)RDD中的每一個(gè)元素是一個(gè)包含一個(gè)JSON對(duì)象的String。

     // sc is an existing JavaSparkContext.
     JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc);
    
     // A JSON dataset is pointed to by path.
     // The path can be either a single text file or a directory storing text files.
     String path = "examples/src/main/resources/people.json";
     // Create a JavaSchemaRDD from the file(s) pointed to by path
     JavaSchemaRDD people = sqlContext.jsonFile(path);
    
     // The inferred schema can be visualized using the printSchema() method.
     people.printSchema();
     // root
     //  |-- age: IntegerType
     //  |-- name: StringType
    
     // Register this JavaSchemaRDD as a table.
     people.registerAsTable("people");
    
     // SQL statements can be run by using the sql methods provided by sqlContext.
     JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
    
     // Alternatively, a JavaSchemaRDD can be created for a JSON dataset represented by
     // an RDD[String] storing one JSON object per string.
     List<String> jsonData = Arrays.asList(
       "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
     JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData);
     JavaSchemaRDD anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD);


###Hive Tables### Spark SQL也支持讀和寫存儲(chǔ)在apache Hive中的數(shù)據(jù)。然而,由于Hive有一個(gè)非常大的依賴,他沒有在Spark默認(rèn)寶中包括。為了使用Hive,你必須運(yùn)行‘SPARK_HIVE=true sbt/sbt assembly/assembly'(或者對(duì)Maven使用 -Phive)。這個(gè)命令構(gòu)建一個(gè)包含Hive的assembly。注意,這個(gè)Hive assembly 必須放在所有的工作節(jié)點(diǎn)上,因?yàn)樗鼈冃枰L問Hive的序列化和方序列化包(SerDes),以此訪問存儲(chǔ)在Hive中的數(shù)據(jù)。

可以通過(guò)conf目錄下的hive-site.xml文件完成Hive配置 。

要和Hive配合工作,你需要構(gòu)造一個(gè)JavaHiveContext,它繼承了JavaSQLContext,并且添加了發(fā)現(xiàn)MetaStore中的表和使用HiveQL編寫查詢的功能。此外,除了sql方法,JavaHiveContext方法還提供了一個(gè)hql方法,它允許查詢使用HiveQL表達(dá)。

##Writing Language-Integrated Relational Queries## Language-Integrated查詢目前只在Scala中被支持。

Spark SQL同樣支持使用領(lǐng)域特定的語(yǔ)言來(lái)編寫查詢。再次,使用上面例子中的數(shù)據(jù):

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Importing the SQL context gives access to all the public SQL functions and implicit conversions.
import sqlContext._
val people: RDD[Person] = ... // An RDD of case class objects, from the first example.

// The following is the same as 'SELECT name FROM people WHERE age >= 10 AND age <= 19'
val teenagers = people.where('age >= 10).where('age <= 19).select('name)
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

DSL使用Scala中得到標(biāo)記來(lái)表示基礎(chǔ)表中的表,他們使用一個(gè)前綴’標(biāo)識(shí)。隱式轉(zhuǎn)換這些標(biāo)記為被SQL 執(zhí)行引擎評(píng)估的表達(dá)式。支持這些功能的完成列表可以再ScalaDoc找到。

關(guān)于“Spark SQL編程的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI