您好,登錄后才能下訂單哦!
python中怎么控制多進程,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。
multiprocessing簡介
multiprocessing是python自帶的多進程模塊,可以大批量的生成進程,在服務器為多核CPU時效果更好,類似于threading模塊。相對于多線程,多進程由于獨享內(nèi)存空間,更穩(wěn)定安全,在運維里面做些批量操作時,多進程有更多適用的場景
multiprocessing包提供了本地和遠程兩種并發(fā)操作,有效的避開了使用子進程而不是全局解釋鎖的線程,因此,multiprocessing可以有效利用到多核處理
Process類
在multiporcessing中,通過Process類對象來批量產(chǎn)生進程,使用start()方法來啟動這個進程
1.語法
multiprocessing.Process(group=None,target=None,name=None,args=(),kwargs={},*)
group: 這個參數(shù)一般為空,它只是為了兼容threading.Tread
target: 這個參數(shù)就是通過run()可調(diào)用對象的方法,默認為空,表示沒有方法被調(diào)用
name: 表示進程名
args: 傳給target調(diào)用方法的tuple(元組)參數(shù)
kwargs: 傳給target調(diào)用方法的dict(字典)參數(shù)
2.Process類的方法及對象
run()
該方法是進程的運行過程,可以在子類中重寫此方法,一般也很少去重構
start()
啟動進程,每個進程對象都必須被該方法調(diào)用
join([timeout])
等待進程終止,再往下執(zhí)行,可以設置超時時間
name
可以獲取進程名字,多個進程也可以是相同的名字
is_alive()
返回進程是否還存活,True or False,進程存活是指start()開始到子進程終止
daemon
守護進程的標記,一個布爾值,在start()之后設置該值,表示是否后臺運行
注意:如果設置了后臺運行,那么后臺程序不運行再創(chuàng)建子進程
pid
可以獲取進程ID
exitcode
子進程退出時的值,如果進程還沒有終止,值將是None,如果是負值,表示子進程被終止
terminate()
終止進程,如果是Windows,則使用terminateprocess(),該方法對已經(jīng)退出和結束的進程,將不會執(zhí)行
以下為一個簡單的例子:
#-*- coding:utf8 -*- import multiprocessing import time def work(x): time.sleep(1) print time.ctime(),'這是子進程[{0}]...'.format(x) if __name__ == '__main__': for i in range(5): p = multiprocessing.Process(target=work,args=(i,)) print '啟動進程數(shù):{0}'.format(i) p.start() p.deamon = True
當然也可以顯示每個進程的ID
#-*- coding:utf8 -*- import multiprocessing import time import os def work(x): time.sleep(1) ppid = os.getppid() pid = os.getpid() print time.ctime(),'這是子進程[{0},父進程:{1},子進程:{2}]...'.format(x,ppid,pid) if __name__ == '__main__': for i in range(5): p = multiprocessing.Process(target=work,args=(i,)) print '啟動進程數(shù):{0}'.format(i) p.start() p.deamon = True
但在實際使用的過程中,并不只是并發(fā)完就可以了,比如,有30個任務,由于服務器資源有限,每次并發(fā)5個任務,這里還涉及到30個任務怎么獲取的問題,另外并發(fā)的進程任務執(zhí)行時間很難保證一致,尤其是需要時間的任務,可能并發(fā)5個任務,有3個已經(jīng)執(zhí)行完了,2個還需要很長時間執(zhí)行,總不能等到這兩個進程執(zhí)行完了,再繼續(xù)執(zhí)行后面的任務,因此進程控制就在此有了使用場景,可以利用Process的方法和一些multiprocessing的包,類等結合使用
進程控制及通信常用類
一、Queue類
類似于python自帶的Queue.Queue,主要用在比較小的隊列上面
語法:
multiprocessing.Queue([maxsize])
類方法:
qsize()
返回隊列的大致大小,因為多進程或者多線程一直在消耗隊列,因此該數(shù)據(jù)不一定正確
empty()
判斷隊列是否為空,如果是,則返回True,否則False
full()
判斷隊列是否已滿,如果是,則返回True,否則False
put(obj[, block[, timeout]])
將對象放入隊列,可選參數(shù)block為True,timeout為None
get()
從隊列取出對象
#-*- coding:utf8 -*- from multiprocessing import Process, Queue def f(q): q.put([42,None,'hi']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print q.get() #打印內(nèi)容: [42,None,'hi'] p.join()
二、Pipe類
pipe()函數(shù)返回一對對象的連接,可以為進程間傳輸消息,在打印一些日志、進程控制上面有一些用處,Pip()對象返回兩個對象connection,代表兩個通道,每個connection對象都有send()和recv()方法,需要注意的是兩個或以上的進程同時讀取或者寫入同一管道,可能會導致數(shù)據(jù)混亂,測試了下,是直接覆蓋了。另外,返回的兩個connection,如果一個是send()數(shù)據(jù),那么另外一個就只能recv()接收數(shù)據(jù)了
#-*- coding:utf8 -*- from multiprocessing import Process, Pipe import time def f(conn,i): print '[{0}]已經(jīng)執(zhí)行到子進程:{1}'.format(time.ctime(),i) time.sleep(1) w = "[{0}]hi,this is :{1}".format(time.ctime(),i) conn.send(w) conn.close() if __name__ == '__main__': reader = [] parent_conn, child_conn = Pipe() for i in range(4): p = Process(target=f, args=(child_conn,i)) p.start() reader.append(parent_conn) p.deamon=True # 等待所有子進程跑完 time.sleep(3) print '\n[{0}]下面打印child_conn向parent_conn傳輸?shù)男畔?'.format(time.ctime()) for i in reader: print i.recv()
輸出為:
三、Value,Array
在進行并發(fā)編程時,應盡量避免使用共享狀態(tài),因為多進程同時修改數(shù)據(jù)會導致數(shù)據(jù)破壞。但如果確實需要在多進程間共享數(shù)據(jù),multiprocessing也提供了方法Value、Array
from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d',0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print num.value print arr[:]
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]*
四、Manager進程管理模塊
Manager類管理進程使用得較多,它返回對象可以操控子進程,并且支持很多類型的操作,如: list, dict, Namespace、lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value, Array,因此使用Manager基本上就夠了
from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() #等待進程結束后往下執(zhí)行 print d,'\n',l
輸出:
{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
可以看到,跟共享數(shù)據(jù)一樣的效果,大部分管理進程的方法都集成到了Manager()模塊了
五、對多進程控制的應用實例
#-*- coding:utf8 -*- from multiprocessing import Process, Queue import time def work(pname,q): time.sleep(1) print_some = "{0}|this is process: {1}".format(time.ctime(),pname) print print_some q.put(pname) if __name__ == '__main__': p_manag_num = 2 # 進程并發(fā)控制數(shù)量2 # 并發(fā)的進程名 q_process = ['process_1','process_2','process_3','process_4','process_5'] q_a = Queue() # 將進程名放入隊列 q_b = Queue() # 將q_a的進程名放往q_b進程,由子進程完成 for i in q_process: q_a.put(i) p_list = [] # 完成的進程隊列 while not q_a.empty(): if len(p_list) <= 2: pname=q_a.get() p = Process(target=work, args=(pname,q_b)) p.start() p_list.append(p) print pname for p in p_list: if not p.is_alive(): p_list.remove(p) # 等待5秒,預估執(zhí)行完后看隊列通信信息 # 當然也可以循環(huán)判斷隊列里面的進程是否執(zhí)行完成 time.sleep(5) print '打印p_b隊列:' while not q_b.empty(): print q_b.get()
執(zhí)行結果:
看完上述內(nèi)容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業(yè)資訊頻道,感謝您對億速云的支持。
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。