溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Spark2.x中共享變量的累加器是怎樣的

發(fā)布時(shí)間:2021-12-16 20:30:04 來(lái)源:億速云 閱讀:111 作者:柒染 欄目:大數(shù)據(jù)

這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)?lái)有關(guān)Spark2.x中共享變量的累加器是怎樣的,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

為什么要定義累加器?

    在 Spark 應(yīng)用程序中,我們經(jīng)常會(huì)有這樣的需求,如要需要統(tǒng)計(jì)符合某種特性數(shù)據(jù)的總數(shù),這種需求都需要用到計(jì)數(shù)器。如果一個(gè)變量不被聲明為一個(gè)累加器,那么它將在被改變時(shí)不會(huì)在 driver 端進(jìn)行全局匯總,即在分布式運(yùn)行時(shí)每個(gè) task 運(yùn)行的只是原始變量的 一個(gè)副本,并不能改變?cè)甲兞康闹?,但是?dāng)這個(gè)變量被聲明為累加器后,該變量就會(huì)有分布式計(jì)數(shù)的功能。

  定義了一個(gè)累加器sum,而不是普通變量,實(shí)例實(shí)例代碼如下:

package com.hadoop.ljs.spark220.studyimport org.apache.spark.{SparkConf, SparkContext}/**  * @author: Created By lujisen  * @company ChinaUnicom Software JiNan  * @date: 2020-02-20 19:36  * @version: v1.0  * @description: com.hadoop.ljs.spark220.study  */object AccumlatorTest {  def main(args: Array[String]): Unit = {    val sparkConf=new SparkConf().setMaster("local[*]").setAppName("AccumlatorTest")    val sc=new SparkContext(sparkConf)    /*定義一個(gè)共享變量:累加器*/    val sum=sc.accumulator(0)    /*輸入數(shù)據(jù)*/    val rdd1=sc.parallelize(List(1,2,3,4,5))    /*求和 ,然后各個(gè)元素加1*/    val rdd2=rdd1.map(x=>{      sum+=x      x    })    /*這里是個(gè)action操作 沒(méi)有這個(gè)操作,程序不會(huì)執(zhí)行*/    rdd2.collect()    println("求和:"+sum)    sc.stop()  }}

運(yùn)行結(jié)果如下,sum=15,符合我們的期望值:

Spark2.x中共享變量的累加器是怎樣的

結(jié)合上面的代碼說(shuō)一下累加器的執(zhí)行過(guò)程:

 1).Accumulator需要在Driver進(jìn)行定義和并初始化,并進(jìn)行注冊(cè),同時(shí)Accumulator首先需要在Driver進(jìn)行序列化,然后發(fā)送到Executor端;另外,Driver接收到Task任務(wù)完成的狀態(tài)更新后,會(huì)去更新Value的值,然后在Action操作執(zhí)行后就可以獲取到Accumulator的值了。

  2).Executor接收到Task之后會(huì)進(jìn)行反序列化操作,反序列化得到RDD和function,同時(shí)在反序列化的同時(shí)也去反序列化Accumulator,同時(shí)也會(huì)向TaskContext完成注冊(cè),完成任務(wù)計(jì)算之后,隨著Task結(jié)果一起返回給Driver端進(jìn)行處理。

    這里有執(zhí)行過(guò)程圖可以參考下:

Spark2.x中共享變量的累加器是怎樣的

累加器特性:

    1.累加器也是也具有懶加載屬性,只有在action操作執(zhí)行時(shí),才會(huì)強(qiáng)制觸發(fā)計(jì)算求值;

    2.累加器的值只可以在Driver端定義初始化,在Executor端更新,不能在Executor端進(jìn)行定義初始化,不能在Executor端通過(guò)[.value]獲取值,任何工作節(jié)點(diǎn)上的Task都不能訪問(wèn)累加器的值;

    3.閉包里的執(zhí)行器代碼可以使用累加器的 += 方法(在Java中是 add )增加累加器的值。


特別提醒:

    累加器在Driver端定義賦初始值,累加器只能在Driver端讀取最后的值,在Excutor端更新。

上述就是小編為大家分享的Spark2.x中共享變量的累加器是怎樣的了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI