溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

spark如何通過classloader實(shí)現(xiàn)對(duì)于hive metastore的兼容性

發(fā)布時(shí)間:2021-12-17 10:36:02 來源:億速云 閱讀:384 作者:柒染 欄目:大數(shù)據(jù)

今天就跟大家聊聊有關(guān)spark如何通過classloader實(shí)現(xiàn)對(duì)于hive metastore的兼容性,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

背景

我們只是簡(jiǎn)單的提了一下按照官網(wǎng)的配置就能夠兼容不同的hive元數(shù)據(jù),這次我們從代碼級(jí)別來分析一下spark是怎么做到實(shí)現(xiàn)不同版本的元數(shù)據(jù)的訪問。 注意:正如官網(wǎng)所說的,該部分只是用于hive元數(shù)據(jù)的訪問,spark sql內(nèi)部編譯的其他版本的hive用于來進(jìn)行其他執(zhí)行,如序列化和反序列化,UDF和UDAF等等
這里提到這一點(diǎn)是為了釋疑一下在源碼中看到一些低版本不存在的類,因?yàn)檫@部分spark sql內(nèi)置了其他版本的hive用于除了hive元數(shù)據(jù)之外的其他交互,如:hive/hiveShim.scala中的SerializationUtilities 這個(gè)類在hive 1.2.1是不存在的,但是hive高版本2.3.7是存在的
我們以spark 3.1.1進(jìn)行分析

分析

我們知道spark跟外部元數(shù)據(jù)的交互是類ExternalCatalog來進(jìn)行響應(yīng)的,對(duì)應(yīng)到hive元數(shù)據(jù)就是HiveExternalCatalog,轉(zhuǎn)到client代碼:

/**
   * A Hive client used to interact with the metastore.
   */
lazy val client: HiveClient = {
    HiveUtils.newClientForMetadata(conf, hadoopConf)
  }

該client在就是進(jìn)行元數(shù)據(jù)交互的最終執(zhí)行者,且這里直接調(diào)用了HiveUtils的newClientForMetadata方法,直接跳到最終調(diào)用的方法:

 protected[hive] def newClientForMetadata(
      conf: SparkConf,
      hadoopConf: Configuration,
      configurations: Map[String, String]): HiveClient = {
    val sqlConf = new SQLConf
    sqlConf.setConf(SQLContext.getSQLProperties(conf))
    val hiveMetastoreVersion = HiveUtils.hiveMetastoreVersion(sqlConf)
    val hiveMetastoreJars = HiveUtils.hiveMetastoreJars(sqlConf)
    val hiveMetastoreSharedPrefixes = HiveUtils.hiveMetastoreSharedPrefixes(sqlConf)
    val hiveMetastoreBarrierPrefixes = HiveUtils.hiveMetastoreBarrierPrefixes(sqlConf)
    val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion)

...
} else if (hiveMetastoreJars == "path") {
      // Convert to files and expand any directories.
      val jars =
        HiveUtils.hiveMetastoreJarsPath(sqlConf)
          .flatMap {
            case path if path.contains("\\") && Utils.isWindows =>
              addLocalHiveJars(new File(path))
            case path =>
              DataSource.checkAndGlobPathIfNecessary(
                pathStrings = Seq(path),
                hadoopConf = hadoopConf,
                checkEmptyGlobPath = true,
                checkFilesExist = false,
                enableGlobbing = true
              ).map(_.toUri.toURL)
          }

      logInfo(
        s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " +
          s"using path: ${jars.mkString(";")}")
      new IsolatedClientLoader(
        version = metaVersion,
        sparkConf = conf,
        hadoopConf = hadoopConf,
        execJars = jars.toSeq,
        config = configurations,
        isolationOn = true,
        barrierPrefixes = hiveMetastoreBarrierPrefixes,
        sharedPrefixes = hiveMetastoreSharedPrefixes)
...

val hiveMetastoreVersion = HiveUtils.hiveMetastoreVersion(sqlConf) 這里直接獲取配置的元數(shù)據(jù)的版本,也就是spark.sql.hive.metastore.version配置項(xiàng)
val hiveMetastoreJars = HiveUtils.hiveMetastoreJars(sqlConf) 這里配置hive元數(shù)據(jù)jar包的獲取方式,默認(rèn)是builtin內(nèi)置,推薦使用path方式,因?yàn)橐话憔€上環(huán)境是無網(wǎng)絡(luò)環(huán)境 val hiveMetastoreSharedPrefixes = HiveUtils.hiveMetastoreSharedPrefixes(sqlConf) val hiveMetastoreBarrierPrefixes = HiveUtils.hiveMetastoreBarrierPrefixes(sqlConf) 這兩個(gè)跟classloader有關(guān),也就是說什么類用哪種classloader加載,用來隔離class
val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion) 映射成spark內(nèi)部的hive版本表示,用于進(jìn)行元數(shù)據(jù)class的精細(xì)化操作
這里會(huì)根據(jù)配置的獲取元數(shù)據(jù)jar包的方式而采用不同的初始化IsolatedClientLoader的方式。最終會(huì)調(diào)用isolatedLoader的createClient方法:

/** The isolated client interface to Hive. */
private[hive] def createClient(): HiveClient = synchronized {
  val warehouseDir = Option(hadoopConf.get(ConfVars.METASTOREWAREHOUSE.varname))
  if (!isolationOn) {
    return new HiveClientImpl(version, warehouseDir, sparkConf, hadoopConf, config,
      baseClassLoader, this)
  }
  // Pre-reflective instantiation setup.
  logDebug("Initializing the logger to avoid disaster...")
  val origLoader = Thread.currentThread().getContextClassLoader
  Thread.currentThread.setContextClassLoader(classLoader)
  try {
    classLoader
      .loadClass(classOf[HiveClientImpl].getName)
      .getConstructors.head
      .newInstance(version, warehouseDir, sparkConf, hadoopConf, config, classLoader, this)
      .asInstanceOf[HiveClient]
  } catch {
    case e: InvocationTargetException =>
      if (e.getCause().isInstanceOf[NoClassDefFoundError]) {
        val cnf = e.getCause().asInstanceOf[NoClassDefFoundError]
        throw new ClassNotFoundException(
          s"$cnf when creating Hive client using classpath: ${execJars.mkString(", ")}\n" +
          "Please make sure that jars for your version of hive and hadoop are included in the " +
          s"paths passed to ${HiveUtils.HIVE_METASTORE_JARS.key}.", e)
      } else {
        throw e
      }
  } finally {
    Thread.currentThread.setContextClassLoader(origLoader)
  }
}

如果未開啟隔離性,則直接返回HiveClientImpl,該client所有終端用戶共享。如果開啟了(默認(rèn)值),則設(shè)置當(dāng)前的contextClassLoader為classLoader: 該classLoader是自定義的:

...
new URLClassLoader(allJars, rootClassLoader) {
            override def loadClass(name: String, resolve: Boolean): Class[_] = {
              val loaded = findLoadedClass(name)
              if (loaded == null) doLoadClass(name, resolve) else loaded
            }
            def doLoadClass(name: String, resolve: Boolean): Class[_] = {
              val classFileName = name.replaceAll("\\.", "/") + ".class"
              if (isBarrierClass(name)) {
                // For barrier classes, we construct a new copy of the class.
                val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
                logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
                defineClass(name, bytes, 0, bytes.length)
              } else if (!isSharedClass(name)) {
                logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
                super.loadClass(name, resolve)
              } else {
                // For shared classes, we delegate to baseClassLoader, but fall back in case the
                // class is not found.
                logDebug(s"shared class: $name")
                try {
                  baseClassLoader.loadClass(name)
                } catch {
                  case _: ClassNotFoundException =>
                    super.loadClass(name, resolve)
                }
              }
            }
          }
        }
...

直接重點(diǎn),對(duì)于開啟了隔離(默認(rèn)值),則直接返回該classLoader,關(guān)于classloader的知識(shí),可以參考這里,要是還有真不明白的,可以參考classLoader類的源碼。
這里我們重點(diǎn)觀察一下該自定義classloader的loadClass方法,該方法是實(shí)現(xiàn)類隔離的關(guān)鍵,

  • 如果是BarrierClass,比如HiveClientImpl/Shim/ShimLoader,或者包含了自定義的前綴.則從當(dāng)前的ContextClassLoader中復(fù)制一份class類,且生成對(duì)應(yīng)的class

  • 如果不是共享類,也不是BarrierClass,則使用URLClassLoader的loadClass方法加載class

  • 否則不是barrierClass,是共享類,則用當(dāng)前contextclassloader來加載當(dāng)前class

通過該classLoader加載的方式,對(duì)于跟hive元數(shù)據(jù)相關(guān)的class就是通過該自定義的classLoader加載的(注意子classloader能夠看見父加載器加載的類)
之后通過該classloader加載對(duì)應(yīng)的HiveClientImpl類,進(jìn)行反射實(shí)例化HiveClientImpl對(duì)象,從而實(shí)現(xiàn)了在運(yùn)行的時(shí)候,根據(jù)傳入的元數(shù)據(jù)jar包進(jìn)行動(dòng)態(tài)加載.
重置當(dāng)前線程的contextClassLoader。

重點(diǎn):hive元數(shù)據(jù)的jar包的動(dòng)態(tài)記載是通過自定義classloader實(shí)現(xiàn)的

至于真正的和hive元數(shù)據(jù)進(jìn)行交互就是HiveClientImpl,該類引入了shim的機(jī)制,也就是說,通過該shim機(jī)制,對(duì)于hive元數(shù)據(jù)版本的升級(jí)都是通過該shim來進(jìn)行控制,比如增加方法,就會(huì)在shim中增加對(duì)應(yīng)的方法,從而達(dá)到hive元數(shù)據(jù)的向后兼容性。 其實(shí)從shim這個(gè)英文單詞中我們也能看出一二,shim(墊片)是為了切合版本的升級(jí)而做的墊片。

看完上述內(nèi)容,你們對(duì)spark如何通過classloader實(shí)現(xiàn)對(duì)于hive metastore的兼容性有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI