您好,登錄后才能下訂單哦!
這篇文章給大家分享的是有關(guān)Spark結(jié)構(gòu)化流處理機(jī)制之容錯(cuò)機(jī)制的示例分析的內(nèi)容。小編覺(jué)得挺實(shí)用的,因此分享給大家做個(gè)參考,一起跟隨小編過(guò)來(lái)看看吧。
端到端的有且僅有一次保證,是結(jié)構(gòu)化流設(shè)計(jì)的關(guān)鍵目標(biāo)之一.
結(jié)構(gòu)化流設(shè)計(jì)了 Structured Streaming sources,sinks等等,來(lái)跟蹤確切的處理進(jìn)度,并讓其重啟或重運(yùn)行來(lái)處理任何故障
streaming source是類(lèi)似kafka的偏移量(offsets)來(lái)跟蹤流的讀取位置.執(zhí)行引擎使用檢查點(diǎn)(checkpoint)和預(yù)寫(xiě)日志(write ahead logs)來(lái)記錄每個(gè)執(zhí)行其的偏移范圍值
streaming sinks 是設(shè)計(jì)用來(lái)保證處理的冪等性
這樣,依靠可回放的數(shù)據(jù)源(streaming source)和處理冪等(streaming sinks),結(jié)構(gòu)流來(lái)做到任何故障下的端到端的有且僅有一次保證
val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() // Split the lines into words val words = lines.as[String].flatMap(_.split(" ")) // Generate running word count val wordCounts = words.groupBy("value").count()
其中,spark是SparkSession,lines是DataFrame,DataFrame就是Dataset[Row]。
DataSet
看看Dataset的觸發(fā)因子的代碼實(shí)現(xiàn),比如foreach操作:
def foreach(f: T => Unit): Unit = withNewRDDExecutionId { rdd.foreach(f) } private def withNewRDDExecutionId[U](body: => U): U = { SQLExecution.withNewExecutionId(sparkSession, rddQueryExecution) { rddQueryExecution.executedPlan.foreach { plan => plan.resetMetrics() } body } }
接著看:
def withNewExecutionId[T]( sparkSession: SparkSession, queryExecution: QueryExecution, name: Option[String] = None)(body: => T): T = { val sc = sparkSession.sparkContext val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY) val executionId = SQLExecution.nextExecutionId sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString) executionIdToQueryExecution.put(executionId, queryExecution) try { withSQLConfPropagated(sparkSession) { try { body } catch { } finally { } } } finally { executionIdToQueryExecution.remove(executionId) sc.setLocalProperty(EXECUTION_ID_KEY, oldExecutionId) } }
執(zhí)行的真正代碼就是 queryExecution: QueryExecution。
@transient private lazy val rddQueryExecution: QueryExecution = { val deserialized = CatalystSerde.deserialize[T](logicalPlan) sparkSession.sessionState.executePlan(deserialized) }
看到了看到了,是sessionState.executePlan執(zhí)行l(wèi)ogicalPlan而得到了QueryExecution
這里的sessionState.executePlan其實(shí)就是創(chuàng)建了一個(gè)QueryExecution對(duì)象。然后執(zhí)行QueryExecution的executedPlan方法得到SparkPlan這個(gè)物理計(jì)劃。怎么生成的呢?
lazy val sparkPlan: SparkPlan = tracker.measurePhase(QueryPlanningTracker.PLANNING) { SparkSession.setActiveSession(sparkSession) planner.plan(ReturnAnswer(optimizedPlan.clone())).next() }
通過(guò)planner.plan方法生成。
planner是SparkPlanner。在BaseSessionStateBuilder類(lèi)中定義。
protected def planner: SparkPlanner = { new SparkPlanner(session.sparkContext, conf, experimentalMethods) { override def extraPlanningStrategies: Seq[Strategy] = super.extraPlanningStrategies ++ customPlanningStrategies } }
SparkPlanner類(lèi)
SparkPlanner對(duì)LogicalPlan執(zhí)行各種策略,返回對(duì)應(yīng)的SparkPlan。比如對(duì)于流應(yīng)用來(lái)說(shuō),有這樣的策略:DataSourceV2Strategy。
典型的幾個(gè)邏輯計(jì)劃到物理計(jì)劃的映射關(guān)系如下:
StreamingDataSourceV2Relation-》ContinuousScanExec
StreamingDataSourceV2Relation-》MicroBatchScanExec
前一種對(duì)應(yīng)與Offset沒(méi)有endOffset的情況,后一種對(duì)應(yīng)于有endOffset的情況。前一種是沒(méi)有結(jié)束的連續(xù)流,后一種是有區(qū)間的微批處理流。
前一種的時(shí)延可以達(dá)到1ms,后一種的時(shí)延只能達(dá)到100ms。
【代碼】:
case r: StreamingDataSourceV2Relation if r.startOffset.isDefined && r.endOffset.isDefined => val microBatchStream = r.stream.asInstanceOf[MicroBatchStream] val scanExec = MicroBatchScanExec( r.output, r.scan, microBatchStream, r.startOffset.get, r.endOffset.get) val withProjection = if (scanExec.supportsColumnar) { scanExec } else { // Add a Project here to make sure we produce unsafe rows. ProjectExec(r.output, scanExec) } withProjection :: Nil case r: StreamingDataSourceV2Relation if r.startOffset.isDefined && r.endOffset.isEmpty => val continuousStream = r.stream.asInstanceOf[ContinuousStream] val scanExec = ContinuousScanExec(r.output, r.scan, continuousStream, r.startOffset.get) val withProjection = if (scanExec.supportsColumnar) { scanExec } else { // Add a Project here to make sure we produce unsafe rows. ProjectExec(r.output, scanExec) } withProjection :: Nil
感謝各位的閱讀!關(guān)于“Spark結(jié)構(gòu)化流處理機(jī)制之容錯(cuò)機(jī)制的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,讓大家可以學(xué)到更多知識(shí),如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到吧!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。