您好,登錄后才能下訂單哦!
今天小編給大家分享一下python協(xié)程與asyncio庫怎么用的相關知識點,內(nèi)容詳細,邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。
python 中的 asyncio 庫提供了管理事件、協(xié)程、任務和線程的方法,以及編寫并發(fā)代碼的原語,即 async
和 await
。
該模塊的主要內(nèi)容:
事件循環(huán):event_loop,管理所有的事件,是一個無限循環(huán)方法,在循環(huán)過程中追蹤事件發(fā)生的順序?qū)⑺鼈兎旁陉犃兄校臻e時則調(diào)用相應的事件處理者來處理這些事件;
協(xié)程:coroutine
,子程序的泛化概念,協(xié)程可以在執(zhí)行期間暫停,等待外部的處理(I/O 操作)完成之后,再從暫停的地方繼續(xù)運行,函數(shù)定義式使用 async
關鍵字,這樣這個函數(shù)就不會立即執(zhí)行,而是返回一個協(xié)程對象;
Future
和Task
:Future
對象表示尚未完成的計算,Task
是 Future
的子類,包含了任務的各個狀態(tài),作用是在運行某個任務的同時可以并發(fā)的運行多個任務。
異步函數(shù)本質(zhì)上依舊是函數(shù),只是在執(zhí)行過程中會將執(zhí)行權交給其它協(xié)程,與普通函數(shù)定義的區(qū)別是在 def
關鍵字前增加 async
。
# 異步函數(shù) import asyncio # 異步函數(shù) async def func(x): print("異步函數(shù)") return x ** 2 ret = func(2) print(ret)
運行代碼輸入如下內(nèi)容:
sys:1: RuntimeWarning: coroutine 'func' was never awaited <coroutine object func at 0x0000000002C8C248>
函數(shù)返回一個協(xié)程對象,如果想要函數(shù)得到執(zhí)行,需要將其放到事件循環(huán) event_loop
中。
event_loop
是 asyncio
模塊的核心,它將異步函數(shù)注冊到事件循環(huán)上。 過程實現(xiàn)方式為:由 loop
在適當?shù)臅r候調(diào)用協(xié)程,這里使用的方式名為 asyncio.get_event_loop()
,然后由 run_until_complete(協(xié)程對象)
將協(xié)程注冊到事件循環(huán)中,并啟動事件循環(huán)。
import asyncio # 異步函數(shù) async def func(x): print("異步函數(shù)") return x ** 2 # 協(xié)程對象,該對象不能直接運行 coroutine1 = func(2) # 事件循環(huán)對象 loop = asyncio.get_event_loop() # 將協(xié)程對象加入到事件循環(huán)中,并執(zhí)行 ret = loop.run_until_complete(coroutine1) print(ret)
首先在 python 3.7 之前的版本中使用異步函數(shù)是安裝上述流程:
先通過 asyncio.get_event_loop()
獲取事件循環(huán)loop
對象;
然后通過不同的策略調(diào)用 loop.run_until_complete()
或者loop.run_forever()
執(zhí)行異步函數(shù)。
在 python 3.7 之后的版本,直接使用 asyncio.run()
即可,該函數(shù)總是會創(chuàng)建一個新的事件循環(huán)并在結束時進行關閉。
最新的官方文檔 都采用的是run
方法。 官方案例
import asyncio async def main(): print('hello') await asyncio.sleep(1) print('world') asyncio.run(main())
接下來在查看一個完整的案例,并且結合await
關鍵字。
import asyncio import time # 異步函數(shù)1 async def task1(x): print("任務1") await asyncio.sleep(2) print("恢復任務1") return x # 異步函數(shù)2 async def task2(x): print("任務2") await asyncio.sleep(1) print("恢復任務2") return x async def main(): start_time = time.perf_counter() ret_1 = await task1(1) ret_2 = await task2(2) print("任務1 返回的值是", ret_1) print("任務2 返回的值是", ret_2) print("運行時間", time.perf_counter() - start_time) if __name__ == '__main__': # 創(chuàng)建一個事件循環(huán) loop = asyncio.get_event_loop() # 將協(xié)程對象加入到事件循環(huán)中,并執(zhí)行 loop.run_until_complete(main())
代碼輸出如下所示:
任務1
恢復任務1
任務2
恢復任務2
任務1 返回的值是 1
任務2 返回的值是 2
運行時間 2.99929154
上述代碼創(chuàng)建了 3 個協(xié)程,其中 task1
和 task2
都放在了協(xié)程函數(shù) main
中,I/O 操作通過 asyncio.sleep(1)
進行模擬,整個函數(shù)運行時間為 2.9999 秒,接近 3 秒,依舊是串行進行,如果希望修改為并發(fā)執(zhí)行,將代碼按照下述進行修改。
import asyncio import time # 異步函數(shù)1 async def task1(x): print("任務1") await asyncio.sleep(2) print("恢復任務1") return x # 異步函數(shù)2 async def task2(x): print("任務2") await asyncio.sleep(1) print("恢復任務2") return x async def main(): start_time = time.perf_counter() ret_1,ret_2 = await asyncio.gather(task1(1),task2(2)) print("任務1 返回的值是", ret_1) print("任務2 返回的值是", ret_2) print("運行時間", time.perf_counter() - start_time) if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main())
上述代碼最大的變化是將task1
和task2
放到了asyncio.gather()
中運行,此時代碼輸出時間明顯變短。
任務1
任務2
恢復任務2 # 任務2 由于等待時間短,先返回。
恢復任務1
任務1 返回的值是 1
任務2 返回的值是 2
運行時間 2.0005669480000003
asyncio.gather()
可以更換為asyncio.wait()
,修改代碼如下所示:
import asyncio import time # 異步函數(shù)1 async def task1(x): print("任務1") await asyncio.sleep(2) print("恢復任務1") return x # 異步函數(shù)2 async def task2(x): print("任務2") await asyncio.sleep(1) print("恢復任務2") return x async def main(): start_time = time.perf_counter() done, pending = await asyncio.wait([task1(1), task2(2)]) print(done) print(pending) print("運行時間", time.perf_counter() - start_time) if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main())
asyncio.wait()
返回一個元組,其中包含一個已經(jīng)完成的任務集合,一個未完成任務的集合。
gather 和 wait 的區(qū)別:
gather
:需要所有任務都執(zhí)行結束,如果任意一個協(xié)程函數(shù)崩潰了,都會拋異常,不會返回結果;
wait
:可以定義函數(shù)返回的時機,可以設置為 FIRST_COMPLETED
(第一個結束的), FIRST_EXCEPTION
(第一個出現(xiàn)異常的), ALL_COMPLETED
(全部執(zhí)行完,默認的)。
done,pending = await asyncio.wait([task1(1),task2(2)],return_when=asyncio.tasks.FIRST_EXCEPTION)
由于協(xié)程對象不能直接運行,在注冊到事件循環(huán)時,是run_until_complete
方法將其包裝成一個 task
對象。該對象是對coroutine
對象的進一步封裝,它比coroutine
對象多了運行狀態(tài),例如 pending
,running
,finished
,可以利用這些狀態(tài)獲取協(xié)程對象的執(zhí)行情況。
下面顯示的將coroutine
對象封裝成task
對象,在上述代碼基礎上進行修改。
import asyncio import time # 異步函數(shù)1 async def task1(x): print("任務1") await asyncio.sleep(2) print("恢復任務1") return x # 異步函數(shù)2 async def task2(x): print("任務2") await asyncio.sleep(1) print("恢復任務2") return x async def main(): start_time = time.perf_counter() # 封裝 task 對象 coroutine1 = task1(1) task_1 = loop.create_task(coroutine1) coroutine2 = task2(2) task_2 = loop.create_task(coroutine2) ret_1, ret_2 = await asyncio.gather(task_1, task_2) print("任務1 返回的值是", ret_1) print("任務2 返回的值是", ret_2) print("運行時間", time.perf_counter() - start_time) if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main())
由于task
對象是future
對象的子類對象,所以上述代碼也可以按照下述內(nèi)容修改:
# task_2 = loop.create_task(coroutine2) task_2 = asyncio.ensure_future(coroutine2)
下面將task
對象的各個狀態(tài)進行打印輸出。
import asyncio import time # 異步函數(shù)1 async def task1(x): print("任務1") await asyncio.sleep(2) print("恢復任務1") return x # 異步函數(shù)2 async def task2(x): print("任務2") await asyncio.sleep(1) print("恢復任務2") return x async def main(): start_time = time.perf_counter() # 封裝 task 對象 coroutine1 = task1(1) task_1 = loop.create_task(coroutine1) coroutine2 = task2(2) # task_2 = loop.create_task(coroutine2) task_2 = asyncio.ensure_future(coroutine2) # 進入 pending 狀態(tài) print(task_1) print(task_2) # 獲取任務的完成狀態(tài) print(task_1.done(), task_2.done()) # 執(zhí)行任務 await task_1 await task_2 # 再次獲取完成狀態(tài) print(task_1.done(), task_2.done()) # 獲取返回結果 print(task_1.result()) print(task_2.result()) print("運行時間", time.perf_counter() - start_time) if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main())
await task_1
表示的是執(zhí)行該協(xié)程,執(zhí)行結束之后,task.done()
返回 True
,task.result()
獲取返回值。
當協(xié)程執(zhí)行完畢,需要獲取其返回值,剛才已經(jīng)演示了一種辦法,使用 task.result()
方法獲取,但是該方法僅當協(xié)程運行完畢時,才能獲取結果,如果協(xié)程沒有運行完畢,result()
方法會返回 asyncio.InvalidStateError
(無效狀態(tài)錯誤)。
一般編碼都采用第二種方案,通過add_done_callback()
方法綁定回調(diào)。
import asyncio import requests async def request_html(): url = 'https://www.csdn.net' res = requests.get(url) return res.status_code def callback(task): print('回調(diào):', task.result()) loop = asyncio.get_event_loop() coroutine = request_html() task = loop.create_task(coroutine) # 綁定回調(diào) task.add_done_callback(callback) print(task) print("*"*100) loop.run_until_complete(task) print(task)
上述代碼當coroutine
執(zhí)行完畢時,會調(diào)用callback
函數(shù)。
如果回調(diào)函數(shù)需要多個參數(shù),請使用functools
模塊中的偏函數(shù)(partial
)方法
建議每次編碼結束之后,都調(diào)用循環(huán)事件對象close()
方法,徹底清理loop
對象。
本節(jié)課要采集的站點由于全部都是 coser 圖片,所以地址在代碼中查看即可。
完整代碼如下所示:
import threading import asyncio import time import requests import lxml from bs4 import BeautifulSoup async def get(url): return requests.get(url) async def get_html(url): print("準備抓?。?quot;, url) res = await get(url) return res.text async def save_img(img_url): # thumbMid_5ae3e05fd3945 將小圖替換為大圖 img_url = img_url.replace('thumb','thumbMid') img_url = "http://mycoser.com/" + img_url print("圖片下載中:", img_url) res = await get(img_url) if res is not None: with open(f'./imgs/{time.time()}.jpg', 'wb') as f: f.write(res.content) return img_url,"ok" async def main(url_list): # 創(chuàng)建 5 個任務 tasks = [asyncio.ensure_future(get_html(url_list[_])) for _ in range(len(url_list))] dones, pending = await asyncio.wait(tasks) for task in dones: html = task.result() soup = BeautifulSoup(html, 'lxml') divimg_tags = soup.find_all(attrs={'class': 'workimage'}) for div in divimg_tags: ret = await save_img(div.a.img["data-original"]) print(ret) if __name__ == '__main__': urls = [f"http://mycoser.com/picture/lists/p/{page}" for page in range(1, 17)] totle_page = len(urls) // 5 if len(urls) % 5 == 0 else len(urls) // 5 + 1 # 對 urls 列表進行切片,方便采集 for page in range(0, totle_page): start_page = 0 if page == 0 else page * 5 end_page = (page + 1) * 5 # 循環(huán)事件對象 loop = asyncio.get_event_loop() loop.run_until_complete(main(urls[start_page:end_page]))
代碼說明:上述代碼中第一個要注意的是await
關鍵字后面只能跟如下內(nèi)容:
原生的協(xié)程對象;
一個包含await
方法的對象返回的一個迭代器。
所以上述代碼get_html
函數(shù)中嵌套了一個協(xié)程 get
。主函數(shù) main
里面為了運算方便,直接對 urls 進行了切片,然后通過循環(huán)進行運行。
當然上述代碼的最后兩行,可以直接修改為:
# 循環(huán)事件對象 # loop = asyncio.get_event_loop() # # loop.run_until_complete(main(urls[start_page:end_page])) asyncio.run(main(urls[start_page:end_page]))
輕松獲取一堆高清圖片:
以上就是“python協(xié)程與asyncio庫怎么用”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會為大家更新不同的知識,如果還想學習更多的知識,請關注億速云行業(yè)資訊頻道。
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。