溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

python 進(jìn)程間通信Queue/Pipe(42)

發(fā)布時(shí)間:2020-09-02 15:30:37 來源:網(wǎng)絡(luò) 閱讀:370 作者:qq5d6f345f0205e 欄目:編程語言

一.前言

1.在前一篇文章?python進(jìn)程Process與線程threading區(qū)別?中講到線程threading共享內(nèi)存地址,進(jìn)程與進(jìn)程Peocess之間相互獨(dú)立,互不影響(相當(dāng)于深拷貝);

2.在線程間通信的時(shí)候可以使用Queue模塊完成,進(jìn)程間通信也可以通過Queue完成,但是此Queue并非線程的Queue,進(jìn)程間通信Queue是將數(shù)據(jù) pickle 后傳給另一個(gè)進(jìn)程的 Queue,用于父進(jìn)程與子進(jìn)程之間的通信或同一父進(jìn)程的子進(jìn)程之間通信;

?

使用Queue線程間通信:


1

2

3

4

5

#導(dǎo)入線程相關(guān)模塊

import threading

import queue??

?

q = queue.Queue()

?

使用Queue進(jìn)程間通信,適用于多個(gè)進(jìn)程之間通信:


1

2

3

4

5

# 導(dǎo)入進(jìn)程相關(guān)模塊

from multiprocessing import Process

from multiprocessing import Queue

?

q = Queue()

?

使用Pipe進(jìn)程間通信,適用于兩個(gè)進(jìn)程之間通信(一對(duì)一):


1

2

3

4

5

# 導(dǎo)入進(jìn)程相關(guān)模塊

from multiprocessing import Process

from multiprocessing import Pipe

?

pipe = Pipe()

?

?

二.python進(jìn)程間通信Queue/Pipe使用

python提供了多種進(jìn)程通信的方式,主要Queue和Pipe這兩種方式,Queue用于多個(gè)進(jìn)程間實(shí)現(xiàn)通信,Pipe用于兩個(gè)進(jìn)程的通信;

1.使用Queue進(jìn)程間通信,Queue包含兩個(gè)方法:

  • put():以插入數(shù)據(jù)到隊(duì)列中,他還有兩個(gè)可選參數(shù):blocked和timeout。詳情自行百度

  • get():從隊(duì)列讀取并且刪除一個(gè)元素。同樣,他還有兩個(gè)可選參數(shù):blocked和timeout。詳情自行百度


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

# !usr/bin/env python

# -*- coding:utf-8 _*-

"""

@Author:何以解憂

@Blog(個(gè)人博客地址): shuopython.com

@WeChat Official Account(微信公眾號(hào)):猿說python

@Github:www.github.com

?

@File:python_process_queue.py

@Time:2019/12/21 21:25

?

@Motto:不積跬步無以至千里,不積小流無以成江海,程序人生的精彩需要堅(jiān)持不懈地積累!

"""

?

from multiprocessing import Process

from multiprocessing import Queue

import os,time,random

?

#寫數(shù)據(jù)進(jìn)程執(zhí)行的代碼

def proc_write(q,urls):

????print ('Process is write....')

????for url in urls:

????????q.put(url)

????????print ('put %s to queue... ' %url)

????????time.sleep(random.random())

?

#讀數(shù)據(jù)進(jìn)程的代碼

def proc_read(q):

????print('Process is reading...')

????while True:

????????url = q.get(True)

????????print('Get %s from queue' %url)

?

if __name__ == '__main__':

????#父進(jìn)程創(chuàng)建Queue,并傳給各個(gè)子進(jìn)程

????q = Queue()

????proc_write1 = Process(target=proc_write,args=(q,['url_1','url_2','url_3']))

????proc_write2 = Process(target=proc_write,args=(q,['url_4','url_5','url_6']))

????proc_reader = Process(target=proc_read,args=(q,))

????#啟動(dòng)子進(jìn)程,寫入

????proc_write1.start()

????proc_write2.start()

?

????proc_reader.start()

????#等待proc_write1結(jié)束

????proc_write1.join()

????proc_write2.join()

????#proc_raader進(jìn)程是死循環(huán),強(qiáng)制結(jié)束

????proc_reader.terminate()

????print("mian")

輸出結(jié)果:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

Process is write....

put url_1 to queue...

Process is write....

put url_4 to queue...

Process is reading...

Get url_1 from queue

Get url_4 from queue

put url_5 to queue...

Get url_5 from queue

put url_2 to queue...

Get url_2 from queue

put url_3 to queue...

Get url_3 from queue

put url_6 to queue...

Get url_6 from queue

mian

?

2.使用Pipe進(jìn)程間通信

Pipe常用于兩個(gè)進(jìn)程,兩個(gè)進(jìn)程分別位于管道的兩端 * Pipe方法返回(conn1,conn2)代表一個(gè)管道的兩個(gè)端,Pipe方法有duplex參數(shù),默認(rèn)為True,即全雙工模式,若為FALSE,conn1只負(fù)責(zé)接收信息,conn2負(fù)責(zé)發(fā)送,Pipe同樣也包含兩個(gè)方法:

send() : 發(fā)送信息;

recv() : 接收信息;


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

from multiprocessing import Process

from multiprocessing import Pipe

import os,time,random

#寫數(shù)據(jù)進(jìn)程執(zhí)行的代碼

def proc_send(pipe,urls):

????#print 'Process is write....'

????for url in urls:

?

????????print ('Process is send :%s' %url)

????????pipe.send(url)

????????time.sleep(random.random())

?

#讀數(shù)據(jù)進(jìn)程的代碼

def proc_recv(pipe):

????while True:

????????print('Process rev:%s' %pipe.recv())

????????time.sleep(random.random())

?

if __name__ == '__main__':

????#父進(jìn)程創(chuàng)建pipe,并傳給各個(gè)子進(jìn)程

????pipe = Pipe()

????p1 = Process(target=proc_send,args=(pipe[0],['url_'+str(i) for i in range(10) ]))

????p2 = Process(target=proc_recv,args=(pipe[1],))

????#啟動(dòng)子進(jìn)程,寫入

????p1.start()

????p2.start()

?

????p1.join()

????p2.terminate()

????print("mian")

輸出結(jié)果:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

Process is send :url_0

Process rev:url_0

Process is send :url_1

Process rev:url_1

Process is send :url_2

Process rev:url_2

Process is send :url_3

Process rev:url_3

Process is send :url_4

Process rev:url_4

Process is send :url_5

Process is send :url_6

Process is send :url_7

Process rev:url_5

Process is send :url_8

Process is send :url_9

Process rev:url_6

mian

?

三.測試queue.Queue來完成進(jìn)程間通信能否成功?

當(dāng)然我們也可以嘗試使用線程threading的Queue是否能完成線程間通信,示例代碼如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

from multiprocessing import Process

# from multiprocessing import Queue???? # 進(jìn)程間通信Queue,兩者不要混淆

import queue????????????????????????????# 線程間通信queue.Queue,兩者不要混淆

import time

?

def p_put(q,*args):

????q.put(args)

????print('Has put %s' % args)

?

?

def p_get(q,*args):

????print('%s wait to get...' % args)

?

????print(q.get())

????print('%s got it' % args)

?

?

?

?

if __name__ == "__main__":

????q = queue.Queue()

????p1 = Process(target=p_put, args=(q,'p1', ))

????p2 = Process(target=p_get, args=(q,'p2', ))

????p1.start()

????p2.start()

直接異常報(bào)錯(cuò):

1

2

3

4

5

6

7

8

9

10

11

12

13

14

Traceback (most recent call last):

??File "E:/Project/python_project/untitled10/123.py", line 38, in <module>

????p1.start()

??File "G:\ProgramData\Anaconda3\lib\multiprocessing\process.py", line 105, in start

????self._popen = self._Popen(self)

??File "G:\ProgramData\Anaconda3\lib\multiprocessing\context.py", line 223, in _Popen

????return _default_context.get_context().Process._Popen(process_obj)

??File "G:\ProgramData\Anaconda3\lib\multiprocessing\context.py", line 322, in _Popen

????return Popen(process_obj)

??File "G:\ProgramData\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__

????reduction.dump(process_obj, to_child)

??File "G:\ProgramData\Anaconda3\lib\multiprocessing\reduction.py", line 60, in dump

????ForkingPickler(file, protocol).dump(obj)

TypeError: can't pickle _thread.lock objects

?

?

?

?

猜你喜歡:

1.python進(jìn)程Process模塊

2.python進(jìn)程Process與線程threading區(qū)別

3.python線程threading創(chuàng)建和參數(shù)傳遞

?

轉(zhuǎn)載請(qǐng)注明:猿說Python???python 進(jìn)程間通信Queue

?


向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI