溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Python3中的threading模塊如何管理進(jìn)程并發(fā)操作

發(fā)布時(shí)間:2021-07-02 10:17:16 來源:億速云 閱讀:106 作者:小新 欄目:開發(fā)技術(shù)

這篇文章主要為大家展示了“Python3中的threading模塊如何管理進(jìn)程并發(fā)操作”,內(nèi)容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“Python3中的threading模塊如何管理進(jìn)程并發(fā)操作”這篇文章吧。

1. threading進(jìn)程中管理并發(fā)操作

threading模塊提供了管理多個(gè)線程執(zhí)行的API,允許程序在同一個(gè)進(jìn)程空間并發(fā)的運(yùn)行多個(gè)操作。

1.1 Thread對象

要使用Thread,最簡單的方法就是用一個(gè)目標(biāo)函數(shù)實(shí)例化一個(gè)Thread對象,并調(diào)用start()讓它開始工作。

import threading
def worker():
 """thread worker function"""
 print('Worker')
threads = []
for i in range(5):
 t = threading.Thread(target=worker)
 threads.append(t)
 t.start()

輸出有5行,每一行都是"Worker"。

Python3中的threading模塊如何管理進(jìn)程并發(fā)操作

如果能夠創(chuàng)建一個(gè)線程,并向它傳遞參數(shù)告訴它要完成什么工作,那么這會(huì)很有用。任何類型的對象都可以作為參數(shù)傳遞到線程。下面的例子傳遞了一個(gè)數(shù),線程將打印出這個(gè)數(shù)。

import threading
def worker(num):
 """thread worker function"""
 print('Worker: %s' % num)
threads = []
for i in range(5):
 t = threading.Thread(target=worker, args=(i,))
 threads.append(t)
 t.start()

現(xiàn)在這個(gè)整數(shù)參數(shù)會(huì)包含在各線程打印的消息中。

Python3中的threading模塊如何管理進(jìn)程并發(fā)操作

1.2 確定當(dāng)前線程

使用參數(shù)來標(biāo)識或命名線程很麻煩,也沒有必要。每個(gè)Thread實(shí)例都有一個(gè)帶有默認(rèn)值的名,該默認(rèn)值可以在創(chuàng)建線程時(shí)改變。如果服務(wù)器進(jìn)程中有多個(gè)服務(wù)線程處理不同的操作,那么在這樣的服務(wù)器進(jìn)程中,對線程命名就很有用。

import threading
import time
def worker():
 print(threading.current_thread().getName(), 'Starting')
 time.sleep(0.2)
 print(threading.current_thread().getName(), 'Exiting')
def my_service():
 print(threading.current_thread().getName(), 'Starting')
 time.sleep(0.3)
 print(threading.current_thread().getName(), 'Exiting')
t = threading.Thread(name='my_service', target=my_service)
w = threading.Thread(name='worker', target=worker)
w2 = threading.Thread(target=worker) # use default name
w.start()
w2.start()
t.start()

調(diào)試輸出的每一行中包含有當(dāng)前線程的名。線程名列中有"Thread-1"的行對應(yīng)未命名的線程w2。

Python3中的threading模塊如何管理進(jìn)程并發(fā)操作

大多數(shù)程序并不使用print來進(jìn)行調(diào)試。logging模塊支持將線程名嵌入到各個(gè)日志消息中(使用格式化代碼%(threadName)s)。通過把線程名包含在日志消息中,就能跟蹤這些消息的來源。

import logging
import threading
import time
def worker():
 logging.debug('Starting')
 time.sleep(0.2)
 logging.debug('Exiting')
def my_service():
 logging.debug('Starting')
 time.sleep(0.3)
 logging.debug('Exiting')
logging.basicConfig(
 level=logging.DEBUG,
 format='[%(levelname)s] (%(threadName)-10s) %(message)s',
)
t = threading.Thread(name='my_service', target=my_service)
w = threading.Thread(name='worker', target=worker)
w2 = threading.Thread(target=worker) # use default name
w.start()
w2.start()
t.start()

而且logging是線程安全的,所以來自不同線程的消息在輸出中會(huì)有所區(qū)分。

Python3中的threading模塊如何管理進(jìn)程并發(fā)操作

1.3 守護(hù)與非守護(hù)線程

到目前為止,示例程序都在隱式地等待所有線程完成工作之后才退出。不過,程序有時(shí)會(huì)創(chuàng)建一個(gè)線程作為守護(hù)線程(daemon),這個(gè)線程可以一直運(yùn)行而不阻塞主程序退出。

如果一個(gè)服務(wù)不能很容易地中斷線程,或者即使讓線程工作到一半時(shí)中止也不會(huì)造成數(shù)據(jù)損失或破壞(例如,為一個(gè)服務(wù)監(jiān)控工具生成“心跳”的線程),那么對于這些服務(wù),使用守護(hù)線程就很有用。要標(biāo)志一個(gè)線程為守護(hù)線程,構(gòu)造線程時(shí)便要傳入daemon=True或者要調(diào)用它的setDaemon()方法并提供參數(shù)True。默認(rèn)情況下線程不作為守護(hù)線程。

import threading
import time
import logging
def daemon():
 logging.debug('Starting')
 time.sleep(0.2)
 logging.debug('Exiting')
def non_daemon():
 logging.debug('Starting')
 logging.debug('Exiting')
logging.basicConfig(
 level=logging.DEBUG,
 format='(%(threadName)-10s) %(message)s',
)
d = threading.Thread(name='daemon', target=daemon, daemon=True)
t = threading.Thread(name='non-daemon', target=non_daemon)
d.start()
t.start()

這個(gè)代碼的輸出中不包含守護(hù)線程的“Exiting“消息,因?yàn)樵趶膕leep()調(diào)用喚醒守護(hù)線程之前,所有非守護(hù)線程(包括主線程)已經(jīng)退出。

Python3中的threading模塊如何管理進(jìn)程并發(fā)操作

要等待一個(gè)守護(hù)線程完成工作,需要使用join()方法。

import threading
import time
import logging
def daemon():
 logging.debug('Starting')
 time.sleep(0.2)
 logging.debug('Exiting')
def non_daemon():
 logging.debug('Starting')
 logging.debug('Exiting')
logging.basicConfig(
 level=logging.DEBUG,
 format='(%(threadName)-10s) %(message)s',
)
d = threading.Thread(name='daemon', target=daemon, daemon=True)
t = threading.Thread(name='non-daemon', target=non_daemon)
d.start()
t.start()
d.join()
t.join()

使用join()等待守護(hù)線程退出意味著它有機(jī)會(huì)生成它的"Exiting"消息。

Python3中的threading模塊如何管理進(jìn)程并發(fā)操作

默認(rèn)地,join()會(huì)無限阻塞?;蛘撸€可以傳入一個(gè)浮點(diǎn)值,表示等待線程在多長時(shí)間(秒數(shù))后變?yōu)椴换顒?dòng)。即使線程在這個(gè)時(shí)間段內(nèi)未完成,join()也會(huì)返回。

import threading
import time
import logging
def daemon():
 logging.debug('Starting')
 time.sleep(0.2)
 logging.debug('Exiting')
def non_daemon():
 logging.debug('Starting')
 logging.debug('Exiting')
logging.basicConfig(
 level=logging.DEBUG,
 format='(%(threadName)-10s) %(message)s',
)
d = threading.Thread(name='daemon', target=daemon, daemon=True)
t = threading.Thread(name='non-daemon', target=non_daemon)
d.start()
t.start()
d.join(0.1)
print('d.isAlive()', d.isAlive())
t.join()

由于傳人的超時(shí)時(shí)間小于守護(hù)線程睡眠的時(shí)間,所以join()返回之后這個(gè)線程仍是"活著"。

Python3中的threading模塊如何管理進(jìn)程并發(fā)操作

1.4 枚舉所有線程

沒有必要為所有守護(hù)線程維護(hù)一個(gè)顯示句柄來確保它們在退出主進(jìn)程之前已經(jīng)完成。

enumerate()會(huì)返回活動(dòng) Thread實(shí)例的一個(gè)列表。這個(gè)列表也包括當(dāng)前線程,由于等待當(dāng)前線程終止(join)會(huì)引入一種死鎖情況,所以必須跳過。

import random
import threading
import time
import logging
def worker():
 """thread worker function"""
 pause = random.randint(1, 5) / 10
 logging.debug('sleeping %0.2f', pause)
 time.sleep(pause)
 logging.debug('ending')
logging.basicConfig(
 level=logging.DEBUG,
 format='(%(threadName)-10s) %(message)s',
)
for i in range(3):
 t = threading.Thread(target=worker, daemon=True)
 t.start()
main_thread = threading.main_thread()
for t in threading.enumerate():
 if t is main_thread:
 continue
 logging.debug('joining %s', t.getName())
 t.join()

由于工作線程睡眠的時(shí)間量是隨機(jī)的,所以這個(gè)程序的輸出可能有變化。

Python3中的threading模塊如何管理進(jìn)程并發(fā)操作

1.5 派生線程

開始時(shí),Thread要完成一些基本初始化,然后調(diào)用其run()方法,這會(huì)調(diào)用傳遞到構(gòu)造函數(shù)的目標(biāo)函數(shù)。要?jiǎng)?chuàng)建Thread的一個(gè)子類,需要覆蓋run()來完成所需的工作。

import threading
import logging
class MyThread(threading.Thread):
 def run(self):
 logging.debug('running')
logging.basicConfig(
 level=logging.DEBUG,
 format='(%(threadName)-10s) %(message)s',
)
for i in range(5):
 t = MyThread()
 t.start()

run()的返回值將被忽略。

Python3中的threading模塊如何管理進(jìn)程并發(fā)操作

由于傳遞到Thread構(gòu)造函數(shù)的args和kwargs值保存在私有變量中(這些變量名都有前綴),所以不能很容易地從子類訪問這些值。要向一個(gè)定制的線程類型傳遞參數(shù),需要重新定義構(gòu)造函數(shù),將這些值保存在子類可見的一個(gè)實(shí)例屬性中。

import threading
import logging
class MyThreadWithArgs(threading.Thread):
 def __init__(self, group=None, target=None, name=None,
  args=(), kwargs=None, *, daemon=None):
 super().__init__(group=group, target=target, name=name,
  daemon=daemon)
 self.args = args
 self.kwargs = kwargs
 def run(self):
 logging.debug('running with %s and %s',
  self.args, self.kwargs)
logging.basicConfig(
 level=logging.DEBUG,
 format='(%(threadName)-10s) %(message)s',
)
for i in range(5):
 t = MyThreadWithArgs(args=(i,), kwargs={'a': 'A', 'b': 'B'})
 t.start()

MyThreadwithArgs使用的API與Thread相同,不過類似于其他定制類,這個(gè)類可以輕松地修改構(gòu)造函數(shù)方法,以取得更多參數(shù)或者與線程用途更直接相關(guān)的不同參數(shù)。

Python3中的threading模塊如何管理進(jìn)程并發(fā)操作

1.6 定時(shí)器線程

有時(shí)出于某種原因需要派生Thread,Timer就是這樣一個(gè)例子,Timer也包含在threading中。Timer在一個(gè)延遲之后開始工作,而且可以在這個(gè)延遲期間內(nèi)的任意時(shí)刻被取消。

import threading
import time
import logging
def delayed():
 logging.debug('worker running')
logging.basicConfig(
 level=logging.DEBUG,
 format='(%(threadName)-10s) %(message)s',
)
t1 = threading.Timer(0.3, delayed)
t1.setName('t1')
t2 = threading.Timer(0.3, delayed)
t2.setName('t2')
logging.debug('starting timers')
t1.start()
t2.start()
logging.debug('waiting before canceling %s', t2.getName())
time.sleep(0.2)
logging.debug('canceling %s', t2.getName())
t2.cancel()
logging.debug('done')

這個(gè)例子中,第二個(gè)定時(shí)器永遠(yuǎn)不會(huì)運(yùn)行,看起來第一個(gè)定時(shí)器在主程序的其余部分完成之后還會(huì)運(yùn)行。由于這不是一個(gè)守護(hù)線程,所以在主線程完成時(shí)其會(huì)隱式退出。

Python3中的threading模塊如何管理進(jìn)程并發(fā)操作

1.7 線程間傳送信號

盡管使用多線程的目的是并發(fā)地運(yùn)行單獨(dú)的操作,但有時(shí)也需要在兩個(gè)或多個(gè)線程中同步操作。事件對象是實(shí)現(xiàn)線程間安全通信的一種簡單方法。Event管理一個(gè)內(nèi)部標(biāo)志,調(diào)用者可以用set()和clear()方法控制這個(gè)標(biāo)志。其他線程可以使用wait()暫停,直到這個(gè)標(biāo)志被設(shè)置,可有效地阻塞進(jìn)程直至允許這些線程繼續(xù)。

import logging
import threading
import time
def wait_for_event(e):
 """Wait for the event to be set before doing anything"""
 logging.debug('wait_for_event starting')
 event_is_set = e.wait()
 logging.debug('event set: %s', event_is_set)
def wait_for_event_timeout(e, t):
 """Wait t seconds and then timeout"""
 while not e.is_set():
 logging.debug('wait_for_event_timeout starting')
 event_is_set = e.wait(t)
 logging.debug('event set: %s', event_is_set)
 if event_is_set:
 logging.debug('processing event')
 else:
 logging.debug('doing other work')
logging.basicConfig(
 level=logging.DEBUG,
 format='(%(threadName)-10s) %(message)s',
)
e = threading.Event()
t1 = threading.Thread(
 name='block',
 target=wait_for_event,
 args=(e,),
)
t1.start()
t2 = threading.Thread(
 name='nonblock',
 target=wait_for_event_timeout,
 args=(e, 2),
)
t2.start()
logging.debug('Waiting before calling Event.set()')
time.sleep(0.3)
e.set()
logging.debug('Event is set')

wait()方法取一個(gè)參數(shù),表示等待事件的時(shí)間(秒數(shù)),達(dá)到這個(gè)時(shí)間后就超時(shí)。它會(huì)返回一個(gè)布爾值,指示事件是否已設(shè)置,使調(diào)用者知道wait()為什么返回。可以對事件單獨(dú)地使用is_set()方法而不必?fù)?dān)心阻塞。

在這個(gè)例子中,wait_for_event_timeout()將檢查事件狀態(tài)而不會(huì)無限阻塞。wait_for_event()在wait()調(diào)用的位置阻塞,事件狀態(tài)改變之前它不會(huì)返回。

Python3中的threading模塊如何管理進(jìn)程并發(fā)操作

1.8 控制資源訪問

除了同步線程操作,還有一點(diǎn)很重要,要能夠控制對共享資源的訪問,從而避免破壞或丟失數(shù)據(jù)。Python的內(nèi)置數(shù)據(jù)結(jié)構(gòu)(列表、字典等)是線程安全的,這是Python使用原子字節(jié)碼來管理這些數(shù)據(jù)結(jié)構(gòu)的一個(gè)副作用(更新過程中不會(huì)釋放保護(hù)Python內(nèi)部數(shù)據(jù)結(jié)構(gòu)的全局解釋器鎖GIL(Global Interpreter Lock))。Python中實(shí)現(xiàn)的其他數(shù)據(jù)結(jié)構(gòu)或更簡單的類型(如整數(shù)和浮點(diǎn)數(shù))則沒有這個(gè)保護(hù)。要保證同時(shí)安全地訪問一個(gè)對象,可以使用一個(gè)Lock對象。

import logging
import random
import threading
import time
class Counter:
 def __init__(self, start=0):
 self.lock = threading.Lock()
 self.value = start
 def increment(self):
 logging.debug('Waiting for lock')
 self.lock.acquire()
 try:
 logging.debug('Acquired lock')
 self.value = self.value + 1
 finally:
 self.lock.release()
def worker(c):
 for i in range(2):
 pause = random.random()
 logging.debug('Sleeping %0.02f', pause)
 time.sleep(pause)
 c.increment()
 logging.debug('Done')
logging.basicConfig(
 level=logging.DEBUG,
 format='(%(threadName)-10s) %(message)s',
)
counter = Counter()
for i in range(2):
 t = threading.Thread(target=worker, args=(counter,))
 t.start()
logging.debug('Waiting for worker threads')
main_thread = threading.main_thread()
for t in threading.enumerate():
 if t is not main_thread:
 t.join()
logging.debug('Counter: %d', counter.value)

在這個(gè)例子中,worker()函數(shù)使一個(gè)Counter實(shí)例遞增,這個(gè)實(shí)例管理著一個(gè)Lock,以避免兩個(gè)線程同時(shí)改變其內(nèi)部狀態(tài)。如果沒有使用Lock,就有可能丟失一次對value屬性的修改。

Python3中的threading模塊如何管理進(jìn)程并發(fā)操作

要確定是否有另一個(gè)線程請求這個(gè)鎖而不影響當(dāng)前線程,可以向acquire()的blocking參數(shù)傳入False。在下一個(gè)例子中,worker()想要分別得到3次鎖,并統(tǒng)計(jì)為得到鎖而嘗試的次數(shù)。與此同時(shí),lock_holder()在占有和釋放鎖之間循環(huán),每個(gè)狀態(tài)會(huì)短暫暫停,以模擬負(fù)載情況。

import logging
import threading
import time
def lock_holder(lock):
 logging.debug('Starting')
 while True:
 lock.acquire()
 try:
 logging.debug('Holding')
 time.sleep(0.5)
 finally:
 logging.debug('Not holding')
 lock.release()
 time.sleep(0.5)
def worker(lock):
 logging.debug('Starting')
 num_tries = 0
 num_acquires = 0
 while num_acquires < 3:
 time.sleep(0.5)
 logging.debug('Trying to acquire')
 have_it = lock.acquire(0)
 try:
 num_tries += 1
 if have_it:
 logging.debug('Iteration %d: Acquired',
  num_tries)
 num_acquires += 1
 else:
 logging.debug('Iteration %d: Not acquired',
  num_tries)
 finally:
 if have_it:
 lock.release()
 logging.debug('Done after %d iterations', num_tries)
logging.basicConfig(
 level=logging.DEBUG,
 format='(%(threadName)-10s) %(message)s',
)
lock = threading.Lock()
holder = threading.Thread(
 target=lock_holder,
 args=(lock,),
 name='LockHolder',
 daemon=True,
)
holder.start()
worker = threading.Thread(
 target=worker,
 args=(lock,),
 name='Worker',
)
worker.start()

worker()需要超過3次迭代才能得到3次鎖。

Python3中的threading模塊如何管理進(jìn)程并發(fā)操作

1.8.1 再入鎖

正常的Lock對象不能請求多次,即使是由同一個(gè)線程請求也不例外。如果同一個(gè)調(diào)用鏈中的多個(gè)函數(shù)訪問一個(gè)鎖,則可能會(huì)產(chǎn)生我們不希望的副作用。

import threading
lock = threading.Lock()
print('First try :', lock.acquire())
print('Second try:', lock.acquire(0))

在這里,對第二個(gè)acquire()調(diào)用給定超時(shí)值為0,以避免阻塞,因?yàn)殒i已經(jīng)被第一個(gè)調(diào)用獲得。

Python3中的threading模塊如何管理進(jìn)程并發(fā)操作

如果同一個(gè)線程的不同代碼需要"重新獲得"鎖,那么在這種情況下要使用RLock。

import threading
lock = threading.RLock()
print('First try :', lock.acquire())
print('Second try:', lock.acquire(0))

與前面的例子相比,對代碼唯一的修改就是用RLock替換Lock。

Python3中的threading模塊如何管理進(jìn)程并發(fā)操作

1.8.2 鎖作為上下文管理器

鎖實(shí)現(xiàn)了上下文管理器API,并與with語句兼容。使用with則不再需要顯式地獲得和釋放鎖。

import threading
import logging
def worker_with(lock):
 with lock:
 logging.debug('Lock acquired via with')
def worker_no_with(lock):
 lock.acquire()
 try:
 logging.debug('Lock acquired directly')
 finally:
 lock.release()
logging.basicConfig(
 level=logging.DEBUG,
 format='(%(threadName)-10s) %(message)s',
)
lock = threading.Lock()
w = threading.Thread(target=worker_with, args=(lock,))
nw = threading.Thread(target=worker_no_with, args=(lock,))
w.start()
nw.start()

函數(shù)worker_with()worker_no_with()用等價(jià)的方式管理鎖。

Python3中的threading模塊如何管理進(jìn)程并發(fā)操作

1.9 同步線程

除了使用Event,還可以通過使用一個(gè)Condition對象來同步線程。由于Condition使用了一個(gè)Lock,所以它可以綁定到一個(gè)共享資源,允許多個(gè)線程等待資源更新。在下一個(gè)例子中,consumer()線程要等待設(shè)置了Condition才能繼續(xù)。producer()線程負(fù)責(zé)設(shè)置條件,以及通知其他線程繼續(xù)。

import logging
import threading
import time
def consumer(cond):
 """wait for the condition and use the resource"""
 logging.debug('Starting consumer thread')
 with cond:
 cond.wait()
 logging.debug('Resource is available to consumer')
def producer(cond):
 """set up the resource to be used by the consumer"""
 logging.debug('Starting producer thread')
 with cond:
 logging.debug('Making resource available')
 cond.notifyAll()
logging.basicConfig(
 level=logging.DEBUG,
 format='%(asctime)s (%(threadName)-2s) %(message)s',
)
condition = threading.Condition()
c1 = threading.Thread(name='c1', target=consumer,
  args=(condition,))
c2 = threading.Thread(name='c2', target=consumer,
  args=(condition,))
p = threading.Thread(name='p', target=producer,
  args=(condition,))
c1.start()
time.sleep(0.2)
c2.start()
time.sleep(0.2)
p.start()

這些線程使用with來獲得與Condition關(guān)聯(lián)的鎖。也可以顯式地使用acquire()和release()方法。

Python3中的threading模塊如何管理進(jìn)程并發(fā)操作

屏障(barrier)是另一種線程同步機(jī)制。Barrier會(huì)建立一個(gè)控制點(diǎn),所有參與線程會(huì)在這里阻塞,直到所有這些參與“方”都到達(dá)這一點(diǎn)。采用這種方法,線程可以單獨(dú)啟動(dòng)然后暫停,直到所有線程都準(zhǔn)備好才可以繼續(xù)。

import threading
import time
def worker(barrier):
 print(threading.current_thread().name,
 'waiting for barrier with {} others'.format(
 barrier.n_waiting))
 worker_id = barrier.wait()
 print(threading.current_thread().name, 'after barrier',
 worker_id)
NUM_THREADS = 3
barrier = threading.Barrier(NUM_THREADS)
threads = [
 threading.Thread(
 name='worker-%s' % i,
 target=worker,
 args=(barrier,),
 )
 for i in range(NUM_THREADS)
]
for t in threads:
 print(t.name, 'starting')
 t.start()
 time.sleep(0.1)
for t in threads:
 t.join()

在這個(gè)例子中,Barrier被配置為會(huì)阻塞線程,直到3個(gè)線程都在等待。滿足這個(gè)條件時(shí),所有線程被同時(shí)釋放從而越過這個(gè)控制點(diǎn)。wait()的返回值指示了釋放的參與線程數(shù),可以用來限制一些線程做清理資源等動(dòng)作。

Python3中的threading模塊如何管理進(jìn)程并發(fā)操作

Barrier的abort()方法會(huì)使所有等待線程接收一個(gè)BrokenBarrierError。如果線程在wait()上被阻塞而停止處理,這就允許線程完成清理工作。

import threading
import time
def worker(barrier):
 print(threading.current_thread().name,
 'waiting for barrier with {} others'.format(
 barrier.n_waiting))
 try:
 worker_id = barrier.wait()
 except threading.BrokenBarrierError:
 print(threading.current_thread().name, 'aborting')
 else:
 print(threading.current_thread().name, 'after barrier',
 worker_id)
NUM_THREADS = 3
barrier = threading.Barrier(NUM_THREADS + 1)
threads = [
 threading.Thread(
 name='worker-%s' % i,
 target=worker,
 args=(barrier,),
 )
 for i in range(NUM_THREADS)
]
for t in threads:
 print(t.name, 'starting')
 t.start()
 time.sleep(0.1)
barrier.abort()
for t in threads:
 t.join()

這個(gè)例子將Barrier配置為多加一個(gè)線程,即需要比實(shí)際啟動(dòng)的線程再多一個(gè)參與線程,所以所有線程中的處理都會(huì)阻塞。在被阻塞的各個(gè)線程中,abort()調(diào)用會(huì)產(chǎn)生一個(gè)異常。

Python3中的threading模塊如何管理進(jìn)程并發(fā)操作

1.10 限制資源的并發(fā)訪問

有時(shí)可能需要允許多個(gè)工作線程同時(shí)訪問一個(gè)資源,但要限制總數(shù)。例如,連接池支持同時(shí)連接,但數(shù)目可能是固定的,或者一個(gè)網(wǎng)絡(luò)應(yīng)用可能支持固定數(shù)目的并發(fā)下載。這些連接就可以使用Semaphore來管理。

import logging
import threading
import time
class ActivePool:
 def __init__(self):
 super(ActivePool, self).__init__()
 self.active = []
 self.lock = threading.Lock()
 def makeActive(self, name):
 with self.lock:
 self.active.append(name)
 logging.debug('Running: %s', self.active)
 def makeInactive(self, name):
 with self.lock:
 self.active.remove(name)
 logging.debug('Running: %s', self.active)
def worker(s, pool):
 logging.debug('Waiting to join the pool')
 with s:
 name = threading.current_thread().getName()
 pool.makeActive(name)
 time.sleep(0.1)
 pool.makeInactive(name)
logging.basicConfig(
 level=logging.DEBUG,
 format='%(asctime)s (%(threadName)-2s) %(message)s',
)
pool = ActivePool()
s = threading.Semaphore(2)
for i in range(4):
 t = threading.Thread(
 target=worker,
 name=str(i),
 args=(s, pool),
 )
 t.start()

在這個(gè)例子中,ActivePool類只作為一種便利方法,用來跟蹤某個(gè)給定時(shí)刻哪些線程能夠運(yùn)行。真正的資源池會(huì)為新的活動(dòng)線程分配一個(gè)連接或另外某個(gè)值,并且當(dāng)這個(gè)線程工作完成時(shí)再回收這個(gè)值。在這里,資源池只是用來保存活動(dòng)線程的名,以顯示至少有兩個(gè)線程在并發(fā)運(yùn)行。

Python3中的threading模塊如何管理進(jìn)程并發(fā)操作

1.11 線程特定的數(shù)據(jù)

有些資源需要鎖定以便多個(gè)線程使用,另外一些資源則需要保護(hù),以使它們對并非是這些資源的“所有者”的線程隱藏。local()函數(shù)會(huì)創(chuàng)建一個(gè)對象,它能夠隱藏值,使其在不同線程中無法被看到。

import random
import threading
import logging
def show_value(data):
 try:
 val = data.value
 except AttributeError:
 logging.debug('No value yet')
 else:
 logging.debug('value=%s', val)
def worker(data):
 show_value(data)
 data.value = random.randint(1, 100)
 show_value(data)
logging.basicConfig(
 level=logging.DEBUG,
 format='(%(threadName)-10s) %(message)s',
)
local_data = threading.local()
show_value(local_data)
local_data.value = 1000
show_value(local_data)
for i in range(2):
 t = threading.Thread(target=worker, args=(local_data,))
 t.start()

屬性local_data.value對所有線程都不可見,除非在某個(gè)線程中設(shè)置了這個(gè)屬性,這個(gè)線程才能看到它。

Python3中的threading模塊如何管理進(jìn)程并發(fā)操作

要初始化設(shè)置以使所有線程在開始時(shí)都有相同的值,可以使用一個(gè)子類,并在_init_()中設(shè)置這些屬性。

import random
import threading
import logging
def show_value(data):
 try:
 val = data.value
 except AttributeError:
 logging.debug('No value yet')
 else:
 logging.debug('value=%s', val)
def worker(data):
 show_value(data)
 data.value = random.randint(1, 100)
 show_value(data)
class MyLocal(threading.local):
 def __init__(self, value):
 super().__init__()
 logging.debug('Initializing %r', self)
 self.value = value
logging.basicConfig(
 level=logging.DEBUG,
 format='(%(threadName)-10s) %(message)s',
)
local_data = MyLocal(1000)
show_value(local_data)
for i in range(2):
 t = threading.Thread(target=worker, args=(local_data,))
 t.start()

這會(huì)在相同的對象上調(diào)用_init_()(注意id()值),每個(gè)線程中調(diào)用一次以設(shè)置默認(rèn)值。

Python3中的threading模塊如何管理進(jìn)程并發(fā)操作

以上是“Python3中的threading模塊如何管理進(jìn)程并發(fā)操作”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識,歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI