您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關(guān)如何進(jìn)行spark python編程,小編覺得挺實用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
spark應(yīng)用程序結(jié)構(gòu)
Spark應(yīng)用程序可分兩部分:driver部分和executor部分初始化SparkContext和主體程序
A:driver部分
driver部分主要是對SparkContext進(jìn)行配置、初始化以及關(guān)閉。初始化SparkContext是為了構(gòu)建Spark應(yīng)用程序的運行環(huán)境,在初始化SparkContext,要先導(dǎo)入一些Spark的類和隱式轉(zhuǎn)換;在executor部分運行完畢后,需要將SparkContext關(guān)閉。
B:executor部分
Spark應(yīng)用程序的executor部分是對數(shù)據(jù)的處理,數(shù)據(jù)分三種:
原生數(shù)據(jù),包含輸入的數(shù)據(jù)和輸出的數(shù)據(jù)
生成Scala標(biāo)量數(shù)據(jù),如count(返回RDD中元素的個數(shù))、reduce、fold/aggregate;返回幾個標(biāo)量,如take(返回前幾個元素)。
生成Scala集合數(shù)據(jù)集,如collect(把RDD中的所有元素倒入 Scala集合類型)、lookup(查找對應(yīng)key的所有值)。
生成hadoop數(shù)據(jù)集,如saveAsTextFile、saveAsSequenceFile
scala集合數(shù)據(jù)集,如Array(1,2,3,4,5),Spark使用parallelize方法轉(zhuǎn)換成RDD。
hadoop數(shù)據(jù)集,Spark支持存儲在hadoop上的文件和hadoop支持的其他文件系統(tǒng),如本地文件、HBase、SequenceFile和Hadoop的輸入格式。例如Spark使用txtFile方法可以將本地文件或HDFS文件轉(zhuǎn)換成RDD。
對于輸入原生數(shù)據(jù),Spark目前提供了兩種:
對于輸出數(shù)據(jù),Spark除了支持以上兩種數(shù)據(jù),還支持scala標(biāo)量
RDD,Spark進(jìn)行并行運算的基本單位,其細(xì)節(jié)參見RDD 細(xì)解。RDD提供了四種算子:
窄依賴算子
寬依賴算子,寬依賴會涉及shuffle類,在DAG圖解析時以此為邊界產(chǎn)生Stage,如圖所示。
輸入輸出一對一的算子,且結(jié)果RDD的分區(qū)結(jié)構(gòu)不變,主要是map、flatMap;
輸入輸出一對一,但結(jié)果RDD的分區(qū)結(jié)構(gòu)發(fā)生了變化,如union、coalesce;
從輸入中選擇部分元素的算子,如filter、distinct、subtract、sample。
對單個RDD基于key進(jìn)行重組和reduce,如groupByKey、reduceByKey;
對兩個RDD基于key進(jìn)行join和重組,如join、cogroup。
輸入算子,將原生數(shù)據(jù)轉(zhuǎn)換成RDD,如parallelize、txtFile等
轉(zhuǎn)換算子,最主要的算子,是Spark生成DAG圖的對象,轉(zhuǎn)換算子并不立即執(zhí)行,在觸發(fā)行動算子后再提交給driver處理,生成DAG圖 --> Stage --> Task --> Worker執(zhí)行。按轉(zhuǎn)化算子在DAG圖中作用,可以分成兩種:
緩存算子,對于要多次使用的RDD,可以緩沖加快運行速度,對重要數(shù)據(jù)可以采用多備份緩存。
行動算子,將運算結(jié)果RDD轉(zhuǎn)換成原生數(shù)據(jù),如count、reduce、collect、saveAsTextFile等。
共享變量,在Spark運行時,一個函數(shù)傳遞給RDD內(nèi)的patition操作時,該函數(shù)所用到的變量在每個運算節(jié)點上都復(fù)制并維護(hù)了一份,并且各個節(jié)點之間不會相互影響。但是在Spark Application中,可能需要共享一些變量,提供Task或驅(qū)動程序使用。Spark提供了兩種共享變量:
廣播變量,可以緩存到各個節(jié)點的共享變量,通常為只讀,使用方法:
>>> from pyspark.context import SparkContext >>> sc = SparkContext('local', 'test') >>> b = sc.broadcast([1, 2, 3, 4, 5]) >>> b.value[1, 2, 3, 4, 5] >>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
累計器,只支持加法操作的變量,可以實現(xiàn)計數(shù)器和變量求和。用戶可以調(diào)用SparkContext.accumulator(v)創(chuàng)建一個初始值為v的累加器,而運行在集群上的Task可以使用“+=”操作,但這些任務(wù)卻不能讀取;只有驅(qū)動程序才能獲取累加器的值。使用方法:
python編程
實驗項目
sogou日志數(shù)據(jù)分析
實驗數(shù)據(jù)來源:sogou精簡版數(shù)據(jù)下載地址
數(shù)據(jù)格式說明:
訪問時間\t用戶ID\t[查詢詞]\t該URL在返回結(jié)果中的排名\t用戶點擊的順序號\t用戶點擊的URL
其中,用戶ID是根據(jù)用戶使用瀏覽器訪問搜索引擎時的Cookie信息自動賦值,即同一次使用瀏覽器輸入的不同查詢對應(yīng)同一個用戶ID。
以上數(shù)據(jù)格式是官方說明,實際上該數(shù)據(jù)集中排名和順序號之間不是\t分割,而是空格分割。
一個session內(nèi)查詢次數(shù)最多的用戶的session與相應(yīng)的查詢次數(shù)
import sys from pyspark import SparkContext if __name__ == "__main__": if len(sys.argv) != 2: print >> sys.stderr, "Usage: SogouC <file>" exit(-1) sc = SparkContext(appName="SogouC") sgRDD = sc.textFile(sys.argv[1]) print sgRDD.filter(lambda line : len(line.split('\t')) == 5).map(lambda line : (line.split('\t')[1],1)).reduceByKey(lambda x , y : x + y ).map(lambda pair : (pair[1],pair[0])).sortByKey(False).map(lambda pair : (pair[1],pair[0])).take(10) sc.stop()
虛擬集群中任意節(jié)點運行命令:./bin/spark-submit --master spark://hadoop1:7077 --executor-memory 3g --driver-memory 1g SogouC.py hdfs://hadoop1:8000/dataguru/data/mini.txt
運行結(jié)果:[(u'11579135515147154', 431), (u'6383499980790535', 385), (u'7822241147182134', 370), (u'900755558064074', 335), (u'12385969593715146', 226), (u'519493440787543', 223), (u'787615177142486', 214), (u'502949445189088', 210), (u'2501320721983056', 208), (u'9165829432475153', 201)]
以上就是如何進(jìn)行spark python編程,小編相信有部分知識點可能是我們?nèi)粘9ぷ鲿姷交蛴玫降摹OM隳芡ㄟ^這篇文章學(xué)到更多知識。更多詳情敬請關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。