您好,登錄后才能下訂單哦!
這篇文章主要為大家展示了“Python線程的示例分析”,內(nèi)容簡(jiǎn)而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“Python線程的示例分析”這篇文章吧。
1. 線程基礎(chǔ)
1.1. 線程狀態(tài)
線程有5種狀態(tài),狀態(tài)轉(zhuǎn)換的過(guò)程如下圖所示:
thread_stat_simple
1.2. 線程同步(鎖)
多線程的優(yōu)勢(shì)在于可以同時(shí)運(yùn)行多個(gè)任務(wù)(至少感覺(jué)起來(lái)是這樣)。但是當(dāng)線程需要共享數(shù)據(jù)時(shí),可能存在數(shù)據(jù)不同步的問(wèn)題??紤]這樣一種情況:一個(gè)列表里所有元素都是0,線程"set"從后向前把所有元素改成1,而線程"print"負(fù)責(zé)從前往后讀取列表并打印。那么,可能線程"set"開(kāi)始改的時(shí)候,線程"print"便來(lái)打印列表了,輸出就成了一半0一半1,這就是數(shù)據(jù)的不同步。為了避免這種情況,引入了鎖的概念。
鎖有兩種狀態(tài)——鎖定和未鎖定。每當(dāng)一個(gè)線程比如"set"要訪問(wèn)共享數(shù)據(jù)時(shí),必須先獲得鎖定;如果已經(jīng)有別的線程比如"print"獲得鎖定了,那么就讓線程"set"暫停,也就是同步阻塞;等到線程"print"訪問(wèn)完畢,釋放鎖以后,再讓線程"set"繼續(xù)。經(jīng)過(guò)這樣的處理,打印列表時(shí)要么全部輸出0,要么全部輸出1,不會(huì)再出現(xiàn)一半0一半1的尷尬場(chǎng)面。
線程與鎖的交互如下圖所示:
thread_lock
1.3. 線程通信(條件變量)
然而還有另外一種尷尬的情況:列表并不是一開(kāi)始就有的;而是通過(guò)線程"create"創(chuàng)建的。如果"set"或者"print" 在"create"還沒(méi)有運(yùn)行的時(shí)候就訪問(wèn)列表,將會(huì)出現(xiàn)一個(gè)異常。使用鎖可以解決這個(gè)問(wèn)題,但是"set"和"print"將需要一個(gè)無(wú)限循環(huán)——他們不知道"create"什么時(shí)候會(huì)運(yùn)行,讓"create"在運(yùn)行后通知"set"和"print"顯然是一個(gè)更好的解決方案。于是,引入了條件變量。
條件變量允許線程比如"set"和"print"在條件不滿足的時(shí)候(列表為None時(shí))等待,等到條件滿足的時(shí)候(列表已經(jīng)創(chuàng)建)發(fā)出一個(gè)通知,告訴"set" 和"print"條件已經(jīng)有了,你們?cè)撈鸫哺苫盍耍蝗缓?quot;set"和"print"才繼續(xù)運(yùn)行。
線程與條件變量的交互如下圖所示:
thread_condition_wait
thread_condition_notify
1.4. 線程運(yùn)行和阻塞的狀態(tài)轉(zhuǎn)換
最后看看線程運(yùn)行和阻塞狀態(tài)的轉(zhuǎn)換。
thread_stat
阻塞有三種情況:
同步阻塞是指處于競(jìng)爭(zhēng)鎖定的狀態(tài),線程請(qǐng)求鎖定時(shí)將進(jìn)入這個(gè)狀態(tài),一旦成功獲得鎖定又恢復(fù)到運(yùn)行狀態(tài);
等待阻塞是指等待其他線程通知的狀態(tài),線程獲得條件鎖定后,調(diào)用“等待”將進(jìn)入這個(gè)狀態(tài),一旦其他線程發(fā)出通知,線程將進(jìn)入同步阻塞狀態(tài),再次競(jìng)爭(zhēng)條件鎖定;
而其他阻塞是指調(diào)用time.sleep()、anotherthread.join()或等待IO時(shí)的阻塞,這個(gè)狀態(tài)下線程不會(huì)釋放已獲得的鎖定。
tips: 如果能理解這些內(nèi)容,接下來(lái)的主題將是非常輕松的;并且,這些內(nèi)容在大部分流行的編程語(yǔ)言里都是一樣的。(意思就是非看懂不可 >_< 嫌作者水平低找別人的教程也要看懂)
2. thread
Python通過(guò)兩個(gè)標(biāo)準(zhǔn)庫(kù)thread和threading提供對(duì)線程的支持。thread提供了低級(jí)別的、原始的線程以及一個(gè)簡(jiǎn)單的鎖。
# encoding: UTF-8 import thread import time # 一個(gè)用于在線程中執(zhí)行的函數(shù) def func(): for i in range(5): print 'func' time.sleep(1) # 結(jié)束當(dāng)前線程 # 這個(gè)方法與thread.exit_thread()等價(jià) thread.exit() # 當(dāng)func返回時(shí),線程同樣會(huì)結(jié)束 # 啟動(dòng)一個(gè)線程,線程立即開(kāi)始運(yùn)行 # 這個(gè)方法與thread.start_new_thread()等價(jià) # 第一個(gè)參數(shù)是方法,第二個(gè)參數(shù)是方法的參數(shù) thread.start_new(func, ()) # 方法沒(méi)有參數(shù)時(shí)需要傳入空tuple # 創(chuàng)建一個(gè)鎖(LockType,不能直接實(shí)例化) # 這個(gè)方法與thread.allocate_lock()等價(jià) lock = thread.allocate() # 判斷鎖是鎖定狀態(tài)還是釋放狀態(tài) print lock.locked() # 鎖通常用于控制對(duì)共享資源的訪問(wèn) count = 0 # 獲得鎖,成功獲得鎖定后返回True # 可選的timeout參數(shù)不填時(shí)將一直阻塞直到獲得鎖定 # 否則超時(shí)后將返回False if lock.acquire(): count += 1 # 釋放鎖 lock.release() # thread模塊提供的線程都將在主線程結(jié)束后同時(shí)結(jié)束 time.sleep(6)
thread 模塊提供的其他方法:
thread.interrupt_main(): 在其他線程中終止主線程。
thread.get_ident(): 獲得一個(gè)代表當(dāng)前線程的魔法數(shù)字,常用于從一個(gè)字典中獲得線程相關(guān)的數(shù)據(jù)。這個(gè)數(shù)字本身沒(méi)有任何含義,并且當(dāng)線程結(jié)束后會(huì)被新線程復(fù)用。
thread還提供了一個(gè)ThreadLocal類用于管理線程相關(guān)的數(shù)據(jù),名為 thread._local,threading中引用了這個(gè)類。
由于thread提供的線程功能不多,無(wú)法在主線程結(jié)束后繼續(xù)運(yùn)行,不提供條件變量等等原因,一般不使用thread模塊,這里就不多介紹了。
3. threading
threading基于Java的線程模型設(shè)計(jì)。鎖(Lock)和條件變量(Condition)在Java中是對(duì)象的基本行為(每一個(gè)對(duì)象都自帶了鎖和條件變量),而在Python中則是獨(dú)立的對(duì)象。Python Thread提供了Java Thread的行為的子集;沒(méi)有優(yōu)先級(jí)、線程組,線程也不能被停止、暫停、恢復(fù)、中斷。Java Thread中的部分被Python實(shí)現(xiàn)了的靜態(tài)方法在threading中以模塊方法的形式提供。
threading 模塊提供的常用方法:
threading.currentThread(): 返回當(dāng)前的線程變量。
threading.enumerate(): 返回一個(gè)包含正在運(yùn)行的線程的list。正在運(yùn)行指線程啟動(dòng)后、結(jié)束前,不包括啟動(dòng)前和終止后的線程。
threading.activeCount(): 返回正在運(yùn)行的線程數(shù)量,與len(threading.enumerate())有相同的結(jié)果。
threading模塊提供的類:
Thread, Lock, Rlock, Condition, [Bounded]Semaphore, Event, Timer, local.
3.1. Thread
Thread是線程類,與Java類似,有兩種使用方法,直接傳入要運(yùn)行的方法或從Thread繼承并覆蓋run():
# encoding: UTF-8 import threading # 方法1:將要執(zhí)行的方法作為參數(shù)傳給Thread的構(gòu)造方法 def func(): print 'func() passed to Thread' t = threading.Thread(target=func) t.start() # 方法2:從Thread繼承,并重寫(xiě)run() class MyThread(threading.Thread): def run(self): print 'MyThread extended from Thread' t = MyThread() t.start()
構(gòu)造方法:
Thread(group=None, target=None, name=None, args=(), kwargs={})
group: 線程組,目前還沒(méi)有實(shí)現(xiàn),庫(kù)引用中提示必須是None;
target: 要執(zhí)行的方法;
name: 線程名;
args/kwargs: 要傳入方法的參數(shù)。
實(shí)例方法:
isAlive(): 返回線程是否在運(yùn)行。正在運(yùn)行指啟動(dòng)后、終止前。
get/setName(name): 獲取/設(shè)置線程名。
is/setDaemon(bool): 獲取/設(shè)置是否守護(hù)線程。初始值從創(chuàng)建該線程的線程繼承。當(dāng)沒(méi)有非守護(hù)線程仍在運(yùn)行時(shí),程序?qū)⒔K止。
start(): 啟動(dòng)線程。
join([timeout]): 阻塞當(dāng)前上下文環(huán)境的線程,直到調(diào)用此方法的線程終止或到達(dá)指定的timeout(可選參數(shù))。
一個(gè)使用join()的例子:
# encoding: UTF-8 import threading import time def context(tJoin): print 'in threadContext.' tJoin.start() # 將阻塞tContext直到threadJoin終止。 tJoin.join() # tJoin終止后繼續(xù)執(zhí)行。 print 'out threadContext.' def join(): print 'in threadJoin.' time.sleep(1) print 'out threadJoin.' tJoin = threading.Thread(target=join) tContext = threading.Thread(target=context, args=(tJoin,)) tContext.start()
運(yùn)行結(jié)果:
in threadContext. in threadJoin. out threadJoin. out threadContext.
3.2. Lock
Lock(指令鎖)是可用的最低級(jí)的同步指令。Lock處于鎖定狀態(tài)時(shí),不被特定的線程擁有。Lock包含兩種狀態(tài)——鎖定和非鎖定,以及兩個(gè)基本的方法。
可以認(rèn)為L(zhǎng)ock有一個(gè)鎖定池,當(dāng)線程請(qǐng)求鎖定時(shí),將線程至于池中,直到獲得鎖定后出池。池中的線程處于狀態(tài)圖中的同步阻塞狀態(tài)。
構(gòu)造方法:
Lock()
實(shí)例方法:
acquire([timeout]): 使線程進(jìn)入同步阻塞狀態(tài),嘗試獲得鎖定。
release(): 釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。
# encoding: UTF-8 import threading import time data = 0 lock = threading.Lock() def func(): global data print '%s acquire lock...' % threading.currentThread().getName() # 調(diào)用acquire([timeout])時(shí),線程將一直阻塞, # 直到獲得鎖定或者直到timeout秒后(timeout參數(shù)可選)。 # 返回是否獲得鎖。 if lock.acquire(): print '%s get the lock.' % threading.currentThread().getName() data += 1 time.sleep(2) print '%s release lock...' % threading.currentThread().getName() # 調(diào)用release()將釋放鎖。 lock.release() t1 = threading.Thread(target=func) t2 = threading.Thread(target=func) t3 = threading.Thread(target=func) t1.start() t2.start() t3.start()
3.3. RLock
RLock(可重入鎖)是一個(gè)可以被同一個(gè)線程請(qǐng)求多次的同步指令。RLock使用了“擁有的線程”和“遞歸等級(jí)”的概念,處于鎖定狀態(tài)時(shí),RLock被某個(gè)線程擁有。擁有RLock的線程可以再次調(diào)用acquire(),釋放鎖時(shí)需要調(diào)用release()相同次數(shù)。
可以認(rèn)為RLock包含一個(gè)鎖定池和一個(gè)初始值為0的計(jì)數(shù)器,每次成功調(diào)用 acquire()/release(),計(jì)數(shù)器將+1/-1,為0時(shí)鎖處于未鎖定狀態(tài)。
構(gòu)造方法:
RLock()
實(shí)例方法:
acquire([timeout])/release(): 跟Lock差不多。
# encoding: UTF-8 import threading import time rlock = threading.RLock() def func(): # 第一次請(qǐng)求鎖定 print '%s acquire lock...' % threading.currentThread().getName() if rlock.acquire(): print '%s get the lock.' % threading.currentThread().getName() time.sleep(2) # 第二次請(qǐng)求鎖定 print '%s acquire lock again...' % threading.currentThread().getName() if rlock.acquire(): print '%s get the lock.' % threading.currentThread().getName() time.sleep(2) # 第一次釋放鎖 print '%s release lock...' % threading.currentThread().getName() rlock.release() time.sleep(2) # 第二次釋放鎖 print '%s release lock...' % threading.currentThread().getName() rlock.release() t1 = threading.Thread(target=func) t2 = threading.Thread(target=func) t3 = threading.Thread(target=func) t1.start() t2.start() t3.start()
3.4. Condition
Condition(條件變量)通常與一個(gè)鎖關(guān)聯(lián)。需要在多個(gè)Contidion中共享一個(gè)鎖時(shí),可以傳遞一個(gè)Lock/RLock實(shí)例給構(gòu)造方法,否則它將自己生成一個(gè)RLock實(shí)例。
可以認(rèn)為,除了Lock帶有的鎖定池外,Condition還包含一個(gè)等待池,池中的線程處于狀態(tài)圖中的等待阻塞狀態(tài),直到另一個(gè)線程調(diào)用notify()/notifyAll()通知;得到通知后線程進(jìn)入鎖定池等待鎖定。
構(gòu)造方法:
Condition([lock/rlock])
實(shí)例方法:
acquire([timeout])/release(): 調(diào)用關(guān)聯(lián)的鎖的相應(yīng)方法。
wait([timeout]): 調(diào)用這個(gè)方法將使線程進(jìn)入Condition的等待池等待通知,并釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。
notify(): 調(diào)用這個(gè)方法將從等待池挑選一個(gè)線程并通知,收到通知的線程將自動(dòng)調(diào)用acquire()嘗試獲得鎖定(進(jìn)入鎖定池);其他線程仍然在等待池中。調(diào)用這個(gè)方法不會(huì)釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。
notifyAll(): 調(diào)用這個(gè)方法將通知等待池中所有的線程,這些線程都將進(jìn)入鎖定池嘗試獲得鎖定。調(diào)用這個(gè)方法不會(huì)釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。
例子是很常見(jiàn)的生產(chǎn)者/消費(fèi)者模式:
# encoding: UTF-8 import threading import time # 商品 product = None # 條件變量 con = threading.Condition() # 生產(chǎn)者方法 def produce(): global product if con.acquire(): while True: if product is None: print 'produce...' product = 'anything' # 通知消費(fèi)者,商品已經(jīng)生產(chǎn) con.notify() # 等待通知 con.wait() time.sleep(2) # 消費(fèi)者方法 def consume(): global product if con.acquire(): while True: if product is not None: print 'consume...' product = None # 通知生產(chǎn)者,商品已經(jīng)沒(méi)了 con.notify() # 等待通知 con.wait() time.sleep(2) t1 = threading.Thread(target=produce) t2 = threading.Thread(target=consume) t2.start() t1.start()
3.5. Semaphore/BoundedSemaphore
Semaphore(信號(hào)量)是計(jì)算機(jī)科學(xué)史上最古老的同步指令之一。Semaphore管理一個(gè)內(nèi)置的計(jì)數(shù)器,每當(dāng)調(diào)用acquire()時(shí)-1,調(diào)用release() 時(shí)+1。計(jì)數(shù)器不能小于0;當(dāng)計(jì)數(shù)器為0時(shí),acquire()將阻塞線程至同步鎖定狀態(tài),直到其他線程調(diào)用release()。
基于這個(gè)特點(diǎn),Semaphore經(jīng)常用來(lái)同步一些有“訪客上限”的對(duì)象,比如連接池。
BoundedSemaphore 與Semaphore的唯一區(qū)別在于前者將在調(diào)用release()時(shí)檢查計(jì)數(shù)器的值是否超過(guò)了計(jì)數(shù)器的初始值,如果超過(guò)了將拋出一個(gè)異常。
構(gòu)造方法:
Semaphore(value=1): value是計(jì)數(shù)器的初始值。
實(shí)例方法:
acquire([timeout]): 請(qǐng)求Semaphore。如果計(jì)數(shù)器為0,將阻塞線程至同步阻塞狀態(tài);否則將計(jì)數(shù)器-1并立即返回。
release(): 釋放Semaphore,將計(jì)數(shù)器+1,如果使用BoundedSemaphore,還將進(jìn)行釋放次數(shù)檢查。release()方法不檢查線程是否已獲得 Semaphore。
# encoding: UTF-8 import threading import time # 計(jì)數(shù)器初值為2 semaphore = threading.Semaphore(2) def func(): # 請(qǐng)求Semaphore,成功后計(jì)數(shù)器-1;計(jì)數(shù)器為0時(shí)阻塞 print '%s acquire semaphore...' % threading.currentThread().getName() if semaphore.acquire(): print '%s get semaphore' % threading.currentThread().getName() time.sleep(4) # 釋放Semaphore,計(jì)數(shù)器+1 print '%s release semaphore' % threading.currentThread().getName() semaphore.release() t1 = threading.Thread(target=func) t2 = threading.Thread(target=func) t3 = threading.Thread(target=func) t4 = threading.Thread(target=func) t1.start() t2.start() t3.start() t4.start() time.sleep(2) # 沒(méi)有獲得semaphore的主線程也可以調(diào)用release # 若使用BoundedSemaphore,t4釋放semaphore時(shí)將拋出異常 print 'MainThread release semaphore without acquire' semaphore.release()
3.6. Event
Event(事件)是最簡(jiǎn)單的線程通信機(jī)制之一:一個(gè)線程通知事件,其他線程等待事件。Event內(nèi)置了一個(gè)初始為False的標(biāo)志,當(dāng)調(diào)用set()時(shí)設(shè)為T(mén)rue,調(diào)用clear()時(shí)重置為 False。wait()將阻塞線程至等待阻塞狀態(tài)。
Event其實(shí)就是一個(gè)簡(jiǎn)化版的 Condition。Event沒(méi)有鎖,無(wú)法使線程進(jìn)入同步阻塞狀態(tài)。
構(gòu)造方法:
Event()
實(shí)例方法:
isSet(): 當(dāng)內(nèi)置標(biāo)志為T(mén)rue時(shí)返回True。
set(): 將標(biāo)志設(shè)為T(mén)rue,并通知所有處于等待阻塞狀態(tài)的線程恢復(fù)運(yùn)行狀態(tài)。
clear(): 將標(biāo)志設(shè)為False。
wait([timeout]): 如果標(biāo)志為T(mén)rue將立即返回,否則阻塞線程至等待阻塞狀態(tài),等待其他線程調(diào)用set()。
# encoding: UTF-8 import threading import time event = threading.Event() def func(): # 等待事件,進(jìn)入等待阻塞狀態(tài) print '%s wait for event...' % threading.currentThread().getName() event.wait() # 收到事件后進(jìn)入運(yùn)行狀態(tài) print '%s recv event.' % threading.currentThread().getName() t1 = threading.Thread(target=func) t2 = threading.Thread(target=func) t1.start() t2.start() time.sleep(2) # 發(fā)送事件通知 print 'MainThread set event.' event.set()
3.7. Timer
Timer(定時(shí)器)是Thread的派生類,用于在指定時(shí)間后調(diào)用一個(gè)方法。
構(gòu)造方法:
Timer(interval, function, args=[], kwargs={})
interval: 指定的時(shí)間
function: 要執(zhí)行的方法
args/kwargs: 方法的參數(shù)
實(shí)例方法:
Timer從Thread派生,沒(méi)有增加實(shí)例方法。
# encoding: UTF-8 import threading def func(): print 'hello timer!' timer = threading.Timer(5, func) timer.start()
3.8. local
local是一個(gè)小寫(xiě)字母開(kāi)頭的類,用于管理 thread-local(線程局部的)數(shù)據(jù)。對(duì)于同一個(gè)local,線程無(wú)法訪問(wèn)其他線程設(shè)置的屬性;線程設(shè)置的屬性不會(huì)被其他線程設(shè)置的同名屬性替換。
可以把local看成是一個(gè)“線程-屬性字典”的字典,local封裝了從自身使用線程作為 key檢索對(duì)應(yīng)的屬性字典、再使用屬性名作為key檢索屬性值的細(xì)節(jié)。
# encoding: UTF-8 import threading local = threading.local() local.tname = 'main' def func(): local.tname = 'notmain' print local.tname t1 = threading.Thread(target=func) t1.start() t1.join() print local.tname
熟練掌握Thread、Lock、Condition就可以應(yīng)對(duì)絕大多數(shù)需要使用線程的場(chǎng)合,某些情況下local也是非常有用的東西。本文的最后使用這幾個(gè)類展示線程基礎(chǔ)中提到的場(chǎng)景:
# encoding: UTF-8 import threading alist = None condition = threading.Condition() def doSet(): if condition.acquire(): while alist is None: condition.wait() for i in range(len(alist))[::-1]: alist[i] = 1 condition.release() def doPrint(): if condition.acquire(): while alist is None: condition.wait() for i in alist: print i, print condition.release() def doCreate(): global alist if condition.acquire(): if alist is None: alist = [0 for i in range(10)] condition.notifyAll() condition.release() tset = threading.Thread(target=doSet,name='tset') tprint = threading.Thread(target=doPrint,name='tprint') tcreate = threading.Thread(target=doCreate,name='tcreate') tset.start() tprint.start() tcreate.start()
以上是“Python線程的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。