您好,登錄后才能下訂單哦!
如何進(jìn)行SparkSQL與Hive metastore Parquet轉(zhuǎn)換的分析,相信很多沒有經(jīng)驗(yàn)的人對(duì)此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個(gè)問題。
這里從表schema的處理角度而言,就必須注意Hive和Parquet兼容性,主要有兩個(gè)區(qū)別:
1.Hive是大小寫敏感的,但Parquet相反
2.Hive會(huì)將所有列視為nullable,但是nullability在parquet里有獨(dú)特的意義
1.有相同名字的字段必須要有相同的數(shù)據(jù)類型,忽略nullability。兼容處理的字段應(yīng)該保持Parquet側(cè)的數(shù)據(jù)類型,這樣就可以處理到nullability類型了(空值問題)
2.兼容處理的schema應(yīng)只包含在Hive元數(shù)據(jù)里的schema信息,主要體現(xiàn)在以下兩個(gè)方面:
(1)只出現(xiàn)在Parquet schema的字段會(huì)被忽略
// 第一種方式應(yīng)用的比較多1. sparkSession.catalog.refreshTable(s"${dbName.tableName}")2. sparkSession.catalog.refreshByPath(s"${path}")
最后說一下最近后臺(tái)小伙伴在生產(chǎn)中遇到的一個(gè)問題,大家如果在業(yè)務(wù)處理中遇到類似的問題,提供一個(gè)思路。
在說問題之前首先了解一個(gè)參數(shù)spark.sql.parquet.writeLegacyFormat(默認(rèn)false)的作用:
比如,對(duì)于decimal數(shù)據(jù)類型的兼容處理,不設(shè)置true時(shí),經(jīng)常會(huì)報(bào)類似如下的錯(cuò)誤:
Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file hdfs://hadoop/data/test_decimal/dt=20200515000000/part-00000-9820eba2-8a40-446d-8c28-37027a1b1f2d-c000.snappy.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201) at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:122) at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:85) at org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:72)... Caused by: java.lang.UnsupportedOperationException: parquet.column.values.dictionary.PlainValuesDictionary$PlainLongDictionary at parquet.column.Dictionary.decodeToBinary(Dictionary.java:44)...
此時(shí)我們需要將spark.sql.parquet.writeLegacyFormat設(shè)置為true來解決上述的異常問題。
但如果同時(shí)設(shè)置spark.sql.hive.convertMetastoreParquet為false時(shí),要注意一些數(shù)據(jù)類型以及精度的處理,比如對(duì)于decimal類型的處理。通過一個(gè)例子復(fù)原一下當(dāng)時(shí)的場(chǎng)景:
1.創(chuàng)建Hive外部表testdb.test_decimal,其中字段fee_rate為decimal(10,10)
CREATE EXTERNAL TABLE `testdb`.`test_decimal`(`no` STRING , `fee_rate` DECIMAL(10,10)) PARTITIONED BY (`dt` STRING ) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'WITH SERDEPROPERTIES ( 'serialization.format' = '1' ) STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 'hdfs://hadoop/data/test_decimal' TBLPROPERTIES ( 'transient_lastDdlTime' = '1589160440' ) ;
2.將testdb.item中的數(shù)據(jù)處理后保存到testdb.test_decimal中
// 這里為了展示方便,直接查詢testdb.item中的數(shù)據(jù)
// 注意: 字段fee_rate的類型為decimal(10,6)
select no, fee_rate from testdb.item where dt=20190528;
// testdb.item中數(shù)據(jù)示例如下
+-------------------+----------------+
| no| fee_rate|
+-------------------+----------------+
| 1| 0.000000|
| 2| 0.000000|
| 3| 0.000000|
+-------------------+----------------+
// tmp是上述查詢testdb.item獲得的臨時(shí)表// 以parquet格式保存到test_decimal的20200529分區(qū)中save overwrite tmp as parquet.`/data/test_decimal/dt=20200529`; msck repair TABLE testdb.item;
上述1-3都能成功執(zhí)行,數(shù)據(jù)也能保存到testdb.test_decimal中,但是當(dāng)查詢testdb.test_decimal中的數(shù)據(jù)時(shí),比如執(zhí)行sql:
select * from testdb.test_decimal where dt = 20200529;
Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 4, localhost, executor driver): java.lang.NullPointerException at org.apache.spark.sql.hive.HiveShim$.toCatalystDecimal(HiveShim.scala:107) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:415) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:414) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:443) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:434) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) ...
究其原因是因?yàn)榘凑丈鲜鰞蓚€(gè)參數(shù)的配置,testdb.item中fee_rate字段類型為decimal(10,6),數(shù)據(jù)為0.000000,經(jīng)過一系列處理0.000000最終會(huì)被處理為0,看下邊最終導(dǎo)致空指針異常的部分,就會(huì)一目了然。
public static BigDecimal enforcePrecisionScale(BigDecimal bd, int maxPrecision, int maxScale) { if (bd == null) { return null; } else { bd = trim(bd); if (bd.scale() > maxScale) { bd = bd.setScale(maxScale, RoundingMode.HALF_UP); } // testdb.test_decimal中fee_rate的類型decimal(10,10),即precision為10,scale也為10 // 對(duì)應(yīng)這里即maxPrecision和maxScale分別為10,則maxIntDigits為0 int maxIntDigits = maxPrecision - maxScale; // bd對(duì)應(yīng)0。對(duì)于0而言,precision為1,scale為0 // 處理之后 intDigits為1 int intDigits = bd.precision() - bd.scale(); return intDigits > maxIntDigits ? null : bd; }}
解決辦法也很簡(jiǎn)單,就是將testdb.test_decimal中的fee_rate數(shù)據(jù)類型和依賴的表testdb.item中的fee_rate保持完全一致,即也為decimal(10,6)。
這個(gè)現(xiàn)象在實(shí)際應(yīng)用環(huán)境中經(jīng)常遇到,通用的解決辦法就是將要保存的表中的數(shù)據(jù)類型與依賴的表(物理表或者臨時(shí)表)的字段類型保持完全一致。
看完上述內(nèi)容,你們掌握如何進(jìn)行SparkSQL與Hive metastore Parquet轉(zhuǎn)換的分析的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。