溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

基本的 RDD 操作——PySpark

發(fā)布時(shí)間:2020-07-08 20:48:02 來(lái)源:網(wǎng)絡(luò) 閱讀:1381 作者:賓果go 欄目:大數(shù)據(jù)

基本的 RDD 轉(zhuǎn)化操作

  1. map()

? ? 語(yǔ)法:RDD.map(<function>,preservesPartitoning=False)

? ? 轉(zhuǎn)化操作 map() 是所有轉(zhuǎn)化操作中最基本的。它將一個(gè)具名函數(shù)或匿名函數(shù)對(duì)數(shù)據(jù)集內(nèi)的所有元素進(jìn)行求值。map() 函數(shù)可以異步執(zhí)行,也不會(huì)嘗試與別的 map() 操作通信或同步。也就是說(shuō),這是無(wú)共享的操作。

? ? 參數(shù) preserversPatitioning 是可選的,為 Boolean 類(lèi)型的參數(shù),用于定義了區(qū)分規(guī)則的 RDD,它們有定義好的鍵,并按照鍵的哈希值或范圍進(jìn)行了分組。如果這個(gè)參數(shù)被設(shè)為 True,這些分區(qū)會(huì)保存完整。這個(gè)參數(shù)可以被 Spark 調(diào)度器用于優(yōu)化后續(xù)操作,比如,基于分區(qū)的鍵進(jìn)行的連接操作。

? ? 轉(zhuǎn)化操作 map() 對(duì)輸入的每條記錄計(jì)算同一個(gè)函數(shù),并生成轉(zhuǎn)化后的輸出記錄。

# map()

map_rdd=sc.textFile('file:///usr/local/spark/README.md')

print(map_rdd.take(5))

map_rdd_new=map_rdd.map(lambda x:x.split(' '))

print(map_rdd_new.take(5))

# 輸出

['# Apache Spark', '', 'Spark is a fast and general cluster computing system for Big Data. It provides', 'high-level APIs in Scala, Java, Python, and R, and an optimized engine that', 'supports general computation graphs for data analysis. It also supports a']

[['#', 'Apache', 'Spark'], [''], ['Spark', 'is', 'a', 'fast', 'and', 'general', 'cluster', 'computing', 'system', 'for', 'Big', 'Data.', 'It', 'provides'], ['high-level', 'APIs', 'in', 'Scala,', 'Java,', 'Python,', 'and', 'R,', 'and', 'an', 'optimized', 'engine', 'that'], ['supports', 'general', 'computation', 'graphs', 'for', 'data', 'analysis.', 'It', 'also', 'supports', 'a']]

? ? 在這個(gè)例子中,split 函數(shù)接收一個(gè)字符串,生成一個(gè)列表,輸入數(shù)據(jù)中的每個(gè)字符串元素都被映射為輸出數(shù)據(jù)中的一個(gè)列表元素。產(chǎn)生的結(jié)果為一個(gè)列表的列表。

? ? 2.flatMap()

? ? 語(yǔ)法::RDD.flatMap(<function>,preservesPartitioning=False)

? ? 轉(zhuǎn)化操作 flatMap() 和轉(zhuǎn)化操作 map() 類(lèi)似,都將函數(shù)作用于輸入數(shù)據(jù)集的每條記錄。但是,flatMap() 還會(huì)“拍平”輸出數(shù)據(jù),這表示它會(huì)去掉一層嵌套。比如,給定一個(gè)包含字符串列表的列表,“拍平”操作會(huì)產(chǎn)生一個(gè)由字符串組成的列表,也就是“拍平”了所有嵌套的列表。

# flatMap()

flat_map_rdd=sc.textFile('file:///usr/local/spark/README.md')

print(flat_map_rdd.take(5))

map_rdd_new=flat_map_rdd.flatMap(lambda x:x.split(' '))

print(map_rdd_new.take(5))

# 輸出

['# Apache Spark', '', 'Spark is a fast and general cluster computing system for Big Data. It provides', 'high-level APIs in Scala, Java, Python, and R, and an optimized engine that', 'supports general computation graphs for data analysis. It also supports a']

['#', 'Apache', 'Spark', '', 'Spark']

? ? 在這個(gè)例子中,flatMap() 使用的匿名函數(shù)和 map() 操作所使用的相同。注意,每個(gè)字符串并沒(méi)有產(chǎn)生一個(gè)對(duì)應(yīng)的列表對(duì)象,所有的元素拍平到一個(gè)列表中。換句話說(shuō),這個(gè)例子里的 flatMap() 產(chǎn)生了一個(gè)組合的列表作為輸出,而不是 map() 中那個(gè)列表的列表。

? ? 3.filter()

? ? 語(yǔ)法:RDD.filter(<function>)

? ? 轉(zhuǎn)化操作 filter 講一個(gè) Boolean 類(lèi)型的表達(dá)式對(duì)數(shù)據(jù)集里的每個(gè)元素進(jìn)行求值,這個(gè)表達(dá)式通常用匿名函數(shù)來(lái)表示。返回的布爾值決定了該記錄是否被包含在產(chǎn)生的輸出 RDD 里。這是一種常用的轉(zhuǎn)化操作,用于從 RDD 中移除不需要的記錄作為中間結(jié)果,或者移除不需要放在最終輸出里的記錄。

# filter()

licenses = sc.textFile('file:///usr/local/spark/README.md')

words = licenses.flatMap(lambda x:x.spilt(' '))

print(words.take(5))

lowercase = words.map(lambda x:x.lower())

print(lowercase.take(5))

longwords = lowercase.filter(lambda x:len(x) > 12)

print(longwords.take(5))

# 輸出

['#', 'Apache', 'Spark', '', 'Spark']

['#', 'apache', 'spark', '', 'spark']

['<http://spark.apache.org/>', 'documentation', 'documentation,', 'page](http://spark.apache.org/documentation.html).', 'instructions.']

? ? 4.distinct()

? ? 語(yǔ)法:RDD.distinct(numPartitions=None)

