溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Python中multiprocessing的作用是什么

發(fā)布時間:2021-07-10 17:02:34 來源:億速云 閱讀:272 作者:Leah 欄目:編程語言

Python中multiprocessing的作用是什么,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。

一前言 
   使用python進行并發(fā)處理多臺機器/多個實例的時候,我們可以使用threading ,但是由于著名的GIL存在,實際上threading 并未提供真正有效的并發(fā)處理,要充分利用到多核CPU,我們需要使用多進程。Python提供了非常好用的多進程包--multiprocessing。multiprocessing 可以利用multiprocessing.Process對象來創(chuàng)建一個進程,該Process對象與Threading對象的用法基本相同,具有相同的方法(官方原話:"The multiprocessing package mostly replicates the API of the threading module.") 比如:start(),run(),join()的方法。multiprocessing包中也有Lock/Event/Semaphore/Condition/Pipe/Queue類用于進程之間的通信。話不多說 show me the code!

二使用
2.1 初識異同
下面的程序顯示threading和multiprocessing的在使用方面的異同,相近的函數(shù)join(),start(),append() 等,并做同一件事情打印自己的進程pid

  1. #!/usr/bin/env python

  2. # encoding: utf-8

  3. import os

  4. import threading

  5. import multiprocessing

  6. def printer(msg):

  7.     print(msg, os.getpid())

  8. print('Main begin:', os.getpid())

  9. # threading

  10. record = []

  11. for i in range(5):

  12.     thread = threading.Thread(target=printer, args=('threading',))

  13.     thread.start()

  14.     record.append(thread)

  15. for thread in record:

  16.     thread.join()

  17. # multi-process

  18. record = []

  19. for i in range(5):

  20.     process = multiprocessing.Process(target=printer, args=('multiprocessing',))

  21.     process.start()

  22.     record.append(process)

  23. for process in record:

  24.     process.join()

  25. print('Main end:', os.getpid())

輸出結果

點擊(此處)折疊或打開

  1. Main begin: 9524

  2. threading 9524

  3. threading 9524

  4. threading 9524

  5. threading 9524

  6. threading 9524

  7. multiprocessing 9539

  8. multiprocessing 9540

  9. multiprocessing 9541

  10. multiprocessing 9542

  11. multiprocessing 9543

  12. Main end: 9524

從例子的結果可以看出多線程threading的進程id和主進程(父進程)pid一樣 ,同為9524; 多進程打印的pid每個都不一樣,for循環(huán)中每創(chuàng)建一個process對象都年開一個進程。其他相關的方法基本類似。

2.2 用法
創(chuàng)建進程的類:
Process([group [, target [, name [, args [, kwargs]]]]]),
target表示調(diào)用對象,
args表示調(diào)用對象的位置參數(shù)元組。
kwargs表示調(diào)用對象的字典。
name為進程的別名。
group實質(zhì)上不使用,為None。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()啟動某個進程,并自動調(diào)用run方法.
屬性:authkey、daemon(要通過start()設置,必須設置在方法start之前)、exitcode(進程在運行時為None、如果為–N,表示被信號N結束)、name、pid。其中daemon是父進程終止后自動終止,且自己不能產(chǎn)生新進程,必須在start()之前設置。

2.3 創(chuàng)建單進程
單線程比較簡單,創(chuàng)建一個 Process的實例對象就好,傳入?yún)?shù) target 為已經(jīng)定義好的方法worker以及worker需要的參數(shù)

  1. #!/usr/bin/env python

  2. # encoding: utf-8

  3. """

  4. author: yangyi@youzan.com

  5. time: 2017/7/2 下午6:45

  6. func:

  7. """

  8. import multiprocessing

  9. import datetime, time

  10. def worker(interval):

  11.     print("process start: {0}".format(datetime.datetime.today()));

  12.     time.sleep(interval)

  13.     print("process   end: {0}".format(datetime.datetime.today()));


  14. if __name__ == "__main__":

  15.     p = multiprocessing.Process(target=worker, args=(5,))

  16.     p.start()

  17.     p.join()

  18.     print "end!"

2.4 創(chuàng)建多進程

  1. #!/usr/bin/env python

  2. # encoding: utf-8

  3. """

  4. author: yangyi@youzan.com

  5. time: 2017/7/2 下午7:50

  6. func:

  7. """

  8. import multiprocessing

  9. def worker(num):

  10.     print "worker %d" %num



  11. if __name__ == "__main__":

  12.     print("The number of CPU is:" + str(multiprocessing.cpu_count()))

  13.     proc = []

  14.     for i in xrange(5):

  15.         p = multiprocessing.Process(target=worker, args=(i,))

  16.         proc.append(p)

  17.     for p in proc:

  18.         p.start()

  19.     for p in proc:

  20.         p.join()

  21.     print "end ..."

輸出

點擊(此處)折疊或打開

  1. The number of CPU is:4

  2. worker 0

  3. worker 1

  4. worker 2

  5. worker 3

  6. worker 4

  7. main process end ...

2.5 線程池
multiprocessing提供進程池的類--Pool,它可以指定程序最大可以調(diào)用的進程數(shù)量,當有新的請求提交到pool中時,如果進程池還沒有滿,那么就會創(chuàng)建一個新的進程用來執(zhí)行該請求;但如果進程池中的進程數(shù)已經(jīng)達到規(guī)定最大值,那么該請求就會等待,直到池中有進程結束,才會創(chuàng)建新的進程來它。
構造方法:
Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
processes  : 使用的工作進程的數(shù)量,如果processes是None,默認使用os.cpu_count()返回的數(shù)量。
initializer: 如果initializer是None,那么每一個工作進程在開始的時候會調(diào)用initializer(*initargs)。
maxtasksperchild:工作進程退出之前可以完成的任務數(shù),完成后用一個新的工作進程來替代原進程,來讓閑置的資源被釋放。maxtasksperchild默認是None,意味著只要Pool存在工作進程就會一直存活。
context: 用在制定工作進程啟動時的上下文,一般使用multiprocessing.Pool()或者一個context對象的Pool()方法來創(chuàng)建一個池,兩種方法都適當?shù)脑O置了context。

實例方法:
  apply(func[, args[, kwds]]):同步進程池
  apply_async(func[, args[, kwds[, callback[, error_callback]]]]) :異步進程池
  close() : 關閉進程池,阻止更多的任務提交到pool,待任務完成后,工作進程會退出。
  terminate() : 結束工作進程,不在處理未完成的任務.
  join() : 等待工作線程的退出,在調(diào)用join()前必須調(diào)用close()或者 terminate(),因為被終止的進程需要被父進程調(diào)用wait(join等價與wait),否則進程會成為僵尸進程。

  1. #!/usr/bin/env python

  2. # encoding: utf-8

  3. """

  4. author: yangyi@youzan.com

  5. time: 2017/7/2 下午7:50

  6. func:

  7. """

  8. from multiprocessing import Pool

  9. import time

  10. def worker(num):

  11.     print "worker %d" %num

  12.     time.sleep(2)

  13.     print "end worker %d" %num


  14. if __name__ == "__main__":

  15.     proc_pool = Pool(2)

  16.     for i in xrange(4):

  17.         proc_pool.apply_async(worker, (i,)) #使用了異步調(diào)用,從輸出結果可以看出來


  18.     proc_pool.close()

  19.     proc_pool.join()

  20.     print "main process end ..."

輸出結果

點擊(此處)折疊或打開

  1. worker 0

  2. worker 1

  3. end worker 0

  4. end worker 1

  5. worker 2

  6. worker 3

  7. end worker 2

  8. end worker 3

  9. main process end ..

解釋:創(chuàng)建一個進程池pool 對象proc_pool,并設定進程的數(shù)量為2,xrange(4)會相繼產(chǎn)生四個對象[0, 1, 2, 4],四個對象被提交到pool中,因pool指定進程數(shù)為2,所以0、1會直接送到進程中執(zhí)行,當其中的2個任務執(zhí)行完之后才空出2進程處理對象2和3,所以會出現(xiàn)輸出 worker 2 worker 3 出現(xiàn)在end worker 0 end worker 1之后。思考一下如果調(diào)用  proc_pool.apply(worker, (i,)) 的輸出結果會是什么樣的?

2.6 使用queue
multiprocessing提供隊列類,可以通過調(diào)用multiprocessing.Queue(maxsize) 初始化隊列對象,maxsize表示隊列里面最多的元素個數(shù)。
例子 創(chuàng)建了兩個函數(shù)入隊,出隊,出隊處理時使用了lock特性,串行化取數(shù)據(jù)。

  1. #!/usr/bin/env python

  2. # encoding: utf-8

  3. """

  4. author: yangyi@youzan.com

  5. time: 2017/7/2 下午9:03

  6. func:

  7. """

  8. import time

  9. from multiprocessing import Process, current_process,Lock,Queue

  10. import datetime

  11. def inputQ(queue):

  12.     time.sleep(1)

  13.     info = "proc_name: " + current_process().name + ' was putted in queue at: ' + str(datetime.datetime.today())

  14.     queue.put(info)

  15. def outputQ(queue,lock):

  16.     info = queue.get()

  17.     lock.acquire()

  18.     print ("proc_name: " + current_process().name + ' gets info :' + info)

  19.     lock.release()

  20. if __name__ == '__main__':

  21.     record1 = [] # store input processes

  22.     record2 = [] # store output processes

  23.     lock = Lock() # To prevent messy print

  24.     queue = Queue(3)

  25.     for i in range(10):

  26.         process = Process(target=inputQ, args=(queue,))

  27.         process.start()

  28.         record1.append(process)

  29.     for i in range(10):

  30.         process = Process(target=outputQ, args=(queue,lock))

  31.         process.start()

  32.         record2.append(process)

  33.     for p in record1:

  34.         p.join()

  35.     queue.close() # No more object will come, close the queue

  36.     for p in record2:

  37.         p.join()

2.7 使用pipe 
Pipe可以是單向(half-duplex),也可以是雙向(duplex)。我們通過mutiprocessing.Pipe(duplex=False)創(chuàng)建單向管道 (默認為雙向)。一個進程從PIPE一端輸入對象,然后被PIPE另一端的進程接收,單向管道只允許管道一端的進程輸入,而雙向管道則允許從兩端輸入。
用法 multiprocessing.Pipe([duplex])
該類返回一組對象實例(conn1, conn2),分別代表發(fā)送和接受消息的兩端。

  1. #!/usr/bin/env python

  2. # encoding: utf-8

  3. """

  4. author: yangyi@youzan.com

  5. time: 2017/7/2 下午8:01

  6. func:

  7. """

  8. from multiprocessing import Process, Pipe

  9. def p1(conn, name):

  10.     conn.send('hello ,{name}'.format(name=name))

  11.     print "p1 receive :", conn.recv()

  12.     conn.close()


  13. def p2(conn, name):

  14.     conn.send('hello ,{name}'.format(name=name))

  15.     print "p2 receive :", conn.recv()

  16.     conn.close()


  17. if __name__ == '__main__':

  18.     parent_conn, child_conn = Pipe()

  19.     proc1 = Process(target=p1, args=(child_conn, "parent_conn"))

  20.     proc2 = Process(target=p2, args=(parent_conn, "child_conn"))

  21.     proc1.start()

  22.     proc2.start()

  23.     proc1.join()

  24.     proc2.join()

輸出:

點擊(此處)折疊或打開

  1. p1 receive : hello ,child_conn

  2. p2 receive : hello ,parent_conn

該例子中 p1 p2 通過pipe 給彼此相互發(fā)送信息,p1 發(fā)送"parent_conn" 給 p2 ,p2 發(fā)送"child_conn" 給p1.
2.8 daemon程序?qū)Ρ冉Y果

  1. import multiprocessing

  2. import datetime, time

  3. def worker(interval):

  4.     print("process start: {0}".format(datetime.datetime.today()));

  5.     time.sleep(interval)

  6.     print("process   end: {0}".format(datetime.datetime.today()));

  7. if __name__ == "__main__":

  8.     p = multiprocessing.Process(target=worker, args=(5,))

  9.     p.start()

  10.     print "end!"

輸出:

點擊(此處)折疊或打開

  1. end!

  2. process start: 2017-07-02 18:47:30.656244

  3. process   end: 2017-07-02 18:47:35.657464


設置 daemon = True,程序隨著主程序結束而不等待子進程。

  1. import multiprocessing

  2. import datetime, time

  3. def worker(interval):

  4.     print("process start: {0}".format(datetime.datetime.today()));

  5.     time.sleep(interval)

  6.     print("process   end: {0}".format(datetime.datetime.today()));

  7. if __name__ == "__main__":

  8.     p = multiprocessing.Process(target=worker, args=(5,))

  9.     p.daemon = True

  10.     p.start()

  11.     print "end!"

輸出:
end!
因為子進程設置了daemon屬性,主進程結束,multiprocessing創(chuàng)建的進程對象就隨著結束了。

  1. import multiprocessing

  2. import datetime, time

  3. def worker(interval):

  4.     print("process start: {0}".format(datetime.datetime.today()));

  5.     time.sleep(interval)

  6.     print("process   end: {0}".format(datetime.datetime.today()));

  7. if __name__ == "__main__":

  8.     p = multiprocessing.Process(target=worker, args=(5,))

  9.     p.daemon = True  #

  10.     p.start()

  11.     p.join() #進程執(zhí)行完畢后再關閉

  12.     print "end!"

輸出:

點擊(此處)折疊或打開

  1. process start: 2017-07-02 18:48:20.953754

  2. process   end: 2017-07-02 18:48:25.954736


2.9 Lock()
當多個進程需要訪問共享資源的時候,Lock可以用來避免訪問的沖突。
實例方法:
acquire([timeout]): 使線程進入同步阻塞狀態(tài),嘗試獲得鎖定。
release(): 釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。
例子:
多個進程使用同一個std_out ,使用lock機制確保同一個時刻有一個一個進程獲取輸出。

  1. #!/usr/bin/env python
    # encoding: utf-8
    """
    author: yangyi@youzan.com
    time: 2017/7/2 下午9:28
    func: 
    """
    from multiprocessing import Process, Lock
    def func_with_lock(l, i):
        l.acquire()
        print 'hello world', i
        l.release()


    def func_without_lock(i):
        print 'hello world', i


    if __name__ == '__main__':
        lock = Lock()
        print "func_with_lock :"
        for num in range(10):
            Process(target=func_with_lock, args=(lock, num)).start()


輸出:

點擊(此處)折疊或打開

  1. func_with_lock :

  2. hello world 0

  3. hello world 1

  4. hello world 2

  5. hello world 3

  6. hello world 4

  7. hello world 5

  8. hello world 6

  9. hello world 7

  10. hello world 8

  11. hello world 9

關于Python中multiprocessing的作用是什么問題的解答就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業(yè)資訊頻道了解更多相關知識。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。

AI