您好,登錄后才能下訂單哦!
1.在前一篇文章?python進(jìn)程Process與線程threading區(qū)別?中講到線程threading共享內(nèi)存地址,進(jìn)程與進(jìn)程Peocess之間相互獨(dú)立,互不影響(相當(dāng)于深拷貝);
2.在線程間通信的時(shí)候可以使用Queue模塊完成,進(jìn)程間通信也可以通過Queue完成,但是此Queue并非線程的Queue,進(jìn)程間通信Queue是將數(shù)據(jù) pickle 后傳給另一個(gè)進(jìn)程的 Queue,用于父進(jìn)程與子進(jìn)程之間的通信或同一父進(jìn)程的子進(jìn)程之間通信;
?
1 2 3 4 5 | #導(dǎo)入線程相關(guān)模塊 import threading import queue?? ? q = queue.Queue() |
?
1 2 3 4 5 | # 導(dǎo)入進(jìn)程相關(guān)模塊 from multiprocessing import Process from multiprocessing import Queue ? q = Queue() |
?
1 2 3 4 5 | # 導(dǎo)入進(jìn)程相關(guān)模塊 from multiprocessing import Process from multiprocessing import Pipe ? pipe = Pipe() |
?
?
python提供了多種進(jìn)程通信的方式,主要Queue和Pipe這兩種方式,Queue用于多個(gè)進(jìn)程間實(shí)現(xiàn)通信,Pipe用于兩個(gè)進(jìn)程的通信;
put():以插入數(shù)據(jù)到隊(duì)列中,他還有兩個(gè)可選參數(shù):blocked和timeout。詳情自行百度
get():從隊(duì)列讀取并且刪除一個(gè)元素。同樣,他還有兩個(gè)可選參數(shù):blocked和timeout。詳情自行百度
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | # !usr/bin/env python # -*- coding:utf-8 _*- """ @Author:何以解憂 @Blog(個(gè)人博客地址): shuopython.com @WeChat Official Account(微信公眾號(hào)):猿說python @Github:www.github.com ? @File:python_process_queue.py @Time:2019/12/21 21:25 ? @Motto:不積跬步無以至千里,不積小流無以成江海,程序人生的精彩需要堅(jiān)持不懈地積累! """ ? from multiprocessing import Process from multiprocessing import Queue import os,time,random ? #寫數(shù)據(jù)進(jìn)程執(zhí)行的代碼 def proc_write(q,urls): ????print ('Process is write....') ????for url in urls: ????????q.put(url) ????????print ('put %s to queue... ' %url) ????????time.sleep(random.random()) ? #讀數(shù)據(jù)進(jìn)程的代碼 def proc_read(q): ????print('Process is reading...') ????while True: ????????url = q.get(True) ????????print('Get %s from queue' %url) ? if __name__ == '__main__': ????#父進(jìn)程創(chuàng)建Queue,并傳給各個(gè)子進(jìn)程 ????q = Queue() ????proc_write1 = Process(target=proc_write,args=(q,['url_1','url_2','url_3'])) ????proc_write2 = Process(target=proc_write,args=(q,['url_4','url_5','url_6'])) ????proc_reader = Process(target=proc_read,args=(q,)) ????#啟動(dòng)子進(jìn)程,寫入 ????proc_write1.start() ????proc_write2.start() ? ????proc_reader.start() ????#等待proc_write1結(jié)束 ????proc_write1.join() ????proc_write2.join() ????#proc_raader進(jìn)程是死循環(huán),強(qiáng)制結(jié)束 ????proc_reader.terminate() ????print("mian") |
輸出結(jié)果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | Process is write.... put url_1 to queue... Process is write.... put url_4 to queue... Process is reading... Get url_1 from queue Get url_4 from queue put url_5 to queue... Get url_5 from queue put url_2 to queue... Get url_2 from queue put url_3 to queue... Get url_3 from queue put url_6 to queue... Get url_6 from queue mian |
?
Pipe常用于兩個(gè)進(jìn)程,兩個(gè)進(jìn)程分別位于管道的兩端 * Pipe方法返回(conn1,conn2)代表一個(gè)管道的兩個(gè)端,Pipe方法有duplex參數(shù),默認(rèn)為True,即全雙工模式,若為FALSE,conn1只負(fù)責(zé)接收信息,conn2負(fù)責(zé)發(fā)送,Pipe同樣也包含兩個(gè)方法:
send() : 發(fā)送信息;
recv() : 接收信息;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | from multiprocessing import Process from multiprocessing import Pipe import os,time,random #寫數(shù)據(jù)進(jìn)程執(zhí)行的代碼 def proc_send(pipe,urls): ????#print 'Process is write....' ????for url in urls: ? ????????print ('Process is send :%s' %url) ????????pipe.send(url) ????????time.sleep(random.random()) ? #讀數(shù)據(jù)進(jìn)程的代碼 def proc_recv(pipe): ????while True: ????????print('Process rev:%s' %pipe.recv()) ????????time.sleep(random.random()) ? if __name__ == '__main__': ????#父進(jìn)程創(chuàng)建pipe,并傳給各個(gè)子進(jìn)程 ????pipe = Pipe() ????p1 = Process(target=proc_send,args=(pipe[0],['url_'+str(i) for i in range(10) ])) ????p2 = Process(target=proc_recv,args=(pipe[1],)) ????#啟動(dòng)子進(jìn)程,寫入 ????p1.start() ????p2.start() ? ????p1.join() ????p2.terminate() ????print("mian") |
輸出結(jié)果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | Process is send :url_0 Process rev:url_0 Process is send :url_1 Process rev:url_1 Process is send :url_2 Process rev:url_2 Process is send :url_3 Process rev:url_3 Process is send :url_4 Process rev:url_4 Process is send :url_5 Process is send :url_6 Process is send :url_7 Process rev:url_5 Process is send :url_8 Process is send :url_9 Process rev:url_6 mian |
?
當(dāng)然我們也可以嘗試使用線程threading的Queue是否能完成線程間通信,示例代碼如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | from multiprocessing import Process # from multiprocessing import Queue???? # 進(jìn)程間通信Queue,兩者不要混淆 import queue????????????????????????????# 線程間通信queue.Queue,兩者不要混淆 import time ? def p_put(q,*args): ????q.put(args) ????print('Has put %s' % args) ? ? def p_get(q,*args): ????print('%s wait to get...' % args) ? ????print(q.get()) ????print('%s got it' % args) ? ? ? ? if __name__ == "__main__": ????q = queue.Queue() ????p1 = Process(target=p_put, args=(q,'p1', )) ????p2 = Process(target=p_get, args=(q,'p2', )) ????p1.start() ????p2.start() |
直接異常報(bào)錯(cuò):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | Traceback (most recent call last): ??File "E:/Project/python_project/untitled10/123.py", line 38, in <module> ????p1.start() ??File "G:\ProgramData\Anaconda3\lib\multiprocessing\process.py", line 105, in start ????self._popen = self._Popen(self) ??File "G:\ProgramData\Anaconda3\lib\multiprocessing\context.py", line 223, in _Popen ????return _default_context.get_context().Process._Popen(process_obj) ??File "G:\ProgramData\Anaconda3\lib\multiprocessing\context.py", line 322, in _Popen ????return Popen(process_obj) ??File "G:\ProgramData\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__ ????reduction.dump(process_obj, to_child) ??File "G:\ProgramData\Anaconda3\lib\multiprocessing\reduction.py", line 60, in dump ????ForkingPickler(file, protocol).dump(obj) TypeError: can't pickle _thread.lock objects |
?
?
?
?
1.python進(jìn)程Process模塊
2.python進(jìn)程Process與線程threading區(qū)別
3.python線程threading創(chuàng)建和參數(shù)傳遞
?
轉(zhuǎn)載請(qǐng)注明:猿說Python???python 進(jìn)程間通信Queue
?
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。