您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)怎么在Spark中自定義一個(gè)累加器,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對相關(guān)知識(shí)有一定的了解。
累加器(accumulator)是Spark中提供的一種分布式的變量機(jī)制,其原理類似于mapreduce,即分布式的改變,然后聚合這些改變。累加器的一個(gè)常見用途是在調(diào)試時(shí)對作業(yè)執(zhí)行過程中的事件進(jìn)行計(jì)數(shù)。
累加器簡單使用
Spark內(nèi)置的提供了Long和Double類型的累加器。下面是一個(gè)簡單的使用示例,在這個(gè)例子中我們在過濾掉RDD中奇數(shù)的同時(shí)進(jìn)行計(jì)數(shù),最后計(jì)算剩下整數(shù)的和。
val sparkConf = new SparkConf().setAppName("Test").setMaster("local[2]") val sc = new SparkContext(sparkConf) val accum = sc.longAccumulator("longAccum") //統(tǒng)計(jì)奇數(shù)的個(gè)數(shù) val sum = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).filter(n=>{ if(n%2!=0) accum.add(1L) n%2==0 }).reduce(_+_) println("sum: "+sum) println("accum: "+accum.value) sc.stop()
結(jié)果為:
sum: 20
accum: 5
這是結(jié)果正常的情況,但是在使用累加器的過程中如果對于spark的執(zhí)行過程理解的不夠深入就會(huì)遇到兩類典型的錯(cuò)誤:少加(或者沒加)、多加。
自定義累加器
自定義累加器類型的功能在1.X版本中就已經(jīng)提供了,但是使用起來比較麻煩,在2.0版本后,累加器的易用性有了較大的改進(jìn),而且官方還提供了一個(gè)新的抽象類:AccumulatorV2來提供更加友好的自定義類型累加器的實(shí)現(xiàn)方式。官方同時(shí)給出了一個(gè)實(shí)現(xiàn)的示例:CollectionAccumulator類,這個(gè)類允許以集合的形式收集spark應(yīng)用執(zhí)行過程中的一些信息。例如,我們可以用這個(gè)類收集Spark處理數(shù)據(jù)時(shí)的一些細(xì)節(jié),當(dāng)然,由于累加器的值最終要匯聚到driver端,為了避免 driver端的outofmemory問題,需要對收集的信息的規(guī)模要加以控制,不宜過大。
繼承AccumulatorV2類,并復(fù)寫它的所有方法
package spark import constant.Constant import org.apache.spark.util.AccumulatorV2 import util.getFieldFromConcatString import util.setFieldFromConcatString open class SessionAccmulator : AccumulatorV2<String, String>() { private var result = Constant.SESSION_COUNT + "=0|"+ Constant.TIME_PERIOD_1s_3s + "=0|"+ Constant.TIME_PERIOD_4s_6s + "=0|"+ Constant.TIME_PERIOD_7s_9s + "=0|"+ Constant.TIME_PERIOD_10s_30s + "=0|"+ Constant.TIME_PERIOD_30s_60s + "=0|"+ Constant.TIME_PERIOD_1m_3m + "=0|"+ Constant.TIME_PERIOD_3m_10m + "=0|"+ Constant.TIME_PERIOD_10m_30m + "=0|"+ Constant.TIME_PERIOD_30m + "=0|"+ Constant.STEP_PERIOD_1_3 + "=0|"+ Constant.STEP_PERIOD_4_6 + "=0|"+ Constant.STEP_PERIOD_7_9 + "=0|"+ Constant.STEP_PERIOD_10_30 + "=0|"+ Constant.STEP_PERIOD_30_60 + "=0|"+ Constant.STEP_PERIOD_60 + "=0" override fun value(): String { return this.result } /** * 合并數(shù)據(jù) */ override fun merge(other: AccumulatorV2<String, String>?) { if (other == null) return else { if (other is SessionAccmulator) { var newResult = "" val resultArray = arrayOf(Constant.SESSION_COUNT,Constant.TIME_PERIOD_1s_3s, Constant.TIME_PERIOD_4s_6s, Constant.TIME_PERIOD_7s_9s, Constant.TIME_PERIOD_10s_30s, Constant.TIME_PERIOD_30s_60s, Constant.TIME_PERIOD_1m_3m, Constant.TIME_PERIOD_3m_10m, Constant.TIME_PERIOD_10m_30m, Constant.TIME_PERIOD_30m, Constant.STEP_PERIOD_1_3, Constant.STEP_PERIOD_4_6, Constant.STEP_PERIOD_7_9, Constant.STEP_PERIOD_10_30, Constant.STEP_PERIOD_30_60, Constant.STEP_PERIOD_60) resultArray.forEach { val oldValue = other.result.getFieldFromConcatString("|", it) if (oldValue.isNotEmpty()) { val newValue = oldValue.toInt() + 1 //找到原因,一直在循環(huán)賦予值,debug30分鐘 很煩 if (newResult.isEmpty()){ newResult = result.setFieldFromConcatString("|", it, newValue.toString()) } //問題就在于這里,自定義沒有寫錯(cuò),合并錯(cuò)了 newResult = newResult.setFieldFromConcatString("|", it, newValue.toString()) } } result = newResult } } } override fun copy(): AccumulatorV2<String, String> { val sessionAccmulator = SessionAccmulator() sessionAccmulator.result = this.result return sessionAccmulator } override fun add(p0: String?) { val v1 = this.result val v2 = p0 if (v2.isNullOrEmpty()){ return }else{ var newResult = "" val oldValue = v1.getFieldFromConcatString("|", v2!!) if (oldValue.isNotEmpty()){ val newValue = oldValue.toInt() + 1 newResult = result.setFieldFromConcatString("|", v2, newValue.toString()) } result = newResult } } override fun reset() { val newResult = Constant.SESSION_COUNT + "=0|"+ Constant.TIME_PERIOD_1s_3s + "=0|"+ Constant.TIME_PERIOD_4s_6s + "=0|"+ Constant.TIME_PERIOD_7s_9s + "=0|"+ Constant.TIME_PERIOD_10s_30s + "=0|"+ Constant.TIME_PERIOD_30s_60s + "=0|"+ Constant.TIME_PERIOD_1m_3m + "=0|"+ Constant.TIME_PERIOD_3m_10m + "=0|"+ Constant.TIME_PERIOD_10m_30m + "=0|"+ Constant.TIME_PERIOD_30m + "=0|"+ Constant.STEP_PERIOD_1_3 + "=0|"+ Constant.STEP_PERIOD_4_6 + "=0|"+ Constant.STEP_PERIOD_7_9 + "=0|"+ Constant.STEP_PERIOD_10_30 + "=0|"+ Constant.STEP_PERIOD_30_60 + "=0|"+ Constant.STEP_PERIOD_60 + "=0" result = newResult } override fun isZero(): Boolean { val newResult = Constant.SESSION_COUNT + "=0|"+ Constant.TIME_PERIOD_1s_3s + "=0|"+ Constant.TIME_PERIOD_4s_6s + "=0|"+ Constant.TIME_PERIOD_7s_9s + "=0|"+ Constant.TIME_PERIOD_10s_30s + "=0|"+ Constant.TIME_PERIOD_30s_60s + "=0|"+ Constant.TIME_PERIOD_1m_3m + "=0|"+ Constant.TIME_PERIOD_3m_10m + "=0|"+ Constant.TIME_PERIOD_10m_30m + "=0|"+ Constant.TIME_PERIOD_30m + "=0|"+ Constant.STEP_PERIOD_1_3 + "=0|"+ Constant.STEP_PERIOD_4_6 + "=0|"+ Constant.STEP_PERIOD_7_9 + "=0|"+ Constant.STEP_PERIOD_10_30 + "=0|"+ Constant.STEP_PERIOD_30_60 + "=0|"+ Constant.STEP_PERIOD_60 + "=0" return this.result == newResult } }
方法介紹
value方法:獲取累加器中的值
merge方法:該方法特別重要,一定要寫對,這個(gè)方法是各個(gè)task的累加器進(jìn)行合并的方法(下面介紹執(zhí)行流程中將要用到)
iszero方法:判斷是否為初始值
reset方法:重置累加器中的值
copy方法:拷貝累加器
spark中累加器的執(zhí)行流程:
首先有幾個(gè)task,spark engine就調(diào)用copy方法拷貝幾個(gè)累加器(不注冊的),然后在各個(gè)task中進(jìn)行累加(注意在此過程中,被最初注冊的累加器的值是不變的),執(zhí)行最后將調(diào)用merge方法和各個(gè)task的結(jié)果累計(jì)器進(jìn)行合并(此時(shí)被注冊的累加器是初始值)
。
關(guān)于怎么在Spark中自定義一個(gè)累加器就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。