溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

RDD怎么向spark傳遞函數(shù)

發(fā)布時間:2021-12-16 17:01:42 來源:億速云 閱讀:137 作者:iii 欄目:云計算

本篇內(nèi)容介紹了“RDD怎么向spark傳遞函數(shù)”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!

惰性求值

RDD的轉(zhuǎn)換操作都是惰性求值的。
惰性求值意味著我們對RDD調(diào)用轉(zhuǎn)化操做(例如map操作)并不會立即執(zhí)行,相反spark會在內(nèi)部記錄下所要求執(zhí)行的操作的相關(guān)信息。
把數(shù)據(jù)讀取到RDD的操作同樣也是惰性的,因此我們調(diào)用sc.textFile()時數(shù)據(jù)沒有立即讀取進來,而是有必要時才會讀取。和轉(zhuǎn)化操作一樣讀取數(shù)據(jù)操作也有可能被多次執(zhí)行。這在寫代碼時要特別注意。

關(guān)于惰性求值,對新手來說可能有與直覺相違背之處。有接觸過函數(shù)式語言類如haskell的應(yīng)該不會陌生。
在最初接觸spark時,我們也會有這樣的疑問。
也參與過這樣的討論:

 val sc = new SparkContext("local[2]", "test")
 val f:Int ? Int = (x:Int) ? x + 1
 val g:Int ? Int = (x:Int) ? x + 1
 val rdd = sc.parallelize(Seq(1,2,3,4),1)
 //1
 val res1 = rdd.map(x ? g(f(x))).collect
 //2
 val res2 = rdd.map(g).map(f).collect

第1和第2兩種操作均能得到我們想要的結(jié)果,但那種操作更好呢?
直觀上我們會覺得第1種操作更好,因為第一種操作可以僅僅需要一次迭代就能得到我們想要的結(jié)果。第二種操作需要兩次迭代操作才能完成。
是我們想象的這樣嗎?讓我們對函數(shù)f和g的調(diào)用加上打印。按照上面的假設(shè)。1和2的輸出分別是這樣的:

1:  f   g   f   g   f   g   f   g       
2:  g   g   g   g   f   f   f   f

代碼:

val sc = new SparkContext("local[2]", "test")
val f:Int ? Int = (x:Int) ? {
    print("f\t")
    x + 1
    }
val g:Int ? Int = (x:Int) ? {
  print("g\t")
  x + 1
}
val rdd = sc.parallelize(Seq(1,2,3,4), 1
//1
val res1 = rdd.map(x ? g(f(x))).collect()
//2
val res2 = rdd.map(f).map(g).collect()

將上面的代碼copy試著運行一下吧,我們在控制臺得到的結(jié)果是這樣的。

f   g   f   g   f   g   f   g
f   g   f   g   f   g   f   g

是不是大大出乎我們的意料?這說明什么?說明spark是懶性求值的! 我們在調(diào)用map(f)時并不會真正去計算, map(f)只是告訴spark數(shù)據(jù)是怎么計算出來的。map(f).map(g)其實就是在告訴spark數(shù)據(jù)先通過f在通過g計算出來的。然后在collect()時,spark在一次迭代中先后對數(shù)據(jù)調(diào)用f、g。

繼續(xù)回到我們最初的問題,既然兩種調(diào)用方式,在性能上毫無差異,那種調(diào)用方式更好呢?我們更推薦第二種調(diào)用方式,除了api更加清晰之外。在調(diào)用鏈很長的情況下,我們可以利用spark的檢查點機制,在中間添加檢查點,這樣數(shù)據(jù)恢復(fù)的代價更小。而第一種方式調(diào)用鏈一旦出錯,數(shù)據(jù)只能從頭計算。

那么spark到底施加了何種魔法,如此神奇?讓我們來撥開spark的層層面紗。最好的方式當然是看源碼了。以map為例:

RDD的map方法

  /**
   * Return a new RDD by applying a function to all elements of this RDD.
   */
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

和MapPartitionsRDD的compute方法

override def compute(split: Partition, context:
                TaskContext): Iterator[U] =
                f(context, split.index,
                firstParent[T].iterator(split, 
                context))

關(guān)鍵是這個 iter.map(cleanF)),我們調(diào)用一個map方法其實是在iter對象上調(diào)用一個map方法。iter對象是scala.collection.Iterator的一個實例。
在看一下Iterator的map方法

def map[B](f: A => B): Iterator[B]=  
    new AbstractIterator[B] {
    def hasNext = self.hasNext
    def next() = f(self.next())
}

聯(lián)想到我們剛才說的我們在RDD上調(diào)用一個map方法只是告訴spark數(shù)據(jù)是怎么計算出來的,并不會真正計算。是不是恍然大悟了。

向spark傳遞函數(shù)

我們可以把定義好的內(nèi)聯(lián)函數(shù)、方法的引用或靜態(tài)方法傳遞給spark。就像scala的其它函數(shù)式API一樣。我們還要考慮一些細節(jié),比如傳遞的函數(shù)及其引用的變量是可序列話的(實現(xiàn)了java的Serializable接口)。除此之外傳遞一個對象的方法或字段時,會包含對整個對象的引用。我們可以把該字段放到一個局部變量中,來避免傳遞包含該字段的整個對象。

scala中的函數(shù)傳遞

class SearchFunctions(val query:String){
    def isMatch(s:String) = s.contains(query)

    def getMatchFuncRef(rdd:RDD[String])
        :RDD[String]= {
        //isMatch 代表this.isMatch因此我們要傳遞整個this
        rdd.map(isMatch)
    }

    def getMatchFieldRef(rdd:RDD[String])={
    //query表示this.query因此我們要傳遞整個this
    rdd.map(x=>x.split(query))
    }

    def getMatchsNoRef(rdd:RDD[String])={
    //安全只要把我們需要的字段放到局部變量中
    val q = this.query
    rdd.map(x=>x.split(query))
    }
}

如果在scala中出現(xiàn)了NotSerializableException,通常問題就在我們傳遞了一個不可序列化類中的函數(shù)或字段。傳遞局部可序列變量或頂級對象中的函數(shù)始終是安全的。

持久化

如前所述,spark的RDD是惰性求值的,有時我們希望能過多次使用同一個RDD。如果只是簡單的對RDD調(diào)用行動操作,spark每次都會重算RDD和它的依賴。這在迭代算法中消耗巨大。 可以使用RDD.persist()讓spark把RDD緩存下來。

避免GroupByKey

讓我們來看看兩種workCount的方式,一種使用reduceByKey,另一種使用groupByKey。

val words = Array("one", "two", "two", "three", "three", "three")
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))

val wordCountsWithReduce = wordPairsRDD
  .reduceByKey(_ + _)
  .collect()

val wordCountsWithGroup = wordPairsRDD
  .groupByKey()
  .map(t => (t._1, t._2.sum))
  .collect()

雖然兩種方式都能產(chǎn)生正確的結(jié)果,但reduceByKey在大數(shù)據(jù)集時工作的更好。這時因為spark會在shuffling數(shù)據(jù)之前,為每一個分區(qū)添加一個combine操作。這將大大減少shuffling前的數(shù)據(jù)。

看下圖來理解 reduceBykey的過程

RDD怎么向spark傳遞函數(shù)

而groupBykey會shuff所有的數(shù)據(jù),這大大加重了網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)量。另外如果一個key對應(yīng)很多value,這樣也可能引起out of memory。

如圖,groupby的過程

RDD怎么向spark傳遞函數(shù)

“RDD怎么向spark傳遞函數(shù)”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI