溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

python中多進(jìn)程和多線程的使用方法

發(fā)布時(shí)間:2021-03-30 14:02:50 來源:億速云 閱讀:206 作者:小新 欄目:開發(fā)技術(shù)

這篇文章主要介紹了python中多進(jìn)程和多線程的使用方法,具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

進(jìn)程和線程

進(jìn)程是系統(tǒng)進(jìn)行資源分配的最小單位,線程是系統(tǒng)進(jìn)行調(diào)度執(zhí)行的最小單位;

一個(gè)應(yīng)用程序至少包含一個(gè)進(jìn)程,一個(gè)進(jìn)程至少包含一個(gè)線程;

每個(gè)進(jìn)程在執(zhí)行過程中擁有獨(dú)立的內(nèi)存空間,而一個(gè)進(jìn)程中的線程之間是共享該進(jìn)程的內(nèi)存空間的;

  • 計(jì)算機(jī)的核心是CPU,它承擔(dān)了所有的計(jì)算任務(wù)。它就像一座工廠,時(shí)刻在運(yùn)行。

  • 假定工廠的電力有限,一次只能供給一個(gè)車間使用。也就是說,一個(gè)車間開工的時(shí)候,其他車間都必須停工。背后的含義就是,單個(gè)CPU一次只能運(yùn)行一個(gè)任務(wù)。編者注: 多核的CPU就像有了多個(gè)發(fā)電廠,使多工廠(多進(jìn)程)實(shí)現(xiàn)可能。

  • 進(jìn)程就好比工廠的車間,它代表CPU所能處理的單個(gè)任務(wù)。任一時(shí)刻,CPU總是運(yùn)行一個(gè)進(jìn)程,其他進(jìn)程處于非運(yùn)行狀態(tài)。

  • 一個(gè)車間里,可以有很多工人。他們協(xié)同完成一個(gè)任務(wù)。

  • 線程就好比車間里的工人。一個(gè)進(jìn)程可以包括多個(gè)線程。

  • 車間的空間是工人們共享的,比如許多房間是每個(gè)工人都可以進(jìn)出的。這象征一個(gè)進(jìn)程的內(nèi)存空間是共享的,每個(gè)線程都可以使用這些共享內(nèi)存。

  • 可是,每間房間的大小不同,有些房間最多只能容納一個(gè)人,比如廁所。里面有人的時(shí)候,其他人就不能進(jìn)去了。這代表一個(gè)線程使用某些共享內(nèi)存時(shí),其他線程必須等它結(jié)束,才能使用這一塊內(nèi)存。

  • 一個(gè)防止他人進(jìn)入的簡單方法,就是門口加一把鎖。先到的人鎖上門,后到的人看到上鎖,就在門口排隊(duì),等鎖打開再進(jìn)去。這就叫"互斥鎖"(Mutual exclusion,縮寫 Mutex),防止多個(gè)線程同時(shí)讀寫某一塊內(nèi)存區(qū)域。

  • 還有些房間,可以同時(shí)容納n個(gè)人,比如廚房。也就是說,如果人數(shù)大于n,多出來的人只能在外面等著。這好比某些內(nèi)存區(qū)域,只能供給固定數(shù)目的線程使用。

  • 這時(shí)的解決方法,就是在門口掛n把鑰匙。進(jìn)去的人就取一把鑰匙,出來時(shí)再把鑰匙掛回原處。后到的人發(fā)現(xiàn)鑰匙架空了,就知道必須在門口排隊(duì)等著了。這種做法叫做"信號量"(Semaphore),用來保證多個(gè)線程不會互相沖突。

  • 不難看出,mutex是semaphore的一種特殊情況(n=1時(shí))。也就是說,完全可以用后者替代前者。但是,因?yàn)閙utex較為簡單,且效率高,所以在必須保證資源獨(dú)占的情況下,還是采用這種設(shè)計(jì)。

Python的多進(jìn)程

Python的多進(jìn)程依賴于multiprocess模塊;使用多進(jìn)程可以利用多個(gè)CPU進(jìn)行并行計(jì)算;

實(shí)例:

from multiprocessing import Process
import os
import time
 
def long_time_task(i):
    print('子進(jìn)程: {} - 任務(wù){(diào)}'.format(os.getpid(), i))
    time.sleep(2)
    print("結(jié)果: {}".format(8 ** 20))
 
if __name__=='__main__':
    print('當(dāng)前母進(jìn)程: {}'.format(os.getpid()))
    start = time.time()
    p1 = Process(target=long_time_task, args=(1,))
    p2 = Process(target=long_time_task, args=(2,))
    print('等待所有子進(jìn)程完成。')
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    end = time.time()
    print("總共用時(shí){}秒".format((end - start)))

新創(chuàng)建進(jìn)程和進(jìn)程間切換是需要消耗資源的,所以應(yīng)該控制進(jìn)程數(shù)量;

同時(shí)可運(yùn)行的進(jìn)程數(shù)量收到CPU核數(shù)限制;

進(jìn)程池

使用進(jìn)程池pool創(chuàng)建進(jìn)程:

使用進(jìn)程池可以避免手工進(jìn)行進(jìn)程的創(chuàng)建的麻煩,默認(rèn)數(shù)量是CPU核數(shù);

Pool類可以提供指定數(shù)量的進(jìn)程供用戶使用,當(dāng)有新的請求被提交到Pool中的時(shí)候,如果進(jìn)程池還沒有滿,就會創(chuàng)建一個(gè)新的進(jìn)程來執(zhí)行請求;如果池已經(jīng)滿了,請求就會等待,等到有空閑進(jìn)程可以使用時(shí),才會執(zhí)行請求;

幾個(gè)方法:

1.apply_async

作用是向進(jìn)程池提交需要執(zhí)行的函數(shù)和參數(shù),各個(gè)進(jìn)程采用非阻塞的異步方式調(diào)用,每個(gè)進(jìn)程只管自己運(yùn)行,是默認(rèn)方式;

2.map

會阻塞進(jìn)程直到返回結(jié)果;

3.map_sunc

非阻塞進(jìn)程;

4.close

關(guān)閉進(jìn)程池,不再接受任務(wù);

5.terminate

結(jié)束進(jìn)程;

6.join

主進(jìn)程阻塞,直到子進(jìn)程執(zhí)行結(jié)束;

實(shí)例:

from multiprocessing import Pool, cpu_count
import os
import time
 
def long_time_task(i):
    print('子進(jìn)程: {} - 任務(wù){(diào)}'.format(os.getpid(), i))
    time.sleep(2)
    print("結(jié)果: {}".format(8 ** 20))
 
if __name__=='__main__':
    print("CPU內(nèi)核數(shù):{}".format(cpu_count()))
    print('當(dāng)前母進(jìn)程: {}'.format(os.getpid()))
    start = time.time()
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('等待所有子進(jìn)程完成。')
    p.close()
    p.join()
    end = time.time()
    print("總共用時(shí){}秒".format((end - start)))

在join之前,必須使用close或者terminate,讓進(jìn)程池不再接受任務(wù);

多進(jìn)程間的數(shù)據(jù)通信與共享

通常,進(jìn)程之間是相互獨(dú)立的,每個(gè)進(jìn)程都有獨(dú)立的內(nèi)存。通過共享內(nèi)存(nmap模塊),進(jìn)程之間可以共享對象,使多個(gè)進(jìn)程可以訪問同一個(gè)變量(地址相同,變量名可能不同)。多進(jìn)程共享資源必然會導(dǎo)致進(jìn)程間相互競爭,所以應(yīng)該盡最大可能防止使用共享狀態(tài)。還有一種方式就是使用隊(duì)列queue來實(shí)現(xiàn)不同進(jìn)程間的通信或數(shù)據(jù)共享,這一點(diǎn)和多線程編程類似。

下例這段代碼中中創(chuàng)建了2個(gè)獨(dú)立進(jìn)程,一個(gè)負(fù)責(zé)寫(pw), 一個(gè)負(fù)責(zé)讀(pr), 實(shí)現(xiàn)了共享一個(gè)隊(duì)列queue。

from multiprocessing import Process, Queue
import os, time, random
 
# 寫數(shù)據(jù)進(jìn)程執(zhí)行的代碼:
def write(q):
    print('Process to write: {}'.format(os.getpid()))
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())
 
# 讀數(shù)據(jù)進(jìn)程執(zhí)行的代碼:
def read(q):
    print('Process to read:{}'.format(os.getpid()))
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)
 
if __name__=='__main__':
    # 父進(jìn)程創(chuàng)建Queue,并傳給各個(gè)子進(jìn)程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 啟動子進(jìn)程pw,寫入:
    pw.start()
    # 啟動子進(jìn)程pr,讀取:
    pr.start()
    # 等待pw結(jié)束:
    pw.join()
    # pr進(jìn)程里是死循環(huán),無法等待其結(jié)束,只能強(qiáng)行終止:
    pr.terminate()

Python的多線程

python 3中的多進(jìn)程編程主要依靠threading模塊。創(chuàng)建新線程與創(chuàng)建新進(jìn)程的方法非常類似。threading.Thread方法可以接收兩個(gè)參數(shù), 第一個(gè)是target,一般指向函數(shù)名,第二個(gè)時(shí)args,需要向函數(shù)傳遞的參數(shù)。對于創(chuàng)建的新線程,調(diào)用start()方法即可讓其開始。我們還可以使用current_thread().name打印出當(dāng)前線程的名字。 

import threading
import time
 
def long_time_task(i):
    print('當(dāng)前子線程: {} 任務(wù){(diào)}'.format(threading.current_thread().name, i))
    time.sleep(2)
    print("結(jié)果: {}".format(8 ** 20))
 
if __name__=='__main__':
    start = time.time()
    print('這是主線程:{}'.format(threading.current_thread().name))
    thread_list = []
    for i in range(1, 3):
        t = threading.Thread(target=long_time_task, args=(i, ))
        thread_list.append(t)
    for t in thread_list:
        t.start()
    for t in thread_list:
        t.join()
    end = time.time()
    print("總共用時(shí){}秒".format((end - start)))

多線程間的數(shù)據(jù)共享

一個(gè)進(jìn)程所含的不同線程間共享內(nèi)存,這就意味著任何一個(gè)變量都可以被任何一個(gè)線程修改,因此線程之間共享數(shù)據(jù)最大的危險(xiǎn)在于多個(gè)線程同時(shí)改一個(gè)變量,把內(nèi)容給改亂了。如果不同線程間有共享的變量,其中一個(gè)方法就是在修改前給其上一把鎖lock,確保一次只有一個(gè)線程能修改它。threading.lock()方法可以輕易實(shí)現(xiàn)對一個(gè)共享變量的鎖定,修改完后release供其它線程使用。

import threading
 
class Account:
    def __init__(self):
        self.balance = 0
 
    def add(self, lock):
        # 獲得鎖
        lock.acquire()
        for i in range(0, 100000):
            self.balance += 1
        # 釋放鎖
        lock.release()
 
    def delete(self, lock):
        # 獲得鎖
        lock.acquire()
        for i in range(0, 100000):
            self.balance -= 1
            # 釋放鎖
        lock.release()
 
if __name__ == "__main__":
    account = Account()
    lock = threading.Lock()
    # 創(chuàng)建線程
   thread_add = threading.Thread(target=account.add, args=(lock,), name='Add')
    thread_delete = threading.Thread(target=account.delete, args=(lock,), name='Delete')
 
    # 啟動線程
   thread_add.start()
    thread_delete.start()
 
    # 等待線程結(jié)束
   thread_add.join()
    thread_delete.join()
 
    print('The final balance is: {}'.format(account.balance))

使用queue隊(duì)列通信-經(jīng)典的生產(chǎn)者和消費(fèi)者模型

from queue import Queue
import random, threading, time
 
# 生產(chǎn)者類
class Producer(threading.Thread):
    def __init__(self, name, queue):
        threading.Thread.__init__(self, name=name)
        self.queue = queue
 
    def run(self):
        for i in range(1, 5):
            print("{} is producing {} to the queue!".format(self.getName(), i))
            self.queue.put(i)
            time.sleep(random.randrange(10) / 5)
        print("%s finished!" % self.getName())
 
# 消費(fèi)者類
class Consumer(threading.Thread):
    def __init__(self, name, queue):
        threading.Thread.__init__(self, name=name)
        self.queue = queue
 
    def run(self):
        for i in range(1, 5):
            val = self.queue.get()
            print("{} is consuming {} in the queue.".format(self.getName(), val))
            time.sleep(random.randrange(10))
        print("%s finished!" % self.getName())
 
def main():
    queue = Queue()
    producer = Producer('Producer', queue)
    consumer = Consumer('Consumer', queue)
 
    producer.start()
    consumer.start()
 
    producer.join()
    consumer.join()
    print('All threads finished!')
 
if __name__ == '__main__':
    main()
  • 對CPU密集型代碼(比如循環(huán)計(jì)算) - 多進(jìn)程效率更高

  • 對IO密集型代碼(比如文件操作,網(wǎng)絡(luò)爬蟲) - 多線程效率更高。

對于IO密集型操作,大部分消耗時(shí)間其實(shí)是等待時(shí)間,在等待時(shí)間中CPU是不需要工作的,那你在此期間提供雙CPU資源也是利用不上的,相反對于CPU密集型代碼,2個(gè)CPU干活肯定比一個(gè)CPU快很多。那么為什么多線程會對IO密集型代碼有用呢?這時(shí)因?yàn)閜ython碰到等待會釋放GIL供新的線程使用,實(shí)現(xiàn)了線程間的切換。

感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“python中多進(jìn)程和多線程的使用方法”這篇文章對大家有幫助,同時(shí)也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,更多相關(guān)知識等著你來學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI