您好,登錄后才能下訂單哦!
這篇文章主要為大家展示了“Spark 3.0中pandas支持及其與DataFrame相互轉(zhuǎn)換的示例分析”,內(nèi)容簡(jiǎn)而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“Spark 3.0中pandas支持及其與DataFrame相互轉(zhuǎn)換的示例分析”這篇文章吧。
pandas是python用戶廣泛使用的數(shù)據(jù)分析庫(kù),Spark 3.0已經(jīng)能較好滴支持pandas接口,從而彌補(bǔ)pandas不能跨機(jī)進(jìn)行大數(shù)據(jù)處理的不足。pandas還能夠與Spark原來(lái)的DataFrame相互轉(zhuǎn)換,方便Spark和Python的庫(kù)相互調(diào)用。
Koalas(https://koalas.readthedocs.io/en/latest/)項(xiàng)目使數(shù)據(jù)科學(xué)家在處理大數(shù)據(jù)時(shí)能夠更有效率,通過(guò)在Spark的上層實(shí)現(xiàn)一套pandas DataFrame API。pandas 是python數(shù)據(jù)處理事實(shí)上的標(biāo)準(zhǔn),而Spark是大數(shù)據(jù)處理的事實(shí)上的標(biāo)準(zhǔn)。通過(guò)Koalas,可以:
通過(guò) Spark 立即提升大數(shù)據(jù)處理生產(chǎn)力,如果熟悉pandas不用學(xué)習(xí)任何新的知識(shí)。
在pandas (tests, smaller datasets) 和 Spark (distributed datasets)只需要一套數(shù)據(jù)分析代碼,方便從研究環(huán)境擴(kuò)展到生產(chǎn)環(huán)節(jié)。
Koalas要求PySpark,需要首先安裝PySpark。
Koalas安裝的多種方式包括:
Conda
PyPI
Installation from source
安裝PySpark,可以使用:
Installation with the official release channel
Conda
PyPI
Installation from source
建議Python 3.5 及以上版本。
通過(guò) Conda 安裝
首先需要安裝 Conda ,然后創(chuàng)建一個(gè)conda環(huán)境。如下:
conda create --name koalas-dev-env
將創(chuàng)建一個(gè)只有 Python的最小環(huán)境,激活當(dāng)前環(huán)境:
conda activate koalas-dev-env
安裝 Koalas:
conda install -c conda-forge koalas
安裝Koalas的特定版本:
conda install -c conda-forge koalas=0.19.0
從 PyPI 安裝
Koalas 可以使用 pip 從 PyPI 安裝:
pip install koalas
從源碼安裝
查看 Contribution Guide 獲得更多指南。
采用官方頻道安裝:
安裝PySpark,從 the official release channel 下載。下載后,解包:
tar xzvf spark-2.4.4-bin-hadoop2.7.tgz
設(shè)置 SPARK_HOME
環(huán)境變量:
cd spark-2.4.4-bin-hadoop2.7 export SPARK_HOME=`pwd`
確保 PYTHONPATH
可以被 PySpark 和 Py4J找到,在 $SPARK_HOME/python/lib
:
export PYTHONPATH=$(ZIPS=("$SPARK_HOME"/python/lib/*.zip); IFS=:; echo "${ZIPS[*]}"):$PYTHONPATH
從Conda安裝:
PySpark 也可以從 Conda 安裝:
conda install -c conda-forge pyspark
從PyPI安裝:
PySpark 可以從 PyPI 安裝:
pip install pyspark
首先,import Koalas 如下:
import pandas as pdimport numpy as npimport databricks.koalas as ksfrom pyspark.sql import SparkSession
創(chuàng)建 Koalas Series,創(chuàng)建一個(gè)整數(shù)序列值:
s = ks.Series([1, 3, 5, np.nan, 6, 8])
s
0 1.0 1 3.0 2 5.0 3 NaN 4 6.0 5 8.0 dtype: float64
創(chuàng)建 Koalas DataFrame,導(dǎo)入詞典對(duì)象,轉(zhuǎn)為一個(gè)序列:
kdf = ks.DataFrame({'a': [1, 2, 3, 4, 5, 6], 'b': [100, 200, 300, 400, 500, 600], 'c': ["one", "two", "three", "four", "five", "six"]},index=[10, 20, 30, 40, 50, 60])
kdf
a | b | c | |
---|---|---|---|
10 | 1 | 100 | one |
20 | 2 | 200 | two |
30 | 3 | 300 | three |
40 | 4 | 400 | four |
50 | 5 | 500 | five |
60 | 6 | 600 | six |
創(chuàng)建 pandas DataFrame,導(dǎo)入 numpy array,帶datetime index 和 labeled columns:
dates = pd.date_range('20130101', periods=6)
dates
DatetimeIndex(['2013-01-01', '2013-01-02', '2013-01-03', '2013-01-04', '2013-01-05', '2013-01-06'], dtype='datetime64[ns]', freq='D')
pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))
A | B | C | D | |
---|---|---|---|---|
2013-01-01 | -0.407291 | 0.066551 | -0.073149 | 0.648219 |
2013-01-02 | -0.848735 | 0.437277 | 0.632657 | 0.312861 |
2013-01-03 | -0.415537 | -1.787072 | 0.242221 | 0.125543 |
2013-01-04 | -1.637271 | 1.134810 | 0.282532 | 0.133995 |
2013-01-05 | -1.230477 | -1.925734 | 0.736288 | -0.547677 |
2013-01-06 | 1.092894 | -1.071281 | 0.318752 | -0.477591 |
現(xiàn)在,把pandas DataFrame 轉(zhuǎn)為 Koalas DataFrame:
kdf = ks.from_pandas(pdf)
type(kdf)
databricks.koalas.frame.DataFrame
看起來(lái)與 pandas DataFrame幾乎一樣。
更多例程:https://koalas.readthedocs.io/en/latest/getting_started/10min.html
pandas與dataframe、koalas都可以相互轉(zhuǎn)換。注意pandas與dataframe的轉(zhuǎn)換效率較低,而且pandas原生接口是單機(jī)的,建議使用Koalas。
from pyspark.sql import SparkSession# 初始化spark會(huì)話spark = SparkSession \ .builder \ .getOrCreate() spark_df = spark.createDataFrame(pandas_df)
import pandas as pdpandas_df = spark_df.toPandas()
由于pandas
的方式是單機(jī)版的,即toPandas()
的方式是單機(jī)版的,所以參考breeze_lsw改成分布式版本:
import pandas as pddef _map_to_pandas(rdds):return [pd.DataFrame(list(rdds))] def topas(df, n_partitions=None):if n_partitions is not None: df = df.repartition(n_partitions) df_pand = df.rdd.mapPartitions(_map_to_pandas).collect() df_pand = pd.concat(df_pand) df_pand.columns = df.columnsreturn df_pand pandas_df = topas(spark_df)
以上是“Spark 3.0中pandas支持及其與DataFrame相互轉(zhuǎn)換的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。