您好,登錄后才能下訂單哦!
這期內(nèi)容當(dāng)中小編將會給大家?guī)碛嘘P(guān)使用pandas怎么對大文件進(jìn)行計數(shù)處理,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
Pandas讀取大文件
要處理的是由探測器讀出的脈沖信號,一組數(shù)據(jù)為兩列,一列為時間,一列為脈沖能量,數(shù)據(jù)量在千萬級,為了有一個直接的認(rèn)識,先使用Pandas讀取一些
import pandas as pd data = pd.read_table('filename.txt', iterator=True) chunk = data.get_chunk(5)
而輸出是這樣的:
Out[4]: 332.977889999979 -0.0164794921875 0 332.97790 -0.022278 1 332.97791 -0.026855 2 332.97792 -0.030518 3 332.97793 -0.045776 4 332.97794 -0.032654
DataFram基本用法
這里,data只是個容器,pandas.io.parsers.TextFileReader。
使用astype可以實現(xiàn)dataframe字段類型轉(zhuǎn)換
輸出數(shù)據(jù)中,每組數(shù)據(jù)會多處一行,因為get_chunk返回的是pandas.core.frame.DataFrame格式, 而data在讀取過程中并沒有指定DataFrame的columns,因此在get_chunk過程中,默認(rèn)將第一組數(shù)據(jù)作為columns。因此需要在讀取過程中指定names即DataFrame的columns。
import pandas as pd data = pd.read_table('filename.txt', iterator=True, names=['time', 'energe']) chunk = data.get_chunk(5) data['energe'] = df['energe'].astype('int')
輸出為
Out[6]:
index | time | energe |
---|---|---|
0 | 332.97789 | -0.016479 |
1 | 332.97790 | -0.022278 |
2 | 332.97791 | -0.026855 |
3 | 332.97792 | -0.030518 |
4 | 332.97793 | -0.045776 |
DataFram存儲和索引
這里講一下DataFrame這個格式,與一般二維數(shù)據(jù)不同(二維列表等),DataFrame既有行索引又有列索引,因此在建立一個DataFrame數(shù)據(jù)是
DataFrame(data, columns=[‘year', ‘month', ‘day'], index=[‘one', ‘two', ‘three'])
year | month | day | |
---|---|---|---|
0 | 2010 | 4 | 1 |
1 | 2011 | 5 | 2 |
2 | 2012 | 6 | 3 |
3 | 2013 | 7 | 5 |
4 | 2014 | 8 | 9 |
而pd.read_table中的names就是指定DataFrame的columns,而index自動設(shè)置。 而DataFrame的索引格式有很多
類型 | 說明 | 例子 |
---|---|---|
obj[val] | 選取單列或者一組列 | |
obj.ix[val] | 選取單個行或者一組行 | |
obj.ix[:,val] | 選取單個列或列子集 | |
obj.ix[val1, val2] | 同時選取行和列 | |
reindex方法 | 將一個或多個軸匹配到新索引 | |
xs方法 | 根據(jù)標(biāo)簽選取單行或單列,返回一個Series | |
icol,lrow方法 | 根據(jù)整數(shù)位置選取單列或單行,返回一個Series | |
get_value,set_value | 根據(jù)行標(biāo)簽列標(biāo)簽選取單個值 |
exp: In[1]:data[:2]
Out[2]:
year | month | day | |
---|---|---|---|
0 | 2010 | 4 | 1 |
1 | 2011 | 5 | 2 |
In[2]:data[data[‘month']>5]
Out[2]:
year | month | day | |
---|---|---|---|
2 | 2012 | 6 | 3 |
4 | 2014 | 8 | 9 |
如果我們直接把data拿來比較的話,相當(dāng)于data中所有的標(biāo)量元素
In[3]:data[data<6]=0
Out[3]:
year | month | day | |
---|---|---|---|
0 | 2010 | 0 | 0 |
1 | 2011 | 0 | 0 |
2 | 2012 | 6 | 0 |
3 | 2013 | 7 | 0 |
4 | 2014 | 8 | 9 |
Pandas運(yùn)算
series = data.ix[0] data - series
Out:
year | month | day | |
---|---|---|---|
0 | 0 | 0 | 0 |
1 | 1 | 1 | 1 |
2 | 2 | 2 | 2 |
3 | 3 | 3 | 4 |
4 | 4 | 4 | 8 |
DataFrame與Series之間運(yùn)算會將Series索引匹配到DataFrame的列,然后沿行一直向下廣播
如果令series1 = data[‘year']
data.sub(series1,axis=0)
則每一列都減去該series1,axis為希望匹配的軸,=0行索引,即匹配列,=1列索引,則按行匹配。
DataFrame的一些函數(shù)方法
這個就有很多了,比如排序和排名;求和、平均數(shù)以及方差、協(xié)方差等數(shù)學(xué)方法;還有就是唯一值(類似于集合)、值計數(shù)和成員資格等方法。
當(dāng)然還有一些更高級的屬性,用的時候再看吧
數(shù)據(jù)處理
在得到數(shù)據(jù)樣式后我們先一次性讀取數(shù)據(jù)
start = time.time() data = pd.read_table('Eu155_Na22_K40_MR_0CM_3Min.csv', names=['time', 'energe']) end = time.time() data.index print("The time is %f s" % (end - start)) plus = data['energe'] plus[plus < 0] = 0
The time is 29.403917 s RangeIndex(start=0, stop=68319232, step=1)
對于一個2G大小,千萬級的數(shù)據(jù),這個讀取速度還是挺快的。之前使用matlab load用時160多s,但是不知道這個是否把數(shù)據(jù)完全讀取了。然后只抽取脈沖信號,將負(fù)值歸0,因為會出現(xiàn)一定的電子噪聲從而產(chǎn)生一定負(fù)值。
然后就需要定位脈沖信號中的能峰了,也就是findpeaks
這里用到了scipy.signal中的find_peaks_cwt,具體用法可以參見官方文檔
peaks = signal.find_peaks_cwt(pluse, np.arange(1, 10)),它返回找到的peaks的位置,輸入第一個為數(shù)據(jù),第二個為窗函數(shù),也就是在這個寬度的能窗內(nèi)尋找峰,我是這樣理解的。剛開始以為是數(shù)據(jù)的另一維坐標(biāo),結(jié)果找了半天沒結(jié)果。不過事實上這個找的確定也挺慢的。
50w條的數(shù)據(jù),找了足足7分鐘,我這一個數(shù)據(jù)3000w條不得找半個多小時,而各種數(shù)據(jù)有好幾十,恩。。這樣是不行的,于是想到了并行的方法。這個下篇文章會講到,也就是把數(shù)據(jù)按照chunksize讀取,然后同時交給(map)幾個進(jìn)程同時尋峰,尋完后返回(reduce)一起計數(shù),計數(shù)的同時,子進(jìn)程再此尋峰。
在處理的時候碰到我自己的破 筆記本由于內(nèi)存原因不能load這個數(shù)據(jù),并且想著每次copy這么大數(shù)據(jù)好麻煩,就把一個整體數(shù)據(jù)文件分割成了幾個部分,先對方法進(jìn)行一定的實驗,時間快,比較方便。
import pandas as pd def split_file(filename, size): name = filename.split('.')[0] data = pd.read_table(filename, chunksize=size, names=['time', 'intension']) i = 1 for piece in data: outname = name + str(i) + '.csv' piece.to_csv(outname, index=False, names = ['time', 'intension']) i += 1 def split_csvfile(filename, size): name = filename.split('.')[0] data = pd.read_csv(filename, chunksize=size, names=['time', 'intension']) i = 1 for piece in data: outname = name + str(i) + '.csv' piece = piece['intension'] piece.to_csv(outname, index=False) i += 1
額..使用并行尋峰通過map/reduce的思想來解決提升效率這個想法,很早就實現(xiàn)了,但是,由于效果不是特別理想,所以放那也就忘了,今天整理代碼來看了下當(dāng)時記的些筆記,然后竟然發(fā)現(xiàn)有個評論…..我唯一收到的評論竟然是“催稿”=。=。想一想還是把下面的工作記錄下來,免得自己后來完全忘記了。
rom scipy import signal import os import time import pandas as pd import numpy as np from multiprocessing import Pool import matplotlib.pylab as plt from functools import partial def findpeak(pluse): pluse[pluse < 0.05] = 0 print('Sub process %s.' % os.getpid()) start = time.time() peaks = signal.find_peaks_cwt(pluse, np.arange(1, 10)) # 返回一個列表 end = time.time() print("The time is %f s" % (end - start)) pks = [pluse[x] for x in peaks] return pks def histcnt(pks, edge=None, channel=None): cnt = plt.hist(pks, edge) res = pd.DataFrame(cnt[0], index=channel, columns=['cnt']) return res if __name__ == '__main__': with Pool(processes=8) as p: start = time.time() print('Parent process %s.' % os.getpid()) pluse = pd.read_csv('data/samples.csv', chunksize=50000, names=['time', 'energe']) channel = pd.read_csv('data/channels.txt', names=['value']) edges = channel * 2 edges = pd.DataFrame({'value': [0]}).append(edges, ignore_index=True) specal = [] for data in pluse: total = p.apply_async(findpeak, (data['energe'],), callback=partial(histcnt, edge=edges['value'], channel=channel['value'])) specal.append(total) print('Waiting for all subprocesses done...') p.close() p.join() print('All subprocesses done.') spec = sum(specal) plt.figure() plt.plot(spec['cnt']) spec.to_csv('data/spec1.csv', header=False) print('every is OK') end = time.time() print("The time is %f s" % (end - start))
由于對對進(jìn)程線程的編程不是很了解,其中走了很多彎路,嘗試了很多方法也,這個是最終效果相對較好的。
首先,通過 pd.readtable以chunksize=50000分塊讀取,edges為hist過程中的下統(tǒng)計box。
然后,apply_async為非阻塞調(diào)用findpeak,然后將結(jié)果返回給回調(diào)函數(shù)histcnt,但是由于回調(diào)函數(shù)除了進(jìn)程返回結(jié)果還有額外的參數(shù),因此使用partial,對特定的參數(shù)賦予固定的值(edge和channel)并返回了一個全新的可調(diào)用對象,這個新的可調(diào)用對象仍然需要通過制定那些未被賦值的參數(shù)(findpeak返回的值)來調(diào)用。這個新的課調(diào)用對象將傳遞給partial()的固定參數(shù)結(jié)合起來,同一將所有參數(shù)傳遞給原始函數(shù)(histcnt)。(至于為啥不在histcnt中確定那兩個參數(shù),主要是為了避免一直打開文件。。當(dāng)然,有更好的辦法只是懶得思考=。=),還有個原因就是,apply_async返回的是一個對象,需要通過該對象的get方法才能獲取值。。
對于 apply_async官方上是這樣解釋的
Apply_async((func[, args[, kwds[, callback[, error_callback]]]])),apply()方法的一個變體,返回一個結(jié)果對象
如果指定回調(diào),那么它應(yīng)該是一個可調(diào)用的接受一個參數(shù)。結(jié)果準(zhǔn)備好回調(diào)時,除非調(diào)用失敗,在這種情況下,應(yīng)用error_callback代替。
如果error_callback被指定,那么它應(yīng)該是一個可調(diào)用的接受一個參數(shù)。如果目標(biāo)函數(shù)失敗,那么error_callback叫做除了實例。
回調(diào)應(yīng)立即完成以來,否則線程處理結(jié)果將被封鎖。
不使用回調(diào)函數(shù)的版本如下,即先將所有子進(jìn)程得到的數(shù)據(jù)都存入peaks列表中,然后所有進(jìn)程完畢后在進(jìn)行統(tǒng)計計數(shù)。
import pandas as pd import time import scipy.signal as signal import numpy as np from multiprocessing import Pool import os import matplotlib.pyplot as plt def findpeak(pluse): pluse[pluse < 0] = 0 pluse[pluse > 100] = 0 print('Sub process %s.' % os.getpid()) start = time.time() peaks = signal.find_peaks_cwt(pluse, np.arange(1, 10)) end = time.time() print("The time is %f s" % (end - start)) res = [pluse[x] for x in peaks] return res if __name__ == '__main__': with Pool(processes=8) as p: start = time.time() print('Parent process %s.' % os.getpid()) pluse = pd.read_csv('data/sample.csv', chunksize=200000, names=['time', 'energe']) pks = [] for data in pluse: pks.append(p.apply_async(findpeak, (data['energe'],))) print('Waiting for all subprocesses done...') p.close() p.join() print('All subprocesses done.') peaks = [] for i, ele in enumerate(pks): peaks.extend(ele.get()) peaks = pd.DataFrame(peaks, columns=['energe']) peaks.to_csv('peaks.csv', index=False, header=False, chunksize=50000) channel = pd.read_csv('data/channels.txt', names=['value']) channel *= 2 channel = pd.DataFrame({'value': [0]}).append(channel, ignore_index=True) plt.figure() spec = plt.hist(peaks['energe'], channel['value']) # out.plot.hist(bins=1024) # print(out) # cnt = peaks.value_counts(bins=1024) # cnt.to_csv('data/cnt.csv', index=False, header=False) print('every is OK') end = time.time() print("The time is %f s" % (end - start))
上述就是小編為大家分享的使用pandas怎么對大文件進(jìn)行計數(shù)處理了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。