您好,登錄后才能下訂單哦!
通過添加新代碼的方式來對(duì)Spark進(jìn)行增強(qiáng)的示例分析,很多新手對(duì)此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。
前言
這兩年做 streamingpro 時(shí),不可避免的需要對(duì)Spark做大量的增強(qiáng)。就如同我之前吐槽的,Spark大量使用了new進(jìn)行對(duì)象的創(chuàng)建,導(dǎo)致里面的實(shí)現(xiàn)基本沒有辦法進(jìn)行替換。
比如SparkEnv里有個(gè)屬性叫closureSerializer,是專門做任務(wù)的序列化反序列化的,當(dāng)然也負(fù)責(zé)對(duì)函數(shù)閉包的序列化反序列化。我們看看內(nèi)部是怎么實(shí)現(xiàn)的:
val serializer = instantiateClassFromConf[Serializer]( "spark.serializer", "org.apache.spark.serializer.JavaSerializer") logDebug(s"Using serializer: ${serializer.getClass}") val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey) val closureSerializer = new JavaSerializer(conf) val envInstance = new SparkEnv( ..... closureSerializer, ....
這里直接new了一個(gè)JavaSerializer,并不能做配置。如果不改源碼,你沒有任何辦法可以替換掉掉這個(gè)實(shí)現(xiàn)。同理,如果我想替換掉Executor的實(shí)現(xiàn),基本也是不可能的。
今年有兩個(gè)大地方涉及到了對(duì)Spark的【魔改】,也就是不通過改源碼,使用原有發(fā)型包,通過添加新代碼的方式來對(duì)Spark進(jìn)行增強(qiáng)。
二層RPC的支持
我們知道,在Spark里,我們只能通過Task才能touch到Executor。現(xiàn)有的API你是沒辦法直接操作到所有或者指定部分的Executor。比如,我希望所有Executor都加載一個(gè)資源文件,現(xiàn)在是沒辦法做到的。為了能夠?qū)xecutor進(jìn)行直接的操作,那就需要建立一個(gè)新的通訊層。那具體怎么做呢?
首先,在Driver端建立一個(gè)Backend,這個(gè)比較簡單,
class PSDriverBackend(sc: SparkContext) extends Logging { val conf = sc.conf var psDriverRpcEndpointRef: RpcEndpointRef = null def createRpcEnv = { val isDriver = sc.env.executorId == SparkContext.DRIVER_IDENTIFIER val bindAddress = sc.conf.get(DRIVER_BIND_ADDRESS) val advertiseAddress = sc.conf.get(DRIVER_HOST_ADDRESS) var port = sc.conf.getOption("spark.ps.driver.port").getOrElse("7777").toInt val ioEncryptionKey = if (sc.conf.get(IO_ENCRYPTION_ENABLED)) { Some(CryptoStreamUtils.createKey(sc.conf)) } else { None } logInfo(s"setup ps driver rpc env: ${bindAddress}:${port} clientMode=${!isDriver}") var createSucess = false var count = 0 val env = new AtomicReference[RpcEnv]() while (!createSucess && count < 10) { try { env.set(RpcEnv.create("PSDriverEndpoint", bindAddress, port, sc.conf, sc.env.securityManager, clientMode = !isDriver)) createSucess = true } catch { case e: Exception => logInfo("fail to create rpcenv", e) count += 1 port += 1 } } if (env.get() == null) { logError(s"fail to create rpcenv finally with attemp ${count} ") } env.get() } def start() = { val env = createRpcEnv val pSDriverBackend = new PSDriverEndpoint(sc, env) psDriverRpcEndpointRef = env.setupEndpoint("ps-driver-endpoint", pSDriverBackend) } }
這樣,你可以理解為在Driver端啟動(dòng)了一個(gè)PRC Server。要運(yùn)行這段代碼也非常簡單,直接在主程序里運(yùn)行即可:
// parameter server should be enabled by default if (!params.containsKey("streaming.ps.enable") || params.get("streaming.ps.enable").toString.toBoolean) { logger.info("ps enabled...") if (ss.sparkContext.isLocal) { localSchedulerBackend = new LocalPSSchedulerBackend(ss.sparkContext) localSchedulerBackend.start() } else { logger.info("start PSDriverBackend") psDriverBackend = new PSDriverBackend(ss.sparkContext) psDriverBackend.start() } }
這里我們需要實(shí)現(xiàn)local模式和cluster模式兩種。
Driver啟動(dòng)了一個(gè)PRC Server,那么Executor端如何啟動(dòng)呢?Executor端似乎沒有任何一個(gè)地方可以讓我啟動(dòng)一個(gè)PRC Server? 其實(shí)有的,只是非常trick,我們知道Spark是允許自定義Metrics的,并且會(huì)調(diào)用用戶實(shí)現(xiàn)的metric特定的方法,我們只要開發(fā)一個(gè)metric Sink,在里面啟動(dòng)RPC Server,騙過Spark即可。具體時(shí)下如下:
class PSServiceSink(val property: Properties, val registry: MetricRegistry, securityMgr: SecurityManager) extends Sink with Logging { def env = SparkEnv.get var psDriverUrl: String = null var psExecutorId: String = null var hostname: String = null var cores: Int = 0 var appId: String = null val psDriverPort = 7777 var psDriverHost: String = null var workerUrl: Option[String] = None val userClassPath = new mutable.ListBuffer[URL]() def parseArgs = { //val runtimeMxBean = ManagementFactory.getRuntimeMXBean(); //var argv = runtimeMxBean.getInputArguments.toList var argv = System.getProperty("sun.java.command").split("\\s+").toList ..... psDriverHost = host psDriverUrl = "spark://ps-driver-endpoint@" + psDriverHost + ":" + psDriverPort } parseArgs def createRpcEnv = { val isDriver = env.executorId == SparkContext.DRIVER_IDENTIFIER val bindAddress = hostname val advertiseAddress = "" val port = env.conf.getOption("spark.ps.executor.port").getOrElse("0").toInt val ioEncryptionKey = if (env.conf.get(IO_ENCRYPTION_ENABLED)) { Some(CryptoStreamUtils.createKey(env.conf)) } else { None } //logInfo(s"setup ps driver rpc env: ${bindAddress}:${port} clientMode=${!isDriver}") RpcEnv.create("PSExecutorBackend", bindAddress, port, env.conf, env.securityManager, clientMode = !isDriver) } override def start(): Unit = { new Thread(new Runnable { override def run(): Unit = { logInfo(s"delay PSExecutorBackend 3s") Thread.sleep(3000) logInfo(s"start PSExecutor;env:${env}") if (env.executorId != SparkContext.DRIVER_IDENTIFIER) { val rpcEnv = createRpcEnv val pSExecutorBackend = new PSExecutorBackend(env, rpcEnv, psDriverUrl, psExecutorId, hostname, cores) PSExecutorBackend.executorBackend = Some(pSExecutorBackend) rpcEnv.setupEndpoint("ps-executor-endpoint", pSExecutorBackend) } } }).start() } ... }
到這里,我們就能成功啟動(dòng)RPC Server,并且連接上Driver中的PRC Server。現(xiàn)在,你就可以在不修改Spark 源碼的情況下,盡情的寫通訊相關(guān)的代碼了,讓你可以更好的控制Executor。
比如在PSExecutorBackend 實(shí)現(xiàn)如下代碼:
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case Message.TensorFlowModelClean(modelPath) => { logInfo("clean tensorflow model") TFModelLoader.close(modelPath) context.reply(true) } case Message.CopyModelToLocal(modelPath, destPath) => { logInfo(s"copying model: ${modelPath} -> ${destPath}") HDFSOperator.copyToLocalFile(destPath, modelPath, true) context.reply(true) } }
接著你就可以在Spark里寫如下的代碼調(diào)用了:
val psDriverBackend = runtime.asInstanceOf[SparkRuntime].psDriverBackend psDriverBackend.psDriverRpcEndpointRef.send(Message.TensorFlowModelClean("/tmp/ok"))
是不是很酷。
修改閉包的序列化方式
Spark的任務(wù)調(diào)度開銷非常大。對(duì)于一個(gè)復(fù)雜的任務(wù),業(yè)務(wù)邏輯代碼執(zhí)行時(shí)間大約是3-7ms,但是整個(gè)spark運(yùn)行的開銷大概是1.3s左右。
經(jīng)過詳細(xì)dig發(fā)現(xiàn),sparkContext里RDD轉(zhuǎn)化時(shí),會(huì)對(duì)函數(shù)進(jìn)行clean操作,clean操作的過程中,默認(rèn)會(huì)檢查是不是能序列化(就是序列化一遍,沒拋出異常就算可以序列化)。而序列化成本相當(dāng)高(默認(rèn)使用的JavaSerializer并且對(duì)于函數(shù)和任務(wù)序列化,是不可更改的),單次序列化耗時(shí)就達(dá)到200ms左右,在local模式下對(duì)其進(jìn)行優(yōu)化,可以減少600ms左右的請(qǐng)求時(shí)間。
當(dāng)然,需要申明的是,這個(gè)是針對(duì)local模式進(jìn)行修改的。那具體怎么做的呢?
我們先看看Spark是怎么調(diào)用序列化函數(shù)的,首先在SparkContext里,clean函數(shù)是這樣的:
private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { ClosureCleaner.clean(f, checkSerializable) f }
調(diào)用的是ClosureCleaner.clean方法,該方法里是這么調(diào)用學(xué)序列化的:
try { if (SparkEnv.get != null) { SparkEnv.get.closureSerializer.newInstance().serialize(func) } } catch { case ex: Exception => throw new SparkException("Task not serializable", ex) }
SparkEnv是在SparkContext初始化的時(shí)候創(chuàng)建的,該對(duì)象里面包含了closureSerializer,該對(duì)象通過new JavaSerializer創(chuàng)建。既然序列化太慢,又因?yàn)槲覀兤鋵?shí)是在Local模式下,本身是可以不需要序列化的,所以我們這里想辦法把closureSerializer的實(shí)現(xiàn)替換掉。正如我們前面吐槽,因?yàn)樵赟park代碼里寫死了,沒有暴露任何自定義的可能性,所以我們又要魔改一下了。
首先,我們新建一個(gè)SparkEnv的子類:
class WowSparkEnv( ....) extends SparkEnv(
接著實(shí)現(xiàn)一個(gè)自定義的Serializer:
class LocalNonOpSerializerInstance(javaD: SerializerInstance) extends SerializerInstance { private def isClosure(cls: Class[_]): Boolean = { cls.getName.contains("$anonfun$") } override def serialize[T: ClassTag](t: T): ByteBuffer = { if (isClosure(t.getClass)) { val uuid = UUID.randomUUID().toString LocalNonOpSerializerInstance.maps.put(uuid, t.asInstanceOf[AnyRef]) ByteBuffer.wrap(uuid.getBytes()) } else { javaD.serialize(t) } } override def deserialize[T: ClassTag](bytes: ByteBuffer): T = { val s = StandardCharsets.UTF_8.decode(bytes).toString() if (LocalNonOpSerializerInstance.maps.containsKey(s)) { LocalNonOpSerializerInstance.maps.remove(s).asInstanceOf[T] } else { bytes.flip() javaD.deserialize(bytes) } } override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = { val s = StandardCharsets.UTF_8.decode(bytes).toString() if (LocalNonOpSerializerInstance.maps.containsKey(s)) { LocalNonOpSerializerInstance.maps.remove(s).asInstanceOf[T] } else { bytes.flip() javaD.deserialize(bytes, loader) } } override def serializeStream(s: OutputStream): SerializationStream = { javaD.serializeStream(s) } override def deserializeStream(s: InputStream): DeserializationStream = { javaD.deserializeStream(s) }
接著我們需要再封裝一個(gè)LocalNonOpSerializer,
class LocalNonOpSerializer(conf: SparkConf) extends Serializer with Externalizable { val javaS = new JavaSerializer(conf) override def newInstance(): SerializerInstance = { new LocalNonOpSerializerInstance(javaS.newInstance()) } override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { javaS.writeExternal(out) } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { javaS.readExternal(in) } }
現(xiàn)在,萬事俱備,只欠東風(fēng)了,我們?cè)趺床拍馨堰@些代碼讓Spark運(yùn)行起來。具體做法非常魔幻,實(shí)現(xiàn)一個(gè)enhance類:
def enhanceSparkEnvForAPIService(session: SparkSession) = { val env = SparkEnv.get //創(chuàng)建一個(gè)新的WowSparkEnv對(duì)象,然后將里面的Serializer替換成我們自己的LocalNonOpSerializer val wowEnv = new WowSparkEnv( ..... new LocalNonOpSerializer(env.conf): Serializer, ....) // 將SparkEnv object里的實(shí)例替換成我們的 //WowSparkEnv SparkEnv.set(wowEnv) //但是很多地方在SparkContext啟動(dòng)后都已經(jīng)在使用之前就已經(jīng)生成的SparkEnv,我們需要做些調(diào)整 //我們先把之前已經(jīng)啟動(dòng)的LocalSchedulerBackend里的scheduer停掉 val localScheduler = session.sparkContext.schedulerBackend.asInstanceOf[LocalSchedulerBackend] val scheduler = ReflectHelper.field(localScheduler, "scheduler") val totalCores = localScheduler.totalCores localScheduler.stop() //創(chuàng)建一個(gè)新的LocalSchedulerBackend val wowLocalSchedulerBackend = new WowLocalSchedulerBackend(session.sparkContext.getConf, scheduler.asInstanceOf[TaskSchedulerImpl], totalCores) wowLocalSchedulerBackend.start() //把SparkContext里的_schedulerBackend替換成我們的實(shí)現(xiàn) ReflectHelper.field(session.sparkContext, "_schedulerBackend", wowLocalSchedulerBackend) }
看完上述內(nèi)容是否對(duì)您有幫助呢?如果還想對(duì)相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝您對(duì)億速云的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。