溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

python數(shù)據(jù)結(jié)構(gòu)和GIL及多進(jìn)程

發(fā)布時間:2020-10-19 18:06:16 來源:網(wǎng)絡(luò) 閱讀:4006 作者:長跑者1號 欄目:編程語言

一 數(shù)據(jù)結(jié)構(gòu)和GIL

1 queue

標(biāo)準(zhǔn)庫queue模塊,提供FIFO的queue、LIFO的隊列,優(yōu)先隊列
Queue 類是線程安全的,適用于多線程間安全的交換數(shù)據(jù),內(nèi)部使用了Lock和Condition


為什么說容器的大小不準(zhǔn)確,其原因是如果不加鎖,是不可能獲取到準(zhǔn)確的大小的,因為你剛讀取了一個大小,還沒取走,有可能被就被其他線程修改了,queue類的size雖然加了鎖,但是依然不能保證立即get,put就能成功,因為讀取大小和get,put方法是分來的。

2 GIL

1 簡介

全局解釋器鎖,進(jìn)程級別的鎖GIL
Cpython在解釋器進(jìn)程中有一把鎖,叫做GIL全局解釋器鎖。

GIL 保證Cpython進(jìn)程中,當(dāng)前時刻只有一個線程執(zhí)行代碼,甚至在多核情況下,也是如此。

2 IO 密集型和CPU密集型

Cpython中
IO 密集型,由于線程阻塞,就會調(diào)度其他線程
CPU密集型,當(dāng)前線程可能連續(xù)獲取GIL,導(dǎo)致其他線程幾乎無法使用CPU,若要喚醒其他線程,則需要準(zhǔn)備數(shù)據(jù),其代價是高昂的。


IO 密集型,多線程解決,CPU密集型,多進(jìn)程解決,繞開GIL。

python中絕大多數(shù)內(nèi)置數(shù)據(jù)結(jié)構(gòu)的讀寫操作都是原子操作


由于GIL 的存在,python的內(nèi)置數(shù)據(jù)類型在多線程編程的時候就變得安全了,但是實際上他們本身不是線程安全類型的

3 保留GIL 原因

Guido堅持的簡單哲學(xué),對于初學(xué)者門檻低,不需要高深的系統(tǒng)知識也能安全,簡單的使用python。
而移除GIL。會降低Cpython單線程的執(zhí)行效率。

4 驗證其是否是單線程

相關(guān)實例

import  logging
import datetime
logging.basicConfig(level=logging.INFO,format="%(asctime)s  %(threadName)s %(message)s ")
start=datetime.datetime.now()

def calc():
    sum=0
    for _ in range(1000000000):
        sum+=1

calc()
calc()
calc()
calc()
calc()
delta=(datetime.datetime.now()-start).total_seconds()
logging.info(delta)

python數(shù)據(jù)結(jié)構(gòu)和GIL及多進(jìn)程

多線程模式下的計算結(jié)果

import  logging
import datetime
import threading
logging.basicConfig(level=logging.INFO,format="%(asctime)s  %(threadName)s %(message)s ")
start=datetime.datetime.now()

def calc():
    sum=0
    for _ in range(1000000000):
        sum+=1
lst=[]
for _ in range(5):
    t=threading.Thread(target=calc)
    t.start()
    lst.append(t)

for t in lst:
    t.join()

delta=(datetime.datetime.now()-start).total_seconds()

print (delta)

結(jié)果如下

python數(shù)據(jù)結(jié)構(gòu)和GIL及多進(jìn)程

從這兩個程序來看,Cpython中多線程根本沒有優(yōu)勢,和一個線程執(zhí)行的時間相當(dāng),因為存在GIL

二 多進(jìn)程

1 概念

1 多進(jìn)程描述

由于python中的GIL ,多線程不是CPU密集型程序的最好選擇

多進(jìn)程可以在完全獨立的進(jìn)程中運行程序,可以充分利用多處理器

但是進(jìn)程本身的隔離帶來數(shù)據(jù)不共享也是一個問題,且線程比進(jìn)程輕量的多

多進(jìn)程也是解決并發(fā)的一種手段

2 進(jìn)程和線程的異同

相同點:

進(jìn)程是可以終止的,線程是不能通過命令終止的,線程的終止要么拋出異常,要么程序本身執(zhí)行完成。

進(jìn)程間同步提供了和線程同步一樣的類,使用方式也是一樣的,使用效果也是類似,不過,進(jìn)程間同步的代價要高于線程,而且底層實現(xiàn)不同。

multiprocessing 還提供了共享內(nèi)存,服務(wù)器進(jìn)程來共享數(shù)據(jù),還提供了queue隊列,匹配管道用于進(jìn)程間通信


不同點

通信方式不同
1 多進(jìn)程就是啟用多個解釋器進(jìn)程,進(jìn)程間通信必須序列化,反序列化
2 數(shù)據(jù)的安全性問題

多進(jìn)程最好是在main中執(zhí)行
多線程已經(jīng)將數(shù)據(jù)進(jìn)行處理了,其不需要再次進(jìn)行序列化了

多進(jìn)程傳遞必須序列化和反序列化。

3 進(jìn)程應(yīng)用

遠(yuǎn)程調(diào)用,RPC,跨網(wǎng)絡(luò)

2 參數(shù)介紹

multiprocessing中的process類

process 類遵循了Thread類的API,減少了學(xué)習(xí)難度
不同進(jìn)程可以完全調(diào)度到不同的CPU上執(zhí)行

IO 密集型最好使用多線程
CPU 密集型最好使用多進(jìn)程

進(jìn)程提供的相關(guān)屬性

名稱 含義
pid 進(jìn)程ID
exitcode 進(jìn)程退出的狀態(tài)碼
terminate() 終止指定進(jìn)程

3 實例

import  logging
import datetime
import multiprocessing
logging.basicConfig(level=logging.INFO,format="%(asctime)s  %(threadName)s %(message)s ")
start=datetime.datetime.now()

def calc(i):
    sum=0
    for _ in range(1000000000):
        sum+=1
lst=[]
for  i in range(5):
    p=multiprocessing.Process(target=calc,args=(i,),name="P-{}".format(i))
    p.start()
    lst.append(p)
for p in  lst:
    p.join()

delta=(datetime.datetime.now()-start).total_seconds()
print (delta)

結(jié)果如下

python數(shù)據(jù)結(jié)構(gòu)和GIL及多進(jìn)程

多進(jìn)程本身避開了進(jìn)程和進(jìn)程之間調(diào)度需要的時間,多核心都使用了,此處存在CPU的調(diào)度問題
多進(jìn)程對CPU的提升是顯而易見的。
單線程,多線程都跑了很長時間,而多進(jìn)程只是用了1分半,是真正的并行

4 進(jìn)程池相關(guān)

import  logging
import datetime
import multiprocessing
logging.basicConfig(level=logging.INFO,format="%(asctime)s  %(threadName)s %(message)s ")
start=datetime.datetime.now()

def calc(i):
    sum=0
    for _ in range(1000000000):
        sum+=1
    print (i,sum)
if  __name__=='__main__':
    start=datetime.datetime.now()
    p=multiprocessing.Pool(5)  # 此處用于初始化進(jìn)程池,其池中的資源是可以復(fù)用的
    for i in range(5):
        p.apply_async(calc,args=(i,))
    p.close()  # 下面要執(zhí)行join,上面必須先close
    p.join()
    delta=(datetime.datetime.now()-start).total_seconds()
    print (delta)

結(jié)果如下

python數(shù)據(jù)結(jié)構(gòu)和GIL及多進(jìn)程

進(jìn)程創(chuàng)建的多,使用進(jìn)程池進(jìn)行處理還是一種比較好的處理方式

5 多進(jìn)程和多線程的選擇

1 選擇

1 CPU 密集型
Cpython 中使用了GIL,多線程的時候互相競爭,且多核優(yōu)勢不能發(fā)揮,python使用多進(jìn)程效率更高

2 IO密集型

適合使用多線程,減少IO序列化開銷,且在IO等待時,切換到其他線程繼續(xù)執(zhí)行,效率不錯,當(dāng)然多進(jìn)程也適用于IO密集型

2 應(yīng)用

請求/應(yīng)答模型: WEB應(yīng)用中常見的處理模型

master啟動多個worker工作進(jìn)程,一般和CPU數(shù)目相同
worker工作進(jìn)程中啟動多個線程,提高并發(fā)處理能力,worker處理用戶的請求,往往需要等待數(shù)據(jù)
這就是nginx的工作模式

工作進(jìn)程一般都和CPU核數(shù)相同,CPU的親原性,進(jìn)程在CPU的遷移成本比較高。

三 concurrent包

1 概念

concurrent.futures
3.2 版本引入的模塊
異步并行任務(wù)編程模塊,提供一個高級的異步可執(zhí)行的便利接口

提供了2個池執(zhí)行器

ThreadPoolExecutor 異步調(diào)用的線程池的Executor
ProcessPoolExecutor 異步調(diào)用進(jìn)程池的Executor

2 參數(shù)詳解

方法 含義
ThreadPoolExecutor(max_workers=1) 池中至多創(chuàng)建max_workers個線程的池來同時異步執(zhí)行,返回Executor實例
submit(fn,*args,**kwagrs) 提交執(zhí)行的函數(shù)及參數(shù),返回Future實例
shutdown(wait=True) 清理池

Future 類

方法 含義
result() 可以查看調(diào)用的返回結(jié)果
done() 如果調(diào)用被成功的取消或者執(zhí)行完成,則返回為True
cancelled() 如果調(diào)用被成功取消,返回True
running() 如果正在運行且不能被取消,則返回True
cancel() 嘗試取消調(diào)用,如果已經(jīng)執(zhí)行且不能取消則返回False,否則返回True
result(timeout=None) 取返回的結(jié)果,超時時為None,一直等待返回,超時設(shè)置到期,拋出concurrent.futures.TimeoutError異常
execption(timeout=None) 取返回的異常,超時為None,一直等待返回,超時設(shè)置到期,拋出concurrent.futures.TimeoutError異常

3 線程池相關(guān)實例

import  logging
import threading
from   concurrent  import  futures
import logging
import  time

logging.basicConfig(level=logging.INFO,format="%(asctime)-15s\t [%(processName)s:%(threadName)s,%(process)d:%(thread)8d] %(message)s")

def worker(n):  # 定義未來執(zhí)行的任務(wù)
    logging.info("begin to work{}".format(n))
    time.sleep(5)
    logging.info("finished{}".format(n))
# 創(chuàng)建一個線程池,池容量為3
executor=futures.ThreadPoolExecutor(max_workers=3)

fs=[]
for i in range(3):
    f=executor.submit(worker,i)  # 傳入?yún)?shù),返回Future對象
    fs.append(f)

for  i in range(3,6):
    f=executor.submit(worker,i)  # 傳入?yún)?shù),返回Future對象
    fs.append(f)
while True:
    time.sleep(2)
    logging.info(threading.enumerate())  #返回存活線程列表
    flag=True
    for  f  in fs:
        logging.info(f.done()) # 如果被成功調(diào)用或取消完成,此處返回為True
        flag=flag  and f.done()  # 若都調(diào)用成功,則返回為True,否則則返回為False
    if flag:
        executor.shutdown()  # 如果全部調(diào)用成功,則需要清理池
        logging.info(threading.enumerate())
        break

結(jié)果如下

python數(shù)據(jù)結(jié)構(gòu)和GIL及多進(jìn)程

其線程池中的線程是持續(xù)使用的,一旦創(chuàng)建好的線程,其不會變化,唯一不好的就是線程名未發(fā)生變化,但其最多影響了打印效果

4 進(jìn)程池相關(guān)實例

import  logging
import threading
from   concurrent  import  futures
import logging
import  time

logging.basicConfig(level=logging.INFO,format="%(asctime)-15s\t [%(processName)s:%(threadName)s,%(process)d:%(thread)8d] %(message)s")

def worker(n):  # 定義未來執(zhí)行的任務(wù)
    logging.info("begin to work{}".format(n))
    time.sleep(5)
    logging.info("finished{}".format(n))
# 創(chuàng)建一個進(jìn)程池,池容量為3
executor=futures.ProcessPoolExecutor(max_workers=3)

fs=[]
for i in range(3):
    f=executor.submit(worker,i)  # 傳入?yún)?shù),返回Future對象
    fs.append(f)

for  i in range(3,6):
    f=executor.submit(worker,i)  # 傳入?yún)?shù),返回Future對象
    fs.append(f)
while True:
    time.sleep(2)
    flag=True
    for  f  in fs:
        logging.info(f.done()) # 如果被成功調(diào)用或取消完成,此處返回為True
        flag=flag  and f.done()  # 若都調(diào)用成功,則返回為True,否則則返回為False
    if flag:
        executor.shutdown()  # 如果全部調(diào)用成功,則需要清理池
        break

結(jié)果如下

python數(shù)據(jù)結(jié)構(gòu)和GIL及多進(jìn)程

5 支持上下文管理

concurrent.futures.ProcessPoolExecutor 繼承自concurrent.futures.base.Executor,而父類有enter,_exit方法,其是支持上下文管理的,可以使用with語句

import  logging
import threading
from   concurrent  import  futures
import logging
import  time

logging.basicConfig(level=logging.INFO,format="%(asctime)-15s\t [%(processName)s:%(threadName)s,%(process)d:%(thread)8d] %(message)s")

def worker(n):  # 定義未來執(zhí)行的任務(wù)
    logging.info("begin to work{}".format(n))
    time.sleep(5)
    logging.info("finished{}".format(n))
fs=[]
with   futures.ProcessPoolExecutor(max_workers=3) as executor:
    for  i in range(6):
        futures=executor.submit(worker,i)
        fs.append(futures)
while True:
    time.sleep(2)
    flag=True
    for  f  in fs:
        logging.info(f.done()) # 如果被成功調(diào)用或取消完成,此處返回為True
        flag=flag  and f.done()  # 若都調(diào)用成功,則返回為True,否則則返回為False
    if flag:
        executor.shutdown()  # 如果全部調(diào)用成功,則需要清理池
        break

結(jié)果如下

python數(shù)據(jù)結(jié)構(gòu)和GIL及多進(jìn)程

6 總結(jié)

統(tǒng)一了線程池,進(jìn)程池的調(diào)用,簡化了編程,是python簡單的思想哲學(xué)的提現(xiàn)
唯一缺點: 無法設(shè)置線程名稱

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI