您好,登錄后才能下訂單哦!
先提到線程同步是個(gè)什么,概念是什么,就是線程通訊中通過(guò)使用某種技術(shù)訪問(wèn)數(shù)據(jù)時(shí),而一旦此線程訪問(wèn)到,其他線程也就不能訪問(wèn)了,直到該線程對(duì)數(shù)據(jù)完成操作才結(jié)束。
Event事件是一種實(shí)現(xiàn)方式:通過(guò)內(nèi)部的標(biāo)記看看是不是變化,也就是true or false了,
將set(),clear(),is_set(),為true,wait(timeout=None)此種設(shè)置true的時(shí)長(zhǎng),等到返回false,不等到超時(shí)返回false,無(wú)線等待為None,
來(lái)看一個(gè)wait的使用:
from threading import Event, Thread
import logging
logging.basicConfig(level=logging.INFO)
def A(event:Event, interval:int):
while not event.wait(interval): # 要么true or false
logging.info('hello')
e = Event()
Thread(target=A, args=(e, 3)).start()
e.wait(8)
e.set()
print('end--------------')
使用鎖Lock解決數(shù)據(jù)資源在爭(zhēng)搶,從而使資源有效利用。
lock的方法:
acquire(blocking=True,timeout=-1),默認(rèn)阻塞,阻塞設(shè)置超時(shí)時(shí)間,非阻塞,timeout禁止使用,成功獲取鎖,返回True,否則False。
有阻塞就有釋放 ,解開(kāi)鎖,release(),從線程釋放鎖,上鎖的鎖重置為unloced未上鎖調(diào)用,拋出RuntimeError異常。
import threading
from threading import Thread, Lock
import logging
import time
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
cups = []
lock = Lock()
def worker(count=10):
logging.info("I'm working for U.")
flag = False
while True:
lock.acquire() # 獲取鎖
if len(cups) >= count:
flag = True
time.sleep(0.0001)
if not flag:
cups.append(1)
if flag:
break
logging.info('I finished. cups = {}'.format(len(cups)))
for _ in range(10):
Thread(target=worker, args=(1000,)).start()
使用鎖的過(guò)程中,總是不經(jīng)意加上鎖,出現(xiàn)死鎖的產(chǎn)生,出現(xiàn)了死鎖,如何解決呢?
使用try finally 將鎖釋放,另一種使用with上下文管理。
鎖的使用場(chǎng)景在于應(yīng)該少用鎖,還要就是如若上鎖,將鎖的使用時(shí)間縮短,避免時(shí)間太長(zhǎng)而出現(xiàn)無(wú)法釋放鎖的結(jié)果。
可重入鎖Lock,
import threading
import time
lock = threading.RLock()
print(lock.acquire())
print('------------')
print(lock.acquire(blocking=False))
print(lock.acquire())
print(lock.acquire(timeout=3.55))
print(lock.acquire(blocking=False))
#print(lock.acquire(blocking=False, timeout=10)) # 異常
lock.release()
lock.release()
lock.release()
lock.release()
print('main thread {}'.format(threading.current_thread().ident))
print("lock in main thread {}".format(lock)) # 注意觀察lock對(duì)象的信息
lock.release()
#lock.release() #多了一次
print('===============')
print()
print(lock.acquire(blocking=False)) # 1次
#threading.Timer(3, lambda x:x.release(), args=(lock,)).start() # 跨線程了,異常
lock.release()
print('~~~~~~~~~~~~~~~~~')
print()
# 測(cè)試多線程
print(lock.acquire())
def sub(l):
print('{}: {}'.format(threading.current_thread(), l.acquire())) # 阻塞
print('{}: {}'.format(threading.current_thread(), l.acquire(False)))
print('lock in sub thread {}'.format(lock))
l.release()
print('sub 1')
l.release()
print('sub 2')
# l.release() # 多了一次
threading.Timer(2, sub, args=(lock,)).start() # 傳入同一個(gè)lock對(duì)象
print('++++++++++++++++++++++')
print()
print(lock.acquire())
lock.release()
time.sleep(5)
print('釋放主線程鎖')
lock.release()
使用構(gòu)造方法Condition(lock=None),默認(rèn)是Rloc,
具體方法為;
acquire(*args),獲取鎖
wait(self,timeout=None),等待或超時(shí)
notify(n=1),喚醒線程,沒(méi)有等待就沒(méi)有任何操作,指線程
notify_all(),喚醒所有等待的線程。
Condition主要用于生產(chǎn)者和消費(fèi)者模型中,解決匹配的問(wèn)題。
使用方式:先獲取acquire,使用完了要釋放release,避免死鎖最好使用with上下文;生產(chǎn)者和消費(fèi)者可以使用notify and notify_all。
如下例子:
from threading import Thread, Event
import logging
import random
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
## 此例只是為了演示,不考慮線程安全問(wèn)題
class Dispatcher:
def __init__(self):
self.data = None
self.event = Event() # event只是為了使用方便,與邏輯無(wú)關(guān)
def produce(self, total):
for _ in range(total):
data = random.randint(0,100)
logging.info(data)
self.data = data
self.event.wait(1)
self.event.set()
def consume(self):
while not self.event.is_set():
data = self.data
logging.info("recieved {}".format(data))
self.data = None
self.event.wait(0.5)
d = Dispatcher()
p = Thread(target=d.produce, args=(10,), name='producer')
c = Thread(target=d.consume, name='consumer')
c.start()
p.start()
這里代碼會(huì)有缺陷:優(yōu)化如下:
from threading import Thread, Event, Condition
import logging
import random
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
## 此例只是為了演示,不考慮線程安全問(wèn)題
class Dispatcher:
def __init__(self):
self.data = None
self.event = Event() # event只是為了使用方便,與邏輯無(wú)關(guān)
self.cond = Condition()
def produce(self, total):
for _ in range(total):
data = random.randint(0,100)
with self.cond:
logging.info(data)
self.data = data
self.cond.notify_all()
self.event.wait(1) # 模擬產(chǎn)生數(shù)據(jù)速度
self.event.set()
def consume(self):
while not self.event.is_set():
with self.cond:
self.cond.wait() # 阻塞等通知
logging.info("received {}".format(self.data))
self.event.wait(0.5) # 模擬消費(fèi)的速度
d = Dispatcher()
p = Thread(target=d.produce, args=(10,), name='producer')
# 增加消費(fèi)者
for i in range(5):
c = Thread(target=d.consume, name='consumer-{}'.format(i))
c.start()
p.start()
Barrier的使用:
方法如下:
Barrier(parties, action=None,
timeout=None),構(gòu)建barrier對(duì)象,timeout未指定的默認(rèn)值;
n_waiting ,當(dāng)前barrier等待的線程數(shù)。;
parties ,需要等待
wait(timeout=None),wait方法設(shè)置超時(shí)并超時(shí)發(fā)送,barrie處于broken狀態(tài)。
而broken的狀態(tài)方法:
broken,打開(kāi)狀態(tài),返回true;
abort(),barrier在broken狀態(tài)中,wait等待的線程會(huì)拋出BrokenBarrierError異常,直到reset恢復(fù)barrier;
reset(),恢復(fù)barrier,重新開(kāi)始攔截。
barrier不做演示:
還有semaphore信號(hào)量,每次acquire時(shí),都會(huì)減一,到0時(shí)的線程再到release后,大于0,恢復(fù)阻塞的線程。
方法:
Semaphore(value=1) 構(gòu)造方法,alue小于0,拋ValueError異常;
acquire(blocking=True, timeout=None) 獲取信號(hào)量,計(jì)數(shù)器減1,獲取成功返回True;
release() 釋放信號(hào)量,計(jì)數(shù)器加1。
使用信號(hào)量處理時(shí),需要注意release超界問(wèn)題,邊界問(wèn)題,其實(shí),在使用中,python有GIL的存在,有的多線程就變成線程安全的,注意一點(diǎn),但實(shí)際上它們并不是線程安全類型。因此我們?cè)谑褂弥幸唧w場(chǎng)景具體分析具體使用。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。