溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

python解決線程同步方案有哪些

發(fā)布時(shí)間:2020-05-28 11:19:47 來(lái)源:網(wǎng)絡(luò) 閱讀:217 作者:可愛(ài)的汁汁 欄目:編程語(yǔ)言
 先提到線程同步是個(gè)什么,概念是什么,就是線程通訊中通過(guò)使用某種技術(shù)訪問(wèn)數(shù)據(jù)時(shí),而一旦此線程訪問(wèn)到,其他線程也就不能訪問(wèn)了,直到該線程對(duì)數(shù)據(jù)完成操作才結(jié)束。
     Event事件是一種實(shí)現(xiàn)方式:通過(guò)內(nèi)部的標(biāo)記看看是不是變化,也就是true or false了,
     將set(),clear(),is_set(),為true,wait(timeout=None)此種設(shè)置true的時(shí)長(zhǎng),等到返回false,不等到超時(shí)返回false,無(wú)線等待為None,

     來(lái)看一個(gè)wait的使用:
from threading import Event, Thread
import logging

logging.basicConfig(level=logging.INFO)

def A(event:Event, interval:int):
    while not event.wait(interval): # 要么true  or false
        logging.info('hello')

e = Event()
Thread(target=A, args=(e, 3)).start()

e.wait(8)
e.set()
print('end--------------')

使用鎖Lock解決數(shù)據(jù)資源在爭(zhēng)搶,從而使資源有效利用。
lock的方法:
acquire(blocking=True,timeout=-1),默認(rèn)阻塞,阻塞設(shè)置超時(shí)時(shí)間,非阻塞,timeout禁止使用,成功獲取鎖,返回True,否則False。
有阻塞就有釋放 ,解開(kāi)鎖,release(),從線程釋放鎖,上鎖的鎖重置為unloced未上鎖調(diào)用,拋出RuntimeError異常。

import threading
from threading import Thread, Lock
import logging
import time

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'

logging.basicConfig(format=FORMAT, level=logging.INFO)


cups = []
lock = Lock()

def worker(count=10):
    logging.info("I'm working for U.")
    flag = False
    while True:
    lock.acquire() # 獲取鎖

    if len(cups) >= count:
        flag = True

    time.sleep(0.0001) 
    if not flag:
        cups.append(1)

    if flag:
        break

logging.info('I finished. cups = {}'.format(len(cups)))

for _ in range(10):
    Thread(target=worker, args=(1000,)).start()

使用鎖的過(guò)程中,總是不經(jīng)意加上鎖,出現(xiàn)死鎖的產(chǎn)生,出現(xiàn)了死鎖,如何解決呢?
使用try finally 將鎖釋放,另一種使用with上下文管理。
鎖的使用場(chǎng)景在于應(yīng)該少用鎖,還要就是如若上鎖,將鎖的使用時(shí)間縮短,避免時(shí)間太長(zhǎng)而出現(xiàn)無(wú)法釋放鎖的結(jié)果。

可重入鎖Lock,


import threading
import time

lock = threading.RLock()
print(lock.acquire())
print('------------')
print(lock.acquire(blocking=False))
print(lock.acquire())
print(lock.acquire(timeout=3.55))
print(lock.acquire(blocking=False))
#print(lock.acquire(blocking=False, timeout=10)) # 異常
lock.release()
lock.release()
lock.release()
lock.release()
print('main thread {}'.format(threading.current_thread().ident))
print("lock in main thread {}".format(lock)) # 注意觀察lock對(duì)象的信息
lock.release()
#lock.release() #多了一次
print('===============')
print()

print(lock.acquire(blocking=False)) # 1次
#threading.Timer(3, lambda x:x.release(), args=(lock,)).start() # 跨線程了,異常
lock.release()
print('~~~~~~~~~~~~~~~~~')
print()

# 測(cè)試多線程
print(lock.acquire())

def sub(l):
    print('{}: {}'.format(threading.current_thread(), l.acquire())) # 阻塞
    print('{}: {}'.format(threading.current_thread(), l.acquire(False)))
    print('lock in sub thread {}'.format(lock))
    l.release()
    print('sub 1')
    l.release()
    print('sub 2')
    # l.release() # 多了一次

threading.Timer(2, sub, args=(lock,)).start() # 傳入同一個(gè)lock對(duì)象
print('++++++++++++++++++++++')
print()

print(lock.acquire())

lock.release()
time.sleep(5)
print('釋放主線程鎖')
lock.release()

使用構(gòu)造方法Condition(lock=None),默認(rèn)是Rloc,
具體方法為;
acquire(*args),獲取鎖
wait(self,timeout=None),等待或超時(shí)
notify(n=1),喚醒線程,沒(méi)有等待就沒(méi)有任何操作,指線程
notify_all(),喚醒所有等待的線程。
Condition主要用于生產(chǎn)者和消費(fèi)者模型中,解決匹配的問(wèn)題。
使用方式:先獲取acquire,使用完了要釋放release,避免死鎖最好使用with上下文;生產(chǎn)者和消費(fèi)者可以使用notify and notify_all。

如下例子:

from threading import Thread, Event
import logging
import random

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

## 此例只是為了演示,不考慮線程安全問(wèn)題

class Dispatcher:
    def __init__(self):
        self.data = None
        self.event = Event() # event只是為了使用方便,與邏輯無(wú)關(guān)

    def produce(self, total):
        for _ in range(total):
            data = random.randint(0,100)
            logging.info(data)
            self.data = data
            self.event.wait(1)
            self.event.set()

    def consume(self):
        while not self.event.is_set():
            data = self.data
            logging.info("recieved {}".format(data))
            self.data = None
            self.event.wait(0.5)

d = Dispatcher()
p = Thread(target=d.produce, args=(10,), name='producer')
c = Thread(target=d.consume, name='consumer')
c.start()
p.start()

這里代碼會(huì)有缺陷:優(yōu)化如下:

from threading import Thread, Event, Condition
import logging
import random

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

## 此例只是為了演示,不考慮線程安全問(wèn)題

class Dispatcher:
    def __init__(self):
        self.data = None
        self.event = Event() # event只是為了使用方便,與邏輯無(wú)關(guān)
        self.cond = Condition()

    def produce(self, total):
        for _ in range(total):
            data = random.randint(0,100)
            with self.cond:
                logging.info(data)
                self.data = data
                self.cond.notify_all()
                self.event.wait(1) # 模擬產(chǎn)生數(shù)據(jù)速度
                self.event.set()

    def consume(self):
        while not self.event.is_set():
            with self.cond:
                self.cond.wait() # 阻塞等通知
                logging.info("received {}".format(self.data))
            self.event.wait(0.5) # 模擬消費(fèi)的速度

d = Dispatcher()
p = Thread(target=d.produce, args=(10,), name='producer')
# 增加消費(fèi)者

for i in range(5):
    c = Thread(target=d.consume, name='consumer-{}'.format(i))
    c.start()
p.start()

Barrier的使用:
方法如下:
Barrier(parties, action=None,
timeout=None),構(gòu)建barrier對(duì)象,timeout未指定的默認(rèn)值;
n_waiting ,當(dāng)前barrier等待的線程數(shù)。;
parties ,需要等待
wait(timeout=None),wait方法設(shè)置超時(shí)并超時(shí)發(fā)送,barrie處于broken狀態(tài)。
而broken的狀態(tài)方法:
broken,打開(kāi)狀態(tài),返回true;
abort(),barrier在broken狀態(tài)中,wait等待的線程會(huì)拋出BrokenBarrierError異常,直到reset恢復(fù)barrier;
reset(),恢復(fù)barrier,重新開(kāi)始攔截。
barrier不做演示:

還有semaphore信號(hào)量,每次acquire時(shí),都會(huì)減一,到0時(shí)的線程再到release后,大于0,恢復(fù)阻塞的線程。
方法:
Semaphore(value=1) 構(gòu)造方法,alue小于0,拋ValueError異常;
acquire(blocking=True, timeout=None) 獲取信號(hào)量,計(jì)數(shù)器減1,獲取成功返回True;
release() 釋放信號(hào)量,計(jì)數(shù)器加1。
使用信號(hào)量處理時(shí),需要注意release超界問(wèn)題,邊界問(wèn)題,其實(shí),在使用中,python有GIL的存在,有的多線程就變成線程安全的,注意一點(diǎn),但實(shí)際上它們并不是線程安全類型。因此我們?cè)谑褂弥幸唧w場(chǎng)景具體分析具體使用。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI