您好,登錄后才能下訂單哦!
如何淺析Hive和Spark SQL讀文件時的輸入任務(wù)劃分,相信很多沒有經(jīng)驗的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個問題。
我們就來講解Hive和Spark SQL是如何切分輸入路徑的。
Hive是起步較早的SQL on Hadoop項目,最早也是誕生于Hadoop中,所以輸入劃分這部分的代碼與Hadoop相關(guān)度非常高。現(xiàn)在Hive普遍使用的輸入格式是CombineHiveInputFormat
,它繼承于HiveInputFormat
,而HiveInputFormat
實現(xiàn)了Hadoop的InputFormat
接口,其中的getSplits
方法用來獲取具體的劃分結(jié)果,劃分出的一份輸入數(shù)據(jù)被稱為一個“Split”。在執(zhí)行時,每個Split對應(yīng)到一個map任務(wù)。在劃分Split時,首先挑出不能合并到一起的目錄——比如開啟了事務(wù)功能的路徑。這些不能合并的目錄必須單獨處理,剩下的路徑交給私有方法getCombineSplits
,這樣Hive的一個map task最多可以處理多個目錄下的文件。在實際操作中,我們一般只要通過set mapred.max.split.size=xx;
即可控制文件合并的大小。當一個文件過大時,父類的getSplits
也會幫我們完成相應(yīng)的切分工作。
Spark的表有兩種:DataSource表和Hive表。另外Spark后續(xù)版本中DataSource V2也將逐漸流行,目前還在不斷發(fā)展中,暫時就不在這里討論。我們知道Spark SQL其實底層是Spark RDD,而RDD執(zhí)行時,每個map task會處理RDD的一個Partition中的數(shù)據(jù)(注意這里的Partition是RDD的概念,要和表的Partition進行區(qū)分)。因此,Spark SQL作業(yè)的任務(wù)切分關(guān)鍵在于底層RDD的partition如何切分。
Spark SQL的DataSource表在最終執(zhí)行的RDD類為FileScanRDD
,由FileSourceScanExec
創(chuàng)建出來。在創(chuàng)建這種RDD的時候,具體的Partition直接作為參數(shù)傳給了構(gòu)造函數(shù),因此劃分輸入的方法也在DataSourceScanExec.scala
文件中。具體分兩步:首先把文件劃分為PartitionFile
,再將較小的PartitionFile
進行合并。
第一步部分代碼如下:
if (fsRelation.fileFormat.isSplitable( fsRelation.sparkSession, fsRelation.options, file.getPath)) { (0L until file.getLen by maxSplitBytes).map { offset =>val remaining = file.getLen - offsetval size = if (remaining > maxSplitBytes) maxSplitBytes else remainingval hosts = getBlockHosts(blockLocations, offset, size)PartitionedFile( partition.values, file.getPath.toUri.toString, offset, size, partitionDeleteDeltas, hosts) } } else {val hosts = getBlockHosts(blockLocations, 0, file.getLen)Seq(PartitionedFile(partition.values, file.getPath.toUri.toString,0, file.getLen, partitionDeleteDeltas, hosts)) }
我們可以看出,Spark SQL首先根據(jù)文件類型判斷單個文件是否能夠切割,如果可以則按maxSplitBytes
進行切割。如果一個文件剩余部分無法填滿maxSplitBytes
,也單獨作為一個Partition。
第二部分代碼如下所示:
splitFiles.foreach { file =>if (currentSize + file.length > maxSplitBytes) { closePartition() }// Add the given file to the current partition.currentSize += file.length + openCostInBytes currentFiles += file }
這樣我們就可以依次遍歷第一步切好的塊,再按照maxSplitBytes
進行合并。注意合并文件時還需加上打開文件的預(yù)估代價openCostInBytes
。那么maxSplitBytes
和openCostInBytes
這兩個關(guān)鍵參數(shù)怎么來的呢?
val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum val bytesPerCore = totalBytes / defaultParallelism val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
不難看出,主要是spark.sql.files.maxPartitionBytes
、spark.sql.files.openCostInBytes
、調(diào)度器默認并發(fā)度以及所有輸入文件實際大小所控制。
Spark SQL中的Hive表底層的RDD類為HadoopRDD
,由HadoopTableReader
類實現(xiàn)。不過這次,具體的Partition劃分還是依賴HadoopRDD
的getPartitions
方法,具體實現(xiàn)如下:
override def getPartitions: Array[Partition] = { ...try { val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions) val inputSplits = if (ignoreEmptySplits) { allInputSplits.filter(_.getLength > 0) } else { allInputSplits } val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) } array } catch { ... } }
不難看出,在處理Hive表的時候,Spark SQL把任務(wù)劃分又交給了Hadoop的InputFormat那一套。不過需要注意的是,并不是所有Hive表都歸為這一類,Spark SQL會默認對ORC和Parquet的表進行轉(zhuǎn)化,用自己的Data Source實現(xiàn)OrcFileFormat
和ParquetFileFormat
來把這兩種表作為Data Source表來處理。
看完上述內(nèi)容,你們掌握如何淺析Hive和Spark SQL讀文件時的輸入任務(wù)劃分的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。