溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

python多進(jìn)程和多線程的實(shí)際用法

發(fā)布時(shí)間:2021-08-31 16:01:19 來(lái)源:億速云 閱讀:218 作者:chen 欄目:編程語(yǔ)言

這篇文章主要講解了“python多進(jìn)程和多線程的實(shí)際用法”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“python多進(jìn)程和多線程的實(shí)際用法”吧!

  寫(xiě)在前面

  總所周知,unix/linux 為多任務(wù)操作系統(tǒng),,即可以支持遠(yuǎn)大于CPU數(shù)量的任務(wù)同時(shí)運(yùn)行

  理解多任務(wù)就需要知道操作系統(tǒng)的CPU上下文:

  首先,我們都知道cpu一個(gè)時(shí)間段其實(shí)只能運(yùn)行單個(gè)任務(wù),只不過(guò)在很短的時(shí)間內(nèi),CPU快速切換到不同的任務(wù)進(jìn)行執(zhí)行,造成一種多任務(wù)同時(shí)執(zhí)行的錯(cuò)覺(jué)

  而在CPU切換到其他任務(wù)執(zhí)行之前,為了確保在切換任務(wù)之后還能夠繼續(xù)切換回原來(lái)的任務(wù)繼續(xù)執(zhí)行,并且看起來(lái)是一種連續(xù)的狀態(tài),就必須將任務(wù)的狀態(tài)保持起來(lái),以便恢復(fù)原始任務(wù)時(shí)能夠繼續(xù)之前的狀態(tài)執(zhí)行,狀態(tài)保存的位置位于CPU的寄存器和程序計(jì)數(shù)器(,PC)

  簡(jiǎn)單來(lái)說(shuō)寄存器是CPU內(nèi)置的容量小、但速度極快的內(nèi)存,用來(lái)保存程序的堆棧信息即數(shù)據(jù)段信息。程序計(jì)數(shù)器保存程序的下一條指令的位置即代碼段信息。

  所以,CPU上下文就是指CPU寄存器和程序計(jì)數(shù)器中保存的任務(wù)狀態(tài)信息;CPU上下文切換就是把前一個(gè)任務(wù)的CPU上下文保存起來(lái),然后加載下一個(gè)任務(wù)的上下文到這些寄存器和程序計(jì)數(shù)器,再跳轉(zhuǎn)到程序計(jì)數(shù)器所指示的位置運(yùn)行程序。

  python程序默認(rèn)都是執(zhí)行單任務(wù)的進(jìn)程,也就是只有一個(gè)線程。如果我們要同時(shí)執(zhí)行多個(gè)任務(wù)怎么辦?

  有兩種解決方案:

  一種是啟動(dòng)多個(gè)進(jìn)程,每個(gè)進(jìn)程雖然只有一個(gè)線程,但多個(gè)進(jìn)程可以一塊執(zhí)行多個(gè)任務(wù)。

  還有一種方法是啟動(dòng)一個(gè)進(jìn)程,在一個(gè)進(jìn)程內(nèi)啟動(dòng)多個(gè)線程,這樣,多個(gè)線程也可以一塊執(zhí)行多個(gè)任務(wù)。

  當(dāng)然還有第三種方法,就是啟動(dòng)多個(gè)進(jìn)程,每個(gè)進(jìn)程再啟動(dòng)多個(gè)線程,這樣同時(shí)執(zhí)行的任務(wù)就更多了,當(dāng)然這種模型更復(fù)雜,實(shí)際很少采用。

  Python中的多進(jìn)程

  在Unix/Linux系統(tǒng)中,提供了一個(gè)fork()函數(shù)調(diào)用,相較于普通函數(shù)調(diào)用一次,返回一次的機(jī)制,fork()調(diào)用一次,返回兩次,具體表現(xiàn)為操作系統(tǒng)自動(dòng)把當(dāng)前進(jìn)程(稱為父進(jìn)程)復(fù)制了一份(稱為子進(jìn)程),然后分別在父進(jìn)程和子進(jìn)程內(nèi)返回。

  子進(jìn)程永遠(yuǎn)返回0,而父進(jìn)程返回子進(jìn)程的ID,這樣一個(gè)父進(jìn)程可以輕松fork出很多子進(jìn)程。且父進(jìn)程會(huì)記下每個(gè)子進(jìn)程的ID,而子進(jìn)程只需要調(diào)用getppid()就可以拿到父進(jìn)程的ID。

  python的os模塊封裝了fork調(diào)用方法以實(shí)現(xiàn)在python程序中創(chuàng)建子進(jìn)程,下面具體看兩個(gè)例子:

  [root@test-yw-01 opt]# cat test.py

  import os

  print('Process ({}) start...'.format(os.getpid()))

  pid = os.fork()

  print(pid)

  [root@test-yw-01 opt]# python3 test.py

  Process (26620) start...

  26621

  0

  [root@test-yunwei-01 opt]# cat process.py

  import os

  print('Process ({}) start...'.format(os.getpid()))

  pid = os.fork()

  if pid == 0:

  print('The child process is {} and parent process is{}'.format(os.getpid(),os.getppid()))

  else:

  print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

  [root@test-yunwei-01 opt]# pyhton3 process.py

  Process (25863) start...

  I (25863) just created a child process (25864)

  The child process is 25864 and parent process is 25863

  通過(guò)fork調(diào)用這種方法,一個(gè)進(jìn)程在接到新任務(wù)時(shí)就可以復(fù)制出一個(gè)子進(jìn)程來(lái)處理新任務(wù),例如nginx就是由父進(jìn)程(master process)監(jiān)聽(tīng)端口,再fork出子進(jìn)程(work process)來(lái)處理新的http請(qǐng)求。

  注意:

  Windows沒(méi)有fork調(diào)用,所以在window pycharm上運(yùn)行以上代碼無(wú)法實(shí)現(xiàn)以上效果。

  multiprocessing模塊

  雖然Windows沒(méi)有fork調(diào)用,但是可以憑借multiprocessing該多進(jìn)程模塊所提供的Process類來(lái)實(shí)現(xiàn)。

  下面看一例子:

  首先模擬一個(gè)使用單進(jìn)程的下載任務(wù),并打印出進(jìn)程號(hào)

  1)單進(jìn)程執(zhí)行:

  import os

  from random import randint

  import time

  def download(filename):

  print("進(jìn)程號(hào)是:%s"%os.getpid())

  downloadtime = 3

  print('現(xiàn)在開(kāi)始下載:{}'.format(filename))

  time.sleep(downloadtime)

  def runtask():

  start_time = time.time()

  download('水滸傳')

  download('西游記')

  stop_time = time.time()

  print('下載耗時(shí):{}'.format(stop_time - start_time))

  if __name__ == '__main__':

  runtask()

  得出結(jié)果

  接著通過(guò)調(diào)用Process模擬開(kāi)啟兩個(gè)子進(jìn)程:

  import time

  from os import getpid

  from multiprocessing import Process

  def download(filename):

  print("進(jìn)程號(hào)是:%s"%getpid())

  downloadtime = 3

  print('現(xiàn)在開(kāi)始下載:{}'.format(filename))

  time.sleep(downloadtime)

  def runtask():

  start_time = time.time()

  task1 = Process(target=download,args=('西游記',))

  task1.start() #調(diào)用start()開(kāi)始執(zhí)行

  task2 = Process(target=download,args=('水滸傳',))

  task2.start()

  task1.join() # join()方法可以等待子進(jìn)程結(jié)束后再繼續(xù)往下運(yùn)行,通常用于進(jìn)程間的同步

  task2.join()

  stop_time = time.time()

  print('下載耗時(shí):{}'.format(stop_time - start_time))

  if __name__ == '__main__':

  runtask()

  連接池Pool

  可以用進(jìn)程池Pool批量創(chuàng)建子進(jìn)程的方式來(lái)創(chuàng)建大量工作子進(jìn)程

  import os

  from random import randint

  from multiprocessing import Process,Pool

  import time

  def download(taskname):

  print("進(jìn)程號(hào)是:%s"%os.getpid())

  downloadtime = randint(1,3)

  print('現(xiàn)在開(kāi)始下載:{}'.format(taskname))

  time.sleep(downloadtime)

  def runtask():

  start_time = time.time()

  pool = Pool(4) #定義進(jìn)程連接池可用連接數(shù)量

  for task in range(5):

  pool.apply_async(download,args=(task,))

  pool.close()

  pool.join()

  stop_time = time.time()

  print('完成下載,下載耗時(shí):{}'.format(stop_time - start_time))

  if __name__ == '__main__':

  runtask()

  需要注意的點(diǎn):

  對(duì)pool對(duì)象調(diào)用join()方法會(huì)等待所有子進(jìn)程執(zhí)行完畢,調(diào)用join()之前必須先調(diào)用close(),調(diào)用close()之后就不能繼續(xù)添加新的進(jìn)程

  pool的默認(rèn)大小是主機(jī)CPU的核數(shù),所以這里設(shè)置成4個(gè)進(jìn)程,這樣就避免了cpu關(guān)于進(jìn)程間切換帶來(lái)的額外資源消耗,提高了任務(wù)的執(zhí)行效率

  進(jìn)程間通信

  from multiprocessing.Queue import Queue

  相較于普通Queue普通的隊(duì)列的先進(jìn)先出模式,get方法會(huì)阻塞請(qǐng)求,直到有數(shù)據(jù)get出來(lái)為止。這個(gè)是多進(jìn)程并發(fā)的Queue隊(duì)列,用于解決多進(jìn)程間的通信問(wèn)題。

  from multiprocessing import Process, Queue

  import os, time, random

  datas = []

  def write_data(args):

  print('Process to write: %s' % os.getpid())

  for v in "helloword":

  datas.append(v)

  print("write {} to queue".format(v))

  args.put(v)

  time.sleep(random.random())

  print(datas)

  def read_data(args):

  print('Process to read: %s' % os.getpid())

  while True:

  value = args.get(True)

  print("read {} from queue".format(value))

  if __name__ == '__main__':

  queue = Queue()

  write = Process(target=write_data,args=(queue,))

  read = Process(target=read_data,args=(queue,))

  write.start()

  read.start()

  write.join()

  read.terminate()

  進(jìn)程池中使用隊(duì)列

  由于隊(duì)列對(duì)象不能在父進(jìn)程與子進(jìn)程間通信,所以需要使用Manager().Queue()才能實(shí)現(xiàn)隊(duì)列中各子進(jìn)程間進(jìn)行通信

  from multiprocessing import Manager

  if __name__=='__main__':

  manager = multiprocessing.Manager()

  # 父進(jìn)程創(chuàng)建Queue,并傳給各個(gè)子進(jìn)程:

  queue = manager.Queue()

  pool = Pool()

  write = Process(target=write_data,args=(queue,))

  read = Process(target=read_data,args=(queue,))

  write.start()

  read.start()

  write.join()

  read.terminate()

  如果是用進(jìn)程池,就需要使用Manager().Queue()隊(duì)列才能實(shí)現(xiàn)在各子進(jìn)程間進(jìn)行通信

  參考文檔: https://blog.csdn.net/qq_32446743/article/details/79785684

  https://blog.csdn.net/u013713010/article/details/53325438

  Python中的多線程

  相較于資源分配的基本單位進(jìn)程,線程是任務(wù)運(yùn)行調(diào)度的基本單位,且由于每一個(gè)進(jìn)程擁有自己獨(dú)立的內(nèi)存空間,而線程共享所屬進(jìn)程的內(nèi)存空間,所以在涉及多任務(wù)執(zhí)行時(shí),線程的上下文切換比進(jìn)程少了操作系統(tǒng)內(nèi)核將虛擬內(nèi)存資源即寄存器中的內(nèi)容切換出這一步驟,也就大大提升了多任務(wù)執(zhí)行的效率。

  首先需要明確幾個(gè)概念:

  1.當(dāng)一個(gè)進(jìn)程啟動(dòng)之后,會(huì)默認(rèn)產(chǎn)生一個(gè)主線程,因?yàn)榫€程是程序執(zhí)行流的最小單元,當(dāng)設(shè)置多線程時(shí),主線程會(huì)創(chuàng)建多個(gè)子線程,在python中,默認(rèn)情況下(其實(shí)就是setDaemon(False)),主線程執(zhí)行完自己的任務(wù)以后,就退出了,此時(shí)子線程會(huì)繼續(xù)執(zhí)行自己的任務(wù),直到自己的任務(wù)結(jié)束

  python的提供了關(guān)于多線程的threading模塊,和多進(jìn)程的啟動(dòng)類似,就是把函數(shù)傳入并創(chuàng)建Thread實(shí)例,然后調(diào)用start()開(kāi)始執(zhí)行:

  import os

  import random

  from threading import Thread

  import threading

  import time

  def mysql_dump():

  print('開(kāi)始執(zhí)行線程{}'.format(threading.current_thread().name)) #返回當(dāng)前線程實(shí)例名稱

  dumptime = random.randint(1,3)

  time.sleep(dumptime) #利用time.sleep()方法模擬備份數(shù)據(jù)庫(kù)所花費(fèi)時(shí)間

  class Mutil_thread(Thread):

  def runtask(slef):

  thread_list = []

  print('當(dāng)前線程的名字是: ', threading.current_thread().name)

  start_time = time.time()

  for t in range(5):

  task = Mutil_thread(target=slef.tasks)

  thread_list.append(task)

  for i in thread_list:

  # i.setDaemon(False)

  i.start()

  i.join()#join()所完成的工作就是線程同步,即主線程任務(wù)結(jié)束之后,進(jìn)入阻塞狀態(tài),一直等待其他的子線程執(zhí)行結(jié)束之后,主線程在終止

  stop_time = time.time()

  print('主線程結(jié)束!', threading.current_thread().name)

  print('下載耗時(shí):{}'.format(stop_time - start_time))

  if __name__ == '__main__':

  run = Mutil_thread()

  run.tasks = mysql_dump

  run.runtask()

  執(zhí)行結(jié)果:

  進(jìn)程默認(rèn)就會(huì)啟動(dòng)一個(gè)線程,我們把該線程稱為主線程,實(shí)例的名為MainThread,主線程又可以啟動(dòng)新的線程,線程命名依次為Thread-1,Thread-2…

  LOCK

  多線程不同于進(jìn)程,在多進(jìn)程中,例如針對(duì)同一個(gè)變量,各自有一份拷貝存在于每個(gè)進(jìn)程中,資源相互隔離,互不影響

  但在進(jìn)程中,線程間可以共享進(jìn)程像系統(tǒng)申請(qǐng)的內(nèi)存空間,雖然實(shí)現(xiàn)多個(gè)線程間的通信相對(duì)簡(jiǎn)單,但是當(dāng)同一個(gè)資源(臨界資源)被多個(gè)線程競(jìng)爭(zhēng)使用時(shí),例如線程共享進(jìn)程的變量,其就有可能被任何一個(gè)線程修改,所以對(duì)這種臨界資源的訪問(wèn)需要加上保護(hù),否則資源會(huì)處于“混亂”的狀態(tài)。

  import time

  from threading import Thread,Lock

  class Account(object): # 假定這是一個(gè)銀行賬戶

  def __init__(self):

  self.balance = 0 #初始余額為0元

  def count(self,money):

  new_balance = self.balance + money

  time.sleep(0.01) # 模擬每次存款需要花費(fèi)的時(shí)間

  self.balance = new_balance #存完之后更新賬戶余額

  @property

  def get_count(self):

  return(self.balance)

  class Addmoney(Thread): #模擬存款業(yè)務(wù),直接繼承Thread

  def __init__(self,action,money):

  super().__init__() #在繼承Thread類的基礎(chǔ)上,再新增action及money屬性,便于main()的直接調(diào)用

  self.action = action

  self.money = money

  def run(self):

  self.action.count(self.money)

  def main():

  action = Account()

  threads = []

  for i in range(1000): #開(kāi)啟1000個(gè)線程同時(shí)向賬戶存款

  t = Addmoney(action,1) #每次只存入一元

  threads.append(t)

  t.start()

  for task in threads:

  task.join()

  print('賬戶余額為: ¥%s元'%action.get_count)

  main()

  查看執(zhí)行結(jié)果: 鄭州哪個(gè)婦科醫(yī)院好 http://www.sptdfk.com/

python多進(jìn)程和多線程的實(shí)際用法

  運(yùn)行上面的程序,1000線程分別向賬戶中轉(zhuǎn)入1元錢,結(jié)果小于100元。之所以出現(xiàn)這種情況是因?yàn)槲覀儧](méi)有對(duì)balance余額該線程共享的變量加以保護(hù),當(dāng)多個(gè)線程同時(shí)向賬戶中存錢時(shí),會(huì)一起執(zhí)行到new_balance = self.balance + money這行代碼,多個(gè)線程得到的賬戶余額都是初始狀態(tài)下的0,所以都是0上面做了+1的操作,因此得到了錯(cuò)誤的結(jié)果。

  如果我們要確保balance計(jì)算正確,就要給Account().count()上一把鎖,當(dāng)某個(gè)線程開(kāi)始執(zhí)行Account().count()時(shí),該線程因?yàn)楂@得了鎖,因此其他線程不能同時(shí)執(zhí)行,只能等待鎖被釋放后,獲得該鎖以后才能更改改。由于鎖只有一個(gè),無(wú)論多少線程,同一時(shí)刻最多只有一個(gè)線程持有該鎖,所以,不會(huì)造成修改的沖突。創(chuàng)建一個(gè)鎖就是通過(guò)threading.Lock()來(lái)實(shí)現(xiàn):

  import time

  from threading import Thread,Lock

  import time

  from threading import Thread,Lock

  class Account(object): # 假定這是一個(gè)銀行賬戶

  def __init__(self):

  self.balance = 0 #初始余額為0元

  self.lock = Lock()

  def count(self,money):

  self.lock.acquire()

  try:

  new_balance = self.balance + money

  time.sleep(0.01) # 模擬每次存款需要花費(fèi)的時(shí)間

  self.balance = new_balance #存完之后更新賬戶余額

  finally: #在finally中執(zhí)行釋放鎖的操作保證正常異常鎖都能釋放

  self.lock.release()

  def get_count(self):

  return(self.balance)

  class Addmoney(Thread): #模擬存款業(yè)務(wù)

  def __init__(self,action,money):

  super().__init__()

  self.action = action

  self.money = money

  self.lock = Lock()

  def run(self):

  self.action.count(self.money)

  def main():

  action = Account()

  threads = []

  for i in range(1000): #開(kāi)啟100000個(gè)線程同時(shí)向賬戶存款

  t = Addmoney(action,1) #每次只存入一元

  threads.append(t)

  t.start()

  for task in threads:

  task.join()

  print('賬戶余額為: ¥%s元'%action.get_count())

  main()

  執(zhí)行結(jié)果:

python多進(jìn)程和多線程的實(shí)際用法

感謝各位的閱讀,以上就是“python多進(jìn)程和多線程的實(shí)際用法”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)python多進(jìn)程和多線程的實(shí)際用法這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI