..."/>
溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

生產(chǎn)常用Spark累加器剖析之四

發(fā)布時間:2020-06-01 11:48:50 來源:網(wǎng)絡 閱讀:191 作者:Stitch_x 欄目:大數(shù)據(jù)

生產(chǎn)常用Spark累加器剖析之四

現(xiàn)象描述

val acc = sc.accumulator(0, “Error Accumulator”)
val data = sc.parallelize(1 to 10)
val newData = data.map(x => {
  if (x % 2 == 0) {
 accum += 1
}
})
newData.count
acc.value
newData.foreach(println)
acc.value

上述現(xiàn)象,會造成acc.value的最終值變?yōu)?0

原因分析

Spark中的一系列transform操作都會構(gòu)造成一長串的任務鏈,此時就需要通過一個action操作來觸發(fā)(lazy的特性),accumulator也是如此。

  • 因此在一個action操作之后,調(diào)用value方法查看,是沒有任何變化
  • 第一次action操作之后,調(diào)用value方法查看,變成了5
  • 第二次action操作之后,調(diào)用value方法查看,變成了10

原因就在于第二次action操作的時候,又執(zhí)行了一次累加器的操作,同個累加器,在原有的基礎(chǔ)上又加了5,從而變成了10

解決方案

通過上述的現(xiàn)象描述,我們可以很快知道解決的方法:只進行一次action操作?;诖?,我們只要切斷任務之間的依賴關(guān)系就可以了,即使用cache、persist。這樣操作之后,那么后續(xù)的累加器操作就不會受前面的transform操作影響了

相關(guān)案例

  • 需求

    使用Accumulators統(tǒng)計emp表中NULL出現(xiàn)的次數(shù)以及正常數(shù)據(jù)的條數(shù) & 打印正常數(shù)據(jù)的信息

  • 數(shù)據(jù)

    7369  SMITH   CLERK   7902    1980-12-17  800.00      20
    7499  ALLEN   SALESMAN    7698    1981-2-20   1600.00 300.00  30
    7521  WARD    SALESMAN    7698    1981-2-22   1250.00 500.00  30
    7566  JONES   MANAGER 7839    1981-4-2    2975.00     20
    7654  MARTIN  SALESMAN    7698    1981-9-28   1250.00 1400.00 30
    7698  BLAKE   MANAGER 7839    1981-5-1    2850.00     30
    7782  CLARK   MANAGER 7839    1981-6-9    2450.00     10
    7788  SCOTT   ANALYST 7566    1987-4-19   3000.00     20
    7839  KING    PRESIDENT       1981-11-17  5000.00     10
    7844  TURNER  SALESMAN    7698    1981-9-8    1500.00 0.00    30
    7876  ADAMS   CLERK   7788    1987-5-23   1100.00     20
    7900  JAMES   CLERK   7698    1981-12-3   950.00      30
    7902  FORD    ANALYST 7566    1981-12-3   3000.00     20
    7934  MILLER  CLERK   7782    1982-1-23   1300.00     10
  • 遇到的坑 & 解決方法

    現(xiàn)象描述 & 原因分析:

    我們都知道,spark中的一系列transform操作會構(gòu)成一串長的任務鏈,此時就需要通過一個action操作來觸發(fā); accumulator也是一樣的,只有當action操作執(zhí)行時,才會觸發(fā)accumulator的執(zhí)行; 因此在一個action操作之前,我們調(diào)用accumulator的value方法是無法查看其數(shù)值的,肯定是沒有任何變化的; 所以在對normalData進行foreach操作之后,即action操作之后,我們會發(fā)現(xiàn)累加器的數(shù)值就變成了11; 之后,我們對normalData再進行一次count操作之后,即又一次的action操作之后,其實這時候,又去執(zhí)行了一次前面的transform操作; 因此累加器的值又增加了11,變成了22

    解決辦法:

    經(jīng)過上面的分析,我們可以知道,使用累加器的時候,我們只有使用一次action操作才能夠保證結(jié)果的準確性 因此,我們面對這種情況,是有辦法的,做法就是切斷它們相互之間的依賴關(guān)系即可 因此對normalData使用cache方法,當RDD第一次被計算出來時,就會被直接緩存起來 再調(diào)用時,相同的計算操作就不會再重新計算一遍

    import org.apache.spark.{SparkConf, SparkContext}
    /**
    * 使用Spark Accumulators完成Job的數(shù)據(jù)量處理
    * 統(tǒng)計emp表中NULL出現(xiàn)的次數(shù)以及正常數(shù)據(jù)的條數(shù) & 打印正常數(shù)據(jù)的信息
    */
    object AccumulatorsApp {
    def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("AccumulatorsApp")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("E:/emp.txt")
    // long類型的累加器值
    val nullNum = sc.longAccumulator("NullNumber")
    val normalData = lines.filter(line => {
      var flag = true
      val splitLines = line.split("\t")
      for (splitLine <- splitLines){
        if ("".equals(splitLine)){
          flag = false
          nullNum.add(1)
        }
      }
      flag
    })
    // 使用cache方法,將RDD的第一次計算結(jié)果進行緩存;防止后面RDD進行重復計算,導致累加器的值不準確
    normalData.cache()
    // 打印每一條正常數(shù)據(jù)
    normalData.foreach(println)
    // 打印正常數(shù)據(jù)的條數(shù)
    println("NORMAL DATA NUMBER: " + normalData.count())
    // 打印emp表中NULL出現(xiàn)的次數(shù)
    println("NULL: " + nullNum.value)
    sc.stop()
    }
    }
向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI