溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

spark delta寫操作ACID事務(wù)中基礎(chǔ)類FileFormat/FileCommitProtocol的示例分析

發(fā)布時間:2021-12-16 16:16:25 來源:億速云 閱讀:164 作者:小新 欄目:大數(shù)據(jù)

小編給大家分享一下spark delta寫操作ACID事務(wù)中基礎(chǔ)類FileFormat/FileCommitProtocol的示例分析,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

分析

直接進(jìn)入主題FileFormatWriter.write,這個是spark寫入文件的入口:

 def write(
      sparkSession: SparkSession,
      plan: SparkPlan,
      fileFormat: FileFormat,
      committer: FileCommitProtocol,
      outputSpec: OutputSpec,
      hadoopConf: Configuration,
      partitionColumns: Seq[Attribute],
      bucketSpec: Option[BucketSpec],
      statsTrackers: Seq[WriteJobStatsTracker],
      options: Map[String, String])
    : Set[String] = {

因?yàn)閐elta是基于parquet實(shí)現(xiàn)的, 所以我們fileformat選擇分析ParquetFileFormat, 而對于FileCommitProtocol,我們分析SQLHadoopMapReduceCommitProtocol

  1. 該write方法實(shí)現(xiàn)比較長,我們講重點(diǎn) :

committer.setupJob(job)

這個做一些job提交前的準(zhǔn)備工作,比如設(shè)置jobId,taskId,設(shè)置OutputCommitter,OutputCommitter是用來。。

override def setupJob(jobContext: JobContext): Unit = {
    // Setup IDs
    val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)
    val taskId = new TaskID(jobId, TaskType.MAP, 0)
    val taskAttemptId = new TaskAttemptID(taskId, 0)

    // Set up the configuration object
    jobContext.getConfiguration.set("mapreduce.job.id", jobId.toString)
    jobContext.getConfiguration.set("mapreduce.task.id", taskAttemptId.getTaskID.toString)
    jobContext.getConfiguration.set("mapreduce.task.attempt.id", taskAttemptId.toString)
    jobContext.getConfiguration.setBoolean("mapreduce.task.ismap", true)
    jobContext.getConfiguration.setInt("mapreduce.task.partition", 0)

    val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
    committer = setupCommitter(taskAttemptContext)
    committer.setupJob(jobContext)
  }

ParquetFileFormat對應(yīng)的OutputCommitter是ParquetOutputCommitter,我們看一下方法:format.getOutputCommitter(context),ParquetOutputCommitter為:

@Override
  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
      throws IOException {
    if (committer == null) {
      Path output = getOutputPath(context);
      committer = new ParquetOutputCommitter(output, context);
    }
    return committer;
  }

而最終調(diào)用了父類的構(gòu)造方法:

public FileOutputCommitter(Path outputPath, 
                             TaskAttemptContext context) throws IOException {
    this(outputPath, (JobContext)context);
    if (outputPath != null) {
      workPath = getTaskAttemptPath(context, outputPath);
    }
  }

注意這里的workPath(全局變量)賦值為$outputPath/_temporary,在以下newTaskTempFile方法中會用到

接著進(jìn)行setupJob操作:

public void setupJob(JobContext context) throws IOException {
    if (hasOutputPath()) {
      Path jobAttemptPath = getJobAttemptPath(context);
      FileSystem fs = jobAttemptPath.getFileSystem(
          context.getConfiguration());
      if (!fs.mkdirs(jobAttemptPath)) {
        LOG.error("Mkdirs failed to create " + jobAttemptPath);
      }
    } else {
      LOG.warn("Output Path is null in setupJob()");

而getJobAttemptPath中引用到$path/_temporary目錄(其中path是文件輸出目錄),且建立該目錄

接下來是進(jìn)行任務(wù)的提交:

sparkSession.sparkContext.runJob(
        rddWithNonEmptyPartitions,
        (taskContext: TaskContext, iter: Iterator[InternalRow]) => {
          executeTask(
            description = description,
            jobIdInstant = jobIdInstant,
            sparkStageId = taskContext.stageId(),
            sparkPartitionId = taskContext.partitionId(),
            sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE,
            committer,
            iterator = iter)
        },
        rddWithNonEmptyPartitions.partitions.indices,
        (index, res: WriteTaskResult) => {
          committer.onTaskCommit(res.commitMsg)
          ret(index) = res
        })

其中重點(diǎn)看看executeTask方法:

committer.setupTask(taskAttemptContext)

    val dataWriter =
      if (sparkPartitionId != 0 && !iterator.hasNext) {
        // In case of empty job, leave first partition to save meta for file format like parquet.
        new EmptyDirectoryDataWriter(description, taskAttemptContext, committer)
      } else if (description.partitionColumns.isEmpty && description.bucketIdExpression.isEmpty) {
        new SingleDirectoryDataWriter(description, taskAttemptContext, committer)
      } else {
        new DynamicPartitionDataWriter(description, taskAttemptContext, committer)
      }
  • 對于SQLHadoopMapReduceCommitProtocol:setupTask實(shí)現(xiàn)如下:

committer = setupCommitter(taskContext)
    committer.setupTask(taskContext)
    addedAbsPathFiles = mutable.Map[String, String]()
    partitionPaths = mutable.Set[String]()

而committer.setupTask(taskContext),對應(yīng)到ParquetOutputCommitter為空實(shí)現(xiàn),

  • 之后看數(shù)據(jù)寫入的最終執(zhí)行者dataWriter, 如果是沒有分區(qū),則是SingleDirectoryDataWriter:

class SingleDirectoryDataWriter(
    description: WriteJobDescription,
    taskAttemptContext: TaskAttemptContext,
    committer: FileCommitProtocol)
  extends FileFormatDataWriter(description, taskAttemptContext, committer) {
  private var fileCounter: Int = _
  private var recordsInFile: Long = _
  // Initialize currentWriter and statsTrackers
  newOutputWriter()

  private def newOutputWriter(): Unit = {
    recordsInFile = 0
    releaseResources()

    val ext = description.outputWriterFactory.getFileExtension(taskAttemptContext)
    val currentPath = committer.newTaskTempFile(
      taskAttemptContext,
      None,
      f"-c$fileCounter%03d" + ext)

    currentWriter = description.outputWriterFactory.newInstance(
      path = currentPath,
      dataSchema = description.dataColumns.toStructType,
      context = taskAttemptContext)

    statsTrackers.foreach(_.newFile(currentPath))
  }

  override def write(record: InternalRow): Unit = {
    if (description.maxRecordsPerFile > 0 && recordsInFile >= description.maxRecordsPerFile) {
      fileCounter += 1
      assert(fileCounter < MAX_FILE_COUNTER,
        s"File counter $fileCounter is beyond max value $MAX_FILE_COUNTER")

      newOutputWriter()
    }

    currentWriter.write(record)
    statsTrackers.foreach(_.newRow(record))
    recordsInFile += 1
  }
}

這里寫文件是哪里呢?

    val currentPath = committer.newTaskTempFile(
      taskAttemptContext,
      None,
      f"-c$fileCounter%03d" + ext)

對應(yīng)到HadoopMapReduceCommitProtocol到newTaskTempFile方法為:

override def newTaskTempFile(
      taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
    val filename = getFilename(taskContext, ext)

    val stagingDir: Path = committer match {
      case _ if dynamicPartitionOverwrite =>
        assert(dir.isDefined,
          "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.")
        partitionPaths += dir.get
        this.stagingDir
      // For FileOutputCommitter it has its own staging path called "work path".
      case f: FileOutputCommitter =>
        new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path))
      case _ => new Path(path)
    }

    dir.map { d =>
      new Path(new Path(stagingDir, d), filename).toString
    }.getOrElse {
      new Path(stagingDir, filename).toString
    }
  }

如果開啟partitionOverwriteMode,則設(shè)置為new Path(path, ".spark-staging-" + jobId) 如果沒有開啟partitionOverwriteMode,且FileOutputCommitter的子類,如果workpath存在則設(shè)置為workPath,否則為path,注意我們之前FileOutputCommitter構(gòu)造方法中已經(jīng)設(shè)置了workPath,所以最終的輸出目錄為$path/_temporary

所以job向該目錄寫入數(shù)據(jù)。 DynamicPartitionDataWriter的分析,讀者可以進(jìn)行類似的分析,只不過目錄則加了分區(qū)信息,只寫入自己的分區(qū)目錄中

  • 如果寫入成功的話執(zhí)行如下:

try {
      Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
        // Execute the task to write rows out and commit the task.
        while (iterator.hasNext) {
          dataWriter.write(iterator.next())
        }
        dataWriter.commit()
      })(catchBlock = {
        // If there is an error, abort the task
        dataWriter.abort()
        logError(s"Job $jobId aborted.")
      }, finallyBlock = {
        dataWriter.close()
      })

dataWriter.commit()如下:

override def commit(): WriteTaskResult = {
    releaseResources()
    val summary = ExecutedWriteSummary(
      updatedPartitions = updatedPartitions.toSet,
      stats = statsTrackers.map(_.getFinalStats()))
    WriteTaskResult(committer.commitTask(taskAttemptContext), summary)
  }

首先會釋放資源,也就是關(guān)閉writer 之后調(diào)用FileCommitProtocol.commitTask();

 override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
    val attemptId = taskContext.getTaskAttemptID
    logTrace(s"Commit task ${attemptId}")
    SparkHadoopMapRedUtil.commitTask(
      committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId)
    new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet)
  }

而SparkHadoopMapRedUtil.commitTask最終調(diào)用FileOutputCommitter的commitTask方法把$PATH/_temporary下文件mv到$PATH下
之后返回統(tǒng)計(jì)的數(shù)值,數(shù)據(jù)格式如下:

case class BasicWriteTaskStats(
    numPartitions: Int,
    numFiles: Int,
    numBytes: Long,
    numRows: Long)
  extends WriteTaskStats
  1. 之后會committer.onTaskCommit(res.commitMsg)操作,
    對于SQLHadoopMapReduceCommitProtocol的實(shí)現(xiàn)為: logDebug(s"onTaskCommit($taskCommit)")

  2. 下一步committer.commitJob(job, commitMsgs):

...
 committer.commitJob(jobContext)
 ...
  for ((src, dst) <- filesToMove) {
        fs.rename(new Path(src), new Path(dst))
      }
 ...
 fs.delete(stagingDir, true)

這里主要涉及清理job,以及把task所產(chǎn)生的文件(writer輸出的臨時文件)移動到path目錄下,且清理臨時目錄,至此文件真正的寫入到了path目錄下

  1. 指標(biāo)記錄

private[datasources] def processStats(
      statsTrackers: Seq[WriteJobStatsTracker],
      statsPerTask: Seq[Seq[WriteTaskStats]])
  : Unit = {

    val numStatsTrackers = statsTrackers.length
    assert(statsPerTask.forall(_.length == numStatsTrackers),
      s"""Every WriteTask should have produced one `WriteTaskStats` object for every tracker.
         |There are $numStatsTrackers statsTrackers, but some task returned
         |${statsPerTask.find(_.length != numStatsTrackers).get.length} results instead.
       """.stripMargin)

    val statsPerTracker = if (statsPerTask.nonEmpty) {
      statsPerTask.transpose
    } else {
      statsTrackers.map(_ => Seq.empty)
    }

    statsTrackers.zip(statsPerTracker).foreach {
      case (statsTracker, stats) => statsTracker.processStats(stats)
    }
  }

主要是把剛才job的指標(biāo)通過statsTrackers傳給driver,而目前的statsTracker實(shí)現(xiàn)類為BasicWriteJobStatsTracker,也就是說最終會通過listenerbus以事件的形式傳播, 如下代碼:

class BasicWriteJobStatsTracker(
    serializableHadoopConf: SerializableConfiguration,
    @transient val metrics: Map[String, SQLMetric])
  extends WriteJobStatsTracker {
   ...

  override def processStats(stats: Seq[WriteTaskStats]): Unit = {
    ...

    metrics(BasicWriteJobStatsTracker.NUM_FILES_KEY).add(numFiles)
    metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_BYTES_KEY).add(totalNumBytes)
    metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_ROWS_KEY).add(totalNumOutput)
    metrics(BasicWriteJobStatsTracker.NUM_PARTS_KEY).add(numPartitions)

    val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
    SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toList)
  }
}

至此整個spark parquet write文件的數(shù)據(jù)流程我們就已經(jīng)全部過了一遍了,部分細(xì)節(jié)沒有展示。 最終的數(shù)據(jù)流如下:

實(shí)例化Job對象
      |
      v
FileCommitProtocol.setupJob -> OutputCommitter.setupJob  進(jìn)行作業(yè)運(yùn)行前的準(zhǔn)備,如建立臨時目錄_temporary等
      |
      v
executeTask()-> FileCommitProtocol.setupTask -> OutputCommitter.setupTask 目前為空實(shí)現(xiàn) 
                |
                v
         FileCommitProtocol.newTaskTempFile/newTaskTempFileAbsPath 建立寫任務(wù)的臨時目錄
                |
                v
         dataWriter.write() 
                |
                v
         dataWriter.commit() 釋放資源以及返回寫入文件的指標(biāo)信息 -> HadoopMapReduceCommitProtocol.commitTask
                          |
                          v
                 SparkHadoopMapRedUtil.commitTask 完成mv $PATH/_temporary文件 到$PATH目錄,以及做outputCommitCoordination
                           |
                           v
                 返回需要額外臨時目錄的信息  
      |
      v
FileCommitProtocol.onTaskCommit
      |
      v
FileCommitProtocol.commitJob -> OutputCommitter.commitJob 清理$PATH/_temporary目錄且把寫額外臨時目錄下的文件mv到最終path目錄下
      |
      v
processStats,處理寫入的文件指標(biāo)

那對應(yīng)到delta中,spark寫入delta數(shù)據(jù)是怎么寫入的呢?其實(shí)流程和以上的流程一模一樣,唯一不同的是FileCommitProtocol類的實(shí)現(xiàn),直接到TransactionalWrite.writeFiles:

def writeFiles(
      data: Dataset[_],
      writeOptions: Option[DeltaOptions],
      isOptimize: Boolean): Seq[AddFile] = {
    hasWritten = true

    ...
    val committer = getCommitter(outputPath)

    ...
      FileFormatWriter.write(
        sparkSession = spark,
        plan = physicalPlan,
        fileFormat = snapshot.fileFormat, // TODO doesn't support changing formats.
        committer = committer,
        outputSpec = outputSpec,
        hadoopConf = spark.sessionState.newHadoopConfWithOptions(metadata.configuration),
        partitionColumns = partitioningColumns,
        bucketSpec = None,
        statsTrackers = statsTrackers,
        options = Map.empty)
    }

    committer.addedStatuses
  }

而這里的commiter為DelayedCommitProtocol,如下:

    new DelayedCommitProtocol("delta", outputPath.toString, None)

我們來看一下DelayedCommitProtocol方法:

override def newTaskTempFile(
      taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
    val filename = getFileName(taskContext, ext)
    val partitionValues = dir.map(parsePartitions).getOrElse(Map.empty[String, String])
    val relativePath = randomPrefixLength.map { prefixLength =>
      getRandomPrefix(prefixLength) // Generate a random prefix as a first choice
    }.orElse {
      dir // or else write into the partition directory if it is partitioned
    }.map { subDir =>
      new Path(subDir, filename)
    }.getOrElse(new Path(filename)) // or directly write out to the output path

    addedFiles.append((partitionValues, relativePath.toUri.toString))
    new Path(path, relativePath).toString
  }

  override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
    if (addedFiles.nonEmpty) {
      val fs = new Path(path, addedFiles.head._2).getFileSystem(taskContext.getConfiguration)
      val statuses: Seq[AddFile] = addedFiles.map { f =>
        val filePath = new Path(path, new Path(new URI(f._2)))
        val stat = fs.getFileStatus(filePath)
        AddFile(f._2, f._1, stat.getLen, stat.getModificationTime, true)
      }

      new TaskCommitMessage(statuses)
    } else {
      new TaskCommitMessage(Nil)
    }
  }

  override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
    val fileStatuses = taskCommits.flatMap(_.obj.asInstanceOf[Seq[AddFile]]).toArray
    addedStatuses ++= fileStatuses
  }
  • 其中newTaskTempFile生成的文件中多了一個UUID.randomUUID.toString,這能減少文件的沖突

  • newTaskTempFile目前直接是返回了輸出目錄,而不是_temporary目錄

  • commitTask只是記錄增加的文件

  • commitJob并沒有真正的提交job,只是把AddFile保存到了內(nèi)存中

后續(xù)我們會分析delta怎么處理AddFile,從而做到事務(wù)性

注意task輸出的文件目錄為:
${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}/${fileName}
如:/data/_temporary/0/_temporary/attempt_20190219101232_0003_m_000005_0/part-00000-936a4f3b-8f71-48ce-960a-e60d3cf4488f.c000.snappy.parquet

以上是“spark delta寫操作ACID事務(wù)中基礎(chǔ)類FileFormat/FileCommitProtocol的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識,歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI