您好,登錄后才能下訂單哦!
這篇文章主要介紹Python并發(fā)編程的示例分析,文中介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們一定要看完!
0x01 multipleprocessing
與使用線程的 threading 模塊類(lèi)似, multipleprocessing
模塊提供許多高級(jí) API 。最常見(jiàn)的是 Pool 對(duì)象了,使用它的接口能很方便地寫(xiě)出并發(fā)執(zhí)行的代碼。
from multiprocessing import Pool def f(x): return x * x if __name__ == '__main__': with Pool(5) as p: # map方法的作用是將f()方法并發(fā)地映射到列表中的每個(gè)元素 print(p.map(f, [1, 2, 3])) # 執(zhí)行結(jié)果 # [1, 4, 9]
關(guān)于 Pool 下文中還會(huì)提到,這里我們先來(lái)看 Process 。
Process
要?jiǎng)?chuàng)建一個(gè)進(jìn)程可以使用 Process 類(lèi),使用 start() 方法啟動(dòng)進(jìn)程。
from multiprocessing import Process import os def echo(text): # 父進(jìn)程ID print("Process Parent ID : ", os.getppid()) # 進(jìn)程ID print("Process PID : ", os.getpid()) print('echo : ', text) if __name__ == '__main__': p = Process(target=echo, args=('hello process',)) p.start() p.join() # 執(zhí)行結(jié)果 # Process Parent ID : 27382 # Process PID : 27383 # echo : hello process
進(jìn)程池
正如開(kāi)篇提到的 multiprocessing
模塊提供了 Pool 類(lèi)可以很方便地實(shí)現(xiàn)一些簡(jiǎn)單多進(jìn)程場(chǎng)景。 它主要有以下接口
apply(func[, args[, kwds]])
執(zhí)行 func(args,kwds) 方法,在方法結(jié)束返回前會(huì)阻塞。
apply_async(func[, args[, kwds[, callback[, error_callback]]]])
異步執(zhí)行 func(args,kwds) ,會(huì)立即返回一個(gè) result 對(duì)象,如果指定了 callback 參數(shù),結(jié)果會(huì)通過(guò)回調(diào)方法返回,還可以指定執(zhí)行出錯(cuò)的回調(diào)方法 error_callback()
map(func, iterable[, chunksize])
類(lèi)似內(nèi)置函數(shù) map() ,可以并發(fā)執(zhí)行 func ,是同步方法
map_async(func, iterable[, chunksize[, callback[, error_callback]]])
異步版本的 map
close()
關(guān)閉進(jìn)程池。當(dāng)池中的所有工作進(jìn)程都執(zhí)行完畢時(shí),進(jìn)程會(huì)退出。
terminate()
終止進(jìn)程池
join()
等待工作進(jìn)程執(zhí)行完,必需先調(diào)用 close() 或者 terminate()
from multiprocessing import Pool def f(x): return x * x if __name__ == '__main__': with Pool(5) as p: # map方法的作用是將f()方法并發(fā)地映射到列表中的每個(gè)元素 a = p.map(f, [1, 2, 3]) print(a) # 異步執(zhí)行map b = p.map_async(f, [3, 5, 7, 11]) # b 是一個(gè)result對(duì)象,代表方法的執(zhí)行結(jié)果 print(b) # 為了拿到結(jié)果,使用join方法等待池中工作進(jìn)程退出 p.close() # 調(diào)用join方法前,需先執(zhí)行close或terminate方法 p.join() # 獲取執(zhí)行結(jié)果 print(b.get()) # 執(zhí)行結(jié)果 # [1, 4, 9] # <multiprocessing.pool.MapResult object at 0x10631b710> # [9, 25, 49, 121]
map_async() 和 apply_async() 執(zhí)行后會(huì)返回一個(gè) class multiprocessing.pool.AsyncResult 對(duì)象,通過(guò)它的 get() 可以獲取到執(zhí)行結(jié)果, ready() 可以判斷 AsyncResult 的結(jié)果是否準(zhǔn)備好。
進(jìn)程間數(shù)據(jù)的傳輸
multiprocessing 模塊提供了兩種方式用于進(jìn)程間的數(shù)據(jù)共享:隊(duì)列( Queue )和管道( Pipe )
Queue 是線程安全,也是進(jìn)程安全的。使用 Queue 可以實(shí)現(xiàn)進(jìn)程間的數(shù)據(jù)共享,例如下面的 demo 中子進(jìn)程 put 一個(gè)對(duì)象,在主進(jìn)程中就能 get 到這個(gè)對(duì)象。 任何可以序列化的對(duì)象都可以通過(guò) Queue 來(lái)傳輸。
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': # 使用Queue進(jìn)行數(shù)據(jù)通信 q = Queue() p = Process(target=f, args=(q,)) p.start() # 主進(jìn)程取得子進(jìn)程中的數(shù)據(jù) print(q.get()) # prints "[42, None, 'hello']" p.join() # 執(zhí)行結(jié)果 # [42, None, 'hello']
Pipe() 返回一對(duì)通過(guò)管道連接的 Connection 對(duì)象。這兩個(gè)對(duì)象可以理解為管道的兩端,它們通過(guò) send() 和 recv() 發(fā)送和接收數(shù)據(jù)。
from multiprocessing import Process, Pipe def write(conn): # 子進(jìn)程中發(fā)送一個(gè)對(duì)象 conn.send([42, None, 'hello']) conn.close() def read(conn): # 在讀的進(jìn)程中通過(guò)recv接收對(duì)象 data = conn.recv() print(data) if __name__ == '__main__': # Pipe()方法返回一對(duì)連接對(duì)象 w_conn, r_conn = Pipe() wp = Process(target=write, args=(w_conn,)) rp = Process(target=read, args=(r_conn,)) wp.start() rp.start() # 執(zhí)行結(jié)果 # [42, None, 'hello']
需要注意的是,兩個(gè)進(jìn)程不能同時(shí)對(duì)一個(gè)連接對(duì)象進(jìn)行 send 或 recv 操作。
同步
我們知道線程間的同步是通過(guò)鎖機(jī)制來(lái)實(shí)現(xiàn)的,進(jìn)程也一樣。
from multiprocessing import Process, Lock import time def print_with_lock(l, i): l.acquire() try: time.sleep(1) print('hello world', i) finally: l.release() def print_without_lock(i): time.sleep(1) print('hello world', i) if __name__ == '__main__': lock = Lock() # 先執(zhí)行有鎖的 for num in range(5): Process(target=print_with_lock, args=(lock, num)).start() # 再執(zhí)行無(wú)鎖的 # for num in range(5): # Process(target=print_without_lock, args=(num,)).start()
有鎖的代碼將每秒依次打印
hello world 0
hello world 1
hello world 2
hello world 3
hello world 4
如果執(zhí)行無(wú)鎖的代碼,則在我的電腦上執(zhí)行結(jié)果是這樣的
hello worldhello world 0
1
hello world 2
hello world 3
hello world 4
除了 Lock ,還包括 RLock 、 Condition 、 Semaphore 和 Event 等進(jìn)程間的同步原語(yǔ)。其用法也與線程間的同步原語(yǔ)很類(lèi)似。 API 使用可以參考文末中引用的文檔鏈接。
在工程中實(shí)現(xiàn)進(jìn)程間的數(shù)據(jù)共享應(yīng)當(dāng)優(yōu)先使用 隊(duì)列或管道。
以上是“Python并發(fā)編程的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對(duì)大家有幫助,更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。