溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

在Apache Spark中執(zhí)行聚合的五種方法分別是什么

發(fā)布時(shí)間:2021-12-17 11:16:17 來源:億速云 閱讀:114 作者:柒染 欄目:大數(shù)據(jù)

本篇文章給大家分享的是有關(guān)在Apache Spark中執(zhí)行聚合的五種方法分別是什么,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

聚合是數(shù)據(jù)分析任務(wù)中廣泛使用的運(yùn)算符,Spark為此提供了堅(jiān)實(shí)的框架。 以下是使用Spark可以針對(duì)大數(shù)據(jù)進(jìn)行聚合的五種不同方式。

RDD上的GroupByKey或ReduceByKey轉(zhuǎn)換:RDD是Spark中分布式數(shù)據(jù)收集的最早表示,其中數(shù)據(jù)通過" T"類型的任意Java對(duì)象表示。  RDD上的聚合與map-reduce框架中的reduce概念相似,在reduce中,reduce函數(shù)(作用于兩個(gè)輸入記錄以生成聚合記錄)是聚合的關(guān)鍵。  使用RDD時(shí),聚合可以通過GroupByKey或ReduceByKey轉(zhuǎn)換來執(zhí)行,但是,這些轉(zhuǎn)換僅限于Pair RDD(元組對(duì)象的集合,每個(gè)元組都由類型為"  K"的鍵對(duì)象和類型為" V"的值對(duì)象組成) 。

在通過GroupByKey進(jìn)行聚合的情況下,轉(zhuǎn)換會(huì)導(dǎo)致元組對(duì)象具有鍵對(duì)象和針對(duì)該鍵對(duì)象的所有值對(duì)象的集合。  因此,之后需要應(yīng)用一個(gè)映射器(通過map,maptoPair或mapPartitions進(jìn)行映射轉(zhuǎn)換),以便將每個(gè)Tuple對(duì)象的值對(duì)象的集合減少為一個(gè)聚合的值對(duì)象。

在Apache Spark中執(zhí)行聚合的五種方法分別是什么
> Aggregation on a Pair RDD (with 2 partitions) via GroupByKey followed  via either of map, maptopair

映射程序(例如map,maptoPair和mappartitions轉(zhuǎn)換)包含聚合函數(shù),以將類型為" V"的值對(duì)象的集合減少為類型為" U"的聚合對(duì)象。  聚合函數(shù)可以是任意函數(shù),不需要遵循關(guān)聯(lián)性或交換性狀。 GroupByKey轉(zhuǎn)換具有三種風(fēng)格,它們因應(yīng)用GroupByKey轉(zhuǎn)換而在RDD的分區(qū)規(guī)范上有所不同。  GroupByKey可以總結(jié)為:

GroupByKey (PairRDD) => PairRDD> Map (PairRDD>) => PairRDD

如果通過ReduceByKey進(jìn)行聚合,則轉(zhuǎn)換將直接導(dǎo)致具有鍵對(duì)象和針對(duì)該鍵對(duì)象的聚合對(duì)象的元組對(duì)象。  與GroupByKey一樣,在ReduceByKey之后不需要映射器。  ReduceByKey轉(zhuǎn)換采用關(guān)聯(lián)和可交換的聚合函數(shù),以便在跨分區(qū)聚合記錄之前,可以在本地聚合位于同一分區(qū)的記錄。 同樣,聚合函數(shù)接受兩個(gè)說類型為"  V"的值對(duì)象,并返回一個(gè)類型為" V"的對(duì)象。  與GroupByKey相似,ReduceByKey轉(zhuǎn)換也具有三種風(fēng)格,它們的區(qū)別在于通過應(yīng)用ReduceByKey轉(zhuǎn)換而導(dǎo)致的RDD分區(qū)規(guī)范。  ReduceByKey可以總結(jié)為:

ReduceByKey(PairRDD, Function) => PairRDD

在GroupByKey和ReduceByKey中,前者更通用,可以與任何聚合函數(shù)一起使用,而后者則更有效,但僅適用于前面所述的一類聚合函數(shù)。

RDD或數(shù)據(jù)集上的Mappartitions:如先前博客中所述,Mappartitions是功能強(qiáng)大的窄轉(zhuǎn)換之一,可在RDD和Dataset(Spark中的數(shù)據(jù)表示)上使用,以明智地執(zhí)行各種操作。  這樣的操作之一也包括聚合。 但是,唯一需要滿足的條件是,屬于相同分組關(guān)鍵字的記錄應(yīng)位于單個(gè)分區(qū)中。  在涉及分組密鑰的混排操作中實(shí)現(xiàn)的RDD或數(shù)據(jù)集(要聚合)中可以隱式滿足此條件。  同樣,可以通過首先基于分組密鑰對(duì)RDD或數(shù)據(jù)集進(jìn)行重新分區(qū)來明確實(shí)現(xiàn)該條件。

在用于典型聚合流的mappartitions內(nèi),必須首先實(shí)例化一個(gè)Hashmap,將Hashmap與相應(yīng)的分組鍵相對(duì)應(yīng)地存儲(chǔ)聚合的Value  Objects。 然后,在迭代基礎(chǔ)分區(qū)的數(shù)據(jù)收集時(shí),將重復(fù)更新此Hashmap。 最后,返回包含在映射中的聚合值/對(duì)象(可選以及關(guān)聯(lián)的分組鍵)的迭代器。

由于基于Mappartitions的聚合涉及將Hashmap保留在內(nèi)存中以保存鍵和聚合的Value對(duì)象,因此,如果大量唯一分組鍵駐留在基礎(chǔ)分區(qū)中,則Hashmap將需要大量堆內(nèi)存,因此可能導(dǎo)致  相應(yīng)執(zhí)行程序的內(nèi)存不足終止的風(fēng)險(xiǎn)。 從此以后,不應(yīng)該歪曲跨分區(qū)的分組密鑰分配,否則會(huì)由于過度提供執(zhí)行程序內(nèi)存來處理偏斜而導(dǎo)致執(zhí)行程序內(nèi)存浪費(fèi)。  此外,由于需要基于堆內(nèi)存的聚合哈希圖,因此與Spark中的專用聚合運(yùn)算符相比,對(duì)內(nèi)存的相對(duì)內(nèi)存分配更多,但是如果內(nèi)存不是約束,則基于Mappartitions的聚合可以提供良好的性能提升。

用于數(shù)據(jù)幀或數(shù)據(jù)集的UDAF:與上述方法不同,UDAF基于聚合緩沖區(qū)的概念以及在此緩沖區(qū)上運(yùn)行的一組方法來實(shí)現(xiàn)聚合。

在Apache Spark中執(zhí)行聚合的五種方法分別是什么
> Aggregation buffer based aggregation flow in Spark (for Datasets and  Dataframe)

到目前為止,UDAF是為Spark中的分布式數(shù)據(jù)收集的Dataframe或Dataset表示編寫聚合邏輯的最常用方法。  UDAF在數(shù)據(jù)收集的無類型視圖上工作,在該視圖中,數(shù)據(jù)記錄被視為(表的)一行,其架構(gòu)定義了該行中每一列的類型和可空性。 通過擴(kuò)展包"  org.apache.spark.sql.expressions"中存在的"  UserDefinedAggregationFunction"類并覆蓋基類中以下方法的實(shí)現(xiàn),可以在Spark中創(chuàng)建UDAF:

/*Return schema for input column(s) to the UDAF, schema being built using StructType*/ => public StructType inputSchema() /*Return schema of aggregation buffer, schema being built using StructType */ => public StructType bufferSchema() /*DataType of final aggregation result*/ => public DataType dataType() /*Initialize aggregation buffer*/ => public void initialize(MutableAggregationBuffer buffer) /*Update aggregation buffer for each of the untyped view (Row) of an input object*/ => public void update(MutableAggregationBuffer buffer, Row row) /*Update current aggregation buffer with a partially aggregated buffer*/ => public void merge(MutableAggregationBuffer buffer, Row buffer) /*Evaluate final aggregation buffer and return the evaluated value of DataType declared earlier */ => public Object evaluate(Row buffer)

除了覆蓋上述方法外,還可以始終聲明其他字段(在UDAF構(gòu)造函數(shù)中使用可選的初始化)和自定義UDAF類中的其他方法,以便在覆蓋方法中使用它們以實(shí)現(xiàn)聚合目標(biāo)。

在使用UDAF之前,必須先在Spark框架中注冊(cè)相同的實(shí)例:

spark.udf.register('sampleUDAF, new SampleUDAF());

注冊(cè)后,可以在Spark SQL查詢中使用UDAF來聚合整個(gè)數(shù)據(jù)集/數(shù)據(jù)框或數(shù)據(jù)集/數(shù)據(jù)框中的記錄組(通過一列或多列分組)。 除了直接在Spark  SQL查詢中使用外,還可以通過數(shù)據(jù)框/數(shù)據(jù)集聚合API(例如" agg")使用UDAF。

UDAF雖然是定義自定義聚合的一種流行方法,但是當(dāng)在聚合緩沖區(qū)中使用復(fù)雜的數(shù)據(jù)類型(數(shù)組或映射)時(shí),會(huì)遇到性能問題。  這是由于以下事實(shí):在UDAF中的每次更新操作期間,對(duì)于復(fù)雜的數(shù)據(jù)類型,將scala數(shù)據(jù)類型(用戶特定)轉(zhuǎn)換為相應(yīng)的催化劑數(shù)據(jù)類型(催化劑內(nèi)部數(shù)據(jù)類型)(反之亦然)變得非常昂貴。  從內(nèi)存和計(jì)算的角度來看,此成本都更高。

數(shù)據(jù)集的聚合器:聚合器是對(duì)數(shù)據(jù)集執(zhí)行聚合的最新方法,類似于UDAF,它也基于聚合緩沖區(qū)的概念以及在該緩沖區(qū)上運(yùn)行的一組方法。  但是,聚合器進(jìn)行聚合的方式稱為類型化聚合,因?yàn)樗婕皩?duì)各種類型的對(duì)象進(jìn)行操作/使用各種類型的對(duì)象進(jìn)行操作。  聚合器的輸入,聚合緩沖區(qū)和最終的聚合輸出(從緩沖區(qū)派生)都是具有相應(yīng)Spark編碼器的某些類型的對(duì)象。  用戶可以通過使用為IN定義的類型(輸入記錄類型)擴(kuò)展抽象的通用'Aggregator  '類(在包'org.apache.spark.sql.expressions中提供)來定義自己的自定義Aggregator。  ,為BUF(聚合緩沖區(qū))定義的類型和為OUT(輸出記錄類型)定義的類型,以及在基類中重寫以下方法的實(shí)現(xiàn):

/* return Encoder for aggregation buffer of type BUF. This is required for buffer ser/deser during shuffling or disk spilling */ => public Encoder<BUF> bufferEncoder() /* return Encoder for output object of type OUT after aggregation is performed */ => public Encoder<OUT> outputEncoder() /* return updated aggregation buffer object of type BUF after aggregating the existing buffer object of type BUF with the input object of type IN*/ => public BUF reduce(BUF buffer, IN input) () /* return updated aggregation buffer of type BUF after merging two partially aggregated buffer objects of type BUF */ => public BUF merge(BUF buffer1, BUF buffer2) /* return output object of type OUT from evaluation of  aggregation buffer of type BUF */ => public OUT finish(BUF arg0) /* return buffer object of type BUF after initializing the same */ => public BUF zero()

由于Aggregator本機(jī)支持將聚合緩沖區(qū)作為對(duì)象,因此它是高效的,并且不需要與從Scala類型轉(zhuǎn)換為催化劑類型(反之亦然)相關(guān)的不必要的開銷(與UDAF一樣)。  同樣,聚合器的聚合方式在編寫聚合邏輯時(shí)提供了更多的靈活性和編程的美感。 聚合器也已集成到無類型聚合流中,以支持SQL,例如即將發(fā)布的版本中的查詢。

預(yù)定義的聚合功能:Spark提供了各種預(yù)構(gòu)建的聚合功能,可用于分布式數(shù)據(jù)收集的數(shù)據(jù)框或數(shù)據(jù)集表示形式。 這些預(yù)先構(gòu)建的函數(shù)可以在SPARK  SQL查詢表達(dá)式中使用,也可以與為Dataframe或Dataset定義的聚合API一起使用。  在org.apache.spark.sql包中,所有預(yù)先構(gòu)建的聚合函數(shù)都定義為"函數(shù)"類的靜態(tài)方法。 帶下劃線的鏈接可以列出所有這些功能的列表。

預(yù)定義的聚合函數(shù)經(jīng)過高度優(yōu)化,在大多數(shù)情況下可以直接與Spark tungusten格式一起使用。 因此,如果"  functions"類中存在預(yù)先構(gòu)建的聚合函數(shù),則Spark程序員應(yīng)始終偏向于使用它們。  萬一那里沒有所需的聚合函數(shù),那么只有一個(gè)可以訴諸于編寫自定義聚合函數(shù)。

以上就是在Apache Spark中執(zhí)行聚合的五種方法分別是什么,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI