您好,登錄后才能下訂單哦!
這篇文章主要為大家展示了“Spark-sql如何創(chuàng)建外部分區(qū)表”,內(nèi)容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“Spark-sql如何創(chuàng)建外部分區(qū)表”這篇文章吧。
一、Spark-sql創(chuàng)建外部分區(qū)表
1.使用spark-sql
spark-sql --queue spark --master yarn --deploy-mode client --num-executors 10 --executor-cores 2 --executor-memory 3G
2.spark-sql中創(chuàng)建parquet分區(qū)表:
create external table pgls.convert_parq( bill_num string, logis_id string, store_id string, store_code string, creater_id string, order_status INT, pay_status INT, order_require_varieties INT, order_require_amount decimal(19,4), order_rec_amount decimal(19,4), order_rec_gpf decimal(19,4), deli_fee FLOAT, order_type INT, last_modify_time timestamp, order_submit_time timestamp ) partitioned by(order_submit_date date) row format serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' stored as parquetfile location '/test/spark/convert/parquet/bill_parq/';
二、CSV轉(zhuǎn)Parquet
代碼:org.apache.spark.ConvertToParquet.scala
package org.apache.spark import com.ecfront.fs.operation.HDFSOperation import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.SQLContext import org.apache.spark.sql.types._ /** * CSV 轉(zhuǎn)換為 parquet * 參數(shù):輸入路徑, 輸出路徑, 分區(qū)數(shù) */ object ConvertToParquet{ def main(args: Array[String]) { if(args.length != 3){ println("jar args: inputFiles outPath numpartitions") System.exit(0) } val inputPath = args(0) val outPath = args(1) val numPartitions = args(2).toInt println("==========================================") println("=========input: "+ inputPath ) println("=========output: "+ outPath ) println("==numPartitions: "+ numPartitions ) println("==========================================") //判斷輸出目錄是否存在,存在則刪除 val fo = HDFSOperation(new Configuration()) val existDir = fo.existDir(outPath) if(existDir) { println("HDFS exists outpath: " + outPath) println("start to delete ...") val isDelete = fo.deleteDir(outPath) if(isDelete){ println(outPath +" delete done. ") } } val conf = new SparkConf() val sc = new SparkContext(conf) //參數(shù)SparkConf創(chuàng)建SparkContext, val sqlContext = new SQLContext(sc) //參數(shù)SparkContext創(chuàng)建SQLContext val schema = StructType(Array( StructField("bill_num",DataTypes.StringType,false), StructField("logis_id",DataTypes.StringType,false), StructField("store_id",DataTypes.StringType,false), StructField("store_code",DataTypes.StringType,false), StructField("creater_id",DataTypes.StringType,false), StructField("order_status",DataTypes.IntegerType,false), StructField("pay_status",DataTypes.IntegerType,false), StructField("order_require_varieties",DataTypes.IntegerType,false), StructField("order_require_amount",DataTypes.createDecimalType(19,4),false), StructField("order_rec_amount",DataTypes.createDecimalType(19,4),false), StructField("order_rec_gpf",DataTypes.createDecimalType(19,4),false), StructField("deli_fee",DataTypes.FloatType,false), StructField("order_type",DataTypes.IntegerType,false), StructField("last_modify_time",DataTypes.TimestampType,false), StructField("order_submit_time",DataTypes.TimestampType,false), StructField("order_submit_date",DataTypes.DateType,false))) convert(sqlContext, inputPath, schema, outPath, numPartitions) } //CSV轉(zhuǎn)換為parquet def convert(sqlContext: SQLContext, inputpath: String, schema: StructType, outpath: String, numPartitions: Int) { // 將text導(dǎo)入到DataFrame val df = sqlContext.read.format("com.databricks.spark.csv"). schema(schema).option("delimiter", ",").load(inputpath) // 轉(zhuǎn)換為parquet // df.write.parquet(outpath) // 轉(zhuǎn)換時(shí)以block數(shù)為分區(qū)數(shù) df.coalesce(numPartitions).write.parquet(outpath) //自定義分區(qū)數(shù) } }
打包后jar上傳至本地目錄: /soft/sparkTest/convert/spark.mydemo-1.0-SNAPSHOT.jar 事先在HDFS上生成CSV文件,HDFS目錄: /test/spark/convert/data/order/2016-05-01/ 執(zhí)行命令:
spark-submit --queue spark --master yarn --num-executors 10 --executor-cores 2 --executor-memory 3G --class org.apache.spark.ConvertToParquet --name ConvertToParquet file:/soft/sparkTest/convert/spark.mydemo-1.0-SNAPSHOT.jar /test/spark/convert/data/order/2016-05-01/ /test/spark/convert/parquet/bill_parq/order_submit_date=2016-05-01
pom.xml相關(guān)內(nèi)容:
1.依賴包:
<dependencies> <!-- 操作HDFS --> <dependency> <groupId>com.ecfront</groupId> <artifactId>ez-fs</artifactId> <version>0.9</version> </dependency> <!--spark --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.6.1</version> </dependency> <!--spark csv--> <dependency> <groupId>com.databricks</groupId> <artifactId>spark-csv_2.11</artifactId> <version>1.4.0</version> </dependency> <!--hadoop --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0</version> </dependency> </dependencies>
2.plugins(含打入依賴包)
<build> <pluginManagement> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.1</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.0.2</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>1.4</version> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </plugin> </plugins> </pluginManagement> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>scala-test-compile</id> <phase>process-test-resources</phase> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <executions> <execution> <phase>compile</phase> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>1.4</version> <configuration> <createDependencyReducedPom>true</createDependencyReducedPom> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>org.apache.spark.ConvertToParquet</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
三、表添加分區(qū)
spark-sql下執(zhí)行
alter table pgls.convert_parq add partition(order_submit_date='2016-05-01');
可通過sql查詢到相應(yīng)數(shù)據(jù):
select * from pgls.convert_parq where order_submit_date='2016-05-01' limit 5;
以上是“Spark-sql如何創(chuàng)建外部分區(qū)表”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。