溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

python多線程之間的同步(一)

發(fā)布時(shí)間:2020-06-29 02:05:23 來(lái)源:網(wǎng)絡(luò) 閱讀:2951 作者:赤色風(fēng)暴 欄目:編程語(yǔ)言

引言:

       線程之間經(jīng)常需要協(xié)同工作,通過(guò)某種技術(shù),讓一個(gè)線程訪問(wèn)某些數(shù)據(jù)時(shí),其它線程不能訪問(wèn)這些數(shù)據(jù),直到該線程完成對(duì)數(shù)據(jù)的操作。這些技術(shù)包括臨界區(qū)(Critical Section),互斥量(Mutex),信號(hào)量(Semaphore),事件Event等。


Event

      threading庫(kù)中的event對(duì)象通過(guò)使用內(nèi)部一個(gè)flag標(biāo)記,通過(guò)flag的True或者False的變化來(lái)進(jìn)行操作。

         名稱                                     含義
set( )標(biāo)記設(shè)置為T(mén)rue
clear( )標(biāo)記設(shè)置為False
is_set( )標(biāo)記是否為T(mén)rue
wait(timeout=None)設(shè)置等待標(biāo)記為T(mén)rue的時(shí)長(zhǎng),None為無(wú)限等待。等到返回True,等不到返回False
from  threading import Thread,Event
import time

def creditor(event:Event):
    print("什么時(shí)候還我錢(qián)")
    event.wait()
    print("我已經(jīng)等了很長(zhǎng)時(shí)間了")


def debtor(event:Event,count=10):
    print("可以寬裕幾天嗎?")
    money=[]
    while True:
        print("先還你100")
        time.sleep(0.5)
        money.append(1)
        if len(money)>count:
            event.set()
            break
    print("我已經(jīng)還完你的錢(qián)了")

event=Event()
c=Thread(target=creditor,args=(event,))
d=Thread(target=debtor,args=(event,))
c.start()
d.start()

運(yùn)行結(jié)果如下所示:

python多線程之間的同步(一)

      可以看到creditor函數(shù)中因?yàn)閑vent.wait( )線程進(jìn)入等待狀態(tài),此時(shí)debtor線程進(jìn)入運(yùn)行,當(dāng)滿足條件時(shí)event.set( )將標(biāo)記設(shè)置為T(mén)rue,creditor線程開(kāi)始運(yùn)行。誰(shuí)wait就是等到flag變?yōu)門(mén)rue,或等到超時(shí)變?yōu)镕alse。不限制等待的個(gè)數(shù)。

wait的使用

from threading import Event,Thread


def Wait(event:Event,interval):
    while not event.wait(interval):
        print("waiting for you")

e=Event()
Thread(target=Wait,args=(e,3)).start()
e.wait(10)
e.set()
print("main exit")

python多線程之間的同步(一)

主線程一開(kāi)始就wait 10s,Waiting線程等待3s返回False,進(jìn)入循環(huán)打印"waiting for you",重復(fù)3次,然后主線程set了,這時(shí)候Waiting線程變?yōu)門(mén)rue,不再進(jìn)入循環(huán)。


Lock

     凡是存在資源爭(zhēng)用的地方都可以使用鎖,從而保證只有一個(gè)使用者可以完全使用這個(gè)資源

     現(xiàn)在要生產(chǎn)10個(gè)杯子,由10個(gè)工人開(kāi)始生產(chǎn)

import threading
import time 

cups=[]

def worker(count=10):
    print("我是{},我開(kāi)始生產(chǎn)了".format(threading.current_thread().name))
    flag=False
    while True:
        if len(cups)>count:
            flag=True
        time.sleep(0.05)
        if not flag:
            cups.append(1)
        if flag:
            break 
    print("finished.cups={}".format(len(cups)))
    
for _  in range(10):
    threading.Thread(target=worker,args=(1000,)).start()

運(yùn)行結(jié)果如下圖所示:

python多線程之間的同步(一)

      我們明明只需要到1000就會(huì)break,但是結(jié)果卻到了1010個(gè),這就是因?yàn)橛?0個(gè)線程,其中每個(gè)線程都在增加,但是增加后的數(shù)目,其他線程并不會(huì)知道(每個(gè)線程通過(guò)len函數(shù)拿到數(shù)量,但是剛拿到數(shù)字,其他線程就立即更新了)

      這個(gè)時(shí)候我們就需要鎖lock來(lái)實(shí)現(xiàn)了,一旦線程獲得鎖,其他試圖獲取鎖的線程將被阻塞

               名稱                               含義
acquire(blocking=True,timeout=-1)默認(rèn)阻塞,阻塞可以設(shè)置超時(shí)時(shí)間。非阻塞時(shí),timeout禁止設(shè)置。成功獲取鎖,返回True,否則返回False
release( )釋放鎖??梢詮娜魏尉€程釋放。已上鎖的鎖,會(huì)拋出RuntimeError異常

加鎖的實(shí)現(xiàn):

import  threading 
import  time 

cups=[]
lock=threading.Lock()

def worker(count=10):
    print("我是{},我開(kāi)始生產(chǎn)了".format(threading.current_thread().name))
    flag=False
    while True:
        lock.acquire()
        if len(cups)>=count:
            flag=True
        time.sleep(0.005)
        if not flag:
            cups.append(1)
        lock.release()
        if flag:
            break
    print("finished,cups={}".format(len(cups)))
        
for _ in  range(10):
    threading.Thread(target=worker,args=(1000,)).start()

運(yùn)行結(jié)果如圖所示:

python多線程之間的同步(一)

      一般來(lái)說(shuō)加鎖后還需要一些代碼實(shí)現(xiàn),在釋放鎖之前還有可能拋出異常,一旦出現(xiàn)異常,鎖無(wú)法釋放,但是當(dāng)前這個(gè)線程會(huì)因?yàn)檫@個(gè)異常而終止,這樣會(huì)產(chǎn)生死鎖,因此使用時(shí)要使用如下的方法:

     1,使用try...finally語(yǔ)句保證鎖的釋放

     2,with安全上下文管理(鎖對(duì)象支持上下文管理)


計(jì)數(shù)器類(lèi),用來(lái)加,減。

import threading
import time


class Counter:
    def __init__(self):
        self._val = 0
        self.__lock = threading.Lock()

    @property
    def value(self):
        return self._val

    def inc(self):
        try:
            self.__lock.acquire()
            self._val += 1
        finally:
            self.__lock.release()

    def dec(self):
        with self.__lock:
            self._val -= 1


def run(c: Counter, count=100):
    for _ in range(count):
        for i in range(-50, 50):
            if i < 0:
                c.dec()
            else:
                c.inc()


c = Counter()
c1 = 10
c2 = 1000


for i in range(c1):
    threading.Thread(target=run, args=(c, c2)).start()


while True:
    if threading.active_count() == 1:
        print(c.value)
        break

    啟動(dòng)了10個(gè)線程,1000次從-50到50進(jìn)行加減,最后得到0,如果沒(méi)有加鎖處理的話,得到的結(jié)果未必是自己想要的。


鎖的使用場(chǎng)景:

       鎖適用于訪問(wèn)和修改同一個(gè)資源的時(shí)候,引起資源爭(zhēng)用的情況下。使用鎖的注意事項(xiàng):

        1,少用鎖,除非有必要。多線程訪問(wèn)加鎖的資源時(shí),由于鎖的存在,實(shí)際就變成了串行。

        2,加鎖時(shí)間越短越好,不需要就立即釋放鎖。

        3,一定要避免死鎖,使用with或者try...finally。


非阻塞鎖使用

import  threading
import  time


def worker(tasks):
    for task in tasks:
        time.sleep(0.001)
        if task.lock.acquire(False):
            print("{} {} begin to start".format(threading.current_thread(),task.name))
        else:
            print("{} {} is working".format(threading.current_thread(),task.name))


class Task:
    def __init__(self,name):
        self.name=name
        self.lock=threading.Lock()

tasks=[Task('task-{}'.format(x)) for x in range(10)]

for i in range(5):
    threading.Thread(target=worker,name="worker-{}".format(i),args=(tasks,)).start()

運(yùn)行結(jié)果如下圖所示:

python多線程之間的同步(一)

       總共開(kāi)啟了5個(gè)線程,每個(gè)線程處理10個(gè)任務(wù),因?yàn)樵趇f語(yǔ)句里面,task.lock.acquire(False),所以每個(gè)線程只有拿到鎖是True,其他的線程不會(huì)阻塞會(huì)返回False。打印"is working"。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI