溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Python多進程-multiprocess

發(fā)布時間:2020-04-05 11:40:30 來源:網(wǎng)絡 閱讀:929 作者:LJ_baby 欄目:編程語言

Python中主要通過 multiprocess 包來操作和管理進程。

進程啟動方式

python 啟動進程方式1

import time
from multiprocessing import Process

def fork(thread_name):
    time.sleep(2)
    print("subprocess: " + thread_name)

if __name__ == '__main__':
    p = Process(target=fork, args=('hello_1',))
    p.start()             # 啟動進程
    print("end...")

# 結(jié)果輸出
end...
subprocess: hello_1

?
Process類參數(shù)說明:

Process([ target [, name [, args [, kwargs]]]]])
target 表示子進程要執(zhí)行的任務
args 表示調(diào)用對象的位置參數(shù)元組,args=(1,2,'hello',)
kwargs 表示調(diào)用對象的字典,kwargs={'name':'baby','age':18}
name 子進程的名稱

?
python 啟動進程方式2:

import time
from multiprocessing import Process

class MyProcess(Process):
    def __init__(self, thread_name):
        super().__init__()
        self.thread_name = thread_name
    def run(self):
        time.sleep(2)
        print("subprocess: " + self.thread_name)

if __name__ == '__main__':
    p = MyProcess('hello_1')
    p.start()
    print("end...")

# 結(jié)果輸出
end...
subprocess: hello_1

Tip:兩種啟動進程的方式?jīng)]有優(yōu)劣之分~

join方法的應用

在主進程中通過 join 方法,可以讓主進程等待子進程執(zhí)行完畢后,再繼續(xù)往下執(zhí)行

import time
from multiprocessing import Process

def fork(thread_name):
    time.sleep(2)
    print("subprocess: " + thread_name)

if __name__ == '__main__':
    p = Process(target=fork, args=('hello_1',))
    p.start()
    p.join()    # 等待子進程執(zhí)行完畢
    print("end...")

# 結(jié)果輸出
subprocess: hello_1
end...

?
多個子進程同時運行

import time
from multiprocessing import Process

def fork(thread_name):
    time.sleep(2)
    print("subprocess: " + thread_name)

if __name__ == '__main__':
    p_list = []
    for i in range(1, 4):
        p = Process(target=fork, args=('hello_' + str(i),))
        p.start()
        p_list.append(p)
    [p.join() for p in p_list]    # 等待子進程執(zhí)行完畢
    print("end...")

# 結(jié)果輸出
subprocess: hello_1
subprocess: hello_2
subprocess: hello_3
end...

?
如上是通過第一種方式啟動子進程,使用繼承 Process 類的形式啟動子進程示例如下:

import time
from multiprocessing import Process

class MyProcess(Process):
    def __init__(self, thread_name):
        super().__init__()
        self.thread_name = thread_name

    def run(self):
        time.sleep(2)
        print("subprocess: " + self.thread_name)

if __name__ == '__main__':
    p_list = []
    for i in range(1, 4):
        p = MyProcess('hello_' + str(i))
        p.start()
        p_list.append(p)
    [p.join() for p in p_list]
    print("end...")

Process類的其他相關(guān)方法和屬性

import time
from multiprocessing import Process

def fork(thread_name):
    time.sleep(2)
    print("subprocess: " + thread_name)

if __name__ == '__main__':
    p = Process(target=fork, args=('hello',))
    p.start()
    # 進程的名稱
    print(p.name)    # 輸出:Process-1
    # 布爾值,True 表示該進程為守護進程,默認為 False,這個值需要在 p.start() 之前設置
    print(p.daemon)  # 輸出:False
    # 進程的pid
    print(p.pid)      # 輸出:7980
    # 進程的身份驗證鍵,默認是由 os.urandom() 隨機生成的32字符的字符串。
    print(p.authkey)  # 輸出:b'\xf2M)\xc8\xf6\xae8\x0c\xbet\xbcAT\xad7%ig9zl\xe5|\xb5|\x7f\xa6\xab\x8a\x8a\x94:'
    # 查看進程是否還在運行,若還在運行,則返回 True
    print(p.is_alive())  # 輸出:True
    # 主進程等待子進程 p 執(zhí)行結(jié)束,再繼續(xù)往下執(zhí)行
    # p.join()
    # 強制終止子進程 p
    p.terminate()
    print('end...')

守護進程

import time
from multiprocessing import Process

def fork(thread_name):
    for i in range(5):
        time.sleep(1)
        print("subprocess: " + thread_name + "..." + str(i))

if __name__ == '__main__':
    p = Process(target=fork, args=('hello',))
    p.start()
    time.sleep(2)
    print('end...')

# 輸出結(jié)果:
subprocess: hello...0
subprocess: hello...1
end...
subprocess: hello...2
subprocess: hello...3
subprocess: hello...4

?
可以看到主進程的代碼先運行完畢,運行完成后,它會等待子進程執(zhí)行完成后再結(jié)束。若是將子進程設置為守護進程,則子進程會隨著主進程的代碼執(zhí)行完畢而結(jié)束。注意守護進程內(nèi)無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children。

import time
from multiprocessing import Process

def fork(thread_name):
    for i in range(5):
        time.sleep(1)
        print("subprocess: " + thread_name + "..." + str(i))

if __name__ == '__main__':
    p = Process(target=fork, args=('hello',))
    p.daemon = True   # 設置進程 p 為守護進程
    p.start()
    time.sleep(2)
    print('end...')

# 輸出結(jié)果:
subprocess: hello...0
subprocess: hello...1
end...

?
值得注意的是:守護進程是在主進程代碼執(zhí)行結(jié)束后就終止,即主進程的代碼執(zhí)行完畢,守護進程就終止。來看如下示例:

import time
from multiprocessing import Process

def fork_1(thread_name):
    for i in range(5):
        time.sleep(1)
        print("subprocess: " + thread_name + "..." + str(i), end="\n")

def fork_2(thread_name):
    for i in range(7):
        time.sleep(1)
        print("subprocess: " + thread_name + "..." + str(i), end="\n")

if __name__ == '__main__':
    p1 = Process(target=fork_1, args=('hello',))
    p2 = Process(target=fork_2, args=('hi',))
    p1.daemon = True   # 設置進程 p1 為守護進程
    p1.start()
    p2.start()
    time.sleep(2)
    print('end...')

# 輸出結(jié)果:
subprocess: hello...0
subprocess: hi...0
subprocess: hello...1
subprocess: hi...1
end...
subprocess: hi...2
subprocess: hi...3
subprocess: hi...4
subprocess: hi...5
subprocess: hi...6

?
如上示例中,p1 為守護進程,在主進程輸出 ‘end…’ 后,即主進程的代碼執(zhí)行完畢后,守護進程 p1 就終止了。但是此時,主進程并沒有終止,它需要等待 p2 執(zhí)行完畢之后再終止。

鎖 —— multiprocess.Lock

進程與進程之間數(shù)據(jù)是隔離的

from multiprocessing import Process

def fork(thread_name):
    global n
    print("subprocess: " + thread_name + "...n=" + str(n))
    n = 1
    print("subprocess: " + thread_name + "...n=" + str(n))

if __name__ == '__main__':
    n = 100
    p = Process(target=fork, args=('hello',))
    p.start()
    p.join()
    print("main...n=" + str(n))

# 輸出結(jié)果:
subprocess: hello...n=100
subprocess: hello...n=1
main...n=100

?
通過如上示例可以看出,子進程 p 中的變量 n 和主進程中的變量 n 是兩個獨立的變量,存放在不同的內(nèi)存空間,更改其中一個變量并不會影響另一個變量的值。
?
要想在進程間共享數(shù)據(jù),可通過 Manager 類實現(xiàn)。Manager 類中提供了很多可以共享數(shù)據(jù)的數(shù)據(jù)類型,包括dict,list,Queue,Pipe 等。注意:Manager 中的數(shù)據(jù)是不安全的。當多個進程同時訪問共享數(shù)據(jù)的時候,就會產(chǎn)生數(shù)據(jù)安全問題。
?
多進程同時搶購余票示例:

from multiprocessing import Process, Manager

def work(m_dict):
    if m_dict['count'] > 0:
        print("%s get ticket %d" % (str(os.getpid()), m_dict['count']))
        m_dict['count'] -= 1

if __name__ == '__main__':
    m = Manager()
    m_dict = m.dict({'count': 20})
    p_list = []
    for i in range(20):
        p = Process(target=work, args=(m_dict, ))
        p.start()
        p_list.append(p)
    for i in p_list:
        i.join()
    print("end..." + str(m_dict['count']))

# 輸出結(jié)果:
32940 get ticket 20
32941 get ticket 19
32942 get ticket 18
32939 get ticket 17
32943 get ticket 16
32944 get ticket 15
32946 get ticket 14
32945 get ticket 13
32947 get ticket 12
32948 get ticket 11
32953 get ticket 11
32958 get ticket 9
32957 get ticket 8
32955 get ticket 7
32956 get ticket 7
32954 get ticket 6
32950 get ticket 5
32949 get ticket 5
32951 get ticket 3
32952 get ticket 2
end...1

?
輸出結(jié)果中 “ticket 11” 被購買了2次,可以看到當多個進程對同一份數(shù)據(jù)進行操作的時候,就會引發(fā)數(shù)據(jù)安全問題。
在如上示例中,增加進程數(shù)據(jù)還有可能出現(xiàn)如下這樣的報錯:

AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

?
這個報錯的觸發(fā)原因并沒有深究,極有可能是 manager 內(nèi)部原因,在 manager 管理進程的同時不可以進入主進程進行某些交互??梢酝ㄟ^在子進程中 sleep 一下 來避免這個問題(這并不是根本的解決方式)

import time, os
from multiprocessing import Process, Manager

def work(m_dict):
    time.sleep(0.5)       # sleep 0.5 s,可以繞過這個問題
    if m_dict['count'] > 0:
        print("%s get ticket %d" % (str(os.getpid()), m_dict['count']))
        m_dict['count'] -= 1
...

?
如上的數(shù)據(jù)安全問題,可以在子進程中加鎖來解決,即在同一時刻,僅允許一個進程執(zhí)行 lock.acquire() 和 lock.release() 之間的代碼

import os
from multiprocessing import Process, Manager, Lock

def work(m_dict, lock):
    lock.acquire()
    if m_dict['count'] > 0:
        print("%s get ticket %d" % (str(os.getpid()), m_dict['count']))
        m_dict['count'] -= 1
    lock.release()

if __name__ == '__main__':
    m = Manager()
    m_dict = m.dict({'count': 20})
    lock = Lock()
    p_list = []
    for i in range(20):
        p = Process(target=work, args=(m_dict, lock))
        p.start()
        p_list.append(p)
    for i in p_list:
        i.join()
    print("end..." + str(m_dict['count']))

# 輸出結(jié)果:
33240 get ticket 20
33242 get ticket 19
33241 get ticket 18
33243 get ticket 17
33244 get ticket 16
33245 get ticket 15
33247 get ticket 14
33246 get ticket 13
33249 get ticket 12
33248 get ticket 11
33250 get ticket 10
33251 get ticket 9
33252 get ticket 8
33257 get ticket 7
33258 get ticket 6
33253 get ticket 5
33254 get ticket 4
33255 get ticket 3
33259 get ticket 2
33256 get ticket 1
end...0

?
Manager() 是通過共享進程來實現(xiàn)多進程之間數(shù)據(jù)共享。Manager() 返回的對象控制了一個 server 進程,這個 server 進程允許其他進程通過 proxies 來訪問。多進程之間數(shù)據(jù)共享,除了 Manager() 外,還有 Value 、 Array,Value 和 Array 是通過共享內(nèi)存的方式實現(xiàn)數(shù)據(jù)共享,同樣為了保證數(shù)據(jù)安全,經(jīng)常和同步互斥鎖配合使用。
關(guān)于 Value 、 Array 的具體使用方式可參閱 https://www.cnblogs.com/gengyi/p/8661235.html。
?
使用 Value 實現(xiàn)上述的搶票示例:

import os
from multiprocessing import Process, Value, Lock

def work(count, lock):
    lock.acquire()
    if count.value > 0:
        print("%s get ticket %d" % (str(os.getpid()), count.value))
        count.value -= 1
    lock.release()

if __name__ == '__main__':
    count = Value('l', 50)
    lock = Lock()
    p_list = []
    for i in range(50):
        p = Process(target=work, args=(count, lock))
        p.start()
        p_list.append(p)
    for i in p_list:
        i.join()
    print("end..." + str(count.value))

隊列 —— multiprocess.Queue

from multiprocessing import Queue

queue = Queue(3)    # 創(chuàng)建隊列:Queue([maxsize]),maxsize 表示隊列的最大長度
queue.put('a')
queue.put('b')
queue.put('c')
print(queue.full())    # 輸出 True,表示隊列已經(jīng)滿了
# 若隊列已經(jīng)滿了,繼續(xù)向隊列中插入數(shù)據(jù),則程序會阻塞在這里,直到隊列的另一端有數(shù)據(jù)被取出,新的數(shù)據(jù)才能插入
# put 方法有兩個可選參數(shù):block 和 timeout。
# block 默認為 True,表示會阻塞 timeout 指定的時間,如果超時,會拋出 Queue.Full 異常。如果 block 為 False,在 put 時 隊列已滿,則會立即拋出 Queue.Full 異常。
# timeout 默認為 None,表示會一直阻塞。
# queue.put('d')
# queue.put_nowait()   # 等同于 queue.put(block = False)
print(queue.get())     # 'a'
print(queue.get())     # 'b'
print(queue.get())     # 'c'
print(queue.empty())   # 輸出 True,表示隊列已空
# 若隊列已空,繼續(xù)從該隊列中 get 數(shù)據(jù),則程序會阻塞在這里,直到隊列中新插入了數(shù)據(jù)。
# get 方法也有兩個參數(shù):block 和 timeout,通 put 方法
# block 默認為 True,表示會阻塞 timeout 指定的時間,如果 timeout 之間之內(nèi)還是沒有獲取到數(shù)據(jù),會拋出 Queue.Empty 異常。block 為 False 時,若隊列中有數(shù)據(jù),則會立即返回數(shù)據(jù),如果隊列為空,則會立即拋出 Queue.Empty 異常.
# timeout 默認為 None,表示會一直阻塞。
# queue.get(False)
# queue.get_nowait()    # 等同于 queue.get(block = False)
# print(queue.qsize())  # 獲取隊列的長度,某些系統(tǒng)上,此方法可能引發(fā)NotImplementedError異常。
# q.close()             # 關(guān)閉隊列

?
生產(chǎn)者和消費者示例

from multiprocessing import Process, Queue
import time

def producer(name, production, queue):
    for i in range(2):
        time.sleep(0.5)
        queue.put(production + '_' + str(i))
        print('%s produce %s' % (name, production + '_' + str(i)), end="\n")

def consumer(name, queue):
    while True:
        data = queue.get()
        if data is None: break      # None 為結(jié)束信號
        time.sleep(0.3)
        print('%s consume %s' % (name, data), end="\n")

if __name__ == '__main__':
    queue = Queue()
    p_list = []
    for index, f in enumerate(['apple', 'pear', 'peach']):
        p = Process(target=producer, args=('producer_' + str(index), f, queue))
        p_list.append(p)
        p.start()
    Process(target=consumer, args=('consumer_1', queue)).start()
    Process(target=consumer, args=('consumer_2', queue)).start()
    [p.join() for p in p_list]
    # 有2個消費者,則發(fā)送2次 None
    queue.put(None)
    queue.put(None)

# 輸出結(jié)果:
producer_1 produce pear_0
producer_2 produce peach_0
producer_0 produce apple_0
consumer_2 consume peach_0
consumer_1 consume pear_0
producer_1 produce pear_1
producer_2 produce peach_1
producer_0 produce apple_1
consumer_2 consume apple_0
consumer_1 consume peach_1
consumer_2 consume pear_1
consumer_1 consume apple_1

?
通過向隊列中插入 None,來告訴消費者生產(chǎn)已經(jīng)結(jié)束。這是一種比較低端的實現(xiàn)方式。
JoinableQueue 類是 Queue 類的擴展,JoinableQueue 類中的 task_done() 方法為消費者調(diào)用方法,表示從隊列中獲取的項目(queue.get() 獲取的數(shù)據(jù))已經(jīng)被處理;JoinableQueue 類中的 join() 方法為生產(chǎn)者調(diào)用的方法,生產(chǎn)者在調(diào)用 join() 方法后會被阻塞,直到隊列中的每個項目都被調(diào)用 queue.task_done() 方法為止。
?
如下示例是通過 task_done() 方法 和 join() 方法來實現(xiàn)類似于上述的發(fā)送結(jié)束信號機制。

from multiprocessing import Process, JoinableQueue
import time

def producer(name, production, queue):
    for i in range(2):
        time.sleep(0.5)
        queue.put(production + '_' + str(i))
        print('%s produce %s' % (name, production + '_' + str(i)), end="\n")
    queue.join()

def consumer(name, queue):
    while True:
        data = queue.get()
        time.sleep(0.3)
        print('%s consume %s' % (name, data), end="\n")
        queue.task_done()

if __name__ == '__main__':
    queue = JoinableQueue()
    p_list = []
    for index, f in enumerate(['apple', 'pear', 'peach']):
        p = Process(target=producer, args=('producer_' + str(index), f, queue))
        p_list.append(p)
        p.start()
    c1 = Process(target=consumer, args=('consumer_1', queue))
    c2 = Process(target=consumer, args=('consumer_2', queue))
    c1.daemon = True
    c2.daemon = True
    c1.start()
    c2.start()
    [p.join() for p in p_list]
    print('end...')

?
輸出結(jié)果與上一個示例一致。這里將 2個 consumer 設置為守護進程,在等待 producer 完成后,也隨主進程的結(jié)束而結(jié)束。

管道

管道的使用:

from multiprocessing import Process, Pipe

def func(pro, con):
    pro.close()
    while True:
        try:
            print(con.recv())
        except EOFError:
            con.close()
            break

if __name__ == '__main__':
    pro, con = Pipe()        # pro, con 分別表示管道的兩端
    Process(target=func, args=(pro, con)).start()
    con.close()             # 這里也可以不關(guān)閉
    for i in range(5):
        pro.send(i)
    pro.close()

# 輸出結(jié)果:
0
1
2
3
4

?
傳給進程的 conn(管道連接)是不會相互影響的,在一個進程中關(guān)閉了管道,并不會影響這個管道在另一個進程中的使用。若是在一個進程中,管道的一端沒有被用到,那么就應該將這一端關(guān)閉。例如在生產(chǎn)者中,應該關(guān)閉管道的 con 端(右端),在消費者中應該關(guān)閉管道的 pro 端(左端)。
?
當管道所有的入口都已經(jīng)關(guān)閉(上述示例中,主進程和子進程中管道的入口都為 pro),消費者繼續(xù)接收數(shù)據(jù)(調(diào)用 recv() 方法),當管道中已經(jīng)沒有數(shù)據(jù)時,就會拋出 EOFError。
?
如果管道有入口沒有關(guān)閉,且該入口沒有在向管道發(fā)送數(shù)據(jù),那么消費者就會阻塞在 recv() 方法上。
如上示例是通過 拋出 EOFError 錯誤來結(jié)束管道,還有另一種方式,就是通過管道中的數(shù)據(jù)(例如向管道中傳遞None)來結(jié)束管道

from multiprocessing import Process, Pipe

def func(con):
    while True:
        data = con.recv()
        if data is None: break
        print(data)

if __name__ == '__main__':
    pro, con = Pipe()        # con, pro 分別表示管道的兩端
    Process(target=func, args=(con,)).start()
    for i in range(5):
        pro.send(i)
    pro.send(None)

?
多個消費者消費管道中的數(shù)據(jù)示例(加鎖):

from multiprocessing import Process, Pipe, Lock
import time

def producer(pro, con, name, production):
    con.close()
    for i in range(4):
        time.sleep(0.5)
        pro.send(production + str(i))
        print('%s produce %s' % (name, production + '_' + str(i)), end="\n")
    pro.close()

def consumer(pro, con, name, lock):
    pro.close()
    while True:
        lock.acquire()
        try:
            data = con.recv()
            time.sleep(0.3)
            print('%s consume %s' % (name, data), end="\n")
        except EOFError:
            con.close()
            break
        finally:
            lock.release()

if __name__ == '__main__':
    pro, con = Pipe()
    lock = Lock()
    Process(target=producer, args=(pro, con, 'producer', 'apple')).start()
    Process(target=consumer, args=(pro, con, 'c_1', lock)).start()
    Process(target=consumer, args=(pro, con, 'c_2', lock)).start()
    pro.close()
    con.close()

?
pipe(管道)是進程數(shù)據(jù)不安全的,隊列進程之間是數(shù)據(jù)安全的,因為隊列的實現(xiàn)就是基于管道和鎖實現(xiàn)的。所以管道極少被用到,生產(chǎn)環(huán)境中 pipe 一般也很少被用到,使用較多的一般會是隊列服務器,例如 rabbitmq,kafka…...

信號量

信號量也是一種鎖,信號量與互斥鎖區(qū)別在于,互斥鎖的 acquire() 方法和 release() 方法之間,僅允許一個線程(或進程)執(zhí)行,而信號量可允許多個線程(或進程)執(zhí)行。信號量的一種應用就是控制并發(fā)執(zhí)行的線程(或進程)數(shù)。

from multiprocessing import Process, Semaphore
import time

def func(semaphore, name):
    if semaphore.acquire():
        print(name)
        time.sleep(2)
        semaphore.release()

if __name__ == '__main__':
    semaphore = Semaphore(3)
    for i in range(9):
        Process(target=func, args=(semaphore, 'process_' + str(i), )).start()

事件

Python中的事件(Event)主要用于主線程(進程)控制其他線程(進程)的執(zhí)行,其主要方法包括 set、wait、clear,is_set。
若事件(Event)的標記取值為 False,則線程(進程)會阻塞在 event.wait() 方法,event.wait() 還可以設置一個參數(shù) timeout,在等待 timeout 指定的時間后停止阻塞,繼續(xù)運行。
?
方法說明:

event.set():將 event 的標記設置為 True,所有 阻塞在 event.wait() 的線程(進程)都會繼續(xù)執(zhí)行
event.clear():將 event 的標記設置為 False。
event.is_set():判斷 event 的標志是否為 True。

?
如下示例,在主進程中控制子進程在何時繼續(xù)向下執(zhí)行。例如在主進程的 time.sleep(3) 處可以執(zhí)行一些檢測工作,確保子進程的運行,若檢測沒有問題則繼續(xù)子進程的運行。

from multiprocessing import Process, Event
import time

def worker(name, event):
    print('Process_%s is ready' % name)
    event.wait()
    print('Process_%s is running' % name)

if __name__ == '__main__':
    event = Event()
    for i in range(0, 2):
        Process(target=worker, args=(i, event)).start()
    time.sleep(3)
    event.set()

# 結(jié)果輸出:
Process_0 is ready
Process_1 is ready
Process_0 is running
Process_1 is running

?
如上示例,若主進程一直沒有允許子進程繼續(xù)執(zhí)行(例如檢測工作沒有通過),則子進程會一直阻塞在 event.wait() 這兒,我們希望在子進程阻塞過程中會有持續(xù)的提示信息,這個可以通過設置 event.wait 方法的 timeout 參數(shù)實現(xiàn)。

from multiprocessing import Process, Event
import time

def worker(name, event):
    while not event.is_set():
        print('Process_%s is ready' % name)
        event.wait(1)
    print('Process_%s is running' % name)

if __name__ == '__main__':
    event = Event()
    for i in range(0, 2):
        Process(target=worker, args=(i, event)).start()
    time.sleep(3)
    event.set()

# 結(jié)果輸出:
Process_0 is ready
Process_1 is ready
Process_0 is ready
Process_1 is ready
Process_0 is ready
Process_1 is ready
Process_0 is ready
Process_1 is ready
Process_1 is running
Process_0 is running

進程池

進程的創(chuàng)建和銷毀都需要消耗系統(tǒng)資源,且每一臺服務器的 cpu 核心數(shù)有限,創(chuàng)建過多的進程反而會降低執(zhí)行效率。這里就可以使用進程池,進程池一啟動就會創(chuàng)建固定數(shù)量的進程,有執(zhí)行需要了,就從進程池中獲取一個進程處理對應的任務,處理完成后,進程不會被銷毀,而是放回進程池中。如果同時需要執(zhí)行的任務過多,沒有獲取到進程的任務需要等待,等有空閑的進程了才能運行。
?
進程池節(jié)省了操作系統(tǒng)在創(chuàng)建和銷毀進程上所花去的開銷,也限制了同一時間能夠運行的進程總數(shù),在一定程度上提升了多進程的執(zhí)行效率。
?
如下示例是使用進程池啟動進程和直接啟動進程的效率差距:

from multiprocessing import Process, Pool
import time

def m_add(a):
    return a ** a

if __name__ == '__main__':
    # print(os.cpu_count())    # 調(diào)試環(huán)境的 cpu 核數(shù)為 8
    # 創(chuàng)建進程池
    pool = Pool(8)
    start_t1 = time.time()
    # 使用進程池啟動進程
    res = pool.map(m_add, range(500))
    print(time.time() - start_t1)
    p_list = []
    start_t2 = time.time()
    # 直接啟動進程
    for i in range(500):
        p = Process(target=m_add, args=(i, ))
        p_list.append(p)
        p.start()
    for p in p_list: p.join()
    print(time.time() - start_t2)

# 輸出結(jié)果:
0.003328084945678711
0.6395020484924316

?
創(chuàng)建進程池:

Pool([numprocess  [,initializer [, initargs]]]):
    numprocess:進程池中的固定繼承數(shù),默認為 cpu 核心數(shù)(os.cpu_count())
    initializer:每次啟動進程需要執(zhí)行的可調(diào)用對象
    initargs:傳遞給 initializer 的參數(shù)

?
Pool 的常用方法:

map(func, iterable):異步提交任務。iterable 為一個可迭代對象,這個可迭代對象的長度是多少,就啟動多少個子進程,且可迭代對象的每一個元素會作為參數(shù)傳遞給 func。注意,使用 map 方法開啟子進程,只能傳遞一個參數(shù),若子進程需要多個參數(shù),則這個參數(shù)可以使用 元組;將所有子進程的返回結(jié)果以列表的形式返回。
apply(func [, args [, kwargs]]):同步提交任務,返回子進程的執(zhí)行結(jié)果。如果需要并發(fā)地執(zhí)行 func,必須從不同線程中調(diào)用同一個進程池的 apply() 方法;
apply_async(func [, args [, kwargs]]):異步提交任務,返回 AsyncResult 類的實例,從 AsyncResult 實例中獲取執(zhí)行結(jié)果。與 map 方法的區(qū)別是,apply_async 方法可以隨心所欲地傳遞參數(shù);
close():結(jié)束進程池接受任務;
jion():感知進程池中的任務執(zhí)行結(jié)束。即所有提交進來的任務都已經(jīng)執(zhí)行完畢,且沒有新的任務提交進來。

Tip:進程池可以有返回值,這是進程池特有的,但是直接起進程,是做不到有返回值的。
?
apply 方法應用:

import time, os
from multiprocessing import Pool

def worker(i):
    print('worker_%s running, pid: %s' % (i, os.getpid()))
    time.sleep(1)
    return i * i

if __name__ == '__main__':
    pool = Pool(3)
    res_list = []
    for i in range(7):
        res = pool.apply(worker, args=(i, ))    # 返回的 res 即是子進程的返回結(jié)果
        res_list.append(res)
    print(res_list)
    print('...end')

# 輸出結(jié)果:
worker_0 running, pid: 20584
worker_1 running, pid: 20585
worker_2 running, pid: 20586
worker_3 running, pid: 20584
worker_4 running, pid: 20585
worker_5 running, pid: 20586
worker_6 running, pid: 20584
[0, 1, 4, 9, 16, 25, 36]
...end

?
在同一個線程中使用 pool.apply 方法提交任務,是提交一個,執(zhí)行一個,執(zhí)行完成后才能繼續(xù)提交下一個任務。如上輸出結(jié)果也是逐個輸出。
?
apply_async 方法應用:

import time, os
from multiprocessing import Pool

def worker(i):
    print('worker_%s running, pid: %s' % (i, os.getpid()))
    time.sleep(1)
    return i * i

if __name__ == '__main__':
    pool = Pool(3)
    res_list = []
    for i in range(7):
        res = pool.apply_async(worker, args=(i, ))   # res 為 AsyncResult 類的實例
        res_list.append(res)
    pool.close()
    pool.join()
    for i in res_list:
        print(i.get())
    print('...end')

# 輸出結(jié)果:
worker_0 running, pid: 20598
worker_1 running, pid: 20599
worker_2 running, pid: 20600
worker_3 running, pid: 20598
worker_4 running, pid: 20599
worker_5 running, pid: 20600
worker_6 running, pid: 20599
0
1
4
9
16
25
36
...end

?
通過 AsyncResult 對象的 get 方法獲取返回值,get 方法會阻塞,即阻塞到子進程執(zhí)行完畢,然后獲取其返回值。
一般使用 apply_async 方法 異步提交任務,需要在主進程中感知任務結(jié)束(join方法),并且在 join 方法前面結(jié)束進程池接受任務(close方法)
?
map 方法應用:

import time, os
from multiprocessing import Pool

def worker(i):
    print('worker_%s running, pid: %s' % (i, os.getpid()))
    time.sleep(1)
    return i * i

if __name__ == '__main__':
    pool = Pool(3)
    res_list = pool.map(worker, range(7))
    for i in res_list:
        print(i)
    print('...end')

# 輸出結(jié)果:
worker_0 running, pid: 20713
worker_1 running, pid: 20714
worker_2 running, pid: 20715
worker_3 running, pid: 20714
worker_4 running, pid: 20713
worker_5 running, pid: 20715
worker_6 running, pid: 20715
0
1
4
9
16
25
36
...end

?
map 方法自帶 join 方法和 close 方法,map 方法啟動子進程后,就不允許再提交任務,且 map 方法會阻塞,直到子進程全部執(zhí)行完畢,且將所有子進程的返回結(jié)果以列表的形式返回。
若是不想阻塞在 map 方法,則可以使用 map_async,只是用了 map_async 方法,需要自己進行 close 和 join。

import time, os
from multiprocessing import Pool

def worker(i):
    print('worker_%s running, pid: %s' % (i, os.getpid()))
    time.sleep(1)
    return i * i

if __name__ == '__main__':
    pool = Pool(3)
    res_list = pool.map_async(worker, range(7))
    pool.close()
    pool.join()
    for i in res_list.get():
        print(i)
    print('...end')

?
返回結(jié)果與上述一致。

回調(diào)函數(shù)

進程池中一個進程處理完任務之后,這進程可以調(diào)用一個函數(shù)去處理該進程返回的結(jié)果,這個函數(shù)就是回調(diào)函數(shù)?;卣{(diào)函數(shù)的主要作用是告訴主進程,這里已經(jīng)執(zhí)行完畢,主進程可以針對返回結(jié)果繼續(xù)后續(xù)的處理。相對于主進程輪詢等待子進程的返回結(jié)果,利用回調(diào)函數(shù)可以提高程序的執(zhí)行效率。
?
注意回調(diào)函數(shù)是由主進程執(zhí)行的,可以將一些比較耗IO的操作放到進程池中執(zhí)行,由主進程統(tǒng)一處理它們的返回結(jié)果。
?
回調(diào)函數(shù)簡單示例:

from multiprocessing import Pool

def func(info):
    print('...' + str(info))

def worker(i):
    return i * i

if __name__ == '__main__':
    pool = Pool(3)
    res_list = []
    for i in range(7):
        res = pool.apply_async(worker, args=(i, ), callback=func)
        res_list.append(res)
    pool.close()
    pool.join()
    print('~end')

# 輸出結(jié)果:
...0
...4
...9
...1
...16
...36
...25
~end

?
如下示例中,可以將具體的業(yè)務放在 worker 方法中,例如從網(wǎng)絡上爬取數(shù)據(jù),然后統(tǒng)一由回調(diào)函數(shù) func 寫到一個文件中。

from multiprocessing import Pool

def func(info):
    with open('abc.txt', 'a+') as f:
        f.writelines(str(info) + '\n')
def worker(i):
    return i * i

if __name__ == '__main__':
    pool = Pool()
    for i in range(10):
        pool.apply_async(worker, (i,), callback=func)
    pool.close()
    pool.join()

?
.................^_^

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI