溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Python并發(fā)之多進程的示例分析

發(fā)布時間:2021-07-22 11:03:26 來源:億速云 閱讀:154 作者:小新 欄目:開發(fā)技術(shù)

這篇文章主要介紹了Python并發(fā)之多進程的示例分析,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

一,進程的理論基礎

一個應用程序,歸根結(jié)底是一堆代碼,是靜態(tài)的,而進程才是執(zhí)行中的程序,在一個程序運行的時候會有多個進程并發(fā)執(zhí)行。

進程和線程的區(qū)別:

  • 進程是系統(tǒng)資源分配的基本單位。

  • 一個進程內(nèi)可以包含多個線程,屬于一對多的關(guān)系,進程內(nèi)的資源,被其內(nèi)的線程共享

  • 線程是進程運行的最小單位,如果說進程是完成一個功能,那么其線程就是完成這個功能的基本單位

  • 進程間資源不共享,多進程切換資源開銷,難度大,同一進程內(nèi)的線程資源共享,多線程切換資源開銷,難度小

進程與線程的共同點:

都是為了提高程序運行效率,都有執(zhí)行的優(yōu)先權(quán)

二,Python的多進程( multiprocessing模塊)

創(chuàng)建一個進程(和創(chuàng)建線程類似)

方法一:創(chuàng)建Process對象,通過對象調(diào)用start()方法啟動進程

from multiprocessing import Process
def foo(name):
 print('hello,%s'%name)
if __name__ == '__main__':
 p1=Process(target=foo,args=('world',))
 p2 = Process(target=foo, args=('China',))
 p1.start()
 p2.start()
 print('=====主進程=====')
 # == == =主進程 == == =
 # hello, world
 # hello, China
 #主進程和子進程并發(fā)執(zhí)行

注意:Process對象只能在在 if __name__ == '__main__':下創(chuàng)建,不然會報錯。

方法二:自定義一個類繼承Process類,并重寫run()方法,將執(zhí)行代碼放在其內(nèi)

from multiprocessing import Process
class MyProcess(Process):
 def __init__(self,name):
  super().__init__()
  self.name = name
 def run(self):
  print('hello,%s'%self.name)
if __name__ == '__main__':
 myprocess1 = MyProcess('world')
 myprocess2 = MyProcess('world')
 myprocess1.start()
 myprocess2.start()

Process內(nèi)置方法

實例方法:

p.start():啟動進程,并調(diào)用該子進程中的p.run()

p.run():進程啟動時運行的方法,正是它去調(diào)用target指定的函數(shù),我們自定義類的類中一定要實現(xiàn)該方法 

p.terminate():強制終止進程p,不會進行任何清理操作,如果p創(chuàng)建了子進程,該子進程就成了僵尸進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進而導致死鎖

p.is_alive():如果p仍然運行,返回True

p.join([timeout]):主線程等待p終止。timeout是可選的超時時間
Process屬性

p.daemon:默認值為False,如果設為True,代表p為后臺運行的守護進程,當p的父進程終止時,p也隨之終止,并且設定為True后,p不能創(chuàng)建自己的新進程,必須在p.start()之前設置

p.name:進程的名稱

p.pid:進程的pid

p.exitcode:進程在運行時為None、如果為–N,表示被信號N結(jié)束(了解即可)

守護進程

類似于守護線程,只不過守護線程是對象的一個方法,而守護進程封裝成對象的屬性。

from multiprocessing import Process
import time
class MyProcess(Process):
 def __init__(self,name):
  super().__init__()
  self.name = name
 def run(self):
  time.sleep(3)
  print('hello,%s'%self.name)
if __name__ == '__main__':
 myprocess1=MyProcess('world')
 myprocess1.daemon = True
 myprocess1.start()
 print('結(jié)束')
#不會輸出‘hello world',因為設置為守護進程,主進程不會等待

也可以使用join方法,使主進程等待

from multiprocessing import Process
import time
class MyProcess(Process):
 def __init__(self,name):
  super().__init__()
  self.name = name
 def run(self):
  time.sleep(3)
  print('hello,%s'%self.name)
if __name__ == '__main__':
 myprocess1=MyProcess('world')
 myprocess1.daemon = True
 myprocess1.start()
 myprocess1.join() #程序阻塞
 print('結(jié)束')
join()

進程同步和鎖

進程雖然不像線程共享資源,但是這并不意味著進程間不 需要加鎖,比如不同進程會共享同一個終端 ( 屏幕),或者操作同一個文件,數(shù)據(jù)庫,那么數(shù)據(jù)安全還是很有必要的,因此我們可以加鎖,

from multiprocessing import Process,Lock
import time
def a_print(l): #需要傳入對象,因為信息不共享
 l.acquire()
 print('我要打印信息')
 time.sleep(1)
 print('我打印完了')
 l.release()
if __name__ == '__main__':
 l = Lock()
 for i in range(20):
  p = Process(target=a_print,args=(l,))
  p.start()

信號量(Semaphore)

能夠并發(fā)執(zhí)行的進程數(shù),超出的進程阻塞,直到有進程運行完成。

Semaphore管理一個內(nèi)置的計數(shù)器,

每當調(diào)用acquire()時內(nèi)置計數(shù)器-1;

調(diào)用release() 時內(nèi)置計數(shù)器+1;

計數(shù)器不能小于0;當計數(shù)器為0時,acquire()將阻塞進程直到其他進程調(diào)用release()。

from multiprocessing import Process,Queue,Semaphore
import time,random
def seat(s,n):
 s.acquire()
 print('學生%d坐下了'%n)
 time.sleep(random.randint(1,2))
 s.release()
if __name__ == '__main__':
 s = Semaphore(5)
 for i in range(20):
  p = Process(target=seat,args=(s,i))
  p.start()
 print('-----主進程-------')

注意:其實信號量和鎖類似,只是限制進程運行某個代碼塊的數(shù)量(鎖為1個),并不是能限制并發(fā)的進程,如上述代碼,一次性還是創(chuàng)建了20個進程

事件(Event)

from multiprocessing import Process,Event
import time, random
def eating(event):
 event.wait()
 print('去吃飯的路上...')
def makeing(event):
 print('做飯中')
 time.sleep(random.randint(1,2))
 print('做好了,快來...')
 event.set()
if __name__ == '__main__':
 event=Event()
 t1 = Process(target=eating,args=(event,))
 t2 = Process(target=makeing,args=(event,))
 t1.start()
 t2.start()
 # 做飯中
 # 做好了,快來...
 # 去吃飯的路上...

和線程事件幾乎一致

進程隊列(Queue)

進程隊列是進程通訊的方式之一。使用multiprocessing 下的Queue

from multiprocessing import Process,Queue
import time
def func1(queue):
 while True:
  info=queue.get()
  if info == None:
   return 
  print(info)
def func2(queue):
 for i in range(10):
  time.sleep(1)
  queue.put('is %d'%i)
 queue.put(None) #結(jié)束的標志
if __name__ == '__main__':
 q = Queue()
 p1 = Process(target=func1,args=(q,))
 p2 = Process(target=func2, args=(q,))
 p1.start()
 p2.start()
Queue類的方法,源碼如下:
class Queue(object):
 def __init__(self, maxsize=-1): #可以傳參設置隊列最大容量
  self._maxsize = maxsize
 def qsize(self): #返回當前時刻隊列中的個數(shù)
  return 0
 def empty(self): #是否為空
  return False
 def full(self): 是否滿了
  return False
 def put(self, obj, block=True, timeout=None): #放值,blocked和timeout。如果blocked為True(默認值),并且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常
  pass
 def put_nowait(self, obj): #=put(False)
  pass
 def get(self, block=True, timeout=None): 獲取值,get方法有兩個可選參數(shù):blocked和timeout。如果blocked為True(默認值),并且timeout為正值,那么在等待時間內(nèi)沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常.
  pass
 def get_nowait(self): # = get(False)
  pass
 def close(self): #將隊列關(guān)閉
  pass
 def join_thread(self): #略,幾乎不用
  pass
 def cancel_join_thread(self):
  pass

進程隊列源碼注釋

進程池

進程的消耗是很大的,因此我們不能無節(jié)制的開啟新進程,因此我們可以 通過維護一個進程池來控制進程的數(shù)量 。這就不同于信號量,進程池可以從源頭控制進程數(shù)量。在Python中可以通過如下方法使用

同步調(diào)用

from multiprocessing import Pool
import time, random, os
def func(n):
 pid = os.getpid()
 print('進程%s正在處理第%d個任務'%(pid,n),'時間%s'%time.strftime('%H-%M-%S'))
 time.sleep(2)
 res = '處理%s'%random.choice(['成功','失敗'])
 return res
if __name__ == '__main__':
 p = Pool(4) #創(chuàng)建4個進程,
 li = []
 for i in range(10):
  res = p.apply(func,args=(i,)) 交給進程池處理,處理完成才返回值,會阻塞,即使池內(nèi)還有空余進程,相當于順序執(zhí)行
  li.append(res)
 for i in li:
  print(i)

#進程1916正在處理第0個任務 時間21-02-53
#進程1240正在處理第1個任務 時間21-02-55
#進程3484正在處理第2個任務 時間21-02-57
#進程7512正在處理第3個任務 時間21-02-59
#進程1916正在處理第4個任務 時間21-03-01
#進程1240正在處理第5個任務 時間21-03-03
#進程3484正在處理第6個任務 時間21-03-05
#進程7512正在處理第7個任務 時間21-03-07
#進程1916正在處理第8個任務 時間21-03-09
#進程1240正在處理第9個任務 時間21-03-11

從結(jié)果可以發(fā)現(xiàn)兩點:

  1. 不是并發(fā)處理

  2. 一直都只有四個進程,串行執(zhí)行

因此進程池提供了 異步處理 的方式

from multiprocessing import Pool
import time, random, os
def func(n):
 pid = os.getpid()
 print('進程%s正在處理第%d個任務'%(pid,n),'時間%s'%time.strftime('%H-%M-%S'))
 time.sleep(2)
 res = '處理%s'%random.choice(['成功','失敗'])
 return res

if __name__ == '__main__':
 p = Pool(4)
 li = []
 for i in range(10):
  res = p.apply_async(func,args=(i,)) 結(jié)果不會立刻返回,遇到阻塞,開啟下一個進程,在這,相當于幾乎同時出現(xiàn)四個打印結(jié)果(一個線程處理一個任務,處理完下個任務才能進來)
  li.append(res)

 p.close() #join之前需要關(guān)閉進程池
 p.join() #因為異步,所以需要等待池內(nèi)進程工作結(jié)束再繼續(xù)
 for i in li:
  print(i.get()) #i是一個對象,通過get方法獲取返回值,而同步則沒有該方法

關(guān)于回調(diào)函數(shù)

from multiprocessing import Pool
import time, random, os
def func(n):
 pid = os.getpid()
 print('進程%s正在處理第%d個任務'%(pid,n),'時間%s'%time.strftime('%H-%M-%S'))
 time.sleep(2)
 res = '處理%s'%random.choice(['成功','失敗'])
 return res

def foo(info):
 print(info) #傳入值為進程執(zhí)行結(jié)果

if __name__ == '__main__':
 p = Pool(4)
 li = []
 for i in range(10):
  res = p.apply_async(func,args=(i,),callback = foo) callback()回調(diào)函數(shù)會在進程執(zhí)行完之后調(diào)用(主進程調(diào)用) 
  li.append(res)

 p.close() 
 p.join() 
 for i in li:
  print(i.get())

有回調(diào)函數(shù)

感謝你能夠認真閱讀完這篇文章,希望小編分享的“Python并發(fā)之多進程的示例分析”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,更多相關(guān)知識等著你來學習!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI