溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Python如何使用threading庫實(shí)現(xiàn)線程鎖與釋放鎖

發(fā)布時(shí)間:2022-02-28 15:19:41 來源:億速云 閱讀:219 作者:iii 欄目:開發(fā)技術(shù)

本篇內(nèi)容介紹了“Python如何使用threading庫實(shí)現(xiàn)線程鎖與釋放鎖”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

控制資源訪問

前文提到threading庫在多線程時(shí),對同一資源的訪問容易導(dǎo)致破壞與丟失數(shù)據(jù)。為了保證安全的訪問一個(gè)資源對象,我們需要?jiǎng)?chuàng)建鎖。

示例如下:

import threading
import time

class AddThread():
    def __init__(self, start=0):
        self.lock = threading.Lock()
        self.value = start

    def increment(self):
        print("Wait Lock")
        self.lock.acquire()
        try:
            print("Acquire Lock")
            self.value += 1
            print(self.value)
        finally:
            self.lock.release()

def worker(a):
    time.sleep(1)
    a.increment()

addThread = AddThread()
for i in range(3):
    t = threading.Thread(target=worker, args=(addThread,))
    t.start()

acquire()會(huì)通過鎖進(jìn)行阻塞其他線程執(zhí)行中間段,release()釋放鎖,可以看到,基本都是獲得鎖之后才執(zhí)行。避免了多個(gè)線程同時(shí)改變其資源對象,不會(huì)造成混亂。

判斷是否有另一個(gè)線程請求鎖

要確定是否有另一個(gè)線程請求鎖而不影響當(dāng)前的線程,可以設(shè)置acquire()的參數(shù)blocking=False。

示例如下:

import threading
import time

def worker2(lock):
    print("worker2 Wait Lock")
    while True:
        lock.acquire()
        try:
            print("Holding")
            time.sleep(0.5)
        finally:
            print("not Holding")
            lock.release()
        time.sleep(0.5)

def worker1(lock):
    print("worker1 Wait Lock")
    num_acquire = 0
    value = 0
    while num_acquire < 3:
        time.sleep(0.5)
        have_it = lock.acquire(blocking=False)
        try:
            value += 1
            print(value)
            print("Acquire Lock")
            if have_it:
                num_acquire += 1
        finally:
            print("release Lock")
            if have_it:
                lock.release()

lock = threading.Lock()
word2Thread = threading.Thread(
    target=worker2,
    name='work2',
    args=(lock,)
)
word2Thread.start()
word1Thread = threading.Thread(
    target=worker1,
    name='work1',
    args=(lock,)
)
word1Thread.start()

這里,我們需要迭代很多次,work1才能獲取3次鎖。但是嘗試了很8次。

with lock

前文,我們通過lock.acquire()與lock.release()實(shí)現(xiàn)了鎖的獲取與釋放,但其實(shí)我們Python還給我們提供了一個(gè)更簡單的語法,通過with lock來獲取與釋放鎖。

示例如下:

import threading
import time

class AddThread():
    def __init__(self, start=0):
        self.lock = threading.Lock()
        self.value = start

    def increment(self):
        print("Wait Lock")
        with self.lock:
            print("lock acquire")
            self.value += 1
            print(self.value)
        print("lock release")

def worker(a):
    time.sleep(1)
    a.increment()

addThread = AddThread()
for i in range(3):
    t = threading.Thread(target=worker, args=(addThread,))
    t.start()

需要注意的是,正常的Lock對象不能請求多次,即使是由同一個(gè)線程請求也不例外。如果同一個(gè)調(diào)用鏈中的多個(gè)函數(shù)訪問一個(gè)鎖,則會(huì)發(fā)生意外。如果期望在同一個(gè)線程的不同代碼需要重新獲得鎖,那么這種情況下使用RLock。

同步線程

Condition

在實(shí)際的操作中,我們還可以使用Condition對象來同步線程。由于Condition使用了一個(gè)Lock,所以它可以綁定到一個(gè)共享資源,允許多個(gè)線程等待資源的更新。

示例如下:

import threading
import time

def consumer(cond):
    print("waitCon")
    with cond:
        cond.wait()
        print('獲取更新的資源')

def producer(cond):
    print("worker")
    with cond:
        print('更新資源')
        cond.notifyAll()