? ? 轉(zhuǎn)化操作 distinct() 返回一個(gè)新的 RDD,其中僅包含輸入 RDD 中去重后的元素。它可以用來(lái)去除重復(fù)的值。參數(shù) numPartitions 可以把數(shù)據(jù)重新分區(qū)為給定的分區(qū)數(shù)量。如果沒(méi)有提供這個(gè)參數(shù)或是使用了默認(rèn)值,那么轉(zhuǎn)化操作 distinct() 返回的分區(qū)數(shù)和輸入的 RDD 的 分區(qū)數(shù)保持一致。

# distinct()

licenses = sc.textFile('file:///usr/local/spark/README.md')

words = licenses.flatMap(lambda x : x.split(' '))

lowercase = words.map(lambda x : x.lower())

allwords = lowercase.count()

diswords = lowercase.distinct().count()

print ("Total words : {} ,Distinct words: {}".format(allwords,diswords))

# 輸出

Total words : 579 ,Distinct words: 276

? ? 5.groupBy()

? ? 語(yǔ)法:RDD.groupBy(<function>,numPartitons=None)

? ? 轉(zhuǎn)化操作 groupBy() 返回一個(gè)按指定函數(shù)對(duì)元素進(jìn)行分組的 RDD。參數(shù) <function> 可以是具名函數(shù),也可以是匿名函數(shù),用來(lái)確定對(duì)所有元素進(jìn)行分組的鍵,或者指定用于對(duì)元素進(jìn)行求值以確定其所屬分組的表達(dá)式。參數(shù) numPartitions,通過(guò)計(jì)算分組函數(shù)輸出的鍵空間的哈希值,以自動(dòng)創(chuàng)建指定數(shù)量的分區(qū)。要注意的是,groupBy() 返回的是一個(gè)可迭代對(duì)象。

# groupBy()

licenses = sc.textFile('file:///usr/local/spark/README.md')

words = licenses.flatMap(lambda x: x.split(' ')).filter(lambda x:len(x) > 0)

groupbyfirstletter = words.groupBy(lambda??x: x[0].lower)

print(groupbyfirstletter.take(1))

# 輸出

[(<built-in method lower of str object at 0x7fbf2ef1b228>, <pyspark.resultiterable.ResultIterable object at 0x7fbf22238ef0>)]

? ? 6.sortBy()

? ? 語(yǔ)法:RDD.sortBy(<keyfunc>,ascending=True,numPartitions=None)

? ? 轉(zhuǎn)化操作 sortBy() 將 RDD 按照 <keyfunc> 參數(shù)選出的指定數(shù)據(jù)集的鍵進(jìn)行排序。它根據(jù)鍵對(duì)象的類(lèi)型的順序進(jìn)行排序。參數(shù) ascending 是布爾類(lèi)型的參數(shù),默認(rèn)為 True,指定所使用的排序順序。如果要使用降序,需要設(shè)置 ascending=False。

# sortBy()

readme = sc.textFile('file:///usr/local/spark/README.md')

words = readme.flatMap(lambda x:x.split(' ')).filter(lambda x:len(x) > 0)

sortbyfirstletter = words.sortBy(lambda x:x[0].lower(),ascending=False)

print(sortbyfirstletter.take(5))

# 輸出

['You', 'you', 'You', 'you', 'you']

基本的 RDD 行動(dòng)操作

? ? Spark 中的行動(dòng)操作要么返回值,比如 count();要么返回?cái)?shù)據(jù),比如 collect();要么保存數(shù)據(jù)到外部,比如 saveAsTextFile()。在所有情況中,行動(dòng)操作都會(huì)對(duì) RDD 及其所有父 RDD 強(qiáng)制進(jìn)行計(jì)算。一些行動(dòng)操作返回計(jì)數(shù),或是數(shù)據(jù)的聚合值,或是 RDD 中全部或部分?jǐn)?shù)據(jù)。與這些不同的是,行動(dòng)操作 foreach() 會(huì)對(duì) RDD 中的每個(gè)元素執(zhí)行一個(gè)函數(shù)。

? ? 1.count()

? ? 語(yǔ)法:RDD.count()

? ? 行動(dòng)操作 count() 不接收參數(shù),返回一個(gè) long 類(lèi)型的值,代表 RDD 中元素的個(gè)數(shù)。

# count()

licenses = sc.textFile('file:///usr/local/spark/licenses')

words = licenses.flatMap(lambda x: x.split(' '))

print(words.count())

# 輸出

22997

?? ?注意,對(duì)于不接收參數(shù)的行動(dòng)操作,需要在行動(dòng)操作名帶上空的括號(hào) ()。

? ? 2.collect()

? ? 語(yǔ)法:RDD.collect()

? ? 行動(dòng)操作 collect() 向 Spark 驅(qū)動(dòng)器進(jìn)程返回一個(gè)由 RDD 中所有元素組成的列表。collect() 沒(méi)有限制輸出,可能導(dǎo)致輸出量相當(dāng)大。一般只用在小規(guī)模 RDD 或開(kāi)發(fā)中。

# collect()

licenses = sc.textFile('file:///usr/local/spark/licenses')

words = licenses.flatMap(lambda x: x.split(' '))

print(words.collect())

# 輸出

['', '<!DOCTYPE', 'HTML', 'PUBLIC', '"-//W3C//DTD', 'HTML', '4.01',......]

? ? 3.take()

? ? 語(yǔ)法:RDD.take(n)

? ? 行動(dòng)操作 take() 返回 RDD 的前 n 個(gè)元素。選取的元素沒(méi)有特定的順序。事實(shí)上,行動(dòng)操作 take() 返回的元素是不確定的,這意味著再次運(yùn)行同一個(gè)行動(dòng)操作時(shí),返回的元素可能會(huì)不同,尤其是在完全分布式的環(huán)境中。

? ? 對(duì)于橫跨超過(guò)一個(gè)分區(qū)的 RDD,take() 會(huì)掃描一個(gè)分區(qū),并使用該分區(qū)的結(jié)果來(lái)預(yù)估還需掃描多少分區(qū)才能滿足獲取所要求數(shù)量的全部的值。

# take()

licenses = sc.textFile('file:///usr/local/spark/licenses')

words = licenses.flatMap(lambda x: x.split(' '))

print(words.take(5))

# 輸出

['', '<!DOCTYPE', 'HTML', 'PUBLIC', '"-//W3C//DTD']

? ? 4.top()

? ? 語(yǔ)法:RDD.top(n,key=None)

? ? 行動(dòng)操作 top() 返回一個(gè) RDD 中的前 n 個(gè)元素,但是和 take() 不同的是,如果使用 top(),元素會(huì)排序并按照降序輸出。參數(shù) key 指定了按照什么對(duì)結(jié)果進(jìn)行排序以返回前 n 個(gè)元素。如果沒(méi)有提供,會(huì)使用根據(jù) RDD 的元素所推斷出來(lái)的鍵。

# top()

licenses = sc.textFile('file:///usr/local/spark/licenses')

words = licenses.flatMap(lambda x: x.split(' '))

print(words.distinct().top(5))

# 輸出

['·', '?', '}', 'your', 'you.</strong>']

? ? 5.first()

? ? 語(yǔ)法:RDD.first()

? ? 行動(dòng)操作 first() 返回 RDD 的第一個(gè)元素。first() 不考慮元素的順序,是一個(gè)非確定性的操作,尤其是在完全分布式的環(huán)境中。

# first()

readme = sc.textFile('file:///usr/local/spark/README.md')

words = readme.flatMap(lambda x:x.split(' ')).filter(lambda x:len(x) > 0)

print(words.distinct().first())

print(words.distinct().take(1))

# 輸出

#

['#']

? ? first() 和 take(1) 最主要的區(qū)別在于 first() 返回一個(gè)原子的數(shù)據(jù)元素,而 take() (即使 n=1)返回的是由數(shù)據(jù)元素組成的列表。

? ? 6.reduce() 和 fold()

? ? 行動(dòng)操作 reduce() 和 fold() 是執(zhí)行聚合的行動(dòng)操作,它們都執(zhí)行滿足交換律或結(jié)合律的操作,比如對(duì) RDD 里的一系列值求和。這里的交換律和結(jié)合律表示操作與執(zhí)行的順序無(wú)關(guān)。這是分布式處理所要求的,因?yàn)樵诜植际教幚碇?,順序無(wú)法保證。

? ? 語(yǔ)法:RDD.reduce(<function>)

? ? ? ? ? ? ? RDD.fold(zeroValue,<function>)

? ? 行動(dòng)操作 reduce() 使用指定的滿足交換律或結(jié)合律的運(yùn)算符來(lái)歸約 RDD 中的所有元素。參數(shù) <function> 指定接收兩個(gè)輸入的匿名函數(shù)(lambda? x,y:....),它表示來(lái)自指定 RDD 的序列中的值。

? ? 行動(dòng)操作 fold() 使用給定的 function 和 zeroValue 把 RDD 中每個(gè)分區(qū)的元素聚合,然后把每個(gè)分區(qū)的聚合結(jié)果再聚合。盡管 reduce() 和 fold() 的功能相似,但還是有區(qū)別的,fold() 不滿足交換律,因此需要給定第一個(gè)值和最后一個(gè)值(zeroValue)。

# reduce(),fold()

numbers = sc.parallelize([1,2,3,4,5,6,7,8,9])

print(numbers.reduce(lambda x,y: x+y)) #輸出:45

print(numbers.fold(0,lambda x,y: x+y)) #輸出:45

empty = sc.parallelize([])

# print(empty.reduce(lambda x,y: x+y)) #輸出:ValueError: Can not reduce() empty RDD

print(empty.getNumPartitions()) #查看rdd分區(qū)數(shù),輸出為 2

print(empty.fold(1,lambda x,y: x+y)) #輸出:3

?? ?fold中zeroValue除了在每個(gè)分區(qū)計(jì)算中作為初始值使用之后,在最后的reduce操作仍然需要使用一次,所以fold()在zeroValue不為0是計(jì)算結(jié)果為reduce()+(分區(qū)數(shù)+1)*zeroValue(以加法為例),參考鏈接:Pyspark學(xué)習(xí)筆記,fold()官網(wǎng)源碼

? ??

?? ?7.foreach()

? ? 語(yǔ)法:RDD.foreach(<function>)

? ? 行動(dòng)操作 foreach() 把 <> 參數(shù)指定的具名或匿名函數(shù)應(yīng)用到 RDD 中的所有元素上。因?yàn)?foreach() 是行動(dòng)操作而不是轉(zhuǎn)化操作,可以使用在轉(zhuǎn)化操作中無(wú)法使用或不該使用的函數(shù)。

# foreach()

def printfunc(x):

????print(x)

licenses = sc.textFile('file:///usr/local/spark/licenses')

longwords = licenses.flatMap(lambda x: x.split(' ')).filter(lambda x: len(x) > 12)

longwords.foreach(lambda x: printfunc(x))

# 輸出

...

</dt><dd>means

href="#exhibit-a">Exhibit

id="section-1.10.1">1.10.1.

...

鍵值對(duì) RDD 的轉(zhuǎn)化操作

? ? 鍵值對(duì) RDD,也就是 PairRDD,它的記錄由鍵和值組成。鍵可以是整型或者字符串對(duì)象,也可以是元組這樣的復(fù)雜對(duì)象。而值可以是標(biāo)量值,也可以是列表、元組、字典或集合等數(shù)據(jù)結(jié)構(gòu)。這是在讀時(shí)系統(tǒng)和 NoSQL 系統(tǒng)上進(jìn)行各種結(jié)構(gòu)化數(shù)據(jù)分析時(shí)常用的數(shù)據(jù)表示形式。PairRDD 及其成員函數(shù)大致被分為如下四類(lèi):

  • 字典函數(shù)

  • 函數(shù)式轉(zhuǎn)化操作

  • 分組操作、聚合操作與排序操作

  • 連接操作

? ? 字典函數(shù)返回鍵值對(duì) RDD 的鍵的集合或值的集合,比如 keys() 和 values()。

? ? 1.keys()

? ? 語(yǔ)法:RDD.keys()

? ? keys() 函數(shù)返回鍵值對(duì) RDD 中所有鍵組成的 RDD,或者說(shuō)是由鍵值對(duì) RDD 中每個(gè)二元組的第一個(gè)元素組成的 RDD。

# keys()

kvpairs = sc.parallelize([('city','Beijing'),('state','SHIGEZHUANG'),('zip','000000'),('country','China')])

print(kvpairs.keys().collect())

# 輸出

['city', 'state', 'zip', 'country']

? ? 2.values()

? ? 語(yǔ)法:RDD.values()

? ? values() 函數(shù)返回鍵值對(duì) RDD 中所有值組成的 RDD,或者說(shuō)是由鍵值對(duì) RDD 中每個(gè)二元組的第二個(gè)元素組成的 RDD。

# values()

kvpairs = sc.parallelize([('city','Beijing'),('state','SHIGEZHUANG'),('zip','000000'),('country','China')])

print(kvpairs.values().collect())

# 輸出

['Beijing', 'SHIGEZHUANG', '000000', 'China']

? ? 3.keyBy()

? ? 語(yǔ)法:RDD.keyBy(<function>)

? ? 轉(zhuǎn)化操作 keyBy() 創(chuàng)建出由從 RDD 中的元素里提取的鍵與值組成的元組,其中 <function> 參數(shù)給定的函數(shù)將原元素轉(zhuǎn)為輸出元素的鍵,而原來(lái)的整個(gè)元組是輸出的值

# keyBy()

locations = sc.parallelize([('city','Beijing',1),('state','SHIGEZHUANG',2),('zip','000000',3),('country','China',4)])

bylocno = locations.keyBy(lambda x: x[2])

print(bylocno.collect())

# 輸出

[(1, ('city', 'Beijing', 1)), (2, ('state', 'SHIGEZHUANG', 2)), (3, ('zip', '000000', 3)), (4, ('country', 'China', 4))]

? ? 4.mapValues()

? ? 語(yǔ)法:RDD.mapValues(<function>)

? ? 轉(zhuǎn)化操作 mapValues() 把鍵值對(duì) RDD 的每個(gè)值都傳給一個(gè)函數(shù)(通過(guò) <function> 參數(shù)指定的具名函數(shù)或匿名函數(shù)),而鍵保持不變。mapValues() 對(duì)于每個(gè)輸入元素輸出一個(gè)元素。原 RDD 的分區(qū)保持不變。

? ? 5.flatMapValues()

? ? 語(yǔ)法:RDD.flatMapValues(<function>)

? ? 轉(zhuǎn)化操作?flatMapValues() 把鍵值對(duì) RDD 的每個(gè)值都傳給一個(gè)函數(shù)處理,而鍵保持不變,并生成拍平的列表。對(duì)于每個(gè)輸入元素,返回 0 個(gè)乃至多個(gè)輸出元素。使用 flatMapValues() 是會(huì)保留原 RDD 的分區(qū)情況。

# mapValues(),flatMapValues()

locwtemps = sc.parallelize(['Beijing,71|72|73|72|70','Shanghai,46|42|40|37|39','Tianjin,50|48|51|43|44'])

kvpairs = locwtemps.map(lambda x: x.split(','))

print('kvpairs: ',kvpairs.take(4))

locwtemplist = kvpairs.mapValues(lambda x: x.split('|')).mapValues(lambda x: [int(s) for s in x])

print('locwtemplist: ',locwtemplist.take(3))

locwtemps = kvpairs.flatMapValues(lambda x: x.split('|')).map(lambda x:(x[0],int(x[1])))

print('locwtemps: ',locwtemps.take(4))

# 輸出

kvpairs:??[['Beijing', '71|72|73|72|70'], ['Shanghai', '46|42|40|37|39'], ['Tianjin', '50|48|51|43|44']]

locwtemplist:??[('Beijing', [71, 72, 73, 72, 70]), ('Shanghai', [46, 42, 40, 37, 39]), ('Tianjin', [50, 48, 51, 43, 44])]

locwtemps:??[('Beijing', 71), ('Beijing', 72), ('Beijing', 73), ('Beijing', 72)]

? ? 6.groupByKey()

? ? 語(yǔ)法:RDD.groupByKey(numPartitions=None,partitionFunc=<hash_fn>)

? ? 轉(zhuǎn)化操作 groupByKey() 將鍵值對(duì)RDD 按各個(gè)鍵對(duì)值進(jìn)行分組,把同組的值整合成一個(gè)序列。參數(shù) numPartitions 指定要?jiǎng)?chuàng)建多少個(gè)分區(qū)(也就是多少個(gè)分組)。分區(qū)使用 partitionFunc 參數(shù)的值創(chuàng)建,默認(rèn)值為 Spark 內(nèi)置的哈希分區(qū)函數(shù)。如果 numPartitions 為默認(rèn)值 None,就使用系統(tǒng)默認(rèn)的分區(qū)數(shù)( spark.default.parallelism )。

# groupByKey()

locwtemps = sc.parallelize(['Beijing,71|72|73|72|70','Shanghai,46|42|40|37|39','Tianjin,50|48|51|43|44'])

kvpairs = locwtemps.map(lambda x: x.split(','))

print('kvpairs: ',kvpairs.collect())

locwtemplist = kvpairs.mapValues(lambda x: x.split('|')).mapValues(lambda x: [int(s) for s in x])

print('locwtemplist: ',locwtemplist.collect())

locwtemps = kvpairs.flatMapValues(lambda x: x.split('|')).map(lambda x:(x[0],int(x[1])))

print('locwtemps: ',locwtemps.collect())

grouped = locwtemps.groupByKey()

print('grouped: ',grouped.collect())

avgtemps = grouped.mapValues(lambda x: sum(x)/len(x))

print('avgtemps: ',avgtemps.collect())

# 輸出

kvpairs:??[['Beijing', '71|72|73|72|70'], ['Shanghai', '46|42|40|37|39'], ['Tianjin', '50|48|51|43|44']]

locwtemplist:??[('Beijing', [71, 72, 73, 72, 70]), ('Shanghai', [46, 42, 40, 37, 39]), ('Tianjin', [50, 48, 51, 43, 44])]

locwtemps:??[('Beijing', 71), ('Beijing', 72), ('Beijing', 73), ('Beijing', 72), ('Beijing', 70), ('Shanghai', 46), ('Shanghai', 42), ('Shanghai', 40), ('Shanghai', 37), ('Shanghai', 39), ('Tianjin', 50), ('Tianjin', 48), ('Tianjin', 51), ('Tianjin', 43), ('Tianjin', 44)]

grouped:??[('Beijing', <pyspark.resultiterable.ResultIterable object at 0x7f0c3353c2b0>), ('Shanghai', <pyspark.resultiterable.ResultIterable object at 0x7f0c3353c320>), ('Tianjin', <pyspark.resultiterable.ResultIterable object at 0x7f0c3353c2e8>)]

avgtemps:??[('Beijing', 71.6), ('Shanghai', 40.8), ('Tianjin', 47.2)]

? ? 注意 groupByKey() 返回的分組后的值是一個(gè)?resultiterable 對(duì)象。Python 中的 iterable 對(duì)象是可以循環(huán)遍歷的序列對(duì)象。Python 中的許多函數(shù)接受可迭代對(duì)象作為輸入,比如 sum() 和 len() 函數(shù)。

? ? 7.reduceByKey()

? ? 語(yǔ)法:RDD.reduceByKey(<function>,numPartitions=None,partitionFunc=<hash_fn>)

? ? 轉(zhuǎn)化操作 reduceByKey() 使用滿足結(jié)合律的函數(shù)合并鍵對(duì)應(yīng)的值。調(diào)用鍵值對(duì)數(shù)據(jù)集的 reduceByKey() 方法,返回的是鍵值對(duì)的數(shù)據(jù)集,其數(shù)據(jù)按照鍵聚合了對(duì)應(yīng)的值。參數(shù) numPartitions 和? partitionFunc 與使用 groupByKey() 函數(shù)時(shí)的用法一模一樣。numPartitions 的值還影響 saveAsTextFile() 或是其他產(chǎn)生文件的行動(dòng)操作所產(chǎn)生的文件數(shù)量。例如,numPartitions = 2 會(huì)把 RDD 保存在硬盤(pán)時(shí)共生成兩個(gè)輸出文件。

# reduceByKey()

locwtemps = sc.parallelize(['Beijing,71|72|73|72|70','Shanghai,46|42|40|37|39','Tianjin,50|48|51|43|44'])

kvpairs = locwtemps.map(lambda x: x.split(','))

locwtemplist = kvpairs.mapValues(lambda x: x.split('|')).mapValues(lambda x: [int(s) for s in x])

locwtemps = kvpairs.flatMapValues(lambda x: x.split('|')).map(lambda x:(x[0],int(x[1])))

temptups = locwtemps.mapValues(lambda x: (x,1))

print('temptups: ',temptups.collect())

inputstoavg = temptups.reduceByKey(lambda x,y: (x[0]+y[0],x[1]+y[1]))

print('inputstoavg: ',inputstoavg.collect())

averages = inputstoavg.map(lambda x:(x[0],x[1][0]/x[1][1]))

print('averages: ',averages.collect())

# 輸出

temptups:??[('Beijing', (71, 1)), ('Beijing', (72, 1)), ('Beijing', (73, 1)), ('Beijing', (72, 1)), ('Beijing', (70, 1)), ('Shanghai', (46, 1)), ('Shanghai', (42, 1)), ('Shanghai', (40, 1)), ('Shanghai', (37, 1)), ('Shanghai', (39, 1)), ('Tianjin', (50, 1)), ('Tianjin', (48, 1)), ('Tianjin', (51, 1)), ('Tianjin', (43, 1)), ('Tianjin', (44, 1))]

inputstoavg:??[('Beijing', (358, 5)), ('Shanghai', (204, 5)), ('Tianjin', (236, 5))]

averages:??[('Beijing', 71.6), ('Shanghai', 40.8), ('Tianjin', 47.2)]

? ? 求平均值不是滿足結(jié)合律的操作,可以通過(guò)創(chuàng)建元組來(lái)繞過(guò)去,元組中包含每個(gè)鍵對(duì)應(yīng)的值和與每個(gè)鍵對(duì)應(yīng)的計(jì)數(shù),這兩個(gè)都滿足交換律和結(jié)合律,然后在最后一步計(jì)算平均值。

? ? 注意 reduceByKey() 比較高效,是因?yàn)樗诿總€(gè)執(zhí)行器本地對(duì)值進(jìn)行了先行組合,然后把組合后的列表發(fā)送到遠(yuǎn)程的執(zhí)行器來(lái)執(zhí)行最后的階段。這是一個(gè)會(huì)產(chǎn)生數(shù)據(jù)混洗的操作。

? ? 以求和函數(shù)為例,可以當(dāng)作是累加一個(gè)由和組成的列表,而不是對(duì)單個(gè)值組成的更大的列表進(jìn)行求和。因?yàn)樵跀?shù)據(jù)混洗時(shí)發(fā)送的數(shù)據(jù)更少,使用 reduceByKey() 進(jìn)行求和一般要比使用 groupByKey() 并指定 sum() 函數(shù)的性能更好。

? ? 8.foldByKey()

? ? 語(yǔ)法:RDD.foldByKey(zeroValue,<function>,numPartitions=None,partitionFunc=<hash_fn>)

? ? 轉(zhuǎn)化操作 foldByKey() 在功能上和行動(dòng)操作 fold() 類(lèi)似,但是 foldByKey() 是轉(zhuǎn)化操作,操作預(yù)先定義的鍵值對(duì)元素。foldByKey() 和 fold() 都提供了相同數(shù)據(jù)類(lèi)型的 zeroValue 參數(shù)供 RDD 為空時(shí)使用。參數(shù) numPartitions 和? partitionFunc 與轉(zhuǎn)化操作 groupByKey() 和 reduceByKey() 中的作用一樣。

# foldByKey()

locwtemps = sc.parallelize(['Beijing,71|72|73|72|70','Shanghai,46|42|40|37|39','Tianjin,50|48|51|43|44'])

kvpairs = locwtemps.map(lambda x: x.split(','))

locwtemplist = kvpairs.mapValues(lambda x: x.split('|')).mapValues(lambda x: [int(s) for s in x])

locwtemps = kvpairs.flatMapValues(lambda x: x.split('|')).map(lambda x:(x[0],int(x[1])))

maxbycity = locwtemps.foldByKey(0,lambda x,y: x if x > y else y)

print('maxbycity: ',maxbycity.collect())

# 輸出

maxbycity:??[('Beijing', 73), ('Shanghai', 46), ('Tianjin', 51)]

?? ? 9.sortByKey()

? ? 語(yǔ)法:RDD.sortByKey(ascending=True,numPartitions=None,keyfunc=<function>)

? ? 轉(zhuǎn)化操作 sortByKey() 把鍵值對(duì) RDD 根據(jù)鍵進(jìn)行排序。排序依據(jù)取決于鍵對(duì)象的類(lèi)型。該操作與 sort() 的區(qū)別之處在于 sort() 要求指定排序依據(jù)的鍵,而 sortByKey() 的鍵是鍵值對(duì) RDD 里定義的。

? ? 鍵按照 ascending 參數(shù)提供的順序進(jìn)行排序,該參數(shù)默認(rèn)值為 True,表示升序。參數(shù) numPartitions 指定了輸出多少分區(qū),分區(qū)函數(shù)為范圍分區(qū)函數(shù)。參數(shù) keyfunc 是一個(gè)可選參數(shù),可以通過(guò)對(duì)原鍵使用另一個(gè)函數(shù)而修改原鍵。

# sortByKey()

locwtemps = sc.parallelize(['Beijing,71|72|73|72|70','Shanghai,46|42|40|37|39','Tianjin,50|48|51|43|44'])

kvpairs = locwtemps.map(lambda x: x.split(','))

locwtemplist = kvpairs.mapValues(lambda x: x.split('|')).mapValues(lambda x: [int(s) for s in x])

locwtemps = kvpairs.flatMapValues(lambda x: x.split('|')).map(lambda x:(x[0],int(x[1])))

sortedbykey = locwtemps.sortByKey()

print('sortedbykey: ',sortedbykey.collect())

sortedbyval = locwtemps.map(lambda x: (x[1],x[0])).sortByKey(ascending=False)

print('sortedbyval: ',sortedbyval.collect())

# 輸出

sortedbykey:??[('Beijing', 71), ('Beijing', 72), ('Beijing', 73), ('Beijing', 72), ('Beijing', 70), ('Shanghai', 46), ('Shanghai', 42), ('Shanghai', 40), ('Shanghai', 37), ('Shanghai', 39), ('Tianjin', 50), ('Tianjin', 48), ('Tianjin', 51), ('Tianjin', 43), ('Tianjin', 44)]

sortedbyval:??[(73, 'Beijing'), (72, 'Beijing'), (72, 'Beijing'), (71, 'Beijing'), (70, 'Beijing'), (51, 'Tianjin'), (50, 'Tianjin'), (48, 'Tianjin'), (46, 'Shanghai'), (44, 'Tianjin'), (43, 'Tianjin'), (42, 'Shanghai'), (40, 'Shanghai'), (39, 'Shanghai'), (37, 'Shanghai')]

連接操作

? ? 1.join()

? ? 語(yǔ)法:RDD.join(<otherRDD>,numPartitions=None)

? ? 轉(zhuǎn)化操作 join() 是內(nèi)連接的一個(gè)實(shí)現(xiàn),根據(jù)鍵來(lái)匹配兩個(gè)鍵值對(duì) RDD。可選參數(shù) numPartitions 決定生成的數(shù)據(jù)集要?jiǎng)?chuàng)建多少分區(qū)。如果不指明這個(gè)參數(shù),缺省值為 spark.default.parallelism 配置參數(shù)對(duì)應(yīng)的值。返回的 RDD 是一個(gè)列表,其結(jié)構(gòu)包含匹配鍵,以及一個(gè)二元組。這個(gè)二元組包含來(lái)自?xún)蓚€(gè) RDD 的一組匹配記錄。

# 連接操作

stores = sc.parallelize([(100,'Beijing'),(101,'Shanghai'),(102,'Tianjin'),(103,'Taiyuan')])

salespeople = sc.parallelize([(1,'Tom',100),(2,'Karen',100),(3,'Paul',101),(4,'Jimmy',102),(5,'Jack',None)])

# join()

print(salespeople.keyBy(lambda x: x[2]).join(stores).collect())

# 輸出

[(100, ((1, 'Tom', 100), 'Beijing')), (100, ((2, 'Karen', 100), 'Beijing')), (101, ((3, 'Paul', 101), 'Shanghai')), (102, ((4, 'Jimmy', 102), 'Tianjin'))]

? ? 2.leftOuterJoin()

? ? 語(yǔ)法:RDD.leftOuterJoin(<otherRDD>,numPartitions=None)

? ? 轉(zhuǎn)化操作 leftOuterJoin() 返回第一個(gè) RDD 中包含的所有記錄或元素。如果第一個(gè) RDD(左 RDD)中的鍵在右 RDD 中存在,那么右 RDD? 中匹配的記錄會(huì)和左 RDD 的記錄一起返回。否則,右 RDD 的記錄為空。

# leftOuterJoin()

leftjoin = salespeople.keyBy(lambda x: x[2]).leftOuterJoin(stores)

print("leftjoin: ",leftjoin.collect())

print(leftjoin.filter(lambda x: x[1][1] is None).map(lambda x: "salesperson " + x[1][0][1] + " has no store").collect())

# 輸出

leftjoin:??[(100, ((1, 'Tom', 100), 'Beijing')), (100, ((2, 'Karen', 100), 'Beijing')), (None, ((5, 'Jack', None), None)), (101, ((3, 'Paul', 101), 'Shanghai')), (102, ((4, 'Jimmy', 102), 'Tianjin'))]

['salesperson Jack has no store']

? ? 3.rightOuterJoin()

? ? 語(yǔ)法:RDD.rightOuterJoin(<otherRDD>,numPartitions=None)

? ? 轉(zhuǎn)化操作 rightOuterJoin() 返回第二個(gè) RDD 中包含的所有元素或記錄。如果第二個(gè) RDD(右 RDD)中的鍵在左 RDD 中存在,則左 RDD 中匹配的記錄會(huì)和右 RDD 中的記錄一起返回。否則,左 RDD 的記錄為 None(空)。

# rightOuterJoin()

print(

salespeople.keyBy(lambda x: x[2])\

????.rightOuterJoin(stores)\

????.filter(lambda x: x[1][0] is None)\

????.map(lambda x: x[1][1] + " store has no salespeople")\

????.collect()

)

# 輸出

['Taiyuan store has no salespeople']

? ? 4.fullOuterJoin()

? ? 語(yǔ)法:RDD.fullOuterJoin(<otherRDD>,numPartitions=None)

? ? fullOuterJoin() 無(wú)論是否有匹配的鍵,都會(huì)返回兩個(gè) RDD 中的所有元素。左數(shù)據(jù)集或右數(shù)據(jù)集中沒(méi)有匹配的元素都用 None(空)來(lái)表示。

# fullOuterJoin

print(

????salespeople.keyBy(lambda x: x[2]).fullOuterJoin(stores).filter(lambda x: x[1][0] is None or x[1][1] is None).collect()

)

# 輸出

[(None, ((5, 'Jack', None), None)), (103, (None, 'Taiyuan'))]

print(

????salespeople.keyBy(lambda x: x[2]).fullOuterJoin(stores).collect()

)

# 輸出

[(100, ((1, 'Tom', 100), 'Beijing')), (100, ((2, 'Karen', 100), 'Beijing')), (None, ((5, 'Jack', None), None)), (101, ((3, 'Paul', 101), 'Shanghai')), (102, ((4, 'Jimmy', 102), 'Tianjin')), (103, (None, 'Taiyuan'))]

? ? 5.cogroup()

? ? 語(yǔ)法:RDD.cogroup(<otherRDD>,numPartitions=None)

? ? 轉(zhuǎn)化操作 cogroup() 將多個(gè)鍵值對(duì)數(shù)據(jù)集按鍵進(jìn)行分組。在概念上和 fullOuterJoin() 有些類(lèi)似,但在實(shí)現(xiàn)上有以下關(guān)鍵區(qū)別:

  • 轉(zhuǎn)化操作 cogroup() 返回可迭代對(duì)象,類(lèi)似 groupByKey() 函數(shù)。

  • 轉(zhuǎn)化操作 cogroup() 將兩個(gè) RDD 中的多個(gè)元素進(jìn)行分組,而 fullOuterJoin() 則對(duì)同一個(gè)鍵創(chuàng)建出多個(gè)分開(kāi)的輸出元素。

  • 轉(zhuǎn)化操作 cogroup() 可以通過(guò) Scala API 或者函數(shù)別名 groupWith() 對(duì)三個(gè)以上的 RDD 進(jìn)行分組。

? ? 對(duì) A、B 兩個(gè) RDD 按照鍵 K 進(jìn)行 cogroup() 操作生成的 RDD 輸出具有下面的結(jié)構(gòu):

[K, Iterable(K,VA,...), Iterable(K,VB,...) ]

? ? 如果一個(gè) RDD 中沒(méi)有另一個(gè) RDD 中包含的給定鍵的值,相應(yīng)的可迭代對(duì)象則為空。

# cogroup()

print(salespeople.keyBy(lambda x: x[2]).cogroup(stores).take(1))

print('----------------')

print(salespeople.keyBy(lambda x: x[2]).cogroup(stores).mapValues(lambda x: [item for sublist in x for item in sublist]).collect())

# 輸出

[(100, (<pyspark.resultiterable.ResultIterable object at 0x7fc1005319b0>, <pyspark.resultiterable.ResultIterable object at 0x7fc100531ba8>))]

----------------

[(100, [(1, 'Tom', 100), (2, 'Karen', 100), 'Beijing']), (None, [(5, 'Jack', None)]), (101, [(3, 'Paul', 101), 'Shanghai']), (102, [(4, 'Jimmy', 102), 'Tianjin']), (103, ['Taiyuan'])]

? ? 6.cartesian()

? ? 語(yǔ)法:RDD.cartesian(<otherRDD>)

? ? 轉(zhuǎn)化操作?cartesian() 即笛卡爾集,有時(shí)也被稱(chēng)為交叉連接,會(huì)根據(jù)兩個(gè) RDD 的記錄生成所有可能的組合。該操作生成的記錄條數(shù)等于第一個(gè) RDD 的記錄條數(shù)乘以第二個(gè) RDD 的記錄條數(shù)。

# cartesian()

print(salespeople.keyBy(lambda x: x[2]).cartesian(stores).collect())

print('----------------')

print(salespeople.keyBy(lambda x: x[2]).cartesian(stores).count())

# 輸出

[((100, (1, 'Tom', 100)), (100, 'Beijing')), ((100, (1, 'Tom', 100)), (101, 'Shanghai')), ((100, (2, 'Karen', 100)), (100, 'Beijing')), ((100, (2, 'Karen', 100)), (101, 'Shanghai')), ((100, (1, 'Tom', 100)), (102, 'Tianjin')), ((100, (1, 'Tom', 100)), (103, 'Taiyuan')), ((100, (2, 'Karen', 100)), (102, 'Tianjin')), ((100, (2, 'Karen', 100)), (103, 'Taiyuan')), ((101, (3, 'Paul', 101)), (100, 'Beijing')), ((101, (3, 'Paul', 101)), (101, 'Shanghai')), ((102, (4, 'Jimmy', 102)), (100, 'Beijing')), ((102, (4, 'Jimmy', 102)), (101, 'Shanghai')), ((None, (5, 'Jack', None)), (100, 'Beijing')), ((None, (5, 'Jack', None)), (101, 'Shanghai')), ((101, (3, 'Paul', 101)), (102, 'Tianjin')), ((101, (3, 'Paul', 101)), (103, 'Taiyuan')), ((102, (4, 'Jimmy', 102)), (102, 'Tianjin')), ((102, (4, 'Jimmy', 102)), (103, 'Taiyuan')), ((None, (5, 'Jack', None)), (102, 'Tianjin')), ((None, (5, 'Jack', None)), (103, 'Taiyuan'))]

----------------

20

集合操作

? ? 1.union()

? ? 語(yǔ)法:RDD.union(<otherRDD>)

? ? 轉(zhuǎn)化操作 union() 將另一個(gè) RDD 追加到 RDD 的后面,組合成一個(gè)輸出 RDD。兩個(gè) RDD 不一定要有相同的結(jié)構(gòu)。如果兩個(gè)輸入 RDD 有相同的記錄,轉(zhuǎn)化操作 union() 不會(huì)從輸出 RDD 中過(guò)濾這些重復(fù)的數(shù)據(jù)。

>>> fibonacci = sc.parallelize([0,1,2,3,5,8])

>>> odds = sc.parallelize([1,3,5,7,9])

>>> odds.union(fibonacci).collect()

[1, 3, 5, 7, 9, 0, 1, 2, 3, 5, 8]

? ? 2.intersection()

? ? 語(yǔ)法:RDD.intersection(<otherRDD>)

? ? 轉(zhuǎn)化操作?intersection() 返回兩個(gè) RDD 中共有的元素。也就是該操作會(huì)返回兩個(gè)集合中共有的元素。返回的元素或者記錄必須在兩個(gè)集合中是一模一樣的,需要記錄的數(shù)據(jù)結(jié)構(gòu)和每個(gè)字段都對(duì)的上。

>>> fibonacci = sc.parallelize([0,1,2,3,5,8])

>>> odds = sc.parallelize([1,3,5,7,9])

>>> odds.intersection(fibonacci).collect()

[1, 3, 5]

? ? 3.subtract()

? ? 語(yǔ)法:RDD.subtract(<otherRDD>,numPartitions=None)

? ? 轉(zhuǎn)化操作 subtract() 會(huì)返回第一個(gè) RDD 中所有沒(méi)有出現(xiàn)在第二個(gè) RDD 中的元素。這是數(shù)學(xué)上的集合減法的一個(gè)實(shí)現(xiàn)。

>>> fibonacci = sc.parallelize([0,1,2,3,5,8])

>>> odds = sc.parallelize([1,3,5,7,9])

>>> odds.subtract(fibonacci).collect()

[7, 9]

? ? 4.subtractByKey()

? ? 語(yǔ)法:RDD.subtractByKey(<otherRDD>,numPartitions=None)

? ? 轉(zhuǎn)化操作 subtractByKey() 是一個(gè)和 subtract 類(lèi)似的集合操作。subtractByKey() 操作返回一個(gè)鍵值對(duì) RDD 中所有在另一個(gè)鍵值對(duì) RDD 中沒(méi)有對(duì)應(yīng)鍵的元素。參數(shù) numPartitions 可以指定生成的結(jié)果 RDD 包含多少個(gè)分區(qū),缺省值為配置項(xiàng) spark.default.parallelism 的值。

>>> cities1 = sc.parallelize([('Hayward',(37.668819,-122.080795)),

... ('Baumholder',(49.6489,7.3975)),

... ('Alexandria',(38.820450,-77.050552)),

... ('Melbourne',(37.663712,144.844788))])

>>> cities2 = sc.parallelize([('Boulder Creek',(64.0708333,-148.2236111)),

... ('Hayward',(37.668819,-122.080795)),

... ('Alexandria',(38.820450,-77.050552)),

... ('Arlington',(38.878337,-77.100703))])

>>> cities1.subtractByKey(cities2).collect()

[('Melbourne', (37.663712, 144.844788)), ('Baumholder', (49.6489, 7.3975))]

>>> cities2.subtractByKey(cities1).collect()

[('Boulder Creek', (64.0708333, -148.2236111)), ('Arlington', (38.878337, -77.100703))]

數(shù)值型 RDD 的操作

?? ?數(shù)值型 RDD 僅由數(shù)值組成,常用于統(tǒng)計(jì)分析。

>>> numbers = sc.parallelize([0,1,0,1,2,3,4,5,6,7,8,9])

>>> numbers.min()? #最小值

0

>>> numbers.max()? #最大值

9

>>> numbers.mean() #算術(shù)平均數(shù)

3.8333333333333335

>>> numbers.sum()? #求和

46

>>> numbers.stdev() #標(biāo)準(zhǔn)差

3.0230595245361753

>>> numbers.variance() #方差

9.138888888888888

>>> numbers.stats() #返回 StatCounter 對(duì)象,一次調(diào)用獲得一個(gè)包括 count()、mean()、stdev()、max()、min() 的結(jié)構(gòu)

(count: 12, mean: 3.83333333333, stdev: 3.02305952454, max: 9.0, min: 0.0)


向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI