溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Python并發(fā)編程的示例分析

發(fā)布時(shí)間:2021-07-24 09:56:04 來(lái)源:億速云 閱讀:165 作者:小新 欄目:開(kāi)發(fā)技術(shù)

這篇文章主要介紹Python并發(fā)編程的示例分析,文中介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們一定要看完!

0x01 multipleprocessing

與使用線程的 threading 模塊類(lèi)似, multipleprocessing 模塊提供許多高級(jí) API 。最常見(jiàn)的是 Pool 對(duì)象了,使用它的接口能很方便地寫(xiě)出并發(fā)執(zhí)行的代碼。

from multiprocessing import Pool
def f(x):
 return x * x
if __name__ == '__main__':
 with Pool(5) as p:
  # map方法的作用是將f()方法并發(fā)地映射到列表中的每個(gè)元素
  print(p.map(f, [1, 2, 3]))
# 執(zhí)行結(jié)果
# [1, 4, 9]

關(guān)于 Pool 下文中還會(huì)提到,這里我們先來(lái)看 Process 。

Process

要?jiǎng)?chuàng)建一個(gè)進(jìn)程可以使用 Process 類(lèi),使用 start() 方法啟動(dòng)進(jìn)程。

from multiprocessing import Process
import os
def echo(text):
 # 父進(jìn)程ID
 print("Process Parent ID : ", os.getppid())
 # 進(jìn)程ID
 print("Process PID : ", os.getpid())
 print('echo : ', text)
if __name__ == '__main__':
 p = Process(target=echo, args=('hello process',))
 p.start()
 p.join()
# 執(zhí)行結(jié)果
# Process Parent ID : 27382
# Process PID : 27383
# echo : hello process

進(jìn)程池

正如開(kāi)篇提到的 multiprocessing 模塊提供了 Pool 類(lèi)可以很方便地實(shí)現(xiàn)一些簡(jiǎn)單多進(jìn)程場(chǎng)景。 它主要有以下接口

  • apply(func[, args[, kwds]])

  • 執(zhí)行 func(args,kwds) 方法,在方法結(jié)束返回前會(huì)阻塞。

  • apply_async(func[, args[, kwds[, callback[, error_callback]]]])

  • 異步執(zhí)行 func(args,kwds) ,會(huì)立即返回一個(gè) result 對(duì)象,如果指定了 callback 參數(shù),結(jié)果會(huì)通過(guò)回調(diào)方法返回,還可以指定執(zhí)行出錯(cuò)的回調(diào)方法 error_callback()

  • map(func, iterable[, chunksize])

  • 類(lèi)似內(nèi)置函數(shù) map() ,可以并發(fā)執(zhí)行 func ,是同步方法

  • map_async(func, iterable[, chunksize[, callback[, error_callback]]])

  • 異步版本的 map

  • close()

  • 關(guān)閉進(jìn)程池。當(dāng)池中的所有工作進(jìn)程都執(zhí)行完畢時(shí),進(jìn)程會(huì)退出。

  • terminate()

  • 終止進(jìn)程池

  • join()

  • 等待工作進(jìn)程執(zhí)行完,必需先調(diào)用 close() 或者 terminate()

from multiprocessing import Pool
def f(x):
 return x * x
if __name__ == '__main__':
 with Pool(5) as p:
  # map方法的作用是將f()方法并發(fā)地映射到列表中的每個(gè)元素
  a = p.map(f, [1, 2, 3])
  print(a)
  # 異步執(zhí)行map
  b = p.map_async(f, [3, 5, 7, 11])
  # b 是一個(gè)result對(duì)象,代表方法的執(zhí)行結(jié)果
  print(b)
  # 為了拿到結(jié)果,使用join方法等待池中工作進(jìn)程退出
  p.close()
  # 調(diào)用join方法前,需先執(zhí)行close或terminate方法
  p.join()
  # 獲取執(zhí)行結(jié)果
  print(b.get())
# 執(zhí)行結(jié)果
# [1, 4, 9]
# <multiprocessing.pool.MapResult object at 0x10631b710>
# [9, 25, 49, 121]

map_async() 和 apply_async() 執(zhí)行后會(huì)返回一個(gè) class multiprocessing.pool.AsyncResult 對(duì)象,通過(guò)它的 get() 可以獲取到執(zhí)行結(jié)果, ready() 可以判斷 AsyncResult 的結(jié)果是否準(zhǔn)備好。

進(jìn)程間數(shù)據(jù)的傳輸

multiprocessing 模塊提供了兩種方式用于進(jìn)程間的數(shù)據(jù)共享:隊(duì)列( Queue )和管道( Pipe )

Queue 是線程安全,也是進(jìn)程安全的。使用 Queue 可以實(shí)現(xiàn)進(jìn)程間的數(shù)據(jù)共享,例如下面的 demo 中子進(jìn)程 put 一個(gè)對(duì)象,在主進(jìn)程中就能 get 到這個(gè)對(duì)象。 任何可以序列化的對(duì)象都可以通過(guò) Queue 來(lái)傳輸。

from multiprocessing import Process, Queue
def f(q):
 q.put([42, None, 'hello'])
if __name__ == '__main__':
 # 使用Queue進(jìn)行數(shù)據(jù)通信
 q = Queue()
 p = Process(target=f, args=(q,))
 p.start()
 # 主進(jìn)程取得子進(jìn)程中的數(shù)據(jù)
 print(q.get()) # prints "[42, None, 'hello']"
 p.join()
# 執(zhí)行結(jié)果
# [42, None, 'hello']

Pipe() 返回一對(duì)通過(guò)管道連接的 Connection 對(duì)象。這兩個(gè)對(duì)象可以理解為管道的兩端,它們通過(guò) send() 和 recv() 發(fā)送和接收數(shù)據(jù)。

from multiprocessing import Process, Pipe
def write(conn):
 # 子進(jìn)程中發(fā)送一個(gè)對(duì)象
 conn.send([42, None, 'hello'])
 conn.close()
def read(conn):
 # 在讀的進(jìn)程中通過(guò)recv接收對(duì)象
 data = conn.recv()
 print(data)
if __name__ == '__main__':
 # Pipe()方法返回一對(duì)連接對(duì)象
 w_conn, r_conn = Pipe()
 wp = Process(target=write, args=(w_conn,))
 rp = Process(target=read, args=(r_conn,))
 wp.start()
 rp.start()
# 執(zhí)行結(jié)果
# [42, None, 'hello']

需要注意的是,兩個(gè)進(jìn)程不能同時(shí)對(duì)一個(gè)連接對(duì)象進(jìn)行 send 或 recv 操作。

同步

我們知道線程間的同步是通過(guò)鎖機(jī)制來(lái)實(shí)現(xiàn)的,進(jìn)程也一樣。

from multiprocessing import Process, Lock
import time
def print_with_lock(l, i):
 l.acquire()
 try:
  time.sleep(1)
  print('hello world', i)
 finally:
  l.release()
def print_without_lock(i):
 time.sleep(1)
 print('hello world', i)
if __name__ == '__main__':
 lock = Lock()
 # 先執(zhí)行有鎖的
 for num in range(5):
  Process(target=print_with_lock, args=(lock, num)).start()
 # 再執(zhí)行無(wú)鎖的
 # for num in range(5):
 #  Process(target=print_without_lock, args=(num,)).start()

有鎖的代碼將每秒依次打印

hello world 0
hello world 1
hello world 2
hello world 3
hello world 4

如果執(zhí)行無(wú)鎖的代碼,則在我的電腦上執(zhí)行結(jié)果是這樣的

hello worldhello world  0
1
hello world 2
hello world 3
hello world 4

除了 Lock ,還包括 RLock 、 Condition 、 Semaphore 和 Event 等進(jìn)程間的同步原語(yǔ)。其用法也與線程間的同步原語(yǔ)很類(lèi)似。 API 使用可以參考文末中引用的文檔鏈接。

在工程中實(shí)現(xiàn)進(jìn)程間的數(shù)據(jù)共享應(yīng)當(dāng)優(yōu)先使用 隊(duì)列或管道。

以上是“Python并發(fā)編程的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對(duì)大家有幫助,更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI