溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Structured中怎么利用Streaming實(shí)現(xiàn)超低延遲

發(fā)布時(shí)間:2021-08-10 11:22:57 來源:億速云 閱讀:135 作者:Leah 欄目:大數(shù)據(jù)

這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)碛嘘P(guān)Structured中怎么利用Streaming實(shí)現(xiàn)超低延遲,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

要在連續(xù)處理模式下運(yùn)行支持的查詢,您只需指定一個(gè)連續(xù)觸發(fā)器,并將所需的checkpoint間隔作為參數(shù)。 例如浪尖的demo如下:

object ContinuousProcessing {
 def main(args: Array[String]): Unit = {
   val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("yarn-client")
     .set("yarn.resourcemanager.hostname", "mt-mdh.local")
     .set("spark.executor.instances","2")
     .set("spark.default.parallelism","4")
     .set("spark.sql.shuffle.partitions","4")
     .setJars(List("/Users/meitu/Desktop/sparkjar/bigdata.jar"
       ,"/opt/jars/spark-streaming-kafka-0-10_2.11-2.3.1.jar"
       ,"/opt/jars/kafka-clients-0.10.2.2.jar"
       ,"/opt/jars/kafka_2.11-0.10.2.2.jar"
       ,"/opt/jars/spark-sql-kafka-0-10_2.11-2.0.2.jar"))


   val spark = SparkSession
     .builder
     .appName("StructuredKafkaWordCount")
     .config(sparkConf)
     .getOrCreate()

   spark
     .readStream
     .format("kafka")
     .option("kafka.bootstrap.servers", "mt-mdh.local:9093")
     .option("subscribe", "StructuredSource")
     .load()
     .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
     .writeStream
     .format("kafka")
     .option("kafka.bootstrap.servers", "mt-mdh.local:9093")
     .option("topic", "StructuredSink")
     .option("checkpointLocation","/sql/checkpoint")
     .trigger(Trigger.Continuous("1 second"))  // only change in query
     .start()
     .awaitTermination()
 }

}

checkpoint 間隔為1秒意味著連續(xù)處理引擎將每秒記錄查詢的進(jìn)度。 生成的checkpoint采用與微批處理引擎兼容的格式,因此可以使用任何觸發(fā)器重新啟動(dòng)任何查詢。 例如,假如查詢支持微批處理和連續(xù)處理,那么實(shí)際上也可以用連續(xù)處理觸發(fā)器去啟動(dòng)微批處理觸發(fā)器,反之亦然。 

請(qǐng)注意,無論何時(shí)切換到連續(xù)模式,都將獲得至少一次的容錯(cuò)保證。

支持的查詢

從Spark 2.3開始,連續(xù)處理模式僅支持以下類型的查詢。

  • Operations:在連續(xù)模式下僅支持dataset/dataframe的類似于map的操作,即支持projection(select,map,flatMap,mapPartitions等)和selection(where,filter等)。

  • 除了聚合函數(shù)(因?yàn)樯胁恢С志酆希琧urrent_timestamp()和current_date()(使用時(shí)間的確定性計(jì)算具有挑戰(zhàn)性)之外,支持所有SQL函數(shù)。

Sources 

  • Kafka Source:支持所有操作。

  • Rate source:適合測試。只有連續(xù)模式支持的選項(xiàng)是numPartitions和rowsPerSecond。

Sinks

  • Kafka sink:支持所有選項(xiàng)。

  • Memory sink:適合調(diào)試。

  • Console sink:適合調(diào)試。支持所有操作。請(qǐng)注意,控制臺(tái)將打印你在連續(xù)觸發(fā)器中指定的每個(gè)checkpoint間隔。

更詳細(xì)的關(guān)于sink和source信息,請(qǐng)參閱輸入源和輸出接收器部分的官網(wǎng)。雖然控制臺(tái)接收器非常適合測試,但是使用Kafka作為源和接收器可以最好地觀察到端到端的低延遲處理。

注意事項(xiàng)

  • 連續(xù)處理引擎啟動(dòng)多個(gè)長時(shí)間運(yùn)行的任務(wù),這些任務(wù)不斷從源中讀取數(shù)據(jù),處理數(shù)據(jù)并連續(xù)寫入接收器。 查詢所需的任務(wù)數(shù)取決于查詢可以并行從源讀取的分區(qū)數(shù)。 因此,在開始連續(xù)處理查詢之前,必須確保群集中有足夠的核心并行執(zhí)行所有任務(wù)。 例如,如果您正在讀取具有10個(gè)分區(qū)的Kafka主題,則群集必須至少具有10個(gè)核心才能使查詢正常執(zhí)行。

  • 停止連續(xù)處理流可能會(huì)產(chǎn)生虛假的任務(wù)終止警告。 這些可以安全地忽略。

  • 目前沒有自動(dòng)重試失敗的任務(wù)。 任何失敗都將導(dǎo)致查詢停止,并且需要從檢查點(diǎn)手動(dòng)重新啟動(dòng)。

上述就是小編為大家分享的Structured中怎么利用Streaming實(shí)現(xiàn)超低延遲了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI