您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關(guān)spark如何通過classloader實(shí)現(xiàn)對(duì)于hive metastore的兼容性,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
我們只是簡(jiǎn)單的提了一下按照官網(wǎng)的配置就能夠兼容不同的hive元數(shù)據(jù),這次我們從代碼級(jí)別來分析一下spark是怎么做到實(shí)現(xiàn)不同版本的元數(shù)據(jù)的訪問。 注意:正如官網(wǎng)所說的,該部分只是用于hive元數(shù)據(jù)的訪問,spark sql內(nèi)部編譯的其他版本的hive用于來進(jìn)行其他執(zhí)行,如序列化和反序列化,UDF和UDAF等等
這里提到這一點(diǎn)是為了釋疑一下在源碼中看到一些低版本不存在的類,因?yàn)檫@部分spark sql內(nèi)置了其他版本的hive用于除了hive元數(shù)據(jù)之外的其他交互,如:hive/hiveShim.scala中的SerializationUtilities 這個(gè)類在hive 1.2.1是不存在的,但是hive高版本2.3.7是存在的
我們以spark 3.1.1進(jìn)行分析
我們知道spark跟外部元數(shù)據(jù)的交互是類ExternalCatalog來進(jìn)行響應(yīng)的,對(duì)應(yīng)到hive元數(shù)據(jù)就是HiveExternalCatalog,轉(zhuǎn)到client代碼:
/** * A Hive client used to interact with the metastore. */ lazy val client: HiveClient = { HiveUtils.newClientForMetadata(conf, hadoopConf) }
該client在就是進(jìn)行元數(shù)據(jù)交互的最終執(zhí)行者,且這里直接調(diào)用了HiveUtils的newClientForMetadata方法,直接跳到最終調(diào)用的方法:
protected[hive] def newClientForMetadata( conf: SparkConf, hadoopConf: Configuration, configurations: Map[String, String]): HiveClient = { val sqlConf = new SQLConf sqlConf.setConf(SQLContext.getSQLProperties(conf)) val hiveMetastoreVersion = HiveUtils.hiveMetastoreVersion(sqlConf) val hiveMetastoreJars = HiveUtils.hiveMetastoreJars(sqlConf) val hiveMetastoreSharedPrefixes = HiveUtils.hiveMetastoreSharedPrefixes(sqlConf) val hiveMetastoreBarrierPrefixes = HiveUtils.hiveMetastoreBarrierPrefixes(sqlConf) val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion) ... } else if (hiveMetastoreJars == "path") { // Convert to files and expand any directories. val jars = HiveUtils.hiveMetastoreJarsPath(sqlConf) .flatMap { case path if path.contains("\\") && Utils.isWindows => addLocalHiveJars(new File(path)) case path => DataSource.checkAndGlobPathIfNecessary( pathStrings = Seq(path), hadoopConf = hadoopConf, checkEmptyGlobPath = true, checkFilesExist = false, enableGlobbing = true ).map(_.toUri.toURL) } logInfo( s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " + s"using path: ${jars.mkString(";")}") new IsolatedClientLoader( version = metaVersion, sparkConf = conf, hadoopConf = hadoopConf, execJars = jars.toSeq, config = configurations, isolationOn = true, barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) ...
val hiveMetastoreVersion = HiveUtils.hiveMetastoreVersion(sqlConf) 這里直接獲取配置的元數(shù)據(jù)的版本,也就是spark.sql.hive.metastore.version配置項(xiàng)
val hiveMetastoreJars = HiveUtils.hiveMetastoreJars(sqlConf) 這里配置hive元數(shù)據(jù)jar包的獲取方式,默認(rèn)是builtin內(nèi)置,推薦使用path方式,因?yàn)橐话憔€上環(huán)境是無網(wǎng)絡(luò)環(huán)境 val hiveMetastoreSharedPrefixes = HiveUtils.hiveMetastoreSharedPrefixes(sqlConf) val hiveMetastoreBarrierPrefixes = HiveUtils.hiveMetastoreBarrierPrefixes(sqlConf) 這兩個(gè)跟classloader有關(guān),也就是說什么類用哪種classloader加載,用來隔離class
val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion) 映射成spark內(nèi)部的hive版本表示,用于進(jìn)行元數(shù)據(jù)class的精細(xì)化操作
這里會(huì)根據(jù)配置的獲取元數(shù)據(jù)jar包的方式而采用不同的初始化IsolatedClientLoader的方式。最終會(huì)調(diào)用isolatedLoader的createClient方法:
/** The isolated client interface to Hive. */ private[hive] def createClient(): HiveClient = synchronized { val warehouseDir = Option(hadoopConf.get(ConfVars.METASTOREWAREHOUSE.varname)) if (!isolationOn) { return new HiveClientImpl(version, warehouseDir, sparkConf, hadoopConf, config, baseClassLoader, this) } // Pre-reflective instantiation setup. logDebug("Initializing the logger to avoid disaster...") val origLoader = Thread.currentThread().getContextClassLoader Thread.currentThread.setContextClassLoader(classLoader) try { classLoader .loadClass(classOf[HiveClientImpl].getName) .getConstructors.head .newInstance(version, warehouseDir, sparkConf, hadoopConf, config, classLoader, this) .asInstanceOf[HiveClient] } catch { case e: InvocationTargetException => if (e.getCause().isInstanceOf[NoClassDefFoundError]) { val cnf = e.getCause().asInstanceOf[NoClassDefFoundError] throw new ClassNotFoundException( s"$cnf when creating Hive client using classpath: ${execJars.mkString(", ")}\n" + "Please make sure that jars for your version of hive and hadoop are included in the " + s"paths passed to ${HiveUtils.HIVE_METASTORE_JARS.key}.", e) } else { throw e } } finally { Thread.currentThread.setContextClassLoader(origLoader) } }
如果未開啟隔離性,則直接返回HiveClientImpl,該client所有終端用戶共享。如果開啟了(默認(rèn)值),則設(shè)置當(dāng)前的contextClassLoader為classLoader: 該classLoader是自定義的:
... new URLClassLoader(allJars, rootClassLoader) { override def loadClass(name: String, resolve: Boolean): Class[_] = { val loaded = findLoadedClass(name) if (loaded == null) doLoadClass(name, resolve) else loaded } def doLoadClass(name: String, resolve: Boolean): Class[_] = { val classFileName = name.replaceAll("\\.", "/") + ".class" if (isBarrierClass(name)) { // For barrier classes, we construct a new copy of the class. val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName)) logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}") defineClass(name, bytes, 0, bytes.length) } else if (!isSharedClass(name)) { logDebug(s"hive class: $name - ${getResource(classToPath(name))}") super.loadClass(name, resolve) } else { // For shared classes, we delegate to baseClassLoader, but fall back in case the // class is not found. logDebug(s"shared class: $name") try { baseClassLoader.loadClass(name) } catch { case _: ClassNotFoundException => super.loadClass(name, resolve) } } } } } ...
直接重點(diǎn),對(duì)于開啟了隔離(默認(rèn)值),則直接返回該classLoader,關(guān)于classloader的知識(shí),可以參考這里,要是還有真不明白的,可以參考classLoader類的源碼。
這里我們重點(diǎn)觀察一下該自定義classloader的loadClass方法,該方法是實(shí)現(xiàn)類隔離的關(guān)鍵,
如果是BarrierClass,比如HiveClientImpl/Shim/ShimLoader,或者包含了自定義的前綴.則從當(dāng)前的ContextClassLoader中復(fù)制一份class類,且生成對(duì)應(yīng)的class
如果不是共享類,也不是BarrierClass,則使用URLClassLoader的loadClass方法加載class
否則不是barrierClass,是共享類,則用當(dāng)前contextclassloader來加載當(dāng)前class
通過該classLoader加載的方式,對(duì)于跟hive元數(shù)據(jù)相關(guān)的class就是通過該自定義的classLoader加載的(注意子classloader能夠看見父加載器加載的類)
之后通過該classloader加載對(duì)應(yīng)的HiveClientImpl類,進(jìn)行反射實(shí)例化HiveClientImpl對(duì)象,從而實(shí)現(xiàn)了在運(yùn)行的時(shí)候,根據(jù)傳入的元數(shù)據(jù)jar包進(jìn)行動(dòng)態(tài)加載.
重置當(dāng)前線程的contextClassLoader。
重點(diǎn):hive元數(shù)據(jù)的jar包的動(dòng)態(tài)記載是通過自定義classloader實(shí)現(xiàn)的
至于真正的和hive元數(shù)據(jù)進(jìn)行交互就是HiveClientImpl,該類引入了shim的機(jī)制,也就是說,通過該shim機(jī)制,對(duì)于hive元數(shù)據(jù)版本的升級(jí)都是通過該shim來進(jìn)行控制,比如增加方法,就會(huì)在shim中增加對(duì)應(yīng)的方法,從而達(dá)到hive元數(shù)據(jù)的向后兼容性。 其實(shí)從shim這個(gè)英文單詞中我們也能看出一二,shim(墊片)是為了切合版本的升級(jí)而做的墊片。
看完上述內(nèi)容,你們對(duì)spark如何通過classloader實(shí)現(xiàn)對(duì)于hive metastore的兼容性有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。