您好,登錄后才能下訂單哦!
前言
在python中使用多進(jìn)程和多線程都能達(dá)到同時運(yùn)行多個任務(wù),和多進(jìn)程和多線程的選擇上,應(yīng)該優(yōu)先選擇多進(jìn)程的方式,因為多進(jìn)程更加穩(wěn)定,且對于進(jìn)程的操作管理也更加方便,但有一點(diǎn)是多進(jìn)程獨(dú)有的殺手锏,多進(jìn)程可以將進(jìn)程分步到多臺機(jī)器上跑,假如有很多個任務(wù),一臺機(jī)器即使開了多進(jìn)程或者多進(jìn)程跑起來還是要耗很多時間,那么這時就要想一下可否將任務(wù)分配到多臺機(jī)器上跑,這樣可以更快的完成任務(wù)。
在分步式進(jìn)程運(yùn)算中,進(jìn)程之前的通信還是依賴于Queue,但此時的隊列不能直接使用,需要使用multiprocessing.managers.BaseManager
進(jìn)行包裝,通過回調(diào)以后才能使用,既然是分步式的調(diào)用,那么應(yīng)該有一個服務(wù)端和一個客戶端,服務(wù)端通過網(wǎng)絡(luò)協(xié)議將隊列中的信息給各個客戶端進(jìn)行調(diào)用,客戶端也可以通過隊列將結(jié)果返回,然后服務(wù)端進(jìn)行結(jié)果的收集展示,流程如下
分步式流程
服務(wù)端將任務(wù)放到 task_queue 中,然后四個客戶端通過網(wǎng)絡(luò)端口從task_queue中獲取到任務(wù),然后進(jìn)行計算,再將結(jié)果放到result_queue中,最后服務(wù)端統(tǒng)一處理結(jié)果。整體的流程比較清晰,只是需要強(qiáng)調(diào),這里的隊列不能是原始的隊列,需要使用BaseManager 進(jìn)行包裝。
先看一下服務(wù)端的代碼
#coding:gbk import time, queue from multiprocessing.managers import BaseManager from multiprocessing import freeze_support # 任務(wù)個數(shù) task_number = 10 # 定義收發(fā)隊列 task_queue = queue.Queue(task_number) result_queue = queue.Queue(task_number) def gettask(): return task_queue def getresult(): return result_queue def test(): # windows下綁定調(diào)用接口不能使用lambda,所以只能先定義函數(shù)再綁定 BaseManager.register('get_task', callable=gettask) BaseManager.register('get_result', callable=getresult) # 綁定端口并設(shè)置驗證碼,windows下需要填寫ip地址,linux下不填默認(rèn)為本地 manager = BaseManager(address=('127.0.0.1', 5002), authkey=b'123') # 啟動 manager.start() try: # 通過網(wǎng)絡(luò)獲取任務(wù)隊列和結(jié)果隊列 task = manager.get_task() result = manager.get_result() # 添加任務(wù) for i in range(task_number): print('Put task %d...' % i) task.put(i) # 每秒檢測一次是否所有任務(wù)都被執(zhí)行完 while not result.full(): print(task.qsize()) time.sleep(1) for i in range(result.qsize()): ans = result.get() print('task %d is finish , runtime:%d s' % ans) except: print('Manager error') finally: manager.shutdown() if __name__ == '__main__': # windows下多進(jìn)程可能會炸,添加這句可以緩解 freeze_support() test()
這里重點(diǎn)說一下 BaseManager.register('get_task', callable=gettask)
這行代碼,它的意思是注冊一個get_task的操作,執(zhí)行的操作是gettask()
函數(shù),上面定義了gettask()
函數(shù),返回的是task_queue,這也是之前說的不能直接使用queue.Queue
,必須要使用通過BaseManager的register接口封裝過的的隊列,下面使用task = manager.get_task()
來獲取到這個隊列。
manager = BaseManager(address=('127.0.0.1', 5002), authkey=b'123')
這行代碼初始了一個manager,它綁定了本機(jī)的5002端口,并且在客戶端連接的時候需要一個密碼:123。
接下來看一下客戶端代碼。
#coding:gbk import time, sys, queue, random from multiprocessing.managers import BaseManager BaseManager.register('get_task') BaseManager.register('get_result') conn = BaseManager(address = ('127.0.0.1',5002), authkey = b'123') try: conn.connect() except: print('連接失敗') sys.exit() task = conn.get_task() result = conn.get_result() while not task.empty(): print(task.qsize()) n = task.get(timeout = 1) print('run task %d' % n) sleeptime = random.randint(0,3) time.sleep(sleeptime) rt = (n, sleeptime) result.put(rt) if __name__ == '__main__': pass;
這里主要看以下的代碼
BaseManager.register('get_task') BaseManager.register('get_result')
這兩個是注冊函數(shù),和之前的服務(wù)端所對應(yīng),之前服務(wù)端注冊了這兩個函數(shù),這里才能注冊使用,注意這里不能注冊服務(wù)端沒有注冊的函數(shù)
運(yùn)行一下,先運(yùn)行服務(wù)端,然后再啟兩個cmd運(yùn)行客戶端,也可以在局域網(wǎng)中的另外的機(jī)器上運(yùn)行,但是要修改服務(wù)端的ip地址
服務(wù)端的結(jié)果如下
Put task 0...
Put task 1...
Put task 2...
Put task 3...
Put task 4...
Put task 5...
Put task 6...
Put task 7...
Put task 8...
Put task 9...
task 0 is finish , runtime:3 s
task 1 is finish , runtime:0 s
task 2 is finish , runtime:2 s
task 4 is finish , runtime:1 s
task 3 is finish , runtime:3 s
task 6 is finish , runtime:1 s
task 7 is finish , runtime:0 s
task 5 is finish , runtime:3 s
task 8 is finish , runtime:2 s
task 9 is finish , runtime:3 s
兩個客戶端的結(jié)果分別如下
客戶端1
10
run task 0
9
run task 1
8
run task 2
6
run task 4
5
run task 5
1
run task 9
客戶端2
7
run task 3
4
run task 6
3
run task 7
2
run task 8
一起運(yùn)行的截圖如下
結(jié)果
由于隊列是線程安全的,所以這里不用加鎖,在客戶端中打印print(task.qsize()) 當(dāng)前的隊列大小,可以看到隊列的信息中同步到各個客戶端的。
最后還是要多說一句,分步式多進(jìn)程雖然可以把任務(wù)分散到不同的機(jī)器上運(yùn)行,可以處理多任務(wù),但是如果此時服務(wù)端掛掉的話,任務(wù)就全丟掉了,所以在生產(chǎn)環(huán)境下還是考慮使用消息中間件如kafka等。
總結(jié)
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,謝謝大家對億速云的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。