您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“Python3中最常用的5種線程鎖的應(yīng)用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學(xué)習(xí)“Python3中最常用的5種線程鎖的應(yīng)用”吧!
前言
線程安全
鎖的作用
Lock() 同步鎖
基本介紹
使用方式
死鎖現(xiàn)象
with語句
RLock() 遞歸鎖
基本介紹
使用方式
with語句
Condition() 條件鎖
基本介紹
使用方式
with語句
Event() 事件鎖
基本介紹
使用方式
Semaphore() 信號量鎖
使用方式
with語句
鎖關(guān)系淺析
基本練習(xí)題
條件鎖的應(yīng)用
事件鎖的應(yīng)用
本章節(jié)將繼續(xù)圍繞threading模塊講解,基本上是純理論偏多。
對于日常開發(fā)者來講很少會使用到本章節(jié)的內(nèi)容,但是對框架作者等是必備知識,同時也是高頻的面試常見問題。
官方文檔
線程安全是多線程或多進程編程中的一個概念,在擁有共享數(shù)據(jù)的多條線程并行執(zhí)行的程序中,線程安全的代碼會通過同步機制保證各個線程都可以正常且正確的執(zhí)行,不會出現(xiàn)數(shù)據(jù)污染等意外情況。
線程安全的問題最主要還是由線程切換導(dǎo)致的,比如一個房間(進程)中有10顆糖(資源),除此之外還有3個小人(1個主線程、2個子線程),當(dāng)小人A吃了3顆糖后被系統(tǒng)強制進行休息時他認為還剩下7顆糖,而當(dāng)小人B工作后又吃掉了3顆糖,那么當(dāng)小人A重新上崗時會認為糖還剩下7顆,但是實際上只有4顆了。
上述例子中線程A和線程B的數(shù)據(jù)不同步,這就是線程安全問題,它可能導(dǎo)致非常嚴重的意外情況發(fā)生,我們按下面這個示例來進行說明。
下面有一個數(shù)值num初始值為0,我們開啟2條線程:
線程1對num進行一千萬次+1的操作
線程2對num進行一千萬次-1的操作
結(jié)果可能會令人咋舌,num最后并不是我們所想象的結(jié)果0:
import threading num = 0 def add(): global num for i in range(10_000_000): num += 1 def sub(): global num for i in range(10_000_000): num -= 1 if __name__ == "__main__": subThread01 = threading.Thread(target=add) subThread02 = threading.Thread(target=sub) subThread01.start() subThread02.start() subThread01.join() subThread02.join() print("num result : %s" % num) # 結(jié)果三次采集 # num result : 669214 # num result : -1849179 # num result : -525674
上面這就是一個非常好的案例,想要解決這個問題就必須通過鎖來保障線程切換的時機。
需要我們值得留意的是,在Python基本數(shù)據(jù)類型中l(wèi)ist、tuple、dict本身就是屬于線程安全的,所以如果有多個線程對這3種容器做操作時,我們不必考慮線程安全問題。
鎖是Python提供給我們能夠自行操控線程切換的一種手段,使用鎖可以讓線程的切換變的有序。
一旦線程的切換變的有序后,各個線程之間對數(shù)據(jù)的訪問、修改就變的可控,所以若要保證線程安全,就必須使用鎖。
threading模塊中提供了5種最常見的鎖,下面是按照功能進行劃分:
同步鎖:lock(一次只能放行一個)
遞歸鎖:rlock(一次只能放行一個)
條件鎖:condition(一次可以放行任意個)
事件鎖:event(一次全部放行)
信號量鎖:semaphore(一次可以放行特定個)
Lock鎖的稱呼有很多,如:
同步鎖
互斥鎖
它們是什么意思呢?如下所示:
互斥指的是某一資源同一時刻僅能有一個訪問者對其進行訪問,具有唯一性和排他性,但是互斥無法限制訪問者對資源的訪問順序,即訪問是無序的
同步是指在互斥的基礎(chǔ)上(大多數(shù)情況),通過其他機制實現(xiàn)訪問者對資源的有序訪問
同步其實已經(jīng)實現(xiàn)了互斥,是互斥的一種更為復(fù)雜的實現(xiàn),因為它在互斥的基礎(chǔ)上實現(xiàn)了有序訪問的特點
下面是threading模塊與同步鎖提供的相關(guān)方法:
方法 | 描述 |
---|---|
threading.Lock() | 返回一個同步鎖對象 |
lockObject.acquire(blocking=True, timeout=1) | 上鎖,當(dāng)一個線程在執(zhí)行被上鎖代碼塊時,將不允許切換到其他線程運行,默認鎖失效時間為1秒 |
lockObject.release() | 解鎖,當(dāng)一個線程在執(zhí)行未被上鎖代碼塊時,將允許系統(tǒng)根據(jù)策略自行切換到其他線程中運行 |
lockObject.locaked() | 判斷該鎖對象是否處于上鎖狀態(tài),返回一個布爾值 |
同步鎖一次只能放行一個線程,一個被加鎖的線程在運行時不會將執(zhí)行權(quán)交出去,只有當(dāng)該線程被解鎖時才會將執(zhí)行權(quán)通過系統(tǒng)調(diào)度交由其他線程。
如下所示,使用同步鎖解決最上面的問題:
import threading num = 0 def add(): lock.acquire() global num for i in range(10_000_000): num += 1 lock.release() def sub(): lock.acquire() global num for i in range(10_000_000): num -= 1 lock.release() if __name__ == "__main__": lock = threading.Lock() subThread01 = threading.Thread(target=add) subThread02 = threading.Thread(target=sub) subThread01.start() subThread02.start() subThread01.join() subThread02.join() print("num result : %s" % num) # 結(jié)果三次采集 # num result : 0 # num result : 0 # num result : 0
這樣這個代碼就完全變成了串行的狀態(tài),對于這種計算密集型I/O業(yè)務(wù)來說,還不如直接使用串行化單線程執(zhí)行來得快,所以這個例子僅作為一個示例,不能概述鎖真正的用途。
對于同步鎖來說,一次acquire()必須對應(yīng)一次release(),不能出現(xiàn)連續(xù)重復(fù)使用多次acquire()后再重復(fù)使用多次release()的操作,這樣會引起死鎖造成程序的阻塞,完全不動了,如下所示:
import threading num = 0 def add(): lock.acquire() # 上鎖 lock.acquire() # 死鎖 # 不執(zhí)行 global num for i in range(10_000_000): num += 1 lock.release() lock.release() def sub(): lock.acquire() # 上鎖 lock.acquire() # 死鎖 # 不執(zhí)行 global num for i in range(10_000_000): num -= 1 lock.release() lock.release() if __name__ == "__main__": lock = threading.Lock() subThread01 = threading.Thread(target=add) subThread02 = threading.Thread(target=sub) subThread01.start() subThread02.start() subThread01.join() subThread02.join() print("num result : %s" % num)
由于threading.Lock()對象中實現(xiàn)了__enter__()與__exit__()方法,故我們可以使用with語句進行上下文管理形式的加鎖解鎖操作:
import threading num = 0 def add(): with lock: # 自動加鎖 global num for i in range(10_000_000): num += 1 # 自動解鎖 def sub(): with lock: # 自動加鎖 global num for i in range(10_000_000): num -= 1 # 自動解鎖 if __name__ == "__main__": lock = threading.Lock() subThread01 = threading.Thread(target=add) subThread02 = threading.Thread(target=sub) subThread01.start() subThread02.start() subThread01.join() subThread02.join() print("num result : %s" % num) # 結(jié)果三次采集 # num result : 0 # num result : 0 # num result : 0
遞歸鎖是同步鎖的一個升級版本,在同步鎖的基礎(chǔ)上可以做到連續(xù)重復(fù)使用多次acquire()后再重復(fù)使用多次release()的操作,但是一定要注意加鎖次數(shù)和解鎖次數(shù)必須一致,否則也將引發(fā)死鎖現(xiàn)象。
下面是threading模塊與遞歸鎖提供的相關(guān)方法:
方法 | 描述 |
---|---|
threading.RLock() | 返回一個遞歸鎖對象 |
lockObject.acquire(blocking=True, timeout=1) | 上鎖,當(dāng)一個線程在執(zhí)行被上鎖代碼塊時,將不允許切換到其他線程運行,默認鎖失效時間為1秒 |
lockObject.release() | 解鎖,當(dāng)一個線程在執(zhí)行未被上鎖代碼塊時,將允許系統(tǒng)根據(jù)策略自行切換到其他線程中運行 |
lockObject.locaked() | 判斷該鎖對象是否處于上鎖狀態(tài),返回一個布爾值 |
以下是遞歸鎖的簡單使用,下面這段操作如果使用同步鎖則會發(fā)生死鎖現(xiàn)象,但是遞歸鎖不會:
import threading num = 0 def add(): lock.acquire() lock.acquire() global num for i in range(10_000_000): num += 1 lock.release() lock.release() def sub(): lock.acquire() lock.acquire() global num for i in range(10_000_000): num -= 1 lock.release() lock.release() if __name__ == "__main__": lock = threading.RLock() subThread01 = threading.Thread(target=add) subThread02 = threading.Thread(target=sub) subThread01.start() subThread02.start() subThread01.join() subThread02.join() print("num result : %s" % num) # 結(jié)果三次采集 # num result : 0 # num result : 0 # num result : 0
由于threading.RLock()對象中實現(xiàn)了__enter__()與__exit__()方法,故我們可以使用with語句進行上下文管理形式的加鎖解鎖操作:
import threading num = 0 def add(): with lock: # 自動加鎖 global num for i in range(10_000_000): num += 1 # 自動解鎖 def sub(): with lock: # 自動加鎖 global num for i in range(10_000_000): num -= 1 # 自動解鎖 if __name__ == "__main__": lock = threading.RLock() subThread01 = threading.Thread(target=add) subThread02 = threading.Thread(target=sub) subThread01.start() subThread02.start() subThread01.join() subThread02.join() print("num result : %s" % num) # 結(jié)果三次采集 # num result : 0 # num result : 0 # num result : 0
條件鎖是在遞歸鎖的基礎(chǔ)上增加了能夠暫停線程運行的功能。并且我們可以使用wait()與notify()來控制線程執(zhí)行的個數(shù)。
注意:條件鎖可以自由設(shè)定一次放行幾個線程。
下面是threading模塊與條件鎖提供的相關(guān)方法:
方法 | 描述 |
---|---|
threading.Condition() | 返回一個條件鎖對象 |
lockObject.acquire(blocking=True, timeout=1) | 上鎖,當(dāng)一個線程在執(zhí)行被上鎖代碼塊時,將不允許切換到其他線程運行,默認鎖失效時間為1秒 |
lockObject.release() | 解鎖,當(dāng)一個線程在執(zhí)行未被上鎖代碼塊時,將允許系統(tǒng)根據(jù)策略自行切換到其他線程中運行 |
lockObject.wait(timeout=None) | 將當(dāng)前線程設(shè)置為“等待”狀態(tài),只有該線程接到“通知”或者超時時間到期之后才會繼續(xù)運行,在“等待”狀態(tài)下的線程將允許系統(tǒng)根據(jù)策略自行切換到其他線程中運行 |
lockObject.wait_for(predicate, timeout=None) | 將當(dāng)前線程設(shè)置為“等待”狀態(tài),只有該線程的predicate返回一個True或者超時時間到期之后才會繼續(xù)運行,在“等待”狀態(tài)下的線程將允許系統(tǒng)根據(jù)策略自行切換到其他線程中運行。注意:predicate參數(shù)應(yīng)當(dāng)傳入一個可調(diào)用對象,且返回結(jié)果為bool類型 |
lockObject.notify(n=1) | 通知一個當(dāng)前狀態(tài)為“等待”的線程繼續(xù)運行,也可以通過參數(shù)n通知多個 |
lockObject.notify_all() | 通知所有當(dāng)前狀態(tài)為“等待”的線程繼續(xù)運行 |
下面這個案例會啟動10個子線程,并且會立即將10個子線程設(shè)置為等待狀態(tài)。
然后我們可以發(fā)送一個或者多個通知,來恢復(fù)被等待的子線程繼續(xù)運行:
import threading currentRunThreadNumber = 0 maxSubThreadNumber = 10 def task(): global currentRunThreadNumber thName = threading.currentThread().name condLock.acquire() # 上鎖 print("start and wait run thread : %s" % thName) condLock.wait() # 暫停線程運行、等待喚醒 currentRunThreadNumber += 1 print("carry on run thread : %s" % thName) condLock.release() # 解鎖 if __name__ == "__main__": condLock = threading.Condition() for i in range(maxSubThreadNumber): subThreadIns = threading.Thread(target=task) subThreadIns.start() while currentRunThreadNumber < maxSubThreadNumber: notifyNumber = int( input("Please enter the number of threads that need to be notified to run:")) condLock.acquire() condLock.notify(notifyNumber) # 放行 condLock.release() print("main thread run end") # 先啟動10個子線程,然后這些子線程會全部變?yōu)榈却隣顟B(tài) # start and wait run thread : Thread-1 # start and wait run thread : Thread-2 # start and wait run thread : Thread-3 # start and wait run thread : Thread-4 # start and wait run thread : Thread-5 # start and wait run thread : Thread-6 # start and wait run thread : Thread-7 # start and wait run thread : Thread-8 # start and wait run thread : Thread-9 # start and wait run thread : Thread-10 # 批量發(fā)送通知,放行特定數(shù)量的子線程繼續(xù)運行 # Please enter the number of threads that need to be notified to run:5 # 放行5個 # carry on run thread : Thread-4 # carry on run thread : Thread-3 # carry on run thread : Thread-1 # carry on run thread : Thread-2 # carry on run thread : Thread-5 # Please enter the number of threads that need to be notified to run:5 # 放行5個 # carry on run thread : Thread-8 # carry on run thread : Thread-10 # carry on run thread : Thread-6 # carry on run thread : Thread-9 # carry on run thread : Thread-7 # Please enter the number of threads that need to be notified to run:1 # main thread run end
由于threading.Condition()對象中實現(xiàn)了__enter__()與__exit__()方法,故我們可以使用with語句進行上下文管理形式的加鎖解鎖操作:
import threading currentRunThreadNumber = 0 maxSubThreadNumber = 10 def task(): global currentRunThreadNumber thName = threading.currentThread().name with condLock: print("start and wait run thread : %s" % thName) condLock.wait() # 暫停線程運行、等待喚醒 currentRunThreadNumber += 1 print("carry on run thread : %s" % thName) if __name__ == "__main__": condLock = threading.Condition() for i in range(maxSubThreadNumber): subThreadIns = threading.Thread(target=task) subThreadIns.start() while currentRunThreadNumber < maxSubThreadNumber: notifyNumber = int( input("Please enter the number of threads that need to be notified to run:")) with condLock: condLock.notify(notifyNumber) # 放行 print("main thread run end")
事件鎖是基于條件鎖來做的,它與條件鎖的區(qū)別在于一次只能放行全部,不能放行任意個數(shù)量的子線程繼續(xù)運行。
我們可以將事件鎖看為紅綠燈,當(dāng)紅燈時所有子線程都暫停運行,并進入“等待”狀態(tài),當(dāng)綠燈時所有子線程都恢復(fù)“運行”。
下面是threading模塊與事件鎖提供的相關(guān)方法:
方法 | 描述 |
---|---|
threading.Event() | 返回一個事件鎖對象 |
lockObject.clear() | 將事件鎖設(shè)為紅燈狀態(tài),即所有線程暫停運行 |
lockObject.is_set() | 用來判斷當(dāng)前事件鎖狀態(tài),紅燈為False,綠燈為True |
lockObject.set() | 將事件鎖設(shè)為綠燈狀態(tài),即所有線程恢復(fù)運行 |
lockObject.wait(timeout=None) | 將當(dāng)前線程設(shè)置為“等待”狀態(tài),只有該線程接到“綠燈通知”或者超時時間到期之后才會繼續(xù)運行,在“等待”狀態(tài)下的線程將允許系統(tǒng)根據(jù)策略自行切換到其他線程中運行 |
事件鎖不能利用with語句來進行使用,只能按照常規(guī)方式。
如下所示,我們來模擬線程和紅綠燈的操作,紅燈停,綠燈行:
import threading maxSubThreadNumber = 3 def task(): thName = threading.currentThread().name print("start and wait run thread : %s" % thName) eventLock.wait() # 暫停運行,等待綠燈 print("green light, %s carry on run" % thName) print("red light, %s stop run" % thName) eventLock.wait() # 暫停運行,等待綠燈 print("green light, %s carry on run" % thName) print("sub thread %s run end" % thName) if __name__ == "__main__": eventLock = threading.Event() for i in range(maxSubThreadNumber): subThreadIns = threading.Thread(target=task) subThreadIns.start() eventLock.set() # 設(shè)置為綠燈 eventLock.clear() # 設(shè)置為紅燈 eventLock.set() # 設(shè)置為綠燈 # start and wait run thread : Thread-1 # start and wait run thread : Thread-2 # start and wait run thread : Thread-3 # green light, Thread-1 carry on run # red light, Thread-1 stop run # green light, Thread-1 carry on run # sub thread Thread-1 run end # green light, Thread-3 carry on run # red light, Thread-3 stop run # green light, Thread-3 carry on run # sub thread Thread-3 run end # green light, Thread-2 carry on run # red light, Thread-2 stop run # green light, Thread-2 carry on run # sub thread Thread-2 run end
基本介紹
信號量鎖也是根據(jù)條件鎖來做的,它與條件鎖和事件鎖的區(qū)別如下:
條件鎖:一次可以放行任意個處于“等待”狀態(tài)的線程
事件鎖:一次可以放行全部的處于“等待”狀態(tài)的線程
信號量鎖:通過規(guī)定,成批的放行特定個處于“上鎖”狀態(tài)的線程
下面是threading模塊與信號量鎖提供的相關(guān)方法:
方法 | 描述 |
---|---|
threading.Semaphore() | 返回一個信號量鎖對象 |
lockObject.acquire(blocking=True, timeout=1) | 上鎖,當(dāng)一個線程在執(zhí)行被上鎖代碼塊時,將不允許切換到其他線程運行,默認鎖失效時間為1秒 |
lockObject.release() | 解鎖,當(dāng)一個線程在執(zhí)行未被上鎖代碼塊時,將允許系統(tǒng)根據(jù)策略自行切換到其他線程中運行 |
以下是使用示例,你可以將它當(dāng)做一段限寬的路段,每次只能放行相同數(shù)量的線程:
import threading import time maxSubThreadNumber = 6 def task(): thName = threading.currentThread().name semaLock.acquire() print("run sub thread %s" % thName) time.sleep(3) semaLock.release() if __name__ == "__main__": # 每次只能放行2個 semaLock = threading.Semaphore(2) for i in range(maxSubThreadNumber): subThreadIns = threading.Thread(target=task) subThreadIns.start() # run sub thread Thread-1 # run sub thread Thread-2 # run sub thread Thread-3 # run sub thread Thread-4 # run sub thread Thread-6 # run sub thread Thread-5
由于threading.Semaphore()對象中實現(xiàn)了__enter__()與__exit__()方法,故我們可以使用with語句進行上下文管理形式的加鎖解鎖操作:
import threading import time maxSubThreadNumber = 6 def task(): thName = threading.currentThread().name with semaLock: print("run sub thread %s" % thName) time.sleep(3) if __name__ == "__main__": semaLock = threading.Semaphore(2) for i in range(maxSubThreadNumber): subThreadIns = threading.Thread(target=task) subThreadIns.start()
上面5種鎖可以說都是基于同步鎖來做的,這些你都可以從源碼中找到答案。
首先來看RLock遞歸鎖,遞歸鎖的實現(xiàn)非常簡單,它的內(nèi)部會維護著一個計數(shù)器,當(dāng)計數(shù)器不為0的時候該線程不能被I/O操作和時間輪詢機制切換。但是當(dāng)計數(shù)器為0的時候便不會如此了:
def __init__(self): self._block = _allocate_lock() self._owner = None self._count = 0 # 計數(shù)器
而Condition條件鎖的內(nèi)部其實是有兩把鎖的,一把底層鎖(同步鎖)一把高級鎖(遞歸鎖)。
低層鎖的解鎖方式有兩種,使用wait()方法會暫時解開底層鎖同時加上一把高級鎖,只有當(dāng)接收到別的線程里的notfiy()后才會解開高級鎖和重新上鎖低層鎖,也就是說條件鎖底層是根據(jù)同步鎖和遞歸鎖的不斷切換來進行實現(xiàn)的:
def __init__(self, lock=None): if lock is None: lock = RLock() # 可以看到條件鎖的內(nèi)部是基于遞歸鎖,而遞歸鎖又是基于同步鎖來做的 self._lock = lock self.acquire = lock.acquire self.release = lock.release try: self._release_save = lock._release_save except AttributeError: pass try: self._acquire_restore = lock._acquire_restore except AttributeError: pass try: self._is_owned = lock._is_owned except AttributeError: pass self._waiters = _deque()
Event事件鎖內(nèi)部是基于條件鎖來做的:
class Event: def __init__(self): self._cond = Condition(Lock()) # 實例化出了一個條件鎖。 self._flag = False def _reset_internal_locks(self): # private! called by Thread._reset_internal_locks by _after_fork() self._cond.__init__(Lock()) def is_set(self): """Return true if and only if the internal flag is true.""" return self._flag isSet = is_set
Semaphore信號量鎖內(nèi)部也是基于條件鎖來做的:
class Semaphore: def __init__(self, value=1): if value < 0: raise ValueError("semaphore initial value must be >= 0") self._cond = Condition(Lock()) # 可以看到,這里是實例化出了一個條件鎖 self._value = value
需求:一個空列表,兩個線程輪番往里面加值(一個加偶數(shù),一個加奇數(shù)),最終讓該列表中的值為 1 - 100 ,且是有序排列的。
import threading lst = [] def even(): """加偶數(shù)""" with condLock: for i in range(2, 101, 2): # 判斷當(dāng)前列表的長度處于2是否能處盡 # 如果能處盡則代表需要添加奇數(shù) # 否則就添加偶數(shù) if len(lst) % 2 != 0: # 添偶數(shù) lst.append(i) # 先添加值 condLock.notify() # 告訴另一個線程,你可以加奇數(shù)了,但是這里不會立即交出執(zhí)行權(quán) condLock.wait() # 交出執(zhí)行權(quán),并等待另一個線程通知加偶數(shù) else: # 添奇數(shù) condLock.wait() # 交出執(zhí)行權(quán),等待另一個線程通知加偶數(shù) lst.append(i) condLock.notify() condLock.notify() def odd(): """加奇數(shù)""" with condLock: for i in range(1, 101, 2): if len(lst) % 2 == 0: lst.append(i) condLock.notify() condLock.wait() condLock.notify() if __name__ == "__main__": condLock = threading.Condition() addEvenTask = threading.Thread(target=even) addOddTask = threading.Thread(target=odd) addEvenTask.start() addOddTask.start() addEvenTask.join() addOddTask.join() print(lst)
有2個任務(wù)線程來扮演李白和杜甫,如何讓他們一人一句進行對答?文本如下:
杜甫:老李啊,來喝酒!
李白:老杜啊,不喝了我喝不下了!
杜甫:老李啊,再來一壺?
杜甫:...老李?
李白:呼呼呼...睡著了..
代碼如下:
import threading def libai(): event.wait() print("李白:老杜啊,不喝了我喝不下了!") event.set() event.clear() event.wait() print("李白:呼呼呼...睡著了..") def dufu(): print("杜甫:老李啊,來喝酒!") event.set() event.clear() event.wait() print("杜甫:老李啊,再來一壺?") print("杜甫:...老李?") event.set() if __name__ == '__main__': event = threading.Event() t1 = threading.Thread(target=libai) t2 = threading.Thread(target=dufu) t1.start() t2.start() t1.join() t2.join()
到此,相信大家對“Python3中最常用的5種線程鎖的應(yīng)用”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。