cond = threading.Condition()
t1 = threading.Thread(name='t1', target=consumer, args=(cond,))
t2 = threading.Thread(name='t2', target=consumer, args=(cond,))
t3 = threading.Thread(name='t3', target=producer, args=(cond,))
t1.start()
time.sleep(0.2)
t2.start()
time.sleep(0.2)
t3.start()

這里,我們通過producer線程處理完成之后調(diào)用notifyAll(),consumer等線程等到了它的更新,可以類比為觀察者模式。這里是,當(dāng)一個(gè)線程用完資源之后時(shí),則會(huì)自動(dòng)通知依賴它的所有線程。

屏障(barrier)

屏障是另一種線程的同步機(jī)制。barrier會(huì)建立一個(gè)控制點(diǎn),所有參與的線程會(huì)在這里阻塞,直到所有這些參與方都到達(dá)這一點(diǎn)。采用這種方法,線程可以單獨(dú)啟動(dòng)然后暫停,直到所有線程都準(zhǔn)備好了才可以繼續(xù)。

示例如下:

import threading
import time

def worker(barrier):
    print(threading.current_thread().getName(), "worker")
    worker_id = barrier.wait()
    print(threading.current_thread().getName(), worker_id)

threads = []
barrier = threading.Barrier(3)
for i in range(3):
    threads.append(
        threading.Thread(
            name="t" + str(i),
            target=worker,
            args=(barrier,)
        )
    )
for t in threads:
    print(t.name, 'starting')
    t.start()
    time.sleep(0.1)

for t in threads:
    t.join()

從控制臺(tái)的輸出會(huì)發(fā)發(fā)現(xiàn),barrier.wait()會(huì)阻塞線程,直到所有線程被創(chuàng)建后,才同時(shí)釋放越過這個(gè)控制點(diǎn)繼續(xù)執(zhí)行。wait()的返回值指示了釋放的參與線程數(shù),可以用來限制一些線程做清理資源等動(dòng)作。

當(dāng)然屏障Barrier還有一個(gè)abort()方法,該方法可以使所有等待線程接收一個(gè)BroKenBarrierError。如果線程在wait()上被阻塞而停止處理,會(huì)產(chǎn)生這個(gè)異常,通過except可以完成清理工作。

有限資源的并發(fā)訪問

除了多線程可能訪問同一個(gè)資源之外,有時(shí)候?yàn)榱诵阅?,我們也?huì)限制多線程訪問同一個(gè)資源的數(shù)量。例如,線程池支持同時(shí)連接,但數(shù)據(jù)可能是固定的,或者一個(gè)網(wǎng)絡(luò)APP提供的并發(fā)下載數(shù)支持固定數(shù)目。這些連接就可以使用Semaphore來管理。

示例如下:

import threading
import time

class WorkerThread(threading.Thread):
    def __init__(self):
        super(WorkerThread, self).__init__()
        self.lock = threading.Lock()
        self.value = 0

    def increment(self):
        with self.lock:
            self.value += 1
            print(self.value)

def worker(s, pool):
    with s:
        print(threading.current_thread().getName())
        pool.increment()
        time.sleep(1)
        pool.increment()

pool = WorkerThread()
s = threading.Semaphore(2)
for i in range(5):
    t = threading.Thread(
        name="t" + str(i),
        target=worker,
        args=(s, pool,)
    )
    t.start()


隱藏資源

在實(shí)際的項(xiàng)目中,有些資源需要鎖定以便于多個(gè)線程使用,而另外一些資源則需要保護(hù),以使它們對并非使這些資源的所有者的線程隱藏。

local()函數(shù)會(huì)創(chuàng)建一個(gè)對象,它能夠隱藏值,使其在不同的線程中無法被看到。示例如下:

import threading
import random

def show_data(data):
    try:
        result = data.value
    except AttributeError:
        print(threading.current_thread().getName(), "No value")
    else:
        print(threading.current_thread().getName(), "value=", result)

def worker(data):
    show_data(data)
    data.value = random.randint(1, 100)
    show_data(data)

local_data = threading.local()
show_data(local_data)
local_data.value = 1000
show_data(local_data)

for i in range(2):
    t = threading.Thread(
        name="t" + str(i),
        target=worker,
        args=(local_data,)
    )
    t.start()

“Python如何使用threading庫實(shí)現(xiàn)線程鎖與釋放鎖”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI