溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

python如何實現(xiàn)多進程

發(fā)布時間:2020-07-02 14:18:56 來源:億速云 閱讀:178 作者:清晨 欄目:編程語言

這篇文章將為大家詳細講解有關(guān)python如何實現(xiàn)多進程,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。

多進程的含義

進程(Process)是具有一定獨立功能的程序關(guān)于某個數(shù)據(jù)集合上的一次運行活動,是系統(tǒng)進行資源分配和調(diào)度的一個獨立單位。
顧名思義,多進程就是啟用多個進程同時運行。由于進程是線程的集合,而且進程是由一個或多個線程構(gòu)成的,所以多進程的運行意味著有大于或等于進程數(shù)量的線程在運行。

多進程的優(yōu)勢

由于進程中 GIL 的存在,Python 中的多線程并不能很好地發(fā)揮多核優(yōu)勢,一個進程中的多個線程,在同一時刻只能有一個線程運行。
而對于多進程來說,每個進程都有屬于自己的 GIL,所以,在多核處理器下,多進程的運行是不會受 GIL 的影響的。因此,多進程能更好地發(fā)揮多核的優(yōu)勢。
當(dāng)然,對于爬蟲這種 IO 密集型任務(wù)來說,多線程和多進程影響差別并不大。對于計算密集型任務(wù)來說,Python 的多進程相比多線程,其多核運行效率會有成倍的提升。
總的來說,Python 的多進程整體來看是比多線程更有優(yōu)勢的。所以,在條件允許的情況下,能用多進程就盡量用多進程。
不過值得注意的是,由于進程是系統(tǒng)進行資源分配和調(diào)度的一個獨立單位,所以各個進程之間的數(shù)據(jù)是無法共享的,如多個進程無法共享一個全局變量,
進程之間的數(shù)據(jù)共享需要有單獨的機制來實現(xiàn)。

實現(xiàn)多進程

基本使用

在Python中也有內(nèi)置的庫來實現(xiàn)多進程,它就是multiprocessing。multiprocessing提供了一系列的組件,如Process(進程)、Queue(隊列)、
Semaphore(信號量)、Pipe(管道)、Lock(鎖)、Pool(進程池)等。

import multiprocessing
import time


now = lambda: time.time()


def work(index):
    print(index)
    time.sleep(index)


def main():
    start_dt = now()
    for i in range(5):
        p = multiprocessing.Process(target=work, args=(i,))
        p.start()
    
    print(f"Time: {now() - start_dt}")
    

if __name__ == "__main__":
    main()
多進程基本信息的獲取
import multiprocessingimport timenow = lambda: time.time()def work(i, index):    print(f'{i}進程啟動')    time.sleep(index)    print(f'{i}進程結(jié)束')def main():    start_dt = now()    for i in range(5):        p = multiprocessing.Process(target=work, args=(i, 5))        p.start()    # 查看本機的 cpu 數(shù)量    print(f"CPU Numbers: {multiprocessing.cpu_count()}")    # 查看全部活躍子進程的名稱以及pid    for p in multiprocessing.active_children():        print(p.name, p.pid)    print(f"Time End: {now() - start_dt}")if __name__ == "__main__":    main()

繼承 Process 類

import multiprocessing
import time


class MyProcess(multiprocessing.Process):
    def __init__(self, loop):
        super(MyProcess, self).__init__()
        self.loop = loop

    def run(self):
        for count in range(self.loop):
            time.sleep(1)
            print(f"Pid: {self.pid}, Name: {self.name}")


def main():
    for i in range(2, 5):
        p = MyProcess(i)
        p.start()


if __name__ == "__main__":
    main()

守護進程

在多進程中,同樣存在守護進程的概念,如果一個進程被設(shè)置為守護進程,當(dāng)父進程結(jié)束后,子進程會自動被終止,我們可以通過設(shè)置daemon屬性來控制是否為守護進程。
還是原來的例子,增加了deamon屬性的設(shè)置:

import multiprocessing
import time


class MyProcess(multiprocessing.Process):
    def __init__(self, loop):
        super(MyProcess, self).__init__()
        self.loop = loop

    def run(self):
        for count in range(self.loop):
            time.sleep(1)
            print(f"Loop: {self.loop}, Pid: {self.pid}, Loop Count: {count}")


def main():
    for i in range(2, 5):
        p = MyProcess(i)
        p.daemon = True
        p.start()


if __name__ == "__main__":
    main()


# 主進程沒有做任何事情 直接輸入后結(jié)束 同時也終止了子進程的運行。
print("Main End.")

