您好,登錄后才能下訂單哦!
python中怎么利用threading實現(xiàn)多線程,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。
threading用于提供線程相關(guān)的操作,線程是應(yīng)用程序中工作的最小單元。python當前版本的多線程庫沒有實現(xiàn)優(yōu)先級、線程組,線程也不能被停止、暫停、恢復(fù)、中斷。
threading模塊提供的類:
Thread, Lock, Rlock, Condition, [Bounded]Semaphore, Event, Timer, local。
threading 模塊提供的常用方法:
threading.currentThread(): 返回當前的線程變量。
threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結(jié)束前,不包括啟動前和終止后的線程。
threading.activeCount(): 返回正在運行的線程數(shù)量,與len(threading.enumerate())有相同的結(jié)果。
threading 模塊提供的常量:
threading.TIMEOUT_MAX 設(shè)置threading全局超時時間。
Thread是線程類,有兩種使用方法,直接傳入要運行的方法或從Thread繼承并覆蓋run():
# coding:utf-8 import threading import time #方法一:將要執(zhí)行的方法作為參數(shù)傳給Thread的構(gòu)造方法 def action(arg): time.sleep(1) print 'the arg is:%s\r' %arg for i in xrange(4): t =threading.Thread(target=action,args=(i,)) t.start() print 'main thread end!' #方法二:從Thread繼承,并重寫run() class MyThread(threading.Thread): def __init__(self,arg): super(MyThread, self).__init__()#注意:一定要顯式的調(diào)用父類的初始化函數(shù)。 self.arg=arg def run(self):#定義每個線程要運行的函數(shù) time.sleep(1) print 'the arg is:%s\r' % self.arg for i in xrange(4): t =MyThread(i) t.start() print 'main thread end!'
構(gòu)造方法:
Thread(group=None, target=None, name=None, args=(), kwargs={})
group: 線程組,目前還沒有實現(xiàn),庫引用中提示必須是None;
target: 要執(zhí)行的方法;
name: 線程名;
args/kwargs: 要傳入方法的參數(shù)。
實例方法:
isAlive(): 返回線程是否在運行。正在運行指啟動后、終止前。
get/setName(name): 獲取/設(shè)置線程名。
start(): 線程準備就緒,等待CPU調(diào)度
is/setDaemon(bool): 獲取/設(shè)置是后臺線程(默認前臺線程(False))。(在start之前設(shè)置)
如果是后臺線程,主線程執(zhí)行過程中,后臺線程也在進行,主線程執(zhí)行完畢后,后臺線程不論成功與否,主線程和后臺線程均停止
如果是前臺線程,主線程執(zhí)行過程中,前臺線程也在進行,主線程執(zhí)行完畢后,等待前臺線程也執(zhí)行完成后,程序停止
start(): 啟動線程。
join([timeout]): 阻塞當前上下文環(huán)境的線程,直到調(diào)用此方法的線程終止或到達指定的timeout(可選參數(shù))。
使用例子一(未設(shè)置setDeamon):
# coding:utf-8 import threading import time def action(arg): time.sleep(1) print 'sub thread start!the thread name is:%s\r' % threading.currentThread().getName() print 'the arg is:%s\r' %arg time.sleep(1) for i in xrange(4): t =threading.Thread(target=action,args=(i,)) t.start() print 'main_thread end!'
main_thread end! sub thread start!the thread name is:Thread-2 the arg is:1 the arg is:0 sub thread start!the thread name is:Thread-4 the arg is:2 the arg is:3 Process finished with exit code 0 可以看出,創(chuàng)建的4個“前臺”線程,主線程執(zhí)行過程中,前臺線程也在進行,主線程執(zhí)行完畢后,等待前臺線程也執(zhí)行完成后,程序停止
驗證了serDeamon(False)(默認)前臺線程,主線程執(zhí)行過程中,前臺線程也在進行,主線程執(zhí)行完畢后,等待前臺線程也執(zhí)行完成后,主線程停止。
使用例子二(setDeamon=True)
# coding:utf-8 import threading import time def action(arg): time.sleep(1) print 'sub thread start!the thread name is:%s\r' % threading.currentThread().getName() print 'the arg is:%s\r' %arg time.sleep(1) for i in xrange(4): t =threading.Thread(target=action,args=(i,)) t.setDaemon(True)#設(shè)置線程為后臺線程 t.start() print 'main_thread end!'
main_thread end! Process finished with exit code 0 可以看出,主線程執(zhí)行完畢后,后臺線程不管是成功與否,主線程均停止
驗證了serDeamon(True)后臺線程,主線程執(zhí)行過程中,后臺線程也在進行,主線程執(zhí)行完畢后,后臺線程不論成功與否,主線程均停止。
使用例子三(設(shè)置join)
#coding:utf-8 import threading import time def action(arg): time.sleep(1) print 'sub thread start!the thread name is:%s ' % threading.currentThread().getName() print 'the arg is:%s ' %arg time.sleep(1) thread_list = [] #線程存放列表 for i in xrange(4): t =threading.Thread(target=action,args=(i,)) t.setDaemon(True) thread_list.append(t) for t in thread_list: t.start() for t in thread_list: t.join()
sub thread start!the thread name is:Thread-2 the arg is:1 sub thread start!the thread name is:Thread-3 the arg is:2 sub thread start!the thread name is:Thread-1 the arg is:0 sub thread start!the thread name is:Thread-4 the arg is:3 main_thread end! Process finished with exit code 0 設(shè)置join之后,主線程等待子線程全部執(zhí)行完成后或者子線程超時后,主線程才結(jié)束
驗證了 join()阻塞當前上下文環(huán)境的線程,直到調(diào)用此方法的線程終止或到達指定的timeout,即使設(shè)置了setDeamon(True)主線程依然要等待子線程結(jié)束。
使用例子四(join不妥當?shù)挠梅?,使多線程編程順序執(zhí)行)
#coding:utf-8 import threading import time def action(arg): time.sleep(1) print 'sub thread start!the thread name is:%s ' % threading.currentThread().getName() print 'the arg is:%s ' %arg time.sleep(1) for i in xrange(4): t =threading.Thread(target=action,args=(i,)) t.setDaemon(True) t.start() t.join() print 'main_thread end!'
sub thread start!the thread name is:Thread-1 the arg is:0 sub thread start!the thread name is:Thread-2 the arg is:1 sub thread start!the thread name is:Thread-3 the arg is:2 sub thread start!the thread name is:Thread-4 the arg is:3 main_thread end! Process finished with exit code 0 可以看出此時,程序只能順序執(zhí)行,每個線程都被上一個線程的join阻塞,使得“多線程”失去了多線程意義。
Lock、Rlock類
由于線程之間隨機調(diào)度:某線程可能在執(zhí)行n條后,CPU接著執(zhí)行其他線程。為了多個線程同時操作一個內(nèi)存中的資源時不產(chǎn)生混亂,我們使用鎖。
Lock(指令鎖)是可用的最低級的同步指令。Lock處于鎖定狀態(tài)時,不被特定的線程擁有。Lock包含兩種狀態(tài)——鎖定和非鎖定,以及兩個基本的方法。
可以認為Lock有一個鎖定池,當線程請求鎖定時,將線程至于池中,直到獲得鎖定后出池。池中的線程處于狀態(tài)圖中的同步阻塞狀態(tài)。
RLock(可重入鎖)是一個可以被同一個線程請求多次的同步指令。RLock使用了“擁有的線程”和“遞歸等級”的概念,處于鎖定狀態(tài)時,RLock被某個線程擁有。擁有RLock的線程可以再次調(diào)用acquire(),釋放鎖時需要調(diào)用release()相同次數(shù)。
可以認為RLock包含一個鎖定池和一個初始值為0的計數(shù)器,每次成功調(diào)用 acquire()/release(),計數(shù)器將+1/-1,為0時鎖處于未鎖定狀態(tài)。
簡言之:Lock屬于全局,Rlock屬于線程。
構(gòu)造方法:
Lock(),Rlock(),推薦使用Rlock()
實例方法:
acquire([timeout]): 嘗試獲得鎖定。使線程進入同步阻塞狀態(tài)。
release(): 釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。
例子一(未使用鎖):
#coding:utf-8 import threading import time gl_num = 0 def show(arg): global gl_num time.sleep(1) gl_num +=1 print gl_num for i in range(10): t = threading.Thread(target=show, args=(i,)) t.start() print 'main thread stop'
main thread stop 12 3 4 568 9 910 Process finished with exit code 0 多次運行可能產(chǎn)生混亂。這種場景就是適合使用鎖的場景。
例子二(使用鎖):
# coding:utf-8 import threading import time gl_num = 0 lock = threading.RLock() # 調(diào)用acquire([timeout])時,線程將一直阻塞, # 直到獲得鎖定或者直到timeout秒后(timeout參數(shù)可選)。 # 返回是否獲得鎖。 def Func(): lock.acquire() global gl_num gl_num += 1 time.sleep(1) print gl_num lock.release() for i in range(10): t = threading.Thread(target=Func) t.start()
1 2 3 4 5 6 7 8 9 10 Process finished with exit code 0 可以看出,全局變量在在每次被調(diào)用時都要獲得鎖,才能操作,因此保證了共享數(shù)據(jù)的安全性
Lock對比Rlock
#coding:utf-8 import threading lock = threading.Lock() #Lock對象 lock.acquire() lock.acquire() #產(chǎn)生了死鎖。 lock.release() lock.release() print lock.acquire() import threading rLock = threading.RLock() #RLock對象 rLock.acquire() rLock.acquire() #在同一線程內(nèi),程序不會堵塞。 rLock.release() rLock.release()
Condition類
Condition(條件變量)通常與一個鎖關(guān)聯(lián)。需要在多個Contidion中共享一個鎖時,可以傳遞一個Lock/RLock實例給構(gòu)造方法,否則它將自己生成一個RLock實例。
可以認為,除了Lock帶有的鎖定池外,Condition還包含一個等待池,池中的線程處于等待阻塞狀態(tài),直到另一個線程調(diào)用notify()/notifyAll()通知;得到通知后線程進入鎖定池等待鎖定。
構(gòu)造方法:
Condition([lock/rlock])
實例方法:
acquire([timeout])/release(): 調(diào)用關(guān)聯(lián)的鎖的相應(yīng)方法。
wait([timeout]): 調(diào)用這個方法將使線程進入Condition的等待池等待通知,并釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。
notify(): 調(diào)用這個方法將從等待池挑選一個線程并通知,收到通知的線程將自動調(diào)用acquire()嘗試獲得鎖定(進入鎖定池);其他線程仍然在等待池中。調(diào)用這個方法不會釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。
notifyAll(): 調(diào)用這個方法將通知等待池中所有的線程,這些線程都將進入鎖定池嘗試獲得鎖定。調(diào)用這個方法不會釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。
例子一:生產(chǎn)者消費者模型
# encoding: UTF-8 import threading import time # 商品 product = None # 條件變量 con = threading.Condition() # 生產(chǎn)者方法 def produce(): global product if con.acquire(): while True: if product is None: print 'produce...' product = 'anything' # 通知消費者,商品已經(jīng)生產(chǎn) con.notify() # 等待通知 con.wait() time.sleep(2) # 消費者方法 def consume(): global product if con.acquire(): while True: if product is not None: print 'consume...' product = None # 通知生產(chǎn)者,商品已經(jīng)沒了 con.notify() # 等待通知 con.wait() time.sleep(2) t1 = threading.Thread(target=produce) t2 = threading.Thread(target=consume) t2.start() t1.start()
produce... consume... produce... consume... produce... consume... produce... consume... produce... consume... Process finished with exit code -1 程序不斷循環(huán)運行下去。重復(fù)生產(chǎn)消費過程。
例子二:生產(chǎn)者消費者模型
import threading import time condition = threading.Condition() products = 0 class Producer(threading.Thread): def run(self): global products while True: if condition.acquire(): if products < 10: products += 1; print "Producer(%s):deliver one, now products:%s" %(self.name, products) condition.notify()#不釋放鎖定,因此需要下面一句 condition.release() else: print "Producer(%s):already 10, stop deliver, now products:%s" %(self.name, products) condition.wait();#自動釋放鎖定 time.sleep(2) class Consumer(threading.Thread): def run(self): global products while True: if condition.acquire(): if products > 1: products -= 1 print "Consumer(%s):consume one, now products:%s" %(self.name, products) condition.notify() condition.release() else: print "Consumer(%s):only 1, stop consume, products:%s" %(self.name, products) condition.wait(); time.sleep(2) if __name__ == "__main__": for p in range(0, 2): p = Producer() p.start() for c in range(0, 3): c = Consumer() c.start()
例子三:
import threading alist = None condition = threading.Condition() def doSet(): if condition.acquire(): while alist is None: condition.wait() for i in range(len(alist))[::-1]: alist[i] = 1 condition.release() def doPrint(): if condition.acquire(): while alist is None: condition.wait() for i in alist: print i, print condition.release() def doCreate(): global alist if condition.acquire(): if alist is None: alist = [0 for i in range(10)] condition.notifyAll() condition.release() tset = threading.Thread(target=doSet,name='tset') tprint = threading.Thread(target=doPrint,name='tprint') tcreate = threading.Thread(target=doCreate,name='tcreate') tset.start() tprint.start() tcreate.start()
Event類
Event(事件)是最簡單的線程通信機制之一:一個線程通知事件,其他線程等待事件。Event內(nèi)置了一個初始為False的標志,當調(diào)用set()時設(shè)為True,調(diào)用clear()時重置為 False。wait()將阻塞線程至等待阻塞狀態(tài)。
Event其實就是一個簡化版的 Condition。Event沒有鎖,無法使線程進入同步阻塞狀態(tài)。
構(gòu)造方法:
Event()
實例方法:
isSet(): 當內(nèi)置標志為True時返回True。
set(): 將標志設(shè)為True,并通知所有處于等待阻塞狀態(tài)的線程恢復(fù)運行狀態(tài)。
clear(): 將標志設(shè)為False。
wait([timeout]): 如果標志為True將立即返回,否則阻塞線程至等待阻塞狀態(tài),等待其他線程調(diào)用set()。
例子一
# encoding: UTF-8 import threading import time event = threading.Event() def func(): # 等待事件,進入等待阻塞狀態(tài) print '%s wait for event...' % threading.currentThread().getName() event.wait() # 收到事件后進入運行狀態(tài) print '%s recv event.' % threading.currentThread().getName() t1 = threading.Thread(target=func) t2 = threading.Thread(target=func) t1.start() t2.start() time.sleep(2) # 發(fā)送事件通知 print 'MainThread set event.' event.set()
Thread-1 wait for event... Thread-2 wait for event... #2秒后。。。 MainThread set event. Thread-1 recv event. Thread-2 recv event. Process finished with exit code 0
timer類
Timer(定時器)是Thread的派生類,用于在指定時間后調(diào)用一個方法。
構(gòu)造方法:
Timer(interval, function, args=[], kwargs={})
interval: 指定的時間
function: 要執(zhí)行的方法
args/kwargs: 方法的參數(shù)
實例方法:
Timer從Thread派生,沒有增加實例方法。
例子一:
# encoding: UTF-8 import threading def func(): print 'hello timer!' timer = threading.Timer(5, func) timer.start()
線程延遲5秒后執(zhí)行。
local是一個小寫字母開頭的類,用于管理 thread-local(線程局部的)數(shù)據(jù)。對于同一個local,線程無法訪問其他線程設(shè)置的屬性;線程設(shè)置的屬性不會被其他線程設(shè)置的同名屬性替換。
可以把local看成是一個“線程-屬性字典”的字典,local封裝了從自身使用線程作為 key檢索對應(yīng)的屬性字典、再使用屬性名作為key檢索屬性值的細節(jié)。
# encoding: UTF-8 import threading local = threading.local() local.tname = 'main' def func(): local.tname = 'notmain' print local.tname t1 = threading.Thread(target=func) t1.start() t1.join() print local.tname
notmain main
看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識有進一步的了解或閱讀更多相關(guān)文章,請關(guān)注億速云行業(yè)資訊頻道,感謝您對億速云的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。