溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Python學(xué)習(xí)—pyhton中的進(jìn)程

發(fā)布時(shí)間:2020-07-26 09:51:03 來(lái)源:網(wǎng)絡(luò) 閱讀:3780 作者:長(zhǎng)安223 欄目:編程語(yǔ)言

1.進(jìn)程定義

進(jìn)程: 進(jìn)程就是一個(gè)程序在一個(gè)數(shù)據(jù)集上的一次動(dòng)態(tài)執(zhí)行過(guò)程。進(jìn)程一般由程序、數(shù)據(jù)、進(jìn)程控制塊(pcb)三部分組成。
(1)我們編寫(xiě)的程序用來(lái)描述進(jìn)程要完成哪些功能以及如何完成;
(2)數(shù)據(jù)則是程序在執(zhí)行過(guò)程中所需要使用的資源;
(3)進(jìn)程控制塊用來(lái)記錄進(jìn)程的所有信息。系統(tǒng)可以利用它來(lái)控制和管理進(jìn)程,它是系統(tǒng)感知進(jìn)程存在的唯一標(biāo)志。

2.創(chuàng)建進(jìn)程

新創(chuàng)建的進(jìn)程在內(nèi)存獨(dú)立開(kāi)辟一塊空間,不與其他進(jìn)程共享空間、數(shù)據(jù)。
同一個(gè)進(jìn)程中,新創(chuàng)建的線程與此進(jìn)程里其他線程共享空間、數(shù)據(jù)。

1.os.fork()函數(shù)

os模塊的三個(gè)方法:
os.fork()創(chuàng)建一個(gè)當(dāng)前進(jìn)程的子進(jìn)程
os.getpid()獲取當(dāng)前進(jìn)程pid
os.getppid()獲取當(dāng)前進(jìn)程的父進(jìn)程的Pid
關(guān)于fork():
它用來(lái)創(chuàng)建一個(gè)進(jìn)程,即為當(dāng)前進(jìn)程的子進(jìn)程,復(fù)制父進(jìn)程的所有代碼并從fork語(yǔ)句處開(kāi)始運(yùn)行。運(yùn)行父進(jìn)程還是子進(jìn)程的取決于當(dāng)前os調(diào)度策略。
在父進(jìn)程中返回子進(jìn)程的pid,在子進(jìn)程中返回0。即返回0表示在子進(jìn)程中運(yùn)行,返回大與0的數(shù)表示在父進(jìn)程中運(yùn)行。

例子:

import os

print('當(dāng)前進(jìn)程:',os.getpid())
print('當(dāng)前進(jìn)程的父進(jìn)程:',os.getppid())

pid = os.fork()
if pid == 0:
    print('此時(shí)為子進(jìn)程:',os.getpid(),'\n其父進(jìn)程:',os.getppid())
else:
    print('父進(jìn)程:',os.getpid(),'\nos.fork的返回值pid:',pid)

運(yùn)行結(jié)果:

當(dāng)前進(jìn)程: 16839
當(dāng)前進(jìn)程的父進(jìn)程: 2912
父進(jìn)程: 16839 
os.fork的返回值pid: 16842
此時(shí)為子進(jìn)程: 16842 
其父進(jìn)程: 16839

從運(yùn)行結(jié)果中看,在linux中fork產(chǎn)生子進(jìn)程后是先運(yùn)行父進(jìn)程,當(dāng)父進(jìn)程結(jié)束后再進(jìn)入子進(jìn)程運(yùn)行。

2.實(shí)例化進(jìn)程類(lèi)

直接通過(guò)實(shí)例化進(jìn)程類(lèi)multiprocessing.Process創(chuàng)建新進(jìn)程。
和線程類(lèi)一樣,進(jìn)程類(lèi)也有start()方法,join()方法。調(diào)用對(duì)象的start()方法實(shí)例上也是調(diào)用的類(lèi)中的run()方法。

# 導(dǎo)入進(jìn)程模塊
import multiprocessing
import os

def job(ss):
    print(ss,'當(dāng)前子進(jìn)程:%s' %os.getpid())

#實(shí)例化進(jìn)程類(lèi),并提交任務(wù),傳入任務(wù)所需要的參數(shù)
p1 = multiprocessing.Process(target=job,args=('abc',))
p1.start()
p2 = multiprocessing.Process(target=job,args=('123',))
p2.start()

# 和線程一樣,進(jìn)程也有join方法。
p1.join()
p2.join()

print('完成......')

運(yùn)行結(jié)果:

abc 當(dāng)前子進(jìn)程:17234
123 當(dāng)前子進(jìn)程:17235
完成......
3.繼承進(jìn)程類(lèi)來(lái)自定義進(jìn)程類(lèi)

繼承python提供的進(jìn)程類(lèi),重寫(xiě)方法,創(chuàng)建自己所需要的進(jìn)程類(lèi),再實(shí)例化自定義的進(jìn)程類(lèi)。

import multiprocessing

class Job(multiprocessing.Process):
    #重寫(xiě)構(gòu)造方法
    def __init__(self,cc):
        super(Job, self).__init__()
        self.cc = cc

    #重寫(xiě)run方法,和線程一樣
    def run(self):
        print(self.cc)

#實(shí)例化對(duì)象
if __name__ == "__main__":
    pp = []
    for i in range(10):
        p = Job(str(i)+':123456')
        pp.append(p)
        p.start()

    for p in pp:
        p.join()
    print('hahhahaha')

運(yùn)行結(jié)果:

0:123456
1:123456
2:123456
3:123456
4:123456
5:123456
6:123456
7:123456
8:123456
9:123456
hahhahaha

3.多進(jìn)程與多線程的對(duì)比

import threading
import multiprocessing
from timeit import timeit

class Jobthread(threading.Thread):
    def __init__(self,li):
        super(Jobthread,self).__init__()
        self.li = li
    def run(self):
        sum(self.li)

class Jobprocess(multiprocessing.Process):
    def __init__(self,li):
        super(Jobprocess, self).__init__()
        self.li = li
    def run(self):
        for i in self.li:
            sum(i)

# 這個(gè)裝飾器是自己寫(xiě)的,用來(lái)計(jì)算某個(gè)函數(shù)執(zhí)行時(shí)間
@timeit
def use_Pro(list):
    for i in range(0,len(list), 1000):
        p = Jobprocess(list[i:i+1000])
        p.start()

@timeit
def use_Thr(list):
    for li in list:
        t = Jobthread(li)
        t.start

if __name__ == "__main__":
    list = [[1,2,3,4,5,6],[2,3,4,5,6,7],[3,4,5,6,7,8],[4,5,6,7,8,9]]*1000
    use_Pro(list)
    use_Thr(list)

運(yùn)行結(jié)果:

use_Pro運(yùn)行時(shí)間0.0041866302490234375
use_Thr運(yùn)行時(shí)間0.02240157127380371

正如看到的結(jié)果一樣,多進(jìn)程適合計(jì)算密集型任務(wù),多線程適合i/o密集型任務(wù)。

3.守護(hù)進(jìn)程與終止進(jìn)程

1.守護(hù)進(jìn)程-daemon屬性

和線程類(lèi)似,進(jìn)程類(lèi)也有一個(gè)daemon屬性,默認(rèn)值為False。
當(dāng)改變他的值為T(mén)rue時(shí),當(dāng)主進(jìn)程結(jié)束,就會(huì)強(qiáng)行終止其他的所以進(jìn)程。
實(shí)例:
(1)第一個(gè)程序

import multiprocessing
import time

def job():
    print('開(kāi)始子進(jìn)程')
    time.sleep(3)
    print('子進(jìn)程結(jié)束')

if __name__ == "__main__":
    p = multiprocessing.Process(target=job)
    p.start()
    print("程序結(jié)束......")

運(yùn)行結(jié)果:

程序結(jié)束......
開(kāi)始子進(jìn)程
子進(jìn)程結(jié)束

主進(jìn)程結(jié)束,其他進(jìn)程還在繼續(xù)執(zhí)行。
(2)第二個(gè)程序

import multiprocessing
import time

def job():
    print('開(kāi)始子進(jìn)程')
    time.sleep(3)
    print('子進(jìn)程結(jié)束')

if __name__ == "__main__":
    p = multiprocessing.Process(target=job)
    p.daemon = True
    p.start()
    print("程序結(jié)束......")

運(yùn)行結(jié)果:

程序結(jié)束......

當(dāng)主進(jìn)程結(jié)束,其他進(jìn)程將會(huì)被強(qiáng)制終止結(jié)束。

2.終止進(jìn)程
import multiprocessing
import time

def job():
    print('開(kāi)始子進(jìn)程')
    time.sleep(3)
    print('子進(jìn)程結(jié)束')

if __name__ == "__main__":
    p = multiprocessing.Process(target=job)
    p.daemon = True
    print(p.is_alive())     #啟動(dòng)進(jìn)程之前查看進(jìn)程狀態(tài)
    p.start()
    print(p.is_alive())     #啟動(dòng)進(jìn)程之后查看進(jìn)程狀態(tài)
    p.terminate()           #終止進(jìn)程
    print(p.is_alive())     #終止進(jìn)程命令一發(fā)出后,查看進(jìn)程狀態(tài)。此時(shí)進(jìn)程在釋放過(guò)程中,還沒(méi)有被完全釋放。
    p.join()                #先讓進(jìn)程完全釋放
    print(p.is_alive())     #最后查看進(jìn)程狀態(tài)

    print("程序結(jié)束......")

運(yùn)行結(jié)果:

False
True
True
False
程序結(jié)束......

4.進(jìn)程間通信

"""
通過(guò)隊(duì)列實(shí)現(xiàn)進(jìn)程間通信,隊(duì)列充當(dāng)消息管道的作用(類(lèi)似生產(chǎn)者消費(fèi)者模型)
這里通信一直存在,也就是這兩個(gè)進(jìn)程會(huì)一直存在,沒(méi)有銷(xiāo)毀釋放。
"""
import multiprocessing
from multiprocessing import Queue
import time

class Put_news(multiprocessing.Process):
    def __init__(self,queue):
        super(Put_news, self).__init__()
        self.queue = queue
    def run(self):
        for i in range(100):
            self.queue.put(i)
            print("傳遞消息:%s" %i)
            time.sleep(0.1)

class Get_news(multiprocessing.Process):
    def __init__(self,queue):
        super(Get_news, self).__init__()
        self.queue = queue
    def run(self):
        while True:
            time.sleep(0.11)
            print("接收消息++++++++++++:%s" %(self.queue.get()))

if __name__ == "__main__":
    q = Queue()
    p = Put_news(q)
    g = Get_news(q)
    p.start()
    g.start()

    if not p.is_alive():
        g.terminate()

運(yùn)行結(jié)果:
Python學(xué)習(xí)—pyhton中的進(jìn)程

Python學(xué)習(xí)—pyhton中的進(jìn)程

5.分布式進(jìn)程

任務(wù)需要處理的數(shù)據(jù)特別大, 希望多臺(tái)主機(jī)共同處理任務(wù)。multiprocessing.managers子模塊里面可以實(shí)現(xiàn)將進(jìn)程分布到多臺(tái)機(jī)器上
(管理端主機(jī)要運(yùn)算一些列任務(wù),通過(guò)與其他主機(jī)建立“連接“,將任務(wù)分配給其他主機(jī)執(zhí)行,并將執(zhí)行結(jié)果返回給管理端主機(jī)。)
管理端主機(jī)代碼:

import random
from queue import Queue
from multiprocessing.managers import BaseManager

# 1.創(chuàng)建隊(duì)列(發(fā)送任務(wù)的隊(duì)列,收取結(jié)果的隊(duì)列)
task_queue = Queue()
result_queue = Queue()

# 第二三步驟可以互換順序
# 2.將隊(duì)列注冊(cè)到網(wǎng)絡(luò)(這樣其他主機(jī)可以通過(guò)網(wǎng)絡(luò)接收任務(wù),發(fā)送結(jié)果)
# 注冊(cè)的隊(duì)列(任務(wù)隊(duì)列,結(jié)果隊(duì)列)的唯一標(biāo)識(shí)碼分別為'put_task_queue','get_result_queue'
BaseManager.register('put_task_queue',callable=lambda :task_queue)
BaseManager.register('get_result_queue',callable=lambda : result_queue)

# 3.綁定端口(3333),設(shè)定密碼(hahahaha)
manager = BaseManager(address=('172.25.254.158',3333),authkey=b'hahahaha')

# 4.啟動(dòng)manager,開(kāi)始共享隊(duì)列
manager.start()

# 5.通過(guò)網(wǎng)絡(luò)訪問(wèn)共享的隊(duì)列
task = manager.put_task_queue()
result = manager.get_result_queue()

# 6.向任務(wù)隊(duì)列中放入執(zhí)行任務(wù)的數(shù)據(jù),這里放入100個(gè)任務(wù)
for i in range(100):
    n = random.randint(10,500)
    task.put(n)
    print('任務(wù)列表加入數(shù)據(jù):'+str(n))

# 7.從結(jié)果隊(duì)列中讀取各個(gè)主機(jī)的任務(wù)執(zhí)行結(jié)果
for j in range(100):
    res = result.get()
    print('執(zhí)行結(jié)果:'+str(res))

# 8.任務(wù)執(zhí)行結(jié)束,關(guān)閉共享隊(duì)列
manager.shutdown()

運(yùn)算主機(jī)代碼:

"""
在各個(gè)工作主機(jī)上執(zhí)行的代碼相同
"""

from multiprocessing.managers import BaseManager

# 1. 連接manager端,獲取共享的隊(duì)列
import time

worker = BaseManager(address=('172.25.254.158',3333),authkey=b'hahahaha')

# 2.注冊(cè)隊(duì)列,去獲取網(wǎng)絡(luò)上共享的隊(duì)列中的內(nèi)容
BaseManager.register('put_task_queue')
BaseManager.register('get_result_queue')

# 3.連接網(wǎng)絡(luò)
worker.connect()

# 4.通過(guò)網(wǎng)絡(luò)訪問(wèn)共享的隊(duì)列

task = worker.put_task_queue()
result = worker.get_result_queue()

# 5.讀取任務(wù),處理任務(wù),這里讀取了50個(gè)任務(wù)進(jìn)行處理
# 每臺(tái)運(yùn)算主機(jī)上的處理任務(wù)數(shù)量可以不同,不過(guò)為了避免修改代碼,一般都相同。
for i in range(50):
    n = task.get()
    print('執(zhí)行任務(wù) %d**2 = '%(n))
    res = '%d**2=%d' %(n,n**2)  #這里設(shè)置執(zhí)行的任務(wù)是求平方
    result.put(res)     #將結(jié)果放入結(jié)果隊(duì)列
    time.sleep(1)       #休息1秒

print('工作主機(jī)執(zhí)行任務(wù)結(jié)束.....')

6.進(jìn)程池

和線程一樣,進(jìn)程也有進(jìn)程池。
1.第一種方法

import multiprocessing
import time

def job(id):
    print('start id ---> %d' %id)
    print('end id ----> %d' %id)
    time.sleep(3)
# 創(chuàng)建含有8個(gè)進(jìn)程的進(jìn)程池
pool = multiprocessing.Pool(8)
# 給進(jìn)城池的進(jìn)程分配任務(wù)
for i in range(12):
    pool.apply_async(job,args=(i,))

# 關(guān)閉進(jìn)程池,使進(jìn)程池不再工作運(yùn)行
pool.close()
# 等待所有子進(jìn)程結(jié)束之后再開(kāi)始主進(jìn)程
pool.join()

print('all works completed!')

運(yùn)行結(jié)果:

start id ---> 0
end id ----> 0
start id ---> 1
end id ----> 1
start id ---> 2
end id ----> 2
start id ---> 3
end id ----> 3
start id ---> 4
end id ----> 4
start id ---> 5
end id ----> 5
start id ---> 6
end id ----> 6
start id ---> 7
end id ----> 7
start id ---> 8
end id ----> 8
start id ---> 9
end id ----> 9
start id ---> 10
end id ----> 10
start id ---> 11
end id ----> 11
all works completed!

2.第二種方法

from concurrent.futures import ProcessPoolExecutor
import time
def job(id):
    print('start id ---> %d' %id)
    print('end id ----> %d' %id)
    time.sleep(3)

# 創(chuàng)建含有2個(gè)進(jìn)程的進(jìn)程池
pool = ProcessPoolExecutor(max_workers=2)
# 給進(jìn)程池的進(jìn)程分配任務(wù),submit方法返回一個(gè)_base.Future對(duì)象
f1 = pool.submit(job,1)
f2 = pool.submit(job,2)
f3 = pool.submit(job,3)
f4 = pool.submit(job,4)
# 執(zhí)行f1對(duì)象的各種方法
f1.done()
f1.result()

運(yùn)行結(jié)果:

start id ---> 1
end id ----> 1
start id ---> 2
end id ----> 2
start id ---> 3
end id ----> 3
start id ---> 4
end id ----> 4

3.第三種方法

from concurrent.futures import ProcessPoolExecutor
import time
def job(id):
    print('start id ---> %d' %id)
    print('end id ----> %d' %id)
    time.sleep(1)
pool = ProcessPoolExecutor(max_workers=3)
pool.map(job,range(1,10))

運(yùn)行結(jié)果:

start id ---> 1
end id ----> 1
start id ---> 2
end id ----> 2
start id ---> 3
end id ----> 3
start id ---> 4
end id ----> 4
start id ---> 5
end id ----> 5
start id ---> 6
end id ----> 6
start id ---> 7
end id ----> 7
start id ---> 8
end id ----> 8
start id ---> 9
end id ----> 9
向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI