溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Python線程的示例分析

發(fā)布時(shí)間:2021-08-25 10:36:29 來(lái)源:億速云 閱讀:106 作者:小新 欄目:開(kāi)發(fā)技術(shù)

這篇文章主要為大家展示了“Python線程的示例分析”,內(nèi)容簡(jiǎn)而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“Python線程的示例分析”這篇文章吧。

1. 線程基礎(chǔ)

1.1. 線程狀態(tài)

線程有5種狀態(tài),狀態(tài)轉(zhuǎn)換的過(guò)程如下圖所示:

Python線程的示例分析

thread_stat_simple

1.2. 線程同步(鎖)

多線程的優(yōu)勢(shì)在于可以同時(shí)運(yùn)行多個(gè)任務(wù)(至少感覺(jué)起來(lái)是這樣)。但是當(dāng)線程需要共享數(shù)據(jù)時(shí),可能存在數(shù)據(jù)不同步的問(wèn)題??紤]這樣一種情況:一個(gè)列表里所有元素都是0,線程"set"從后向前把所有元素改成1,而線程"print"負(fù)責(zé)從前往后讀取列表并打印。那么,可能線程"set"開(kāi)始改的時(shí)候,線程"print"便來(lái)打印列表了,輸出就成了一半0一半1,這就是數(shù)據(jù)的不同步。為了避免這種情況,引入了鎖的概念。

鎖有兩種狀態(tài)——鎖定和未鎖定。每當(dāng)一個(gè)線程比如"set"要訪問(wèn)共享數(shù)據(jù)時(shí),必須先獲得鎖定;如果已經(jīng)有別的線程比如"print"獲得鎖定了,那么就讓線程"set"暫停,也就是同步阻塞;等到線程"print"訪問(wèn)完畢,釋放鎖以后,再讓線程"set"繼續(xù)。經(jīng)過(guò)這樣的處理,打印列表時(shí)要么全部輸出0,要么全部輸出1,不會(huì)再出現(xiàn)一半0一半1的尷尬場(chǎng)面。

線程與鎖的交互如下圖所示:

Python線程的示例分析

thread_lock

1.3. 線程通信(條件變量)

然而還有另外一種尷尬的情況:列表并不是一開(kāi)始就有的;而是通過(guò)線程"create"創(chuàng)建的。如果"set"或者"print" 在"create"還沒(méi)有運(yùn)行的時(shí)候就訪問(wèn)列表,將會(huì)出現(xiàn)一個(gè)異常。使用鎖可以解決這個(gè)問(wèn)題,但是"set"和"print"將需要一個(gè)無(wú)限循環(huán)——他們不知道"create"什么時(shí)候會(huì)運(yùn)行,讓"create"在運(yùn)行后通知"set"和"print"顯然是一個(gè)更好的解決方案。于是,引入了條件變量。

條件變量允許線程比如"set"和"print"在條件不滿足的時(shí)候(列表為None時(shí))等待,等到條件滿足的時(shí)候(列表已經(jīng)創(chuàng)建)發(fā)出一個(gè)通知,告訴"set" 和"print"條件已經(jīng)有了,你們?cè)撈鸫哺苫盍耍蝗缓?quot;set"和"print"才繼續(xù)運(yùn)行。

線程與條件變量的交互如下圖所示:

Python線程的示例分析

thread_condition_wait

thread_condition_notify

1.4. 線程運(yùn)行和阻塞的狀態(tài)轉(zhuǎn)換

最后看看線程運(yùn)行和阻塞狀態(tài)的轉(zhuǎn)換。

Python線程的示例分析

thread_stat

阻塞有三種情況:

同步阻塞是指處于競(jìng)爭(zhēng)鎖定的狀態(tài),線程請(qǐng)求鎖定時(shí)將進(jìn)入這個(gè)狀態(tài),一旦成功獲得鎖定又恢復(fù)到運(yùn)行狀態(tài);

等待阻塞是指等待其他線程通知的狀態(tài),線程獲得條件鎖定后,調(diào)用“等待”將進(jìn)入這個(gè)狀態(tài),一旦其他線程發(fā)出通知,線程將進(jìn)入同步阻塞狀態(tài),再次競(jìng)爭(zhēng)條件鎖定;

而其他阻塞是指調(diào)用time.sleep()、anotherthread.join()或等待IO時(shí)的阻塞,這個(gè)狀態(tài)下線程不會(huì)釋放已獲得的鎖定。

tips: 如果能理解這些內(nèi)容,接下來(lái)的主題將是非常輕松的;并且,這些內(nèi)容在大部分流行的編程語(yǔ)言里都是一樣的。(意思就是非看懂不可 >_< 嫌作者水平低找別人的教程也要看懂)

2. thread

Python通過(guò)兩個(gè)標(biāo)準(zhǔn)庫(kù)thread和threading提供對(duì)線程的支持。thread提供了低級(jí)別的、原始的線程以及一個(gè)簡(jiǎn)單的鎖。

# encoding: UTF-8
import thread
import time
 
# 一個(gè)用于在線程中執(zhí)行的函數(shù)
def func():
  for i in range(5):
    print 'func'
    time.sleep(1)
  
  # 結(jié)束當(dāng)前線程
  # 這個(gè)方法與thread.exit_thread()等價(jià)
  thread.exit() # 當(dāng)func返回時(shí),線程同樣會(huì)結(jié)束
    
# 啟動(dòng)一個(gè)線程,線程立即開(kāi)始運(yùn)行
# 這個(gè)方法與thread.start_new_thread()等價(jià)
# 第一個(gè)參數(shù)是方法,第二個(gè)參數(shù)是方法的參數(shù)
thread.start_new(func, ()) # 方法沒(méi)有參數(shù)時(shí)需要傳入空tuple
 
# 創(chuàng)建一個(gè)鎖(LockType,不能直接實(shí)例化)
# 這個(gè)方法與thread.allocate_lock()等價(jià)
lock = thread.allocate()
 
# 判斷鎖是鎖定狀態(tài)還是釋放狀態(tài)
print lock.locked()
 
# 鎖通常用于控制對(duì)共享資源的訪問(wèn)
count = 0
 
# 獲得鎖,成功獲得鎖定后返回True
# 可選的timeout參數(shù)不填時(shí)將一直阻塞直到獲得鎖定
# 否則超時(shí)后將返回False
if lock.acquire():
  count += 1
  
  # 釋放鎖
  lock.release()
 
# thread模塊提供的線程都將在主線程結(jié)束后同時(shí)結(jié)束


time.sleep(6)

thread 模塊提供的其他方法:

thread.interrupt_main(): 在其他線程中終止主線程。

thread.get_ident(): 獲得一個(gè)代表當(dāng)前線程的魔法數(shù)字,常用于從一個(gè)字典中獲得線程相關(guān)的數(shù)據(jù)。這個(gè)數(shù)字本身沒(méi)有任何含義,并且當(dāng)線程結(jié)束后會(huì)被新線程復(fù)用。

thread還提供了一個(gè)ThreadLocal類用于管理線程相關(guān)的數(shù)據(jù),名為 thread._local,threading中引用了這個(gè)類。

由于thread提供的線程功能不多,無(wú)法在主線程結(jié)束后繼續(xù)運(yùn)行,不提供條件變量等等原因,一般不使用thread模塊,這里就不多介紹了。

3. threading

threading基于Java的線程模型設(shè)計(jì)。鎖(Lock)和條件變量(Condition)在Java中是對(duì)象的基本行為(每一個(gè)對(duì)象都自帶了鎖和條件變量),而在Python中則是獨(dú)立的對(duì)象。Python Thread提供了Java Thread的行為的子集;沒(méi)有優(yōu)先級(jí)、線程組,線程也不能被停止、暫停、恢復(fù)、中斷。Java Thread中的部分被Python實(shí)現(xiàn)了的靜態(tài)方法在threading中以模塊方法的形式提供。

threading 模塊提供的常用方法:

threading.currentThread(): 返回當(dāng)前的線程變量。

threading.enumerate(): 返回一個(gè)包含正在運(yùn)行的線程的list。正在運(yùn)行指線程啟動(dòng)后、結(jié)束前,不包括啟動(dòng)前和終止后的線程。

threading.activeCount(): 返回正在運(yùn)行的線程數(shù)量,與len(threading.enumerate())有相同的結(jié)果。

threading模塊提供的類:

Thread, Lock, Rlock, Condition, [Bounded]Semaphore, Event, Timer, local.

3.1. Thread

Thread是線程類,與Java類似,有兩種使用方法,直接傳入要運(yùn)行的方法或從Thread繼承并覆蓋run():

# encoding: UTF-8
import threading
 
# 方法1:將要執(zhí)行的方法作為參數(shù)傳給Thread的構(gòu)造方法
def func():
  print 'func() passed to Thread'
 
t = threading.Thread(target=func)
t.start()
 
# 方法2:從Thread繼承,并重寫(xiě)run()
class MyThread(threading.Thread):
  def run(self):
    print 'MyThread extended from Thread'
 
t = MyThread()
t.start()

構(gòu)造方法:

Thread(group=None, target=None, name=None, args=(), kwargs={})

group: 線程組,目前還沒(méi)有實(shí)現(xiàn),庫(kù)引用中提示必須是None;

target: 要執(zhí)行的方法;

name: 線程名;

args/kwargs: 要傳入方法的參數(shù)。

實(shí)例方法:

isAlive(): 返回線程是否在運(yùn)行。正在運(yùn)行指啟動(dòng)后、終止前。

get/setName(name): 獲取/設(shè)置線程名。

is/setDaemon(bool): 獲取/設(shè)置是否守護(hù)線程。初始值從創(chuàng)建該線程的線程繼承。當(dāng)沒(méi)有非守護(hù)線程仍在運(yùn)行時(shí),程序?qū)⒔K止。

start(): 啟動(dòng)線程。

join([timeout]): 阻塞當(dāng)前上下文環(huán)境的線程,直到調(diào)用此方法的線程終止或到達(dá)指定的timeout(可選參數(shù))。

一個(gè)使用join()的例子:

# encoding: UTF-8
import threading
import time
 
def context(tJoin):
  print 'in threadContext.'
  tJoin.start()
  
  # 將阻塞tContext直到threadJoin終止。
  tJoin.join()
  
  # tJoin終止后繼續(xù)執(zhí)行。
  print 'out threadContext.'
 
def join():
  print 'in threadJoin.'
  time.sleep(1)
  print 'out threadJoin.'
 
tJoin = threading.Thread(target=join)
tContext = threading.Thread(target=context, args=(tJoin,))
 
tContext.start()

運(yùn)行結(jié)果:

in threadContext. 
in threadJoin. 
out threadJoin. 
out threadContext.

3.2. Lock

Lock(指令鎖)是可用的最低級(jí)的同步指令。Lock處于鎖定狀態(tài)時(shí),不被特定的線程擁有。Lock包含兩種狀態(tài)——鎖定和非鎖定,以及兩個(gè)基本的方法。

可以認(rèn)為L(zhǎng)ock有一個(gè)鎖定池,當(dāng)線程請(qǐng)求鎖定時(shí),將線程至于池中,直到獲得鎖定后出池。池中的線程處于狀態(tài)圖中的同步阻塞狀態(tài)。

構(gòu)造方法:

Lock()

實(shí)例方法:

acquire([timeout]): 使線程進(jìn)入同步阻塞狀態(tài),嘗試獲得鎖定。

release(): 釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。

# encoding: UTF-8
import threading
import time
 
data = 0
lock = threading.Lock()
 
def func():
  global data
  print '%s acquire lock...' % threading.currentThread().getName()
  
  # 調(diào)用acquire([timeout])時(shí),線程將一直阻塞,
  # 直到獲得鎖定或者直到timeout秒后(timeout參數(shù)可選)。
  # 返回是否獲得鎖。
  if lock.acquire():
    print '%s get the lock.' % threading.currentThread().getName()
    data += 1
    time.sleep(2)
    print '%s release lock...' % threading.currentThread().getName()
    
    # 調(diào)用release()將釋放鎖。
    lock.release()
 
t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t3 = threading.Thread(target=func)
t1.start()
t2.start()
t3.start()

3.3. RLock

RLock(可重入鎖)是一個(gè)可以被同一個(gè)線程請(qǐng)求多次的同步指令。RLock使用了“擁有的線程”和“遞歸等級(jí)”的概念,處于鎖定狀態(tài)時(shí),RLock被某個(gè)線程擁有。擁有RLock的線程可以再次調(diào)用acquire(),釋放鎖時(shí)需要調(diào)用release()相同次數(shù)。

可以認(rèn)為RLock包含一個(gè)鎖定池和一個(gè)初始值為0的計(jì)數(shù)器,每次成功調(diào)用 acquire()/release(),計(jì)數(shù)器將+1/-1,為0時(shí)鎖處于未鎖定狀態(tài)。

構(gòu)造方法:

RLock()

實(shí)例方法:

acquire([timeout])/release(): 跟Lock差不多。

# encoding: UTF-8
import threading
import time
 
rlock = threading.RLock()
 
def func():
  # 第一次請(qǐng)求鎖定
  print '%s acquire lock...' % threading.currentThread().getName()
  if rlock.acquire():
    print '%s get the lock.' % threading.currentThread().getName()
    time.sleep(2)
    
    # 第二次請(qǐng)求鎖定
    print '%s acquire lock again...' % threading.currentThread().getName()
    if rlock.acquire():
      print '%s get the lock.' % threading.currentThread().getName()
      time.sleep(2)
    
    # 第一次釋放鎖
    print '%s release lock...' % threading.currentThread().getName()
    rlock.release()
    time.sleep(2)
    
    # 第二次釋放鎖
    print '%s release lock...' % threading.currentThread().getName()
    rlock.release()
 
t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t3 = threading.Thread(target=func)
t1.start()
t2.start()
t3.start()

3.4. Condition

Condition(條件變量)通常與一個(gè)鎖關(guān)聯(lián)。需要在多個(gè)Contidion中共享一個(gè)鎖時(shí),可以傳遞一個(gè)Lock/RLock實(shí)例給構(gòu)造方法,否則它將自己生成一個(gè)RLock實(shí)例。

可以認(rèn)為,除了Lock帶有的鎖定池外,Condition還包含一個(gè)等待池,池中的線程處于狀態(tài)圖中的等待阻塞狀態(tài),直到另一個(gè)線程調(diào)用notify()/notifyAll()通知;得到通知后線程進(jìn)入鎖定池等待鎖定。

構(gòu)造方法:

Condition([lock/rlock])

實(shí)例方法:

acquire([timeout])/release(): 調(diào)用關(guān)聯(lián)的鎖的相應(yīng)方法。

wait([timeout]): 調(diào)用這個(gè)方法將使線程進(jìn)入Condition的等待池等待通知,并釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。

notify(): 調(diào)用這個(gè)方法將從等待池挑選一個(gè)線程并通知,收到通知的線程將自動(dòng)調(diào)用acquire()嘗試獲得鎖定(進(jìn)入鎖定池);其他線程仍然在等待池中。調(diào)用這個(gè)方法不會(huì)釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。

notifyAll(): 調(diào)用這個(gè)方法將通知等待池中所有的線程,這些線程都將進(jìn)入鎖定池嘗試獲得鎖定。調(diào)用這個(gè)方法不會(huì)釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。

例子是很常見(jiàn)的生產(chǎn)者/消費(fèi)者模式:

# encoding: UTF-8
import threading
import time
 
# 商品
product = None
# 條件變量
con = threading.Condition()
 
# 生產(chǎn)者方法
def produce():
  global product
  
  if con.acquire():
    while True:
      if product is None:
        print 'produce...'
        product = 'anything'
        
        # 通知消費(fèi)者,商品已經(jīng)生產(chǎn)
        con.notify()
      
      # 等待通知
      con.wait()
      time.sleep(2)
 
# 消費(fèi)者方法
def consume():
  global product
  
  if con.acquire():
    while True:
      if product is not None:
        print 'consume...'
        product = None
        
        # 通知生產(chǎn)者,商品已經(jīng)沒(méi)了
        con.notify()
      
      # 等待通知
      con.wait()
      time.sleep(2)
 
t1 = threading.Thread(target=produce)
t2 = threading.Thread(target=consume)
t2.start()
t1.start()

3.5. Semaphore/BoundedSemaphore

Semaphore(信號(hào)量)是計(jì)算機(jī)科學(xué)史上最古老的同步指令之一。Semaphore管理一個(gè)內(nèi)置的計(jì)數(shù)器,每當(dāng)調(diào)用acquire()時(shí)-1,調(diào)用release() 時(shí)+1。計(jì)數(shù)器不能小于0;當(dāng)計(jì)數(shù)器為0時(shí),acquire()將阻塞線程至同步鎖定狀態(tài),直到其他線程調(diào)用release()。

基于這個(gè)特點(diǎn),Semaphore經(jīng)常用來(lái)同步一些有“訪客上限”的對(duì)象,比如連接池。

BoundedSemaphore 與Semaphore的唯一區(qū)別在于前者將在調(diào)用release()時(shí)檢查計(jì)數(shù)器的值是否超過(guò)了計(jì)數(shù)器的初始值,如果超過(guò)了將拋出一個(gè)異常。

構(gòu)造方法:

Semaphore(value=1): value是計(jì)數(shù)器的初始值。

實(shí)例方法:

acquire([timeout]): 請(qǐng)求Semaphore。如果計(jì)數(shù)器為0,將阻塞線程至同步阻塞狀態(tài);否則將計(jì)數(shù)器-1并立即返回。

release(): 釋放Semaphore,將計(jì)數(shù)器+1,如果使用BoundedSemaphore,還將進(jìn)行釋放次數(shù)檢查。release()方法不檢查線程是否已獲得 Semaphore。

# encoding: UTF-8
import threading
import time
 
# 計(jì)數(shù)器初值為2
semaphore = threading.Semaphore(2)
 
def func():
  
  # 請(qǐng)求Semaphore,成功后計(jì)數(shù)器-1;計(jì)數(shù)器為0時(shí)阻塞
  print '%s acquire semaphore...' % threading.currentThread().getName()
  if semaphore.acquire():
    
    print '%s get semaphore' % threading.currentThread().getName()
    time.sleep(4)
    
    # 釋放Semaphore,計(jì)數(shù)器+1
    print '%s release semaphore' % threading.currentThread().getName()
    semaphore.release()
 
t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t3 = threading.Thread(target=func)
t4 = threading.Thread(target=func)
t1.start()
t2.start()
t3.start()
t4.start()
 
time.sleep(2)
 
# 沒(méi)有獲得semaphore的主線程也可以調(diào)用release
# 若使用BoundedSemaphore,t4釋放semaphore時(shí)將拋出異常
print 'MainThread release semaphore without acquire'
semaphore.release()

3.6. Event

Event(事件)是最簡(jiǎn)單的線程通信機(jī)制之一:一個(gè)線程通知事件,其他線程等待事件。Event內(nèi)置了一個(gè)初始為False的標(biāo)志,當(dāng)調(diào)用set()時(shí)設(shè)為T(mén)rue,調(diào)用clear()時(shí)重置為 False。wait()將阻塞線程至等待阻塞狀態(tài)。

Event其實(shí)就是一個(gè)簡(jiǎn)化版的 Condition。Event沒(méi)有鎖,無(wú)法使線程進(jìn)入同步阻塞狀態(tài)。

構(gòu)造方法:

Event()

實(shí)例方法:

isSet(): 當(dāng)內(nèi)置標(biāo)志為T(mén)rue時(shí)返回True。

set(): 將標(biāo)志設(shè)為T(mén)rue,并通知所有處于等待阻塞狀態(tài)的線程恢復(fù)運(yùn)行狀態(tài)。

clear(): 將標(biāo)志設(shè)為False。

wait([timeout]): 如果標(biāo)志為T(mén)rue將立即返回,否則阻塞線程至等待阻塞狀態(tài),等待其他線程調(diào)用set()。

# encoding: UTF-8
import threading
import time
 
event = threading.Event()
 
def func():
  # 等待事件,進(jìn)入等待阻塞狀態(tài)
  print '%s wait for event...' % threading.currentThread().getName()
  event.wait()
  
  # 收到事件后進(jìn)入運(yùn)行狀態(tài)
  print '%s recv event.' % threading.currentThread().getName()
 
t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t1.start()
t2.start()
 
time.sleep(2)
 
# 發(fā)送事件通知
print 'MainThread set event.'
event.set()

3.7. Timer

Timer(定時(shí)器)是Thread的派生類,用于在指定時(shí)間后調(diào)用一個(gè)方法。

構(gòu)造方法:

Timer(interval, function, args=[], kwargs={})

interval: 指定的時(shí)間

function: 要執(zhí)行的方法

args/kwargs: 方法的參數(shù)

實(shí)例方法:

Timer從Thread派生,沒(méi)有增加實(shí)例方法。

# encoding: UTF-8
import threading
 
def func():
  print 'hello timer!'
 
timer = threading.Timer(5, func)
timer.start()

3.8. local

local是一個(gè)小寫(xiě)字母開(kāi)頭的類,用于管理 thread-local(線程局部的)數(shù)據(jù)。對(duì)于同一個(gè)local,線程無(wú)法訪問(wèn)其他線程設(shè)置的屬性;線程設(shè)置的屬性不會(huì)被其他線程設(shè)置的同名屬性替換。

可以把local看成是一個(gè)“線程-屬性字典”的字典,local封裝了從自身使用線程作為 key檢索對(duì)應(yīng)的屬性字典、再使用屬性名作為key檢索屬性值的細(xì)節(jié)。

# encoding: UTF-8
import threading
 
local = threading.local()
local.tname = 'main'
 
def func():
  local.tname = 'notmain'
  print local.tname
 
t1 = threading.Thread(target=func)
t1.start()
t1.join()
 
print local.tname

熟練掌握Thread、Lock、Condition就可以應(yīng)對(duì)絕大多數(shù)需要使用線程的場(chǎng)合,某些情況下local也是非常有用的東西。本文的最后使用這幾個(gè)類展示線程基礎(chǔ)中提到的場(chǎng)景:

# encoding: UTF-8
import threading
 
alist = None
condition = threading.Condition()
 
def doSet():
  if condition.acquire():
    while alist is None:
      condition.wait()
    for i in range(len(alist))[::-1]:
      alist[i] = 1
    condition.release()
 
def doPrint():
  if condition.acquire():
    while alist is None:
      condition.wait()
    for i in alist:
      print i,
    print
    condition.release()
 
def doCreate():
  global alist
  if condition.acquire():
    if alist is None:
      alist = [0 for i in range(10)]
      condition.notifyAll()
    condition.release()
 
tset = threading.Thread(target=doSet,name='tset')
tprint = threading.Thread(target=doPrint,name='tprint')
tcreate = threading.Thread(target=doCreate,name='tcreate')
tset.start()
tprint.start()
tcreate.start()

以上是“Python線程的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI