溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Spark-sql如何創(chuàng)建外部分區(qū)表

發(fā)布時(shí)間:2021-11-19 09:30:24 來源:億速云 閱讀:1055 作者:小新 欄目:云計(jì)算

這篇文章主要為大家展示了“Spark-sql如何創(chuàng)建外部分區(qū)表”,內(nèi)容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“Spark-sql如何創(chuàng)建外部分區(qū)表”這篇文章吧。

一、Spark-sql創(chuàng)建外部分區(qū)表

1.使用spark-sql

spark-sql --queue spark --master yarn --deploy-mode client --num-executors 10  --executor-cores 2 --executor-memory 3G

2.spark-sql中創(chuàng)建parquet分區(qū)

create external table pgls.convert_parq(
bill_num string,
logis_id string,
store_id string,
store_code string,
creater_id string,
order_status INT,
pay_status INT,
order_require_varieties INT,
order_require_amount decimal(19,4),
order_rec_amount decimal(19,4),
order_rec_gpf decimal(19,4),
deli_fee FLOAT,
order_type INT,
last_modify_time timestamp,
order_submit_time timestamp
) 
partitioned by(order_submit_date date)
row format serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
stored as parquetfile
location '/test/spark/convert/parquet/bill_parq/';

二、CSV轉(zhuǎn)Parquet

代碼:org.apache.spark.ConvertToParquet.scala

package org.apache.spark

import com.ecfront.fs.operation.HDFSOperation
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._

/**
* CSV 轉(zhuǎn)換為 parquet
* 參數(shù):輸入路徑, 輸出路徑, 分區(qū)數(shù)
*/
object ConvertToParquet{
def main(args: Array[String]) {
if(args.length != 3){
println("jar args: inputFiles outPath numpartitions")
System.exit(0)
}
val inputPath = args(0)
val outPath = args(1)
val numPartitions = args(2).toInt

println("==========================================")
println("=========input: "+ inputPath )
println("=========output: "+ outPath )
println("==numPartitions: "+ numPartitions )
println("==========================================")

//判斷輸出目錄是否存在,存在則刪除
val fo = HDFSOperation(new Configuration())
val existDir = fo.existDir(outPath)
if(existDir) {
println("HDFS exists outpath: " + outPath)
println("start to delete ...")
val isDelete = fo.deleteDir(outPath)
if(isDelete){
println(outPath +" delete done. ")
}
}

val conf = new SparkConf()
val sc = new SparkContext(conf) //參數(shù)SparkConf創(chuàng)建SparkContext,
val sqlContext = new SQLContext(sc) //參數(shù)SparkContext創(chuàng)建SQLContext

val schema = StructType(Array(
StructField("bill_num",DataTypes.StringType,false),
StructField("logis_id",DataTypes.StringType,false),
StructField("store_id",DataTypes.StringType,false),
StructField("store_code",DataTypes.StringType,false),
StructField("creater_id",DataTypes.StringType,false),
StructField("order_status",DataTypes.IntegerType,false),
StructField("pay_status",DataTypes.IntegerType,false),
StructField("order_require_varieties",DataTypes.IntegerType,false),
StructField("order_require_amount",DataTypes.createDecimalType(19,4),false),
StructField("order_rec_amount",DataTypes.createDecimalType(19,4),false),
StructField("order_rec_gpf",DataTypes.createDecimalType(19,4),false),
StructField("deli_fee",DataTypes.FloatType,false),
StructField("order_type",DataTypes.IntegerType,false),
StructField("last_modify_time",DataTypes.TimestampType,false),
StructField("order_submit_time",DataTypes.TimestampType,false),
StructField("order_submit_date",DataTypes.DateType,false)))

convert(sqlContext, inputPath, schema, outPath, numPartitions)
}

//CSV轉(zhuǎn)換為parquet
def convert(sqlContext: SQLContext, inputpath: String, schema: StructType, outpath: String, numPartitions: Int) {
// 將text導(dǎo)入到DataFrame
val df = sqlContext.read.format("com.databricks.spark.csv").
schema(schema).option("delimiter", ",").load(inputpath)
// 轉(zhuǎn)換為parquet
// df.write.parquet(outpath) // 轉(zhuǎn)換時(shí)以block數(shù)為分區(qū)數(shù)
df.coalesce(numPartitions).write.parquet(outpath) //自定義分區(qū)數(shù)
}

}
打包后jar上傳至本地目錄:
/soft/sparkTest/convert/spark.mydemo-1.0-SNAPSHOT.jar

事先在HDFS上生成CSV文件,HDFS目錄:
/test/spark/convert/data/order/2016-05-01/

執(zhí)行命令:
spark-submit --queue spark --master yarn --num-executors 10  --executor-cores 2 --executor-memory 3G --class org.apache.spark.ConvertToParquet --name ConvertToParquet file:/soft/sparkTest/convert/spark.mydemo-1.0-SNAPSHOT.jar  /test/spark/convert/data/order/2016-05-01/ /test/spark/convert/parquet/bill_parq/order_submit_date=2016-05-01

pom.xml相關(guān)內(nèi)容:

1.依賴包:

<dependencies>
<!-- 操作HDFS -->
<dependency>
        <groupId>com.ecfront</groupId>
        <artifactId>ez-fs</artifactId>
        <version>0.9</version>
    </dependency>

<!--spark -->
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.6.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.6.1</version>
    </dependency>

<!--spark csv-->
<dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-csv_2.11</artifactId>
        <version>1.4.0</version>
    </dependency>

<!--hadoop -->
<dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.6.0</version>
    </dependency>
</dependencies>

    2.plugins(含打入依賴包)

<build>
    <pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.1</version>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.0.2</version>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>1.4</version>
                <configuration>
                    <filters>
                        <filter>
                            <artifact>*:*</artifact>
                            <excludes>
                                <exclude>META-INF/*.SF</exclude>
                                <exclude>META-INF/*.DSA</exclude>
                                <exclude>META-INF/*.RSA</exclude>
                            </excludes>
                        </filter>
                    </filters>
                </configuration>
            </plugin>
        </plugins>
    </pluginManagement>
    <plugins>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <executions>
                <execution>
                    <id>scala-compile-first</id>
                    <phase>process-resources</phase>
                    <goals>
                        <goal>add-source</goal>
                        <goal>compile</goal>
                    </goals>
                </execution>
                <execution>
                    <id>scala-test-compile</id>
                    <phase>process-test-resources</phase>
                    <goals>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <executions>
                <execution>
                    <phase>compile</phase>
                    <goals>
                        <goal>compile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>1.4</version>
            <configuration>
                <createDependencyReducedPom>true</createDependencyReducedPom>
            </configuration>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers>
                            <transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            <transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                            <mainClass>org.apache.spark.ConvertToParquet</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

三、表添加分區(qū)

spark-sql下執(zhí)行

alter table pgls.convert_parq add partition(order_submit_date='2016-05-01');

可通過sql查詢到相應(yīng)數(shù)據(jù):

select * from pgls.convert_parq where order_submit_date='2016-05-01' limit 5;

Spark-sql如何創(chuàng)建外部分區(qū)表

以上是“Spark-sql如何創(chuàng)建外部分區(qū)表”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI