您好,登錄后才能下訂單哦!
如何在windowns中配置PySpark環(huán)境?相信很多沒有經(jīng)驗(yàn)的人對(duì)此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個(gè)問題。
首先需要下載hadoop和spark,解壓,然后設(shè)置環(huán)境變量。
hadoop清華源下載
spark清華源下載
HADOOP_HOME => /path/hadoop SPARK_HOME => /path/spark
安裝pyspark。
pip install pyspark
可以在shell終端,輸入pyspark,有如下回顯:
輸入以下指令進(jìn)行測(cè)試,并創(chuàng)建SparkContext,SparkContext是任何spark功能的入口點(diǎn)。
>>> from pyspark import SparkContext >>> sc = SparkContext("local", "First App")
如果以上不會(huì)報(bào)錯(cuò),恭喜可以開始使用pyspark編寫代碼了。
不過,我這里使用IDE來編寫代碼,首先我們先在終端執(zhí)行以下代碼關(guān)閉SparkContext。
>>> sc.stop()
下面使用pycharm編寫代碼,如果修改了環(huán)境變量需要先重啟pycharm。
在pycharm運(yùn)行如下程序,程序會(huì)起本地模式的spark計(jì)算引擎,通過spark統(tǒng)計(jì)abc.txt文件中a和b出現(xiàn)行的數(shù)量,文件路徑需要自己指定。
from pyspark import SparkContext sc = SparkContext("local", "First App") logFile = "abc.txt" logData = sc.textFile(logFile).cache() numAs = logData.filter(lambda s: 'a' in s).count() numBs = logData.filter(lambda s: 'b' in s).count() print("Line with a:%i,line with b:%i" % (numAs, numBs))
運(yùn)行結(jié)果如下:
20/03/11 16:15:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/03/11 16:15:58 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Line with a:3,line with b:1
這里說一下,同樣的工作使用python可以做,spark也可以做,使用spark主要是為了高效的進(jìn)行分布式計(jì)算。
戳pyspark教程
戳spark教程
RDD代表Resilient Distributed Dataset,它們是在多個(gè)節(jié)點(diǎn)上運(yùn)行和操作以在集群上進(jìn)行并行處理的元素,RDD是spark計(jì)算的操作對(duì)象。
一般,我們先使用數(shù)據(jù)創(chuàng)建RDD,然后對(duì)RDD進(jìn)行操作。
對(duì)RDD操作有兩種方法:
Transformation(轉(zhuǎn)換) - 這些操作應(yīng)用于RDD以創(chuàng)建新的RDD。例如filter,groupBy和map。
Action(操作) - 這些是應(yīng)用于RDD的操作,它指示Spark執(zhí)行計(jì)算并將結(jié)果發(fā)送回驅(qū)動(dòng)程序,例如count,collect等。
parallelize是從列表創(chuàng)建RDD,先看一個(gè)例子:
from pyspark import SparkContext sc = SparkContext("local", "count app") words = sc.parallelize( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark" ]) print(words)
結(jié)果中我們得到一個(gè)對(duì)象,就是我們列表數(shù)據(jù)的RDD對(duì)象,spark之后可以對(duì)他進(jìn)行操作。
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195
count方法返回RDD中的元素個(gè)數(shù)。
from pyspark import SparkContext sc = SparkContext("local", "count app") words = sc.parallelize( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark" ]) print(words) counts = words.count() print("Number of elements in RDD -> %i" % counts)
返回結(jié)果:
Number of elements in RDD -> 8
collect返回RDD中的所有元素。
from pyspark import SparkContext sc = SparkContext("local", "collect app") words = sc.parallelize( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark" ]) coll = words.collect() print("Elements in RDD -> %s" % coll)
返回結(jié)果:
Elements in RDD -> ['scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark']
每個(gè)元素會(huì)使用foreach內(nèi)的函數(shù)進(jìn)行處理,但是不會(huì)返回任何對(duì)象。
下面的程序中,我們定義的一個(gè)累加器accumulator,用于儲(chǔ)存在foreach執(zhí)行過程中的值。
from pyspark import SparkContext sc = SparkContext("local", "ForEach app") accum = sc.accumulator(0) data = [1, 2, 3, 4, 5] rdd = sc.parallelize(data) def increment_counter(x): print(x) accum.add(x) return 0 s = rdd.foreach(increment_counter) print(s) # None print("Counter value: ", accum)
返回結(jié)果:
None
Counter value: 15
返回一個(gè)包含元素的新RDD,滿足過濾器的條件。
from pyspark import SparkContext sc = SparkContext("local", "Filter app") words = sc.parallelize( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) words_filter = words.filter(lambda x: 'spark' in x) filtered = words_filter.collect() print("Fitered RDD -> %s" % (filtered)) Fitered RDD -> ['spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark']
也可以改寫成這樣:
from pyspark import SparkContext sc = SparkContext("local", "Filter app") words = sc.parallelize( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) def g(x): for i in x: if "spark" in x: return i words_filter = words.filter(g) filtered = words_filter.collect() print("Fitered RDD -> %s" % (filtered))
將函數(shù)應(yīng)用于RDD中的每個(gè)元素并返回新的RDD。
from pyspark import SparkContext sc = SparkContext("local", "Map app") words = sc.parallelize( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) words_map = words.map(lambda x: (x, 1, "_{}".format(x))) mapping = words_map.collect() print("Key value pair -> %s" % (mapping))
返回結(jié)果:
Key value pair -> [('scala', 1, '_scala'), ('java', 1, '_java'), ('hadoop', 1, '_hadoop'), ('spark', 1, '_spark'), ('akka', 1, '_akka'), ('spark vs hadoop', 1, '_spark vs hadoop'), ('pyspark', 1, '_pyspark'), ('pyspark and spark', 1, '_pyspark and spark')]
執(zhí)行指定的可交換和關(guān)聯(lián)二元操作后,然后返回RDD中的元素。
from pyspark import SparkContext from operator import add sc = SparkContext("local", "Reduce app") nums = sc.parallelize([1, 2, 3, 4, 5]) adding = nums.reduce(add) print("Adding all the elements -> %i" % (adding))
這里的add是python內(nèi)置的函數(shù),可以使用ide查看:
def add(a, b): "Same as a + b." return a + b
reduce會(huì)依次對(duì)元素相加,相加后的結(jié)果加上其他元素,最后返回結(jié)果(RDD中的元素)。
Adding all the elements -> 15
返回RDD,包含兩者同時(shí)匹配的鍵,鍵包含對(duì)應(yīng)的所有元素。
from pyspark import SparkContext sc = SparkContext("local", "Join app") x = sc.parallelize([("spark", 1), ("hadoop", 4), ("python", 4)]) y = sc.parallelize([("spark", 2), ("hadoop", 5)]) print("x =>", x.collect()) print("y =>", y.collect()) joined = x.join(y) final = joined.collect() print( "Join RDD -> %s" % (final))
返回結(jié)果:
x => [('spark', 1), ('hadoop', 4), ('python', 4)]
y => [('spark', 2), ('hadoop', 5)]
Join RDD -> [('hadoop', (4, 5)), ('spark', (1, 2))]
看完上述內(nèi)容,你們掌握如何在windowns中配置PySpark環(huán)境的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。