您好,登錄后才能下訂單哦!
小編給大家分享一下結(jié)構(gòu)化處理之Spark Session的示例分析,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
創(chuàng)建DataFrame,有三種模式,一種是sql()主要是訪問(wèn)Hive表;一種是從RDD生成DataFrame,主要從ExistingRDD開(kāi)始創(chuàng)建;還有一種是read/format格式,從json/txt/csv等數(shù)據(jù)源格式創(chuàng)建。
先看看第三種方式的創(chuàng)建流程。
1、read/format
def read: DataFrameReader = new DataFrameReader(self)
SparkSession.read()方法直接創(chuàng)建DataFrameReader,然后再DataFrameReader的load()方法來(lái)導(dǎo)入外部數(shù)據(jù)源。load()方法主要邏輯如下:
def load(paths: String*): DataFrame = { sparkSession.baseRelationToDataFrame( DataSource.apply( sparkSession, paths = paths, userSpecifiedSchema = userSpecifiedSchema, className = source, options = extraOptions.toMap).resolveRelation()) }
創(chuàng)建對(duì)應(yīng)數(shù)據(jù)源類(lèi)型的DataSource,DataSource解析成BaseRelation,然后通過(guò)SparkSession的baseRelationToDataFrame方法從BaseRelation映射生成DataFrame。從BaseRelation創(chuàng)建LogicalRelation,然后調(diào)用Dataset.ofRows方法從LogicalRelation創(chuàng)建DataFrame。DataFrame實(shí)際就是Dataset。
type DataFrame = Dataset[Row]
baseRelationToDataFrame的定義:
def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = { Dataset.ofRows(self, LogicalRelation(baseRelation)) }
Dataset.ofRows方法主要是將邏輯計(jì)劃轉(zhuǎn)換成物理計(jì)劃,然后生成新的Dataset。
2、執(zhí)行
SparkSession的執(zhí)行關(guān)鍵是如何從LogicalPlan生成物理計(jì)劃。我們?cè)囋嚫欉@部分邏輯。
def count(): Long = withAction("count", groupBy().count().queryExecution) { plan =>
plan.executeCollect().head.getLong(0)
}
Dataset的count()動(dòng)作觸發(fā)物理計(jì)劃的執(zhí)行,調(diào)用物理計(jì)劃plan的executeCollect方法,該方法實(shí)際上會(huì)調(diào)用doExecute()方法生成Array[InternalRow]格式。executeCollect方法在SparkPlan中定義。
3、HadoopFsRelation
需要跟蹤下如何從HadoopFsRelation生成物理計(jì)劃(也就是SparkPlan)
通過(guò)FileSourceStrategy來(lái)解析。它在FileSourceScanExec上疊加Filter和Projection等操作,看看FileSourceScanExec的定義:
case class FileSourceScanExec( @transient relation: HadoopFsRelation, output: Seq[Attribute], requiredSchema: StructType, partitionFilters: Seq[Expression], dataFilters: Seq[Expression], override val metastoreTableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec with ColumnarBatchScan { 。。。 }
它的主要執(zhí)行代碼doExecute()的功能邏輯如下:
protected override def doExecute(): RDD[InternalRow] = { if (supportsBatch) { // in the case of fallback, this batched scan should never fail because of: // 1) only primitive types are supported // 2) the number of columns should be smaller than spark.sql.codegen.maxFields WholeStageCodegenExec(this).execute() } else { val unsafeRows = { val scan = inputRDD if (needsUnsafeRowConversion) { scan.mapPartitionsWithIndexInternal { (index, iter) => val proj = UnsafeProjection.create(schema) proj.initialize(index) iter.map(proj) } } else { scan } } val numOutputRows = longMetric("numOutputRows") unsafeRows.map { r => numOutputRows += 1 r } } }
inputRDD有兩種方式創(chuàng)建,一是createBucketedReadRDD,二是createNonBucketedReadRDD。兩者沒(méi)有本質(zhì)的區(qū)別,僅僅是文件分區(qū)規(guī)則的不同。
private lazy val inputRDD: RDD[InternalRow] = { val readFile: (PartitionedFile) => Iterator[InternalRow] = relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, dataSchema = relation.dataSchema, partitionSchema = relation.partitionSchema, requiredSchema = requiredSchema, filters = pushedDownFilters, options = relation.options, hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) relation.bucketSpec match { case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled => createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation) case _ => createNonBucketedReadRDD(readFile, selectedPartitions, relation) } } createNonBucketedReadRDD調(diào)用FileScanRDD : new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
以上是“結(jié)構(gòu)化處理之Spark Session的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。