溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Python中asyncio怎么用

發(fā)布時(shí)間:2021-08-25 11:19:41 來源:億速云 閱讀:166 作者:小新 欄目:開發(fā)技術(shù)

小編給大家分享一下Python中asyncio怎么用,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

asyncio介紹

熟悉c#的同學(xué)可能知道,在c#中可以很方便的使用 async 和 await 來實(shí)現(xiàn)異步編程,那么在python中應(yīng)該怎么做呢,其實(shí)python也支持異步編程,一般使用 asyncio 這個(gè)庫,下面介紹下什么是 asyncio :

asyncio 是用來編寫 并發(fā) 代碼的庫,使用 async/await 語法。 asyncio 被用作多個(gè)提供高性能 Python 異步框架的基礎(chǔ),包括網(wǎng)絡(luò)和網(wǎng)站服務(wù),數(shù)據(jù)庫連接庫,分布式任務(wù)隊(duì)列等等。 asyncio 往往是構(gòu)建 IO 密集型和高層級(jí) 結(jié)構(gòu)化 網(wǎng)絡(luò)代碼的最佳選擇。

asyncio中的基本概念

可以看見,使用asyncio庫我們也可以在python代碼中使用 async 和 await 。在 asyncio 中,有四個(gè)基本概念,分別是:

Eventloop

Eventloop 可以說是 asyncio 應(yīng)用的核心,中央總控, Eventloop 實(shí)例提供了注冊(cè)、取消、執(zhí)行任務(wù)和回調(diào) 的方法。 簡(jiǎn)單來說,就是我們可以把一些異步函數(shù)注冊(cè)到這個(gè)事件循環(huán)上,事件循環(huán)回循環(huán)執(zhí)行這些函數(shù)(每次只能執(zhí)行一個(gè)),如果當(dāng)前正在執(zhí)行的函數(shù)在等待I/O返回,那么事件循環(huán)就會(huì)暫停它的執(zhí)行去執(zhí)行其他函數(shù)。當(dāng)某個(gè)函數(shù)完成I/O后會(huì)恢復(fù),等到下次循環(huán)到它的時(shí)候就會(huì)繼續(xù)執(zhí)行。

Coroutine

協(xié)程本質(zhì)就是一個(gè)函數(shù),

import asyncio
import time
async def a():
 print('Suspending a')
 await asyncio.sleep(3)
 print('Resuming a')
async def b():
 print('Suspending b')
 await asyncio.sleep(1)
 print('Resuming b')
async def main():
 start = time.perf_counter()
 await asyncio.gather(a(), b())
 print(f'{main.__name__} Cost: {time.perf_counter() - start}')
if __name__ == '__main__':
 asyncio.run(main())

執(zhí)行上述代碼,可以看到類似這樣的輸出:

Suspending a
Suspending b
Resuming b
Resuming a
main Cost: 3.0023356619999997

關(guān)于協(xié)程的具體介紹,可以參考我以前的文章python中的協(xié)程 不過以前的那種寫法,需要使用裝飾器,已經(jīng)過時(shí)了。

Future

Future 是表示一個(gè)“未來”對(duì)象,類似于 javascript 中的 promise ,當(dāng)異步操作結(jié)束后會(huì)把最終結(jié)果設(shè)置到這個(gè) Future 對(duì)象上, Future 是對(duì)協(xié)程的封裝。

>>> import asyncio
>>> def fun():
...  print("inner fun")
...  return 111
... 
>>> loop = asyncio.get_event_loop()
>>> future = loop.run_in_executor(None, fun) #這里沒有使用await
inner fun
>>> future #可以看到,fun方法狀態(tài)是pending
<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/futures.py:348]>
>>> future.done() # 還沒有完成
False
>>> [m for m in dir(future) if not m.startswith('_')]
['add_done_callback', 'cancel', 'cancelled', 'done', 'exception', 'get_loop', 'remove_done_callback', 'result', 'set_exception', 'set_result']
>>> future.result() #這個(gè)時(shí)候如果直接調(diào)用result()方法會(huì)報(bào)錯(cuò)
Traceback (most recent call last):
 File "<input>", line 1, in <module>
asyncio.base_futures.InvalidStateError: Result is not set.
>>> async def runfun():
...  result=await future
...  print(result)
...  
>>>loop.run_until_complete(runfun()) #也可以通過 loop.run_until_complete(future) 來執(zhí)行,這里只是為了演示await
111
>>> future
<Future finished result=111>
>>> future.done()
True
>>> future.result()
111
Task

Eventloop 除了支持協(xié)程,還支持注冊(cè) Future 和 Task 2種類型的對(duì)象,而 Future 是協(xié)程的封裝, Future 對(duì)象提供了很多任務(wù)方法(如完成后的回調(diào),取消,設(shè)置任務(wù)結(jié)果等等),但是一般情況下開發(fā)者不需要操作 Future 這種底層對(duì)象,而是直接用 Future 的子類 Task 協(xié)同的調(diào)度協(xié)程來實(shí)現(xiàn)并發(fā)。那么什么是 Task 呢?下面介紹下:

一個(gè)與 Future 類似的對(duì)象,可運(yùn)行 Python 協(xié)程。非線程安全。 Task 對(duì)象被用來在事件循環(huán)中運(yùn)行協(xié)程。如果一個(gè)協(xié)程在等待一個(gè) Future 對(duì)象, Task 對(duì)象會(huì)掛起該協(xié)程的執(zhí)行并等待該 Future 對(duì)象完成。當(dāng)該 Future 對(duì)象完成被打包的協(xié)程將恢復(fù)執(zhí)行。 事件循環(huán)使用協(xié)同日程調(diào)度: 一個(gè)事件循環(huán)每次運(yùn)行一個(gè) Task 對(duì)象。而一個(gè) Task 對(duì)象會(huì)等待一個(gè) Future 對(duì)象完成,該事件循環(huán)會(huì)運(yùn)行其他 Task 、回調(diào)或執(zhí)行IO操作。

下面看看用法:

>>> async def a():
...  print('Suspending a')
...  await asyncio.sleep(3)
...  print('Resuming a')
...  
>>> task = asyncio.ensure_future(a())
>>> loop.run_until_complete(task)
Suspending a
Resuming a

asyncio中一些常見用法的區(qū)別

Asyncio.gather和asyncio.wait

我們?cè)谏厦娴拇a中用到過 asyncio.gather ,其實(shí)還有另外一種用法是 asyncio.wait ,他們都可以讓多個(gè)協(xié)程并發(fā)執(zhí)行,那么他們有什么區(qū)別呢?下面介紹下。

>>> import asyncio
>>> async def a():
...  print('Suspending a')
...  await asyncio.sleep(3)
...  print('Resuming a')
...  return 'A'
... 
... 
... async def b():
...  print('Suspending b')
...  await asyncio.sleep(1)
...  print('Resuming b')
...  return 'B'
... 
>>> async def fun1():
...  return_value_a, return_value_b = await asyncio.gather(a(), b())
...  print(return_value_a,return_value_b)
...  
>>> asyncio.run(fun1())
Suspending a
Suspending b
Resuming b
Resuming a
A B
>>> async def fun2():
...  done,pending=await asyncio.wait([a(),b()])
...  print(done)
...  print(pending)
...  task=list(done)[0]
...  print(task)
...  print(task.result())
...  
>>> asyncio.run(fun2())
Suspending b
Suspending a
Resuming b
Resuming a
{<Task finished coro=<a() done, defined at <input>:1> result='A'>, <Task finished coro=<b() done, defined at <input>:8> result='B'>}
set()
<Task finished coro=<a() done, defined at <input>:1> result='A'>
A

根據(jù)上述代碼,我們可以看出兩者的區(qū)別:

asyncio.gather 能收集協(xié)程的結(jié)果,而且會(huì)按照輸入?yún)f(xié)程的順序保存對(duì)應(yīng)協(xié)程的執(zhí)行結(jié)果,而 asyncio.wait 的返回值有兩項(xiàng),第一項(xiàng)是完成的任務(wù)列表,第二項(xiàng)表示等待完成的任務(wù)列表。

asyncio.wait 支持接受一個(gè)參數(shù) return_when ,在默認(rèn)情況下, asyncio.wait 會(huì)等待全部任務(wù)完成 (return_when='ALL_COMPLETED') ,它還支持 FIRST_COMPLETED (第一個(gè)協(xié)程完成就返回)和 FIRST_EXCEPTION (出現(xiàn)第一個(gè)異常就返回):

>>> async def fun2():
...  done,pending=await asyncio.wait([a(),b()],return_when=asyncio.tasks.FIRST_COMPLETED)
...  print(done)
...  print(pending)
...  task=list(done)[0]
...  print(task)
...  print(task.result())
...  
>>> asyncio.run(fun2())
Suspending a
Suspending b
Resuming b
{<Task finished coro=<b() done, defined at <input>:8> result='B'>}
{<Task pending coro=<a() running at <input>:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10757bf18>()]>>}
<Task finished coro=<b() done, defined at <input>:8> result='B'>
B

一般情況下,用 asyncio.gather 就足夠了。

asyncio.create_task和loop.create_task以及asyncio.ensure_future

這三種方法都可以創(chuàng)建 Task ,從Python3.7開始可以統(tǒng)一的使用更高階的 asyncio.create_task .其實(shí) asyncio.create_task 就是用的 loop.create_task . loop.create_task 接受的參數(shù)需要是一個(gè)協(xié)程,但是 asyncio.ensure_future 除了接受協(xié)程,還可以是 Future 對(duì)象或者 awaitable 對(duì)象:

  1. 如果參數(shù)是協(xié)程,其底層使用 loop.create_task ,返回 Task 對(duì)象

  2. 如果是 Future 對(duì)象會(huì)直接返回

  3. 如果是一個(gè) awaitable 對(duì)象,會(huì) await 這個(gè)對(duì)象的 __await__ 方法,再執(zhí)行一次 ensure_future ,最后返回 Task 或者 Future 。

所以 ensure_future 方法主要就是確保這是一個(gè) Future 對(duì)象,一般情況下直接用 asyncio.create_task 就可以了。

注冊(cè)回調(diào)和執(zhí)行同步代碼

可以使用 add_done_callback 來添加成功回調(diào):

def callback(future):
 print(f'Result: {future.result()}')
def callback2(future, n):
 print(f'Result: {future.result()}, N: {n}')
async def funa():
 await asyncio.sleep(1)
 return "funa"
async def main():
 task = asyncio.create_task(funa())
 task.add_done_callback(callback)
 await task
 #這樣可以為callback傳遞參數(shù)
 task = asyncio.create_task(funa())
 task.add_done_callback(functools.partial(callback2, n=1))
 await task
if __name__ == '__main__':
 asyncio.run(main())

執(zhí)行同步代碼

如果有同步邏輯,想要用 asyncio 來實(shí)現(xiàn)并發(fā),那么需要怎么做呢?下面看看:

def a1():
 time.sleep(1)
 return "A"
async def b1():
 await asyncio.sleep(1)
 return "B"
async def main():
 loop = asyncio.get_running_loop()
 await asyncio.gather(loop.run_in_executor(None, a1), b1())
if __name__ == '__main__':
 start = time.perf_counter()
 asyncio.run(main())
 print(f'main method Cost: {time.perf_counter() - start}')
# 輸出: main method Cost: 1.0050589740000002

可以使用 run_into_executor 來將同步函數(shù)邏輯轉(zhuǎn)化成一個(gè)協(xié)程,第一個(gè)參數(shù)是要傳遞 concurrent.futures.Executor 實(shí)例的,傳遞 None 會(huì)選擇默認(rèn)的 executor 。

以上是“Python中asyncio怎么用”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI