您好,登錄后才能下訂單哦!
這篇文章主要介紹了python多線程并發(fā)及測試框架案例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
1、循環(huán)創(chuàng)建多個線程,并通過循環(huán)啟動執(zhí)行
import threading from datetime import * from time import sleep # 單線程執(zhí)行 def test(): print('hello world') t = threading.Thread(target=test) t.start() # 多線程執(zhí)行 def test_01(): sleep(1) x = 0 while x == 0: # 設(shè)置一個死循環(huán) print(datetime.now()) # 獲取當(dāng)前系統(tǒng)時間 def looptest(): ''' 循環(huán)20次執(zhí)行 test_o1()函數(shù) :return: ''' for i in range(20): test_01() def thd(): ''' 創(chuàng)建并執(zhí)行多個線程 需求:并發(fā)執(zhí)行50次 test_o1()函數(shù) 說明:把50的并發(fā)拆成25個線程組,每個線程再循環(huán)20次執(zhí)行 test_o1()函數(shù),這樣在啟動下一個線程的時候, 上一個線程已經(jīng)在循環(huán)了,以此類推,當(dāng)啟動第25個線程的時候,可能已經(jīng)執(zhí)行了200次的t est_o1()函數(shù), 這樣就可以大大減少并發(fā)的時間差異 :return: ''' Threads = [] for i in range(25): th = threading.Thread(target=looptest) Threads.append(th) ''' 守護(hù)線程:主線程執(zhí)行完畢之后,會等待子線程全部執(zhí)行完畢,才會關(guān)閉結(jié)束程序 必須加在start()之前,默認(rèn)為 false ''' th.setDaemon(True) for th in Threads: th.start() for th in Threads: ''' 阻塞線程:等主線程執(zhí)行完畢之后再關(guān)閉所有子線程 必須加在start()之后 可以通過join()的timeout參數(shù)來完美解決相互等待的問題,子線程告訴主線程讓其等待0.04秒, 0.04秒之內(nèi)子線程完成,主線程就繼續(xù)往下執(zhí)行,0.04秒之后如果子線程還未完成,主線程也會 繼續(xù)往下執(zhí)行,執(zhí)行完成之后關(guān)閉子線程 ''' th.join(0.04) if __name__=="__main__": print('start') thd() print('end')
2、并發(fā)測試框架
# 并發(fā)測試框架 THREAD_NUM = 1 ONE_WORKER_NUM = 1 def test(): pass # 測試代碼 def working(): global ONE_WORKER_NUM for i in range(0, ONE_WORKER_NUM): test() def t(): global THREAD_NUM Threads = [] for i in range(THREAD_NUM): t = threading.Thread(target=working,name='T'+str(i)) t.setDaemon(True) Threads.append(t) for t in Threads: t.start() for t in Threads: t.join() if __name__=="__main__": t()
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持億速云。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。