這樣可以有效防止無控制地生成子進程。這樣的寫法可以讓我們在主進程運行結(jié)束后無需額外擔(dān)心子進程是否關(guān)閉,避免了獨立子進程的運行。

進程等待

import multiprocessing
import time


class MyProcess(multiprocessing.Process):
    def __init__(self, loop):
        super(MyProcess, self).__init__()
        self.loop = loop

    def run(self):
        for count in range(self.loop):
            time.sleep(1)
            print(f"Loop: {self.loop}, Pid: {self.pid}, Loop Count: {count}")


def main():
    processes = []
    for i in range(2, 5):
        p = MyProcess(i)
        p.daemon = True
        p.start()
        processes.append(p)

    for p in processes:
        p.join()


if __name__ == "__main__":
    main()


# 主進程沒有做任何事情 直接輸入后結(jié)束 同時也終止了子進程的運行。
print("Main End.")

進程等待最大時間

默認情況下,join是無限期的。也就是說,如果有子進程沒有運行完畢,主進程會一直等待。這種情況下,如果子進程出現(xiàn)問題陷入了死循環(huán),主進程也會無限等待下去。
怎么解決這個問題呢?可以給 join 方法傳遞一個超時參數(shù),代表最長等待秒數(shù)。如果子進程沒有在這個指定秒數(shù)之內(nèi)完成,會被強制返回,主進程不再會等待。
也就是說這個參數(shù)設(shè)置了主進程等待該子進程的最長時間。

import multiprocessing
import time


class MyProcess(multiprocessing.Process):
    def __init__(self, loop):
        super(MyProcess, self).__init__()
        self.loop = loop

    def run(self):
        for count in range(self.loop):
            time.sleep(1)
            print(f"Loop: {self.loop}, Pid: {self.pid}, Loop Count: {count}")


def main():
    processes = []
    for i in range(2, 5):
        p = MyProcess(i)
        p.daemon = True
        p.start()
        processes.append(p)

    for p in processes:
        # 主進程最多等待改進程 1 s
        p.join(1)


if __name__ == "__main__":
    main()


# 主進程沒有做任何事情 直接輸入后結(jié)束 同時也終止了子進程的運行。
print("Main End.")

終止進程

當(dāng)然,終止進程不止有守護進程這一種做法,我們也可以通過 terminate 方法來終止某個子進程,另外我們還可以通過 is_alive 方法判斷進程是否還在運行。

import multiprocessing
import time


def task():
    print("1")
    time.sleep(5)
    print("2")


if __name__ == "__main__":
    p = multiprocessing.Process(target=task)
    # 使用 is_alive 判斷當(dāng)前進程進程是否在運行
    print(f"First: {p}, {p.is_alive()}")

    p.start()
    print(f"During: {p}, {p.is_alive()}")

    p.terminate()
    # 即使此時已經(jīng)調(diào)用了 terminate 進程的狀態(tài)還是 True, 即運行狀態(tài)
    # 在調(diào)用 join 之后才變成了終止狀態(tài)
    print(f"After: {p}, {p.is_alive()}")

    p.join()
    print(f"Joined: {p}, {p.is_alive()}")

進程互斥鎖

我們發(fā)現(xiàn),有的輸出結(jié)果沒有換行。這是什么原因造成的呢?這種情況是由多個進程并行執(zhí)行導(dǎo)致的,兩個進程同時進行了輸出,結(jié)果第一個進程的換行沒有來得及輸出,
第二個進程就輸出了結(jié)果,導(dǎo)致最終輸出沒有換行。

那如何來避免這種問題?如果我們能保證,多個進程運行期間的任一時間,只能一個進程輸出,其他進程等待,等剛才那個進程輸出完畢之后,另一個進程再進行輸出,
這樣就不會出現(xiàn)輸出沒有換行的現(xiàn)象了。

這種解決方案實際上就是實現(xiàn)了進程互斥,避免了多個進程同時搶占臨界區(qū)(輸出)資源。我們可以通過multiprocessing中的Lock來實現(xiàn)。Lock,即鎖,
在一個進程輸出時,加鎖,其他進程等待。等此進程執(zhí)行結(jié)束后,釋放鎖,其他進程可以進行輸出。

首先是一個不加鎖的實例:

import multiprocessing
import time


class MyProcess(multiprocessing.Process):
    def __init__(self, loop, lock: multiprocessing.Lock):
        super(MyProcess, self).__init__()
        self.loop = loop
        self.lock = lock

    def run(self):
        for count in range(self.loop):
            time.sleep(0.1)
            # self.lock.acquire()
            print(f"Pid: {self.pid}, LoopCount: {count}")
            # self.lock.release()


def main():
    lock = multiprocessing.Lock()
    for i in range(10, 15):
        p = MyProcess(i, lock)
        p.start()


if __name__ == "__main__":
    main()

然后取消注釋再次運行。

信號量

進程互斥鎖可以使同一時刻只有一個進程能訪問共享資源,如上面的例子所展示的那樣,在同一時刻只能有一個進程輸出結(jié)果。
但有時候我們需要允許多個進程來訪問共享資源,同時還需要限制能訪問共享資源的進程的數(shù)量。

這種需求該如何實現(xiàn)呢?可以用信號量,信號量是進程同步過程中一個比較重要的角色。它可以控制臨界資源的數(shù)量,實現(xiàn)多個進程同時訪問共享資源,限制進程的并發(fā)量。
我們可以用 multiprocessing 庫中的 Semaphore 來實現(xiàn)信號量。
那么接下來我們就用一個實例來演示一下進程之間利用 Semaphore 做到多個進程共享資源,同時又限制同時可訪問的進程數(shù)量,代碼如下:

import multiprocessing
import time

'''
Semaphore管理一個內(nèi)置的計數(shù)器,
每當(dāng)調(diào)用acquire()時內(nèi)置計數(shù)器-1;
調(diào)用release() 時內(nèi)置計數(shù)器+1;
計數(shù)器不能小于0;當(dāng)計數(shù)器為0時,acquire()將阻塞線程直到其他線程(進程)調(diào)用release()。
'''

buffer = multiprocessing.Queue(10)
empty = multiprocessing.Semaphore(2)   # 緩沖區(qū)閑適區(qū)空余數(shù)
full = multiprocessing.Semaphore(0)    # 緩沖區(qū)占用區(qū)占用數(shù)
lock = multiprocessing.Lock()


class Consumer(multiprocessing.Process):
    def run(self):
        global buffer, empty, full, lock
        while True:
            full.acquire()
            lock.acquire()
            buffer.get()
            print("Consumer pop an element.")
            time.sleep(1)
            lock.release()
            empty.release()


class Producer(multiprocessing.Process):
    def __init__(self, name):
        super(Producer, self).__init__()
        self.name = name

    def run(self):
        global buffer, empty, full, lock
        # 生產(chǎn)者 Producer 使用 acquire 方法來占用一個緩沖區(qū)位置,緩沖區(qū)空閑區(qū)大小減 1,接下來進行加鎖,對緩沖區(qū)進行操作,
        # 然后釋放鎖,最后讓代表占用的緩沖區(qū)位置數(shù)量加 1,消費者則相反。
        # 通過 Semaphore 我們很好地控制了進程對資源的并發(fā)訪問數(shù)量。
        while True:
            empty.acquire()
            lock.acquire()
            buffer.put(1)
            print(f"{self.name} Producer put an element.")
            time.sleep(1)
            lock.release()
            full.release()


def main():
    lst = []
    for i in range(3):
        p = Producer(str(i))
        p.daemon = True
        p.start()
        lst.append(p)

    c = Consumer()
    c.daemon = True
    c.start()
    c.join()

    for p in lst:
        p.join()

    print("Main End.")


if __name__ == "__main__":
    main()

在上面的例子中我們使用Queue作為進程通信的共享隊列使用。而如果我們把上面程序中的Queue換成普通的list,是完全起不到效果的,因為進程和進程之間的資源是不共享的。
即使在一個進程中改變了這個list,在另一個進程也不能獲取到這個list的狀態(tài),所以聲明全局變量對多進程是沒有用處的。那進程如何共享數(shù)據(jù)呢?可以用Queue,即隊列。
當(dāng)然這里的隊列指的是 multiprocessing 里面的 Queue。

管道

剛才我們使用Queue實現(xiàn)了進程間的數(shù)據(jù)共享,那么進程之間直接通信,如收發(fā)信息,用什么比較好呢?可以用Pipe,管道。管道,我們可以把它理解為兩個進程之間通信的通道。
管道可以是單向的,即half-duplex:一個進程負責(zé)發(fā)消息,另一個進程負責(zé)收消息;也可以是雙向的duplex,即互相收發(fā)消息。
默認聲明Pipe對象是雙向管道,如果要創(chuàng)建單向管道,可以在初始化的時候傳入 deplex 參數(shù)為 False。

import multiprocessing


class Consumer(multiprocessing.Process):
    def __init__(self, pipe):
        super(Consumer, self).__init__()
        self.pipe = pipe

    def run(self):
        self.pipe.send("Consumer Words.")
        print(f"Consumer Recv: {self.pipe.recv()}")


class Producer(multiprocessing.Process):
    def __init__(self, pipe):
        super(Producer, self).__init__()
        self.pipe = pipe

    def run(self):
        self.pipe.send("Producer Words.")
        print(f"Producer Recv: {self.pipe.recv()}")


def main():
    # 聲明了一個默認為雙向的管道,然后將管道的兩端分別傳給兩個進程。
    # 管道 Pipe 就像進程之間搭建的橋梁,利用它我們就可以很方便地實現(xiàn)進程間通信了。
    pipe = multiprocessing.Pipe()
    c = Consumer(pipe[0])
    p = Producer(pipe[1])
    c.daemon = True
    p.daemon = True
    c.start()
    p.start()
    c.join()
    p.join()
    print("Main Process Ended.")


if __name__ == "__main__":
    main()

進程池

我們講了可以使用Process來創(chuàng)建進程,同時也講了如何用Semaphore來控制進程的并發(fā)執(zhí)行數(shù)量。假如現(xiàn)在我們遇到這么一個問題,我有10000個任務(wù),
每個任務(wù)需要啟動一個進程來執(zhí)行,并且一個進程運行完畢之后要緊接著啟動下一個進程,同時我還需要控制進程的并發(fā)數(shù)量,不能并發(fā)太高,
不然CPU處理不過來(如果同時運行的進程能維持在一個最高恒定值當(dāng)然利用率是最高的)。那么我們該如何來實現(xiàn)這個需求呢?

用Process和Semaphore可以實現(xiàn),但是實現(xiàn)起來比較煩瑣。而這種需求在平時又是非常常見的。此時,我們就可以派上進程池了,即 multiprocessing 中的 Pool。
Pool可以提供指定數(shù)量的進程,供用戶調(diào)用,當(dāng)有新的請求提交到pool中時,如果池還沒有滿,就會創(chuàng)建一個新的進程用來執(zhí)行該請求;
但如果池中的進程數(shù)已經(jīng)達到規(guī)定最大值,那么該請求就會等待,直到池中有進程結(jié)束,才會創(chuàng)建新的進程來執(zhí)行它。

進程池1:

import multiprocessing
import time


def function(index):
    print(f"{index} start")
    time.sleep(3)
    print(f"{index} end")


def main():
    # 聲明了一個大小為 3 的進程池,通過 processes 參數(shù)來指定,如果不指定,那么會自動根據(jù)處理器內(nèi)核來分配進程數(shù)。
    pool = multiprocessing.Pool(processes=3)
    for i in range(4):
        # 使用 apply_async 方法將進程添加進去,args 可以用來傳遞參數(shù)。
        pool.apply_async(function, args=(i, ))

    print("Main start.")
    # 關(guān)閉進程池 使之不再接受新任務(wù)
    pool.close()
    pool.join()
    print("Main end.")


if __name__ == "__main__":
    main()

再介紹進程池一個更好用的map方法,可以將上述寫法簡化很多。map方法是怎么用的呢?第一個參數(shù)就是要啟動的進程對應(yīng)的執(zhí)行方法,
第2個參數(shù)是一個可迭代對象,其中的每個元素會被傳遞給這個執(zhí)行方法。

舉個例子:現(xiàn)在我們有一個list,里面包含了很多URL,另外我們也定義了一個方法用來抓取每個URL內(nèi)容并解析,
那么我們可以直接在map的第一個參數(shù)傳入方法名,第 2 個參數(shù)傳入 URL 數(shù)組。

進程池2:

import multiprocessing
import requests


def scrape(url):
    try:
        ret = requests.get(url).text
        print(ret[:10])
        print()
    except:
        print("Get Error")


def main():
    pool = multiprocessing.Pool(processes=3)
    urls = [
        'http://data.eastmoney.com/hsgt/index.html',
        'http://data.eastmoney.com/hsgtcg/gzcglist.html',
        'https://www.runoob.com/mysql/mysql-alter.html',
        'https://blog.csdn.net/weixin_42329277/article/details/80735009?depth_1-utm_source=distribute.pc_relevant.none-task&utm_source=distribute.pc_relevant.none-task',

    ]
    pool.map(scrape, urls)
    pool.close()
    # pool.join()
    print("Main End.")


if __name__ == "__main__":
    main()

 關(guān)于python如何實現(xiàn)多進程就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI