溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Spark-submit執(zhí)行流程是怎么樣的

發(fā)布時間:2021-12-17 09:54:33 來源:億速云 閱讀:122 作者:柒染 欄目:大數(shù)據(jù)

本篇文章給大家分享的是有關(guān)Spark-submit執(zhí)行流程是怎么樣的,小編覺得挺實用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

我們在進行Spark任務(wù)提交時,會使用“spark-submit -class .....”樣式的命令來提交任務(wù),該命令為Spark目錄下的shell腳本。它的作用是查詢spark-home,調(diào)用spark-class命令。

if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

隨后會執(zhí)行spark-class命令,以SparkSubmit類為參數(shù)進行任務(wù)向Spark程序的提交,而Spark-class的shell腳本主要是執(zhí)行以下幾個步驟:

(1)加載spark環(huán)境參數(shù),從conf中獲取

if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

. "${SPARK_HOME}"/bin/load-spark-env.sh

# 尋找javahome
if [ -n "${JAVA_HOME}" ]; then
  RUNNER="${JAVA_HOME}/bin/java"
else
  if [ "$(command -v java)" ]; then
    RUNNER="java"
  else
    echo "JAVA_HOME is not set" >&2
    exit 1
  fi
fi

(2)載入java,jar包等

# Find Spark jars.
if [ -d "${SPARK_HOME}/jars" ]; then
  SPARK_JARS_DIR="${SPARK_HOME}/jars"
else
  SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
fi

(3)調(diào)用org.apache.spark.launcher中的Main進行參數(shù)注入

build_command() {
  "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
  printf "%d\0" $?
}

(4)shell腳本監(jiān)測任務(wù)執(zhí)行狀態(tài),是否完成或者退出任務(wù),通過執(zhí)行返回值,判斷是否結(jié)束

if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
  echo "${CMD[@]}" | head -n-1 1>&2
  exit 1
fi

if [ $LAUNCHER_EXIT_CODE != 0 ]; then
  exit $LAUNCHER_EXIT_CODE
fi

CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"

2.任務(wù)檢測及提交任務(wù)到Spark

檢測執(zhí)行模式(class or submit)構(gòu)建cmd,在submit中進行參數(shù)的檢查(SparkSubmitOptionParser),構(gòu)建命令行并且打印回spark-class中,最后調(diào)用exec執(zhí)行spark命令行提交任務(wù)。通過組裝而成cmd內(nèi)容如下所示:

/usr/local/java/jdk1.8.0_91/bin/java-cp
/data/spark-1.6.0-bin-hadoop2.6/conf/:/data/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/data/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/data/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/data/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/data/hadoop-2.6.5/etc/hadoop/
-Xms1g-Xmx1g -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=1234
org.apache.spark.deploy.SparkSubmit
--classorg.apache.spark.repl.Main
--nameSpark shell
--masterspark://localhost:7077
--verbose/tool/jarDir/maven_scala-1.0-SNAPSHOT.jar

3.SparkSubmit函數(shù)的執(zhí)行

(1)Spark任務(wù)在提交之后會執(zhí)行SparkSubmit中的main方法

  def main(args: Array[String]): Unit = {
    val submit = new SparkSubmit()
    submit.doSubmit(args)
  }

(2)doSubmit()對log進行初始化,添加spark任務(wù)參數(shù),通過參數(shù)類型執(zhí)行任務(wù):

  def doSubmit(args: Array[String]): Unit = {
    // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
    // be reset before the application starts.
    val uninitLog = initializeLogIfNecessary(true, silent = true)

    val appArgs = parseArguments(args)
    if (appArgs.verbose) {
      logInfo(appArgs.toString)
    }
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
      case SparkSubmitAction.PRINT_VERSION => printVersion()
    }
  }

SUBMIT:使用提供的參數(shù)提交application

KILL(Standalone and Mesos cluster mode only):通過REST協(xié)議終止任務(wù)

REQUEST_STATUS(Standalone and Mesos cluster mode only):通過REST協(xié)議請求已經(jīng)提交任務(wù)的狀態(tài)

PRINT_VERSION:對log輸出版本信息

(3)調(diào)用submit函數(shù):

    def doRunMain(): Unit = {
      if (args.proxyUser != null) {
        val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
          UserGroupInformation.getCurrentUser())
        try {
          proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
            override def run(): Unit = {
              runMain(args, uninitLog)
            }
          })
        } catch {
          case e: Exception =>
            // Hadoop's AuthorizationException suppresses the exception's stack trace, which
            // makes the message printed to the output by the JVM not very helpful. Instead,
            // detect exceptions with empty stack traces here, and treat them differently.
            if (e.getStackTrace().length == 0) {
              error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
            } else {
              throw e
            }
        }
      } else {
        runMain(args, uninitLog)
      }
    }

doRunMain為集群調(diào)用子main class準(zhǔn)備參數(shù),然后調(diào)用runMain()執(zhí)行任務(wù)invoke main

Spark在作業(yè)提交中會采用多種不同的參數(shù)及模式,都會根據(jù)不同的參數(shù)選擇不同的分支執(zhí)行,因此在最后提交的runMain中會將所需要的參數(shù)傳遞給執(zhí)行函數(shù)。

以上就是Spark-submit執(zhí)行流程是怎么樣的,小編相信有部分知識點可能是我們?nèi)粘9ぷ鲿姷交蛴玫降?。希望你能通過這篇文章學(xué)到更多知識。更多詳情敬請關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI