您好,登錄后才能下訂單哦!
這篇文章主要講解了“python多進(jìn)程和多線程的實(shí)際用法”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“python多進(jìn)程和多線程的實(shí)際用法”吧!
寫(xiě)在前面
總所周知,unix/linux 為多任務(wù)操作系統(tǒng),,即可以支持遠(yuǎn)大于CPU數(shù)量的任務(wù)同時(shí)運(yùn)行
理解多任務(wù)就需要知道操作系統(tǒng)的CPU上下文:
首先,我們都知道cpu一個(gè)時(shí)間段其實(shí)只能運(yùn)行單個(gè)任務(wù),只不過(guò)在很短的時(shí)間內(nèi),CPU快速切換到不同的任務(wù)進(jìn)行執(zhí)行,造成一種多任務(wù)同時(shí)執(zhí)行的錯(cuò)覺(jué)
而在CPU切換到其他任務(wù)執(zhí)行之前,為了確保在切換任務(wù)之后還能夠繼續(xù)切換回原來(lái)的任務(wù)繼續(xù)執(zhí)行,并且看起來(lái)是一種連續(xù)的狀態(tài),就必須將任務(wù)的狀態(tài)保持起來(lái),以便恢復(fù)原始任務(wù)時(shí)能夠繼續(xù)之前的狀態(tài)執(zhí)行,狀態(tài)保存的位置位于CPU的寄存器和程序計(jì)數(shù)器(,PC)
簡(jiǎn)單來(lái)說(shuō)寄存器是CPU內(nèi)置的容量小、但速度極快的內(nèi)存,用來(lái)保存程序的堆棧信息即數(shù)據(jù)段信息。程序計(jì)數(shù)器保存程序的下一條指令的位置即代碼段信息。
所以,CPU上下文就是指CPU寄存器和程序計(jì)數(shù)器中保存的任務(wù)狀態(tài)信息;CPU上下文切換就是把前一個(gè)任務(wù)的CPU上下文保存起來(lái),然后加載下一個(gè)任務(wù)的上下文到這些寄存器和程序計(jì)數(shù)器,再跳轉(zhuǎn)到程序計(jì)數(shù)器所指示的位置運(yùn)行程序。
python程序默認(rèn)都是執(zhí)行單任務(wù)的進(jìn)程,也就是只有一個(gè)線程。如果我們要同時(shí)執(zhí)行多個(gè)任務(wù)怎么辦?
有兩種解決方案:
一種是啟動(dòng)多個(gè)進(jìn)程,每個(gè)進(jìn)程雖然只有一個(gè)線程,但多個(gè)進(jìn)程可以一塊執(zhí)行多個(gè)任務(wù)。
還有一種方法是啟動(dòng)一個(gè)進(jìn)程,在一個(gè)進(jìn)程內(nèi)啟動(dòng)多個(gè)線程,這樣,多個(gè)線程也可以一塊執(zhí)行多個(gè)任務(wù)。
當(dāng)然還有第三種方法,就是啟動(dòng)多個(gè)進(jìn)程,每個(gè)進(jìn)程再啟動(dòng)多個(gè)線程,這樣同時(shí)執(zhí)行的任務(wù)就更多了,當(dāng)然這種模型更復(fù)雜,實(shí)際很少采用。
Python中的多進(jìn)程
在Unix/Linux系統(tǒng)中,提供了一個(gè)fork()函數(shù)調(diào)用,相較于普通函數(shù)調(diào)用一次,返回一次的機(jī)制,fork()調(diào)用一次,返回兩次,具體表現(xiàn)為操作系統(tǒng)自動(dòng)把當(dāng)前進(jìn)程(稱為父進(jìn)程)復(fù)制了一份(稱為子進(jìn)程),然后分別在父進(jìn)程和子進(jìn)程內(nèi)返回。
子進(jìn)程永遠(yuǎn)返回0,而父進(jìn)程返回子進(jìn)程的ID,這樣一個(gè)父進(jìn)程可以輕松fork出很多子進(jìn)程。且父進(jìn)程會(huì)記下每個(gè)子進(jìn)程的ID,而子進(jìn)程只需要調(diào)用getppid()就可以拿到父進(jìn)程的ID。
python的os模塊封裝了fork調(diào)用方法以實(shí)現(xiàn)在python程序中創(chuàng)建子進(jìn)程,下面具體看兩個(gè)例子:
[root@test-yw-01 opt]# cat test.py
import os
print('Process ({}) start...'.format(os.getpid()))
pid = os.fork()
print(pid)
[root@test-yw-01 opt]# python3 test.py
Process (26620) start...
26621
0
[root@test-yunwei-01 opt]# cat process.py
import os
print('Process ({}) start...'.format(os.getpid()))
pid = os.fork()
if pid == 0:
print('The child process is {} and parent process is{}'.format(os.getpid(),os.getppid()))
else:
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
[root@test-yunwei-01 opt]# pyhton3 process.py
Process (25863) start...
I (25863) just created a child process (25864)
The child process is 25864 and parent process is 25863
通過(guò)fork調(diào)用這種方法,一個(gè)進(jìn)程在接到新任務(wù)時(shí)就可以復(fù)制出一個(gè)子進(jìn)程來(lái)處理新任務(wù),例如nginx就是由父進(jìn)程(master process)監(jiān)聽(tīng)端口,再fork出子進(jìn)程(work process)來(lái)處理新的http請(qǐng)求。
注意:
Windows沒(méi)有fork調(diào)用,所以在window pycharm上運(yùn)行以上代碼無(wú)法實(shí)現(xiàn)以上效果。
multiprocessing模塊
雖然Windows沒(méi)有fork調(diào)用,但是可以憑借multiprocessing該多進(jìn)程模塊所提供的Process類來(lái)實(shí)現(xiàn)。
下面看一例子:
首先模擬一個(gè)使用單進(jìn)程的下載任務(wù),并打印出進(jìn)程號(hào)
1)單進(jìn)程執(zhí)行:
import os
from random import randint
import time
def download(filename):
print("進(jìn)程號(hào)是:%s"%os.getpid())
downloadtime = 3
print('現(xiàn)在開(kāi)始下載:{}'.format(filename))
time.sleep(downloadtime)
def runtask():
start_time = time.time()
download('水滸傳')
download('西游記')
stop_time = time.time()
print('下載耗時(shí):{}'.format(stop_time - start_time))
if __name__ == '__main__':
runtask()
得出結(jié)果
接著通過(guò)調(diào)用Process模擬開(kāi)啟兩個(gè)子進(jìn)程:
import time
from os import getpid
from multiprocessing import Process
def download(filename):
print("進(jìn)程號(hào)是:%s"%getpid())
downloadtime = 3
print('現(xiàn)在開(kāi)始下載:{}'.format(filename))
time.sleep(downloadtime)
def runtask():
start_time = time.time()
task1 = Process(target=download,args=('西游記',))
task1.start() #調(diào)用start()開(kāi)始執(zhí)行
task2 = Process(target=download,args=('水滸傳',))
task2.start()
task1.join() # join()方法可以等待子進(jìn)程結(jié)束后再繼續(xù)往下運(yùn)行,通常用于進(jìn)程間的同步
task2.join()
stop_time = time.time()
print('下載耗時(shí):{}'.format(stop_time - start_time))
if __name__ == '__main__':
runtask()
連接池Pool
可以用進(jìn)程池Pool批量創(chuàng)建子進(jìn)程的方式來(lái)創(chuàng)建大量工作子進(jìn)程
import os
from random import randint
from multiprocessing import Process,Pool
import time
def download(taskname):
print("進(jìn)程號(hào)是:%s"%os.getpid())
downloadtime = randint(1,3)
print('現(xiàn)在開(kāi)始下載:{}'.format(taskname))
time.sleep(downloadtime)
def runtask():
start_time = time.time()
pool = Pool(4) #定義進(jìn)程連接池可用連接數(shù)量
for task in range(5):
pool.apply_async(download,args=(task,))
pool.close()
pool.join()
stop_time = time.time()
print('完成下載,下載耗時(shí):{}'.format(stop_time - start_time))
if __name__ == '__main__':
runtask()
需要注意的點(diǎn):
對(duì)pool對(duì)象調(diào)用join()方法會(huì)等待所有子進(jìn)程執(zhí)行完畢,調(diào)用join()之前必須先調(diào)用close(),調(diào)用close()之后就不能繼續(xù)添加新的進(jìn)程
pool的默認(rèn)大小是主機(jī)CPU的核數(shù),所以這里設(shè)置成4個(gè)進(jìn)程,這樣就避免了cpu關(guān)于進(jìn)程間切換帶來(lái)的額外資源消耗,提高了任務(wù)的執(zhí)行效率
進(jìn)程間通信
from multiprocessing.Queue import Queue
相較于普通Queue普通的隊(duì)列的先進(jìn)先出模式,get方法會(huì)阻塞請(qǐng)求,直到有數(shù)據(jù)get出來(lái)為止。這個(gè)是多進(jìn)程并發(fā)的Queue隊(duì)列,用于解決多進(jìn)程間的通信問(wèn)題。
from multiprocessing import Process, Queue
import os, time, random
datas = []
def write_data(args):
print('Process to write: %s' % os.getpid())
for v in "helloword":
datas.append(v)
print("write {} to queue".format(v))
args.put(v)
time.sleep(random.random())
print(datas)
def read_data(args):
print('Process to read: %s' % os.getpid())
while True:
value = args.get(True)
print("read {} from queue".format(value))
if __name__ == '__main__':
queue = Queue()
write = Process(target=write_data,args=(queue,))
read = Process(target=read_data,args=(queue,))
write.start()
read.start()
write.join()
read.terminate()
進(jìn)程池中使用隊(duì)列
由于隊(duì)列對(duì)象不能在父進(jìn)程與子進(jìn)程間通信,所以需要使用Manager().Queue()才能實(shí)現(xiàn)隊(duì)列中各子進(jìn)程間進(jìn)行通信
from multiprocessing import Manager
if __name__=='__main__':
manager = multiprocessing.Manager()
# 父進(jìn)程創(chuàng)建Queue,并傳給各個(gè)子進(jìn)程:
queue = manager.Queue()
pool = Pool()
write = Process(target=write_data,args=(queue,))
read = Process(target=read_data,args=(queue,))
write.start()
read.start()
write.join()
read.terminate()
如果是用進(jìn)程池,就需要使用Manager().Queue()隊(duì)列才能實(shí)現(xiàn)在各子進(jìn)程間進(jìn)行通信
參考文檔: https://blog.csdn.net/qq_32446743/article/details/79785684
https://blog.csdn.net/u013713010/article/details/53325438
Python中的多線程
相較于資源分配的基本單位進(jìn)程,線程是任務(wù)運(yùn)行調(diào)度的基本單位,且由于每一個(gè)進(jìn)程擁有自己獨(dú)立的內(nèi)存空間,而線程共享所屬進(jìn)程的內(nèi)存空間,所以在涉及多任務(wù)執(zhí)行時(shí),線程的上下文切換比進(jìn)程少了操作系統(tǒng)內(nèi)核將虛擬內(nèi)存資源即寄存器中的內(nèi)容切換出這一步驟,也就大大提升了多任務(wù)執(zhí)行的效率。
首先需要明確幾個(gè)概念:
1.當(dāng)一個(gè)進(jìn)程啟動(dòng)之后,會(huì)默認(rèn)產(chǎn)生一個(gè)主線程,因?yàn)榫€程是程序執(zhí)行流的最小單元,當(dāng)設(shè)置多線程時(shí),主線程會(huì)創(chuàng)建多個(gè)子線程,在python中,默認(rèn)情況下(其實(shí)就是setDaemon(False)),主線程執(zhí)行完自己的任務(wù)以后,就退出了,此時(shí)子線程會(huì)繼續(xù)執(zhí)行自己的任務(wù),直到自己的任務(wù)結(jié)束
python的提供了關(guān)于多線程的threading模塊,和多進(jìn)程的啟動(dòng)類似,就是把函數(shù)傳入并創(chuàng)建Thread實(shí)例,然后調(diào)用start()開(kāi)始執(zhí)行:
import os
import random
from threading import Thread
import threading
import time
def mysql_dump():
print('開(kāi)始執(zhí)行線程{}'.format(threading.current_thread().name)) #返回當(dāng)前線程實(shí)例名稱
dumptime = random.randint(1,3)
time.sleep(dumptime) #利用time.sleep()方法模擬備份數(shù)據(jù)庫(kù)所花費(fèi)時(shí)間
class Mutil_thread(Thread):
def runtask(slef):
thread_list = []
print('當(dāng)前線程的名字是: ', threading.current_thread().name)
start_time = time.time()
for t in range(5):
task = Mutil_thread(target=slef.tasks)
thread_list.append(task)
for i in thread_list:
# i.setDaemon(False)
i.start()
i.join()#join()所完成的工作就是線程同步,即主線程任務(wù)結(jié)束之后,進(jìn)入阻塞狀態(tài),一直等待其他的子線程執(zhí)行結(jié)束之后,主線程在終止
stop_time = time.time()
print('主線程結(jié)束!', threading.current_thread().name)
print('下載耗時(shí):{}'.format(stop_time - start_time))
if __name__ == '__main__':
run = Mutil_thread()
run.tasks = mysql_dump
run.runtask()
執(zhí)行結(jié)果:
進(jìn)程默認(rèn)就會(huì)啟動(dòng)一個(gè)線程,我們把該線程稱為主線程,實(shí)例的名為MainThread,主線程又可以啟動(dòng)新的線程,線程命名依次為Thread-1,Thread-2…
LOCK
多線程不同于進(jìn)程,在多進(jìn)程中,例如針對(duì)同一個(gè)變量,各自有一份拷貝存在于每個(gè)進(jìn)程中,資源相互隔離,互不影響
但在進(jìn)程中,線程間可以共享進(jìn)程像系統(tǒng)申請(qǐng)的內(nèi)存空間,雖然實(shí)現(xiàn)多個(gè)線程間的通信相對(duì)簡(jiǎn)單,但是當(dāng)同一個(gè)資源(臨界資源)被多個(gè)線程競(jìng)爭(zhēng)使用時(shí),例如線程共享進(jìn)程的變量,其就有可能被任何一個(gè)線程修改,所以對(duì)這種臨界資源的訪問(wèn)需要加上保護(hù),否則資源會(huì)處于“混亂”的狀態(tài)。
import time
from threading import Thread,Lock
class Account(object): # 假定這是一個(gè)銀行賬戶
def __init__(self):
self.balance = 0 #初始余額為0元
def count(self,money):
new_balance = self.balance + money
time.sleep(0.01) # 模擬每次存款需要花費(fèi)的時(shí)間
self.balance = new_balance #存完之后更新賬戶余額
@property
def get_count(self):
return(self.balance)
class Addmoney(Thread): #模擬存款業(yè)務(wù),直接繼承Thread
def __init__(self,action,money):
super().__init__() #在繼承Thread類的基礎(chǔ)上,再新增action及money屬性,便于main()的直接調(diào)用
self.action = action
self.money = money
def run(self):
self.action.count(self.money)
def main():
action = Account()
threads = []
for i in range(1000): #開(kāi)啟1000個(gè)線程同時(shí)向賬戶存款
t = Addmoney(action,1) #每次只存入一元
threads.append(t)
t.start()
for task in threads:
task.join()
print('賬戶余額為: ¥%s元'%action.get_count)
main()
查看執(zhí)行結(jié)果: 鄭州哪個(gè)婦科醫(yī)院好 http://www.sptdfk.com/
運(yùn)行上面的程序,1000線程分別向賬戶中轉(zhuǎn)入1元錢,結(jié)果小于100元。之所以出現(xiàn)這種情況是因?yàn)槲覀儧](méi)有對(duì)balance余額該線程共享的變量加以保護(hù),當(dāng)多個(gè)線程同時(shí)向賬戶中存錢時(shí),會(huì)一起執(zhí)行到new_balance = self.balance + money這行代碼,多個(gè)線程得到的賬戶余額都是初始狀態(tài)下的0,所以都是0上面做了+1的操作,因此得到了錯(cuò)誤的結(jié)果。
如果我們要確保balance計(jì)算正確,就要給Account().count()上一把鎖,當(dāng)某個(gè)線程開(kāi)始執(zhí)行Account().count()時(shí),該線程因?yàn)楂@得了鎖,因此其他線程不能同時(shí)執(zhí)行,只能等待鎖被釋放后,獲得該鎖以后才能更改改。由于鎖只有一個(gè),無(wú)論多少線程,同一時(shí)刻最多只有一個(gè)線程持有該鎖,所以,不會(huì)造成修改的沖突。創(chuàng)建一個(gè)鎖就是通過(guò)threading.Lock()來(lái)實(shí)現(xiàn):
import time
from threading import Thread,Lock
import time
from threading import Thread,Lock
class Account(object): # 假定這是一個(gè)銀行賬戶
def __init__(self):
self.balance = 0 #初始余額為0元
self.lock = Lock()
def count(self,money):
self.lock.acquire()
try:
new_balance = self.balance + money
time.sleep(0.01) # 模擬每次存款需要花費(fèi)的時(shí)間
self.balance = new_balance #存完之后更新賬戶余額
finally: #在finally中執(zhí)行釋放鎖的操作保證正常異常鎖都能釋放
self.lock.release()
def get_count(self):
return(self.balance)
class Addmoney(Thread): #模擬存款業(yè)務(wù)
def __init__(self,action,money):
super().__init__()
self.action = action
self.money = money
self.lock = Lock()
def run(self):
self.action.count(self.money)
def main():
action = Account()
threads = []
for i in range(1000): #開(kāi)啟100000個(gè)線程同時(shí)向賬戶存款
t = Addmoney(action,1) #每次只存入一元
threads.append(t)
t.start()
for task in threads:
task.join()
print('賬戶余額為: ¥%s元'%action.get_count())
main()
執(zhí)行結(jié)果:
感謝各位的閱讀,以上就是“python多進(jìn)程和多線程的實(shí)際用法”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)python多進(jìn)程和多線程的實(shí)際用法這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。