溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

集算器實現(xiàn)外部數(shù)據(jù)并行計算

發(fā)布時間:2020-07-29 05:12:48 來源:網(wǎng)絡(luò) 閱讀:207 作者:raqsoft 欄目:大數(shù)據(jù)

? 文本并行

SPL可將文本文件按體積大致分為N段,只讀取其中一段。比如cardInfo.txt存儲著一千萬條人口信息,將其分為十份,取第二份,代碼可以寫作:


AB
1=file("d:\\temp\\cardInfo千萬.txt")
2=A1.import@t(;2:10)/直接讀入內(nèi)存
3=A1.cursor@t(;2:10).fetch@x()/游標方式讀取

按體積大致分段,而不是按行數(shù)精確分段,目的是提高分段性能。比如在IDE中觀察A2或A3的前幾個字段,可以看到行數(shù)并非精確的100萬(與具體數(shù)據(jù)有關(guān)):

indexcardNonamegenderprovincemobile
1308200310180525Alison ClintonfemaleIdaho?1024627490
2709198311300191Abby WoodfemaleKansas?19668466
31005199807060610George BushmaleCalifornia?1019879226
1000005405199907050256Mark RowswellmaleIdaho?1168620176

分段讀取可應(yīng)用于多線程計算,從而提高讀取性能。比如用2個線程分別讀取cardInfo.txt,各線程計算本段行數(shù),最后合并為總行數(shù),可用如下代碼:

5fork to(2)=A1.cursor@t(;A5:2).total(count(1))/2線程分段
6=A5.sum()
/合并結(jié)果

語句fork語句適合算法較復(fù)雜的情況,當(dāng)算法比較簡單時,可用cursor@m直接分段讀取。比如前面的代碼可以改寫如下:

7=A1.cursor@tm(;2).total(count(1))/2線程分段

上述代碼指定了線程數(shù),如果省略線程數(shù),則用配置文件中的“parallet limit”當(dāng)做默認線程數(shù)。假設(shè)parallet limit=2,則上述代碼可以改寫成:

8=A1.cursor@tm().total(count(1))/默認線程數(shù)分段

?

為了驗證分段讀取前后的性能差異,下面設(shè)計一個算法,分別用單線程和2線程計算cardInfo.txt的總行數(shù),可以看到性能顯著提升:

11=now()
12=A1.cursor@t().total(count(1))
13=interval@ms(A11,now())/未分段,20882ms
14

15=now()
16=A1.cursor@tm(;2).total(count(1))
17=interval@ms(A15,now())/2線程分段,12217ms

?

? JDBC 并行

通過JDBC取數(shù)時,有時會遇到數(shù)據(jù)庫負載雖然不重,但取數(shù)性能仍然較差的情況,這種情況下可以用并行取數(shù)提高性能。

比如Oracle數(shù)據(jù)庫有一張通話記錄表callrecord,記錄數(shù)100萬條,索引字段是callTime,且數(shù)據(jù)基本按該字段平均分布。采用非并行取數(shù)時,可以發(fā)現(xiàn)性能不夠理想,代碼如下:


AB
1=now()/記錄時間,用于測試性能
2=connect("orcl")
3=A2.query@x("select * from ? callrecord")
4=interval@ms(A1,now())/非并行取數(shù),17654ms

改為2線程并行取數(shù)后,可以看到性能提升明顯,代碼如下:

6=now()
7=connect("orcl").query@x("select ? min(callTime),max(callTime) from callrecordA")
8=2.(range(A7.#1,elapse@s(A7.#2,1),~:2))/時間區(qū)間參數(shù)列表
9fork A8=connect("orcl")
10
=B9.query@x("select * from ? callrecordA where callTime>=? and callTime<?",A9(1),A9(2))
11=A9.conj()
12=interval@ms(A6,now())/并行取數(shù),10045ms

既然要并行取數(shù),就要把源數(shù)據(jù)分成多個區(qū)間,使每區(qū)間的數(shù)據(jù)量大致相等。在這個例子中,索引字段是時間類型callTime,所以先用A7求出callTime的數(shù)據(jù)范圍,再用A8將該范圍平均分成2個時間區(qū)間。之后在A9進行并行計算,每個線程以各自的時間區(qū)間為參數(shù)執(zhí)行SQL,取數(shù)結(jié)果將大致相等。最后合并多線程的取數(shù)結(jié)果,作為最終結(jié)果。

函數(shù)range非常適合對數(shù)據(jù)分段。該函數(shù)可將某范圍平均分為N個區(qū)間,獲得第i個區(qū)間,且可根據(jù)范圍的數(shù)據(jù)類型自動調(diào)整區(qū)間的數(shù)據(jù)類型。本例的范圍類型是datetime,則函數(shù)range將范圍按秒均分,返回類型也是datetime。如果范圍類型是date,則函數(shù)range按天均分;如果范圍類型是整數(shù),則函數(shù)range按整數(shù)均分。

上面例子中,分段字段是索引,如果沒有建立索引,則查詢性能會出現(xiàn)下降。在這種情況下,并行取數(shù)仍然可以帶來明顯的性能提升,所以可以用相同的方法。

上面例子中,源數(shù)據(jù)基本按callTime平均分布,因此容易使各區(qū)間的數(shù)據(jù)量大致相等,如果源數(shù)據(jù)分布很不平均,可以考慮按行號分段。每種數(shù)據(jù)庫都有生成行號的方法,比如oralce可用rownum。

?

除了單表單SQL并行取數(shù),SPL也支持多表多SQL并行取數(shù)。比如某報表格式較復(fù)雜,需要SPL執(zhí)行多個SQL,并按一定的格式拼出結(jié)果集。當(dāng)采用非并行取數(shù)時,可以發(fā)現(xiàn)性能不夠理想,代碼如下:


AB
1=now()=connect("orcl")
2select count(1)? from? ? callrecordA where to_char(calltime,'yyyy')='2015'=B1.query(A2)
3select count(1)? from? ? callrecordA where to_char(calltime,'yyyyMM')='201501'=B1.query(A3)
4select count(1)? from? ? callrecordA where to_char(calltime,'yyyyMM')='201502'=B1.query(A4)
5select count(1)? from? ? callrecordA where to_char(calltime,'yyyyMM')='201503'=B1.query(A5)
6select count(1)? from? ? callrecordA where to_char(calltime,'yyyy')='2016'=B1.query(A6)
7select count(1)? from? ? callrecordA where to_char(calltime,'yyyyMM')='201601'=B1.query(A7)
8select count(1)? from? ? callrecordA where to_char(calltime,'yyyyMM')='201602'=B1.query(A8)
9select count(1)? from? ? callrecordA where to_char(calltime,'yyyyMM')='201603'=B1.query(A9)
10
=B1.close()
11=[B2:B9].new(~.#1:data)
12=interval@ms(A1,now())/非并行取數(shù),2195毫秒

改為4線程并行取數(shù)后,可以看到性能提升明顯,代碼如下:

14=now()
15fork [A2:A9]=connect("orcl")
16
=B15.query@x(A15)
17=A15.new(~.#1:data)
18=interval@ms(A14,now())/4并行取數(shù),1320毫秒

需要注意的是,并行取數(shù)時任務(wù)數(shù)可大于并行數(shù)。比如上面代碼共8個任務(wù),但同時執(zhí)行的任務(wù)只有4個,其他待執(zhí)行的任務(wù)排在隊列中,如果某個小任務(wù)先執(zhí)行完成,SPL會從隊列中取下一個任務(wù)并執(zhí)行它??梢钥吹?,當(dāng)任務(wù)數(shù)較多時,即使各任務(wù)負載相差較大,也能充分發(fā)揮硬件性能。

?

? 混合并行

當(dāng)數(shù)據(jù)量太大時,除了分庫計算,還可以進行混合數(shù)據(jù)源并行計算,后者性能更高。具體做法是:把數(shù)據(jù)分為兩部分(或多部分),一部分存儲在數(shù)據(jù)庫中,通常是當(dāng)前實時數(shù)據(jù),一部分存儲在組文件,通常是歷史數(shù)據(jù),再對兩種數(shù)據(jù)源進行并行計算,從而獲得更高性能。

比如歷史訂單存儲在orders.ctx中,當(dāng)前訂單存儲在數(shù)據(jù)庫orcl中,請按年、月分組,對各組數(shù)據(jù)的amount字段求和。SPL代碼如下:


AB
1forkselect extract(year from ? orderTime)y,extract(month from orderTime)m,sum(amount) amount from orders ? group by? extract(year from ? orderTime),extract(month from orderTime)
2
=connect("orcl")
3
=B2.query@x(B1)
4fork=file("orders.ctx").create()
5
=B4.groups(year(ORDERTIME):Y,month(ORDERTIME):M;sum(AMOUNT):AMOUNT)
6=[A1,A4].conj()
7=A6.groups(Y,M;sum(AMOUNT):AMOUNT)

注意fork……fork……的用法。如果fork語句塊下接非fork語句塊,則兩者順序執(zhí)行,如果fork語句塊下接fork語句塊,則兩者并行執(zhí)行。


向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI