您好,登錄后才能下訂單哦!
這篇文章主要介紹Python大數(shù)據(jù)分析神器Dask有什么用,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!
1、什么是Dask?
Pandas
和Numpy
大家都不陌生了,代碼運行后數(shù)據(jù)都加載到RAM中,如果數(shù)據(jù)集特別大,我們就會看到內(nèi)存飆升。但有時要處理的數(shù)據(jù)并不適合RAM
,這時候Dask
來了。
Dask
是開源免費的。它是與其他社區(qū)項目(如Numpy,Pandas和Scikit-Learn)協(xié)調(diào)開發(fā)的。
官方:https://dask.org/
Dask
支持Pandas
的DataFrame
和NumpyArray
的數(shù)據(jù)結(jié)構(gòu),并且既可在本地計算機上運行,也可以擴展到在集群上運行。
基本上,只要編寫一次代碼,使用普通的Pythonic
語法,就可在本地運行或部署到多節(jié)點集群上。這本身就是一個很牛逼的功能了,但這還不是最牛逼的。
我覺得Dask
的最牛逼的功能是:它兼容大部分我們已經(jīng)在用的工具,并且只需改動少量的代碼,就可以利用自己筆記本電腦上已有的處理能力并行運行代碼。而并行處理數(shù)據(jù)就意味著更少的執(zhí)行時間,更少的等待時間和更多的分析時間。
下面這個就是Dask
進行數(shù)據(jù)處理的大致流程。
2、Dask支持哪些現(xiàn)有工具?
這一點也是我比較看中的,因為Dask
可以與Python
數(shù)據(jù)處理和建模的庫包兼容,沿用庫包的API,這對于Python使用者來說學(xué)習(xí)成本是極低的。而像Hadoop
、Spark
這種大數(shù)據(jù)處理是有很高的學(xué)習(xí)門檻和時間成本的。
目前,Dask
可支持pandas
、Numpy
、Sklearn
、XGBoost
、XArray
、RAPIDS
等等,光是這幾項我覺得就足夠用了,至少對于常用的數(shù)據(jù)處理、建模分析是完全覆蓋得掉的。
3、Dask安裝
可以使用 conda
或者 pip
,或從源代碼安裝dask
。
conda install dask
因為dask
有很多依賴,所以為了快速安裝也可用下面代碼,將安裝運行Dask
所需的最少依賴關(guān)系集。
conda install dask-core
再有就是通過源來安裝。
git clone https://github.com/dask/dask.git cd dask python -m pip install .
4、Dask如何使用?
Numpy、pandas
Dask
引入了3個并行集合,它們可以存儲大于RAM的數(shù)據(jù),這些集合有DataFrame
、Bags
、Arrays
。這些集合類型中的每一個都能夠使用在RAM和硬盤之間分區(qū)的數(shù)據(jù),以及分布在群集中多個節(jié)點上的數(shù)據(jù)。
Dask
的使用是非常清晰的,如果你使用NumPy
數(shù)組,就從Dask
數(shù)組開始,如果你使用Pandas DataFrame
,就從Dask DataFrame
開始,依此類推。
import dask.array as da x = da.random.uniform(low=0, high=10, size=(10000, 10000), # normal numpy code chunks=(1000, 1000)) # break into chunks of size 1000x1000 y = x + x.T - x.mean(axis=0) # Use normal syntax for high level algorithms # DataFrames import dask.dataframe as dd df = dd.read_csv('2018-*-*.csv', parse_dates='timestamp', # normal Pandas code blocksize=64000000) # break text into 64MB chunks s = df.groupby('name').balance.mean() # Use normal syntax for high level algorithms # Bags / lists import dask.bag as db b = db.read_text('*.json').map(json.loads) total = (b.filter(lambda d: d['name'] == 'Alice') .map(lambda d: d['balance']) .sum())
這些高級接口在略微變化的情況下復(fù)制了標準接口。對于原始項目中的大部分API,這些接口會自動為我們并行處理較大的數(shù)據(jù)集,實現(xiàn)上不是很復(fù)雜,對照Dask
的doc文檔即可一步步完成。
Delayed
下面說一下Dask
的 Delay
功能,非常強大。
Dask.delayed
是一種并行化現(xiàn)有代碼的簡單而強大的方法。之所以被叫做delayed
是因為,它沒有立即計算出結(jié)果,而是將要作為任務(wù)計算的結(jié)果記錄在一個圖形中,稍后將在并行硬件上運行。
有時問題用已有的dask.array
或dask.dataframe
可能都不適合,在這些情況下,我們可以使用更簡單的dask.delayed
界面并行化自定義算法。例如下面這個例子。
def inc(x): return x + 1 def double(x): return x * 2 def add(x, y): return x + y data = [1, 2, 3, 4, 5] output = [] for x in data: a = inc(x) b = double(x) c = add(a, b) output.append(c) total = sum(output) 45
上面代碼在單個線程中按順序運行。但是,我們看到其中很多可以并行執(zhí)行。Dask delayed
函數(shù)可修飾inc
、double
這些函數(shù),以便它們可延遲運行,而不是立即執(zhí)行函數(shù),它將函數(shù)及其參數(shù)放入計算任務(wù)圖中。
我們簡單修改代碼,用delayed
函數(shù)包裝一下。
import dask output = [] for x in data: a = dask.delayed(inc)(x) b = dask.delayed(double)(x) c = dask.delayed(add)(a, b) output.append(c) total = dask.delayed(sum)(output)
代碼運行后inc
、double
、add
和sum
都還沒有發(fā)生,而是生成一個計算的任務(wù)圖交給了total
。然后我們用visualizatize
看下任務(wù)圖。
total.visualize()
上圖明顯看到了并行的可能性,所以毫不猶豫,使用compute
進行并行計算,這時才完成了計算。
>>> total.compute() 45
由于數(shù)據(jù)集較小無法比較時間,這里只介紹下使用方法,具體可自己動手實踐下。
Sklearn機器學(xué)習(xí)
關(guān)于機器學(xué)習(xí)的并行化執(zhí)行,由于內(nèi)容較多,東哥會在另一篇文章展開。這里簡單說下一下dask-learn
。
dask-learn
項目是與Sklearn
開發(fā)人員協(xié)作完成的?,F(xiàn)在可實現(xiàn)并行化有Scikit-learn
的Pipeline
、GridsearchCV
和RandomSearchCV
以及這些的變體,它們可以更好地處理嵌套的并行操作。
因此,如果你將sklearn
替換為dklearn
,那么速度將會提升很多。
# from sklearn.grid_search import GridSearchCV from dklearn.grid_search import GridSearchCV # from sklearn.pipeline import Pipeline from dklearn.pipeline import Pipeline 下面是一個使用Pipeline的示例,其中應(yīng)用了PCA和邏輯回歸。 from sklearn.datasets import make_classification X, y = make_classification(n_samples=10000, n_features=500, n_classes=2, n_redundant=250, random_state=42) from sklearn import linear_model, decomposition from sklearn.pipeline import Pipeline from dklearn.pipeline import Pipeline logistic = linear_model.LogisticRegression() pca = decomposition.PCA() pipe = Pipeline(steps=[('pca', pca), ('logistic', logistic)]) grid = dict(pca__n_components=[50, 100, 150, 250], logistic__C=[1e-4, 1.0, 10, 1e4], logistic__penalty=['l1', 'l2']) # from sklearn.grid_search import GridSearchCV from dklearn.grid_search import GridSearchCV estimator = GridSearchCV(pipe, grid) estimator.fit(X, y)
結(jié)果是:sklearn
會在40秒鐘左右執(zhí)行此計算,而dask-learn
替代品大約需要10秒鐘。
另外,如果添加以下代碼可以連接到集群,通過Client
可以展示整個計算過程的dashboard
,由Bokeh
實現(xiàn)。
from dask.distributed import Client c = Client('scheduler-address:8786')
以上是“Python大數(shù)據(jù)分析神器Dask有什么用”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對大家有幫助,更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。