您好,登錄后才能下訂單哦!
這篇文章主要講解了“Spark Streaming編程方法是什么”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“Spark Streaming編程方法是什么”吧!
上一篇文章中介紹了常見的無(wú)狀態(tài)的轉(zhuǎn)換操作,比如在WordCount的例子中,輸出的結(jié)果只與當(dāng)前batch interval的數(shù)據(jù)有關(guān),不會(huì)依賴于上一個(gè)batch interval的計(jì)算結(jié)果。spark Streaming也提供了有狀態(tài)的操作:updateStateByKey
,該算子會(huì)維護(hù)一個(gè)狀態(tài),同時(shí)進(jìn)行信息更新 。該操作會(huì)讀取上一個(gè)batch interval的計(jì)算結(jié)果,然后將其結(jié)果作用到當(dāng)前的batch interval數(shù)據(jù)統(tǒng)計(jì)中。其源碼如下:
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S]
): DStream[(K, S)] = ssc.withScope {
updateStateByKey(updateFunc, defaultPartitioner())
}
該算子只能在key–value對(duì)的DStream上使用,需要接收一個(gè)狀態(tài)更新函數(shù) updateFunc作為參數(shù)。使用案例如下:
object StateWordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName(StateWordCount.getClass.getSimpleName)
val ssc = new StreamingContext(conf, Seconds(5))
// 必須開啟checkpoint,否則會(huì)報(bào)錯(cuò)
ssc.checkpoint("file:///e:/checkpoint")
val lines = ssc.socketTextStream("localhost", 9999)
// 狀態(tài)更新函數(shù)
def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = {
var oldvalue = stateValue.getOrElse(0) // 獲取狀態(tài)值
// 遍歷當(dāng)前數(shù)據(jù),并更新狀態(tài)
for (newValue <- newValues) {
oldvalue += newValue
}
// 返回最新的狀態(tài)
Option(oldvalue)
}
val count = lines.flatMap(_.split(" "))
.map(w => (w, 1))
.updateStateByKey(updateFunc)
count.print()
ssc.start()
ssc.awaitTermination()
}
}
尖叫提示:上面的代碼必須要開啟checkpoint,否則會(huì)報(bào)錯(cuò):
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint()
運(yùn)行上面的代碼會(huì)發(fā)現(xiàn)一個(gè)現(xiàn)象:即便沒有數(shù)據(jù)源輸入,Spark也會(huì)為新的batch interval更新狀態(tài),即如果沒有數(shù)據(jù)源輸入,則會(huì)不斷地輸出之前的計(jì)算狀態(tài)結(jié)果。
updateStateByKey可以在指定的批次間隔內(nèi)返回之前的全部歷史數(shù)據(jù),包括新增的,改變的和沒有改變的。由于updateStateByKey在使用的時(shí)候一定要做checkpoint,當(dāng)數(shù)據(jù)量過(guò)大的時(shí)候,checkpoint會(huì)占據(jù)龐大的數(shù)據(jù)量,會(huì)影響性能,效率不高。
mapwithState是Spark提供的另外一個(gè)有狀態(tài)的算子,該操作克服了updateStateByKey的缺點(diǎn),從Spark 1.5開始引入。源碼如下:
def mapWithState[StateType: ClassTag, MappedType: ClassTag](
spec: StateSpec[K, V, StateType, MappedType]
): MapWithStateDStream[K, V, StateType, MappedType] = {
new MapWithStateDStreamImpl[K, V, StateType, MappedType](
self,
spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]]
)
}
mapWithState只返回發(fā)生變化的key的值,對(duì)于沒有發(fā)生變化的Key,則不返回。這樣做可以只關(guān)心那些已經(jīng)發(fā)生的變化的key,對(duì)于沒有數(shù)據(jù)輸入,則不會(huì)返回那些沒有變化的key 的數(shù)據(jù)。這樣的話,即使數(shù)據(jù)量很大,checkpint也不會(huì)updateBykey那樣,占用太多的存儲(chǔ),效率比較高(生產(chǎn)環(huán)境中建議使用)。
object StatefulNetworkWordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.setAppName("StatefulNetworkWordCount")
.setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("file:///e:/checkpoint")
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))
/**
* word:當(dāng)前key的值
* one:當(dāng)前key對(duì)應(yīng)的value值
* state:狀態(tài)值
*/
val mappingFunc = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
println(s">>> batchTime = $batchTime")
println(s">>> word = $word")
println(s">>> one = $one")
println(s">>> state = $state")
val output = (word, sum)
state.update(sum) //更新當(dāng)前key的狀態(tài)值
Some(output) //返回結(jié)果
}
// 通過(guò)StateSpec.function構(gòu)建StateSpec
val spec = StateSpec.function(mappingFunc)
val stateDstream = wordDstream.mapWithState(spec)
stateDstream.print()
ssc.start()
ssc.awaitTermination()
}
}
Spark Streaming提供了兩種類型的窗口操作,分別是滾動(dòng)窗口和滑動(dòng)窗口。具體分析如下:
滾動(dòng)窗口的示意圖如下:滾動(dòng)窗口只需要傳入一個(gè)固定的時(shí)間間隔,滾動(dòng)窗口是不存在重疊的。
源碼如下:
/**
* @param windowDuration:窗口的長(zhǎng)度; 必須是batch interval的整數(shù)倍.
*/
def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration
滑動(dòng)窗口的示意圖如下:滑動(dòng)窗口只需要傳入兩個(gè)參數(shù),一個(gè)為窗口的長(zhǎng)度,一個(gè)是滑動(dòng)時(shí)間間隔??梢钥闯觯夯瑒?dòng)窗口是存在重疊的。
源碼如下:
/**
* @param windowDuration 窗口長(zhǎng)度;必須是batching interval的整數(shù)倍
*
* @param slideDuration 滑動(dòng)間隔;必須是batching interval的整數(shù)倍
*/
def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = ssc.withScope {
new WindowedDStream(this, windowDuration, slideDuration)
}
window(windowLength, slideInterval)
解釋
基于源DStream產(chǎn)生的窗口化的批數(shù)據(jù),計(jì)算得到一個(gè)新的Dstream
源碼
def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)
def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = ssc.withScope {
new WindowedDStream(this, windowDuration, slideDuration)
}
countByWindow(windowLength, slideInterval)
返回一個(gè)滑動(dòng)窗口的元素個(gè)數(shù)
源碼
/**
* @param windowDuration window長(zhǎng)度,必須是batch interval的倍數(shù)
* @param slideDuration 滑動(dòng)的時(shí)間間隔,必須是batch interval的倍數(shù)
* 底層調(diào)用的是reduceByWindow
*/
def countByWindow(
windowDuration: Duration,
slideDuration: Duration): DStream[Long] = ssc.withScope {
this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
}
reduceByWindow(func, windowLength, slideInterval)
返回一個(gè)單元素流。利用函數(shù)func聚集滑動(dòng)時(shí)間間隔的流的元素創(chuàng)建這個(gè)單元素流。函數(shù)func必須滿足結(jié)合律,從而可以支持并行計(jì)算
源碼
def reduceByWindow(
reduceFunc: (T, T) => T,
windowDuration: Duration,
slideDuration: Duration
): DStream[T] = ssc.withScope {
this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)
}
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
應(yīng)用到一個(gè)(K,V)鍵值對(duì)組成的DStream上時(shí),會(huì)返回一個(gè)由(K,V)鍵值對(duì)組成的新的DStream。每一個(gè)key的值均由給定的reduce函數(shù)(func函數(shù))進(jìn)行聚合計(jì)算。注意:在默認(rèn)情況下,這個(gè)算子利用了Spark默認(rèn)的并發(fā)任務(wù)數(shù)去分組。可以通過(guò)numTasks參數(shù)的設(shè)置來(lái)指定不同的任務(wù)數(shù)
源碼
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration
): DStream[(K, V)] = ssc.withScope {
reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner())
}
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
更加高效的reduceByKeyAndWindow,每個(gè)窗口的reduce值,是基于先前窗口的reduce值進(jìn)行增量計(jì)算得到的;它會(huì)對(duì)進(jìn)入滑動(dòng)窗口的新數(shù)據(jù)進(jìn)行reduce操作,并對(duì)離開窗口的老數(shù)據(jù)進(jìn)行
逆向reduce
操作。但是,只能用于可逆reduce函數(shù)
,即那些reduce函數(shù)都有一個(gè)對(duì)應(yīng)的逆向reduce函數(shù)
(以InvFunc參數(shù)傳入)注意:必須開啟 checkpointing
源碼
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration,
partitioner: Partitioner,
filterFunc: ((K, V)) => Boolean
): DStream[(K, V)] = ssc.withScope {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)
val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else None
new ReducedWindowedDStream[K, V](
self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc,
windowDuration, slideDuration, partitioner
)
}
countByValueAndWindow(windowLength, slideInterval, [numTasks])
解釋
當(dāng)應(yīng)用到一個(gè)(K,V)鍵值對(duì)組成的DStream上,返回一個(gè)由(K,V)鍵值對(duì)組成的新的DStream。每個(gè)key的對(duì)應(yīng)的value值都是它們?cè)诨瑒?dòng)窗口中出現(xiàn)的頻率
源碼
def countByValueAndWindow(
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int = ssc.sc.defaultParallelism)
(implicit ord: Ordering[T] = null)
: DStream[(T, Long)] = ssc.withScope {
this.map((_, 1L)).reduceByKeyAndWindow(
(x: Long, y: Long) => x + y,
(x: Long, y: Long) => x - y,
windowDuration,
slideDuration,
numPartitions,
(x: (T, Long)) => x._2 != 0L
)
}
val lines = ssc.socketTextStream("localhost", 9999)
val count = lines.flatMap(_.split(" "))
.map(w => (w, 1))
.reduceByKeyAndWindow((w1: Int, w2: Int) => w1 + w2, Seconds(30), Seconds(10))
.print()
//滾動(dòng)窗口
/* lines.window(Seconds(20))
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.print()*/
持久化是提升Spark應(yīng)用性能的一種方式,在第二篇|Spark core編程指南一文中講解了RDD持久化的使用方式。其實(shí),DStream也是支持持久化的,同樣是使用persist()與cache()方法,持久化通常在有狀態(tài)的算子中使用,比如窗口操作,默認(rèn)情況下,雖然沒有顯性地調(diào)用持久化方法,但是底層已經(jīng)幫用戶做了持久化操作,通過(guò)下面的源碼可以看出。
private[streaming]
class WindowedDStream[T: ClassTag](
parent: DStream[T],
_windowDuration: Duration,
_slideDuration: Duration)
extends DStream[T](parent.ssc) {
// 省略代碼...
// Persist parent level by default, as those RDDs are going to be obviously reused.
parent.persist(StorageLevel.MEMORY_ONLY_SER)
}
注意:與RDD的持久化不同,DStream的默認(rèn)持久性級(jí)別將數(shù)據(jù)序列化在內(nèi)存中,通過(guò)下面的源碼可以看出:
/** 給定一個(gè)持計(jì)劃級(jí)別 */
def persist(level: StorageLevel): DStream[T] = {
if (this.isInitialized) {
throw new UnsupportedOperationException(
"Cannot change storage level of a DStream after streaming context has started")
}
this.storageLevel = level
this
}
/** 默認(rèn)的持久化級(jí)別為(MEMORY_ONLY_SER) */
def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER)
def cache(): DStream[T] = persist()
從上面的源碼可以看出persist()與cache()的主要區(qū)別是:
流應(yīng)用程序通常是24/7運(yùn)行的,因此必須對(duì)與應(yīng)用程序邏輯無(wú)關(guān)的故障(例如系統(tǒng)故障,JVM崩潰等)具有彈性的容錯(cuò)能力。為此,Spark Streaming需要將足夠的信息checkpoint
到容錯(cuò)存儲(chǔ)系統(tǒng)(比如HDFS),以便可以從故障中恢復(fù)。檢查點(diǎn)包括兩種類型:
元數(shù)據(jù)檢查點(diǎn)
元數(shù)據(jù)檢查點(diǎn)可以保證從Driver程序失敗中恢復(fù)。即如果運(yùn)行drive的節(jié)點(diǎn)失敗時(shí),可以查看最近的checkpoin數(shù)據(jù)獲取最新的狀態(tài)。典型的應(yīng)用程序元數(shù)據(jù)包括:
數(shù)據(jù)檢查點(diǎn)
將生成的RDD保存到可靠的存儲(chǔ)中。在某些有狀態(tài)轉(zhuǎn)換中,需要合并多個(gè)批次中的數(shù)據(jù),所以需要開啟檢查點(diǎn)。在此類轉(zhuǎn)換中,生成的RDD依賴于先前批次的RDD,這導(dǎo)致依賴鏈的長(zhǎng)度隨時(shí)間不斷增加。為了避免恢復(fù)時(shí)間無(wú)限制的增加(與依賴鏈成比例),有狀態(tài)轉(zhuǎn)換的中間RDD定期 checkpoint到可靠的存儲(chǔ)(例如HDFS),以切斷依賴鏈,功能類似于持久化,只需要從當(dāng)前的狀態(tài)恢復(fù),而不需要重新計(jì)算整個(gè)lineage。
總而言之,從Driver程序故障中恢復(fù)時(shí),主要需要元數(shù)據(jù)檢查點(diǎn)。而如果使用有狀態(tài)轉(zhuǎn)換,則需要數(shù)據(jù)或RDD檢查點(diǎn)。
必須為具有以下類型的應(yīng)用程序啟用檢查點(diǎn):
使用了有狀態(tài)轉(zhuǎn)換轉(zhuǎn)換操作
如果在應(yīng)用程序中使用updateStateByKey
或reduceByKeyAndWindow
,則必須提供檢查點(diǎn)目錄以允許定期進(jìn)行RDD檢查點(diǎn)。
從運(yùn)行應(yīng)用程序的Driver程序故障中恢復(fù)
元數(shù)據(jù)檢查點(diǎn)用于恢復(fù)進(jìn)度信息。
注意,沒有前述狀態(tài)轉(zhuǎn)換的簡(jiǎn)單流應(yīng)用程序可以在不啟用檢查點(diǎn)的情況下運(yùn)行。在這種情況下,從驅(qū)動(dòng)程序故障中恢復(fù)也將是部分的(某些丟失但未處理的數(shù)據(jù)可能會(huì)丟失)。這通常是可以接受的,并且許多都以這種方式運(yùn)行Spark Streaming應(yīng)用程序。預(yù)計(jì)將來(lái)會(huì)改善對(duì)非Hadoop環(huán)境的支持。
可以通過(guò)具有容錯(cuò)的、可靠的文件系統(tǒng)(例如HDFS,S3等)中設(shè)置目錄來(lái)啟用檢查點(diǎn),將檢查點(diǎn)信息保存到該目錄中。開啟檢查點(diǎn),需要開啟下面的兩個(gè)配置:
其中配置檢查點(diǎn)的時(shí)間間隔是可選的。如果不設(shè)置,會(huì)根據(jù)DStream的類型選擇一個(gè)默認(rèn)值。對(duì)于MapWithStateDStream,默認(rèn)的檢查點(diǎn)間隔是batch interval的10倍。對(duì)于其他的DStream,默認(rèn)的檢查點(diǎn)間隔是10S,或者是batch interval的間隔時(shí)間。需要注意的是:checkpoint的頻率必須是 batch interval的整數(shù)倍,否則會(huì)報(bào)錯(cuò)。
此外,如果要使應(yīng)用程序從Driver程序故障中恢復(fù),則需要使用下面的方式創(chuàng)建StreamingContext:
def createStreamingContext (conf: SparkConf,checkpointPath: String):
StreamingContext = {
val ssc = new StreamingContext( <ConfInfo> )
// .... other code ...
ssc.checkPoint(checkpointDirectory)
ssc
}
#創(chuàng)建一個(gè)新的StreamingContext或者從最近的checkpoint獲取
val context = StreamingContext.getOrCreate(checkpointDirectory,
createStreamingContext _)
#啟動(dòng)
context.start()
context.awaitTermination()
注意:
RDD的檢查點(diǎn)需要將數(shù)據(jù)保存到可靠存儲(chǔ)上,由此帶來(lái)一些成本開銷。這可能會(huì)導(dǎo)致RDD獲得檢查點(diǎn)的那些批次的處理時(shí)間增加。因此,需要設(shè)置一個(gè)合理的檢查點(diǎn)的間隔。在batch interval較小時(shí)(例如1秒),每個(gè)batch interval都進(jìn)行檢查點(diǎn)可能會(huì)大大降低吞吐量。相反,檢查點(diǎn)時(shí)間間隔太長(zhǎng)會(huì)導(dǎo)致 lineage和任務(wù)規(guī)模增加,這可能會(huì)產(chǎn)生不利影響。對(duì)于需要RDD檢查點(diǎn)的有狀態(tài)轉(zhuǎn)換,默認(rèn)間隔為batch interval的倍數(shù),至少應(yīng)為10秒??梢允褂?**dstream.checkpoint(checkpointInterval)**進(jìn)行配置。通常,DStream的5-10個(gè)batch interval的檢查點(diǎn)間隔是一個(gè)較好的選擇。
持久化
檢查點(diǎn)
在Spark Streaming應(yīng)用中,可以輕松地對(duì)流數(shù)據(jù)使用DataFrames和SQL操作。使用案例如下:
object SqlStreaming {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName(SqlStreaming.getClass.getSimpleName)
.setMaster("local[4]")
val ssc = new StreamingContext(conf, Seconds(5))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
words.foreachRDD { rdd =>
// 調(diào)用SparkSession單例方法,如果已經(jīng)創(chuàng)建了,則直接返回
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
import spark.implicits._
val wordsDataFrame = rdd.toDF("word")
wordsDataFrame.show()
wordsDataFrame.createOrReplaceTempView("words")
val wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()
}
ssc.start()
ssc.awaitTermination()
}
}
/** SparkSession單例 */
object SparkSessionSingleton {
@transient private var instance: SparkSession = _
def getInstance(sparkConf: SparkConf): SparkSession = {
if (instance == null) {
instance = SparkSession
.builder
.config(sparkConf)
.getOrCreate()
}
instance
}
}
感謝各位的閱讀,以上就是“Spark Streaming編程方法是什么”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)Spark Streaming編程方法是什么這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。