溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

mapPartitions的簡單介紹及使用方法

發(fā)布時間:2021-07-28 09:16:55 來源:億速云 閱讀:726 作者:chen 欄目:大數(shù)據(jù)

本篇內(nèi)容介紹了“mapPartitions的簡單介紹及使用方法”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!


1. mappartition簡介

首先,說到mapPartitions大家肯定想到的是map和MapPartitions的對比。大家都知道m(xù)apPartition算子是使用一個函數(shù)針對分區(qū)計算的,函數(shù)參數(shù)是一個迭代器。而map只針對每條數(shù)據(jù)調(diào)用的,所以存在訪問外部數(shù)據(jù)庫等情況時mapParititons更加高效。  
mapPartitions函數(shù):
  /**   * Return a new RDD by applying a function to each partition of this RDD.   *   * `preservesPartitioning` indicates whether the input function preserves the partitioner, which   * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.   */  def mapPartitions[U: ClassTag](      f: Iterator[T] => Iterator[U],      preservesPartitioning: Boolean = false): RDD[U] = withScope {    val cleanedF = sc.clean(f)    new MapPartitionsRDD(      this,      (_: TaskContext, _: Int, iter: Iterator[T]) => cleanedF(iter),      preservesPartitioning)  }
有代碼可知mapPartitions的函數(shù)參數(shù)是傳入一個迭代器,返回值是另一個迭代器。
map函數(shù):  
  /**   * Return a new RDD by applying a function to all elements of this RDD.   */  def map[U: ClassTag](f: T => U): RDD[U] = withScope {    val cleanF = sc.clean(f)    new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))  }
map函數(shù)就是將rdd的元素由T類型轉(zhuǎn)化為U類型。
綜上可知,map和foreach這類的是針對一個元素調(diào)用一次我們的函數(shù),也即是我們的函數(shù)參數(shù)是單個元素,假如函數(shù)內(nèi)部存在數(shù)據(jù)庫鏈接、文件等的創(chuàng)建及關(guān)閉,那么會導(dǎo)致處理每個元素時創(chuàng)建一次鏈接或者句柄,導(dǎo)致性能底下,很多初學(xué)者犯過這種毛病。
而foreachpartition/mapPartitions是針對每個分區(qū)調(diào)用一次我們的函數(shù),也即是我們函數(shù)傳入的參數(shù)是整個分區(qū)數(shù)據(jù)的迭代器,這樣避免了創(chuàng)建過多的臨時鏈接等,提升了性能。
下面的例子都是1-20這20個數(shù)字,經(jīng)過map完成a*3的轉(zhuǎn)換:
val a = sc.parallelize(1 to 20, 2)
def mapTerFunc(a : Int) : Int = {a*3}
val mapResult = a.map(mapTerFunc)
println(mapResult.collect().mkString(","))
結(jié)果
  
    
  
  
  
    
      
    
    3,6,9,12,15,18,21,24,27,30,33,36,39,42,45,48,51,54,57,60
           

           

3. mappartitions低效用法


大家通常的做法都是申請一個迭代器buffer,將處理后的數(shù)據(jù)加入迭代器buffer,然后返回迭代器。如下面的demo。
val a = sc.parallelize(1 to 20, 2)  def terFunc(iter: Iterator[Int]) : Iterator[Int] = {     var res = List[Int]()      while (iter.hasNext)   {           val cur = iter.next;     res.::= (cur*3) ;   }    res.iterator}
val result = a.mapPartitions(terFunc)println(result.collect().mkString(","))
結(jié)果亂序了,因為我的list是無序的,可以使用LinkList:
30,27,24,21,18,15,12,9,6,3,60,57,54,51,48,45,42,39,36,33

4. mappartitions高效用法

注意,3中的例子,會在mappartition執(zhí)行期間,在內(nèi)存中定義一個數(shù)組并且將緩存所有的數(shù)據(jù)。假如數(shù)據(jù)集比較大,內(nèi)存不足,會導(dǎo)致內(nèi)存溢出,任務(wù)失敗。對于這樣的案例,Spark的RDD不支持像mapreduce那些有上下文的寫方法。其實,浪尖有個方法是無需緩存數(shù)據(jù)的,那就是自定義一個迭代器類。如下例:  
  
    
  
  
  
    
      
    
    
class CustomIterator(iter: Iterator[Int]) extends Iterator[Int] {                       def hasNext : Boolean = {                          iter.hasNext                     }                                               def next : Int= {                           val cur = iter.next                       cur*3                     }                   }                    
                  val result = a.mapPartitions(v => new CustomIterator(v))                   println(result.collect().mkString(","))              
           
結(jié)果:
   
     
   
   
   3,6,9,12,15,18,21,24,27,30,33,36,39,42,45,48,51,54,57,60

“mapPartitions的簡單介紹及使用方法”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI