溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

Python中的asyncio庫(kù)-函數(shù)的回調(diào)與調(diào)度

發(fā)布時(shí)間:2020-08-24 14:44:48 來(lái)源:億速云 閱讀:506 作者:Leah 欄目:編程語(yǔ)言

這篇文章運(yùn)用簡(jiǎn)單易懂的例子給大家介紹Python中的asyncio庫(kù)-函數(shù)的回調(diào)與調(diào)度,代碼非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對(duì)大家能有所幫助。

在大部分的高級(jí)語(yǔ)言中都有回調(diào)函數(shù),這里我們看下asyncio中的的函數(shù)回調(diào)。

成功回調(diào)

可以給Task(Future)添加回調(diào)函數(shù),等Task完成后就會(huì)自動(dòng)調(diào)用這個(gè)(些)回調(diào):

async def a():
    await asyncio.sleep(1)
    return 'A'
In : loop = asyncio.get_event_loop()
In : task = loop.create_task(a())
In : def callback(future):
...:     print(f'Result: {future.result()}')
...:
In : task.add_done_callback(callback)
In : await task
Result: A
Out: 'A'

可以看到在任務(wù)完成后執(zhí)行了callback函數(shù)。我這里順便解釋一個(gè)問(wèn)題,不知道有沒(méi)有人注意到。

為什么之前一直推薦大家用asyncio.create_task,但是很多例子卻用了loop.create_task?

這是因?yàn)樵贗Python里面支持方便的使用await執(zhí)行協(xié)程,但如果直接用asyncio.create_task會(huì)報(bào)「no running event loop」:

In : asyncio.create_task(a())
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-2-2a742a8da161> in <module>
----> 1 asyncio.create_task(a())
/usr/local/lib/python3.7/asyncio/tasks.py in create_task(coro)
    322     Return a Task object.
    323     """
--> 324     loop = events.get_running_loop()
    325     return loop.create_task(coro)
    326
RuntimeError: no running event loop

Eventloop是在單進(jìn)程里面的單線程中的,在IPython里面await的時(shí)候會(huì)把協(xié)程注冊(cè)到一個(gè)線程的Eventloop上,但是REPL環(huán)境是另外一個(gè)線程,不是一個(gè)線程,所以會(huì)提示這個(gè)錯(cuò)誤,即便asyncio.events._set_running_loop(loop)設(shè)置了loop,任務(wù)可以創(chuàng)建倒是不能await:因?yàn)閠ask是在線程X的Eventloop上注冊(cè)的,但是await時(shí)卻到線程Y的Eventloop上去執(zhí)行。這部分是C實(shí)現(xiàn)的,可以看延伸閱讀鏈接1。

所以現(xiàn)在你就會(huì)看到很多l(xiāng)oop.create_task的代碼片段,別擔(dān)心,在代碼項(xiàng)目里面都是用asyncio.create_task的,如果你非常想要在IPython里面使用asyncio.create_task也不是沒(méi)有辦法,可以這樣做:

In : loop = asyncio.get_event_loop()
In : def loop_runner(coro):
...:     asyncio.events._set_running_loop(None)
...:     loop.run_until_complete(coro)
...:     asyncio.events._set_running_loop(loop)
...:
In : %autoawait loop_runner
In : asyncio.events._set_running_loop(loop)
In : task = asyncio.create_task(a())
In : await task
Out: 'A'

這樣就可以啦。我解釋下為什么:

IPython里面能運(yùn)行await是由于loop_runner函數(shù),這個(gè)函數(shù)能運(yùn)行協(xié)程(延伸閱讀鏈接2),默認(rèn)的效果大概是asyncio.get_event_loop().run_until_complete(coro)。為了讓asyncio.create_task正常運(yùn)行我定義了新的loop_runner

通過(guò)autoawait這個(gè)magic函數(shù)就可以重新設(shè)置loop_runner

上面的報(bào)錯(cuò)是「no running event loop」,所以通過(guò)events._set_running_loop(loop)設(shè)置一個(gè)正在運(yùn)行的loop,但是在默認(rèn)的loop_runner中也無(wú)法運(yùn)行,會(huì)報(bào)「Cannot run the event loop while another loop is running」,所以重置await里面那個(gè)running的loop,運(yùn)行結(jié)束再設(shè)置回去。

如果你覺(jué)得有必要,可以在IPython配置文件中設(shè)置這個(gè)loop_runner到c.InteractiveShell.loop_runner上~

好,我們說(shuō)回來(lái),add_done_callback方法也是支持參數(shù)的,但是需要用到functools.partial:

def callback2(future, n):
    print(f'Result: {future.result()}, N: {n}')
In : task = loop.create_task(a())
In : task.add_done_callback(partial(callback2, n=1))
In : await task
Result: A, N: 1
Out: 'A'

調(diào)度回調(diào)

asyncio提供了3個(gè)按需回調(diào)的方法,都在Eventloop對(duì)象上,而且也支持參數(shù):

call_soon

在下一次事件循環(huán)中被回調(diào),回調(diào)是按其注冊(cè)順序被調(diào)用的:

def mark_done(future, result):
    print(f'Set to: {result}')
    future.set_result(result)
async def b1():
    loop = asyncio.get_event_loop()
    fut = asyncio.Future()
    loop.call_soon(mark_done, fut, 'the result')
    loop.call_soon(partial(print, 'Hello', flush=True))
    loop.call_soon(partial(print, 'Greeting', flush=True))
    print(f'Done: {fut.done()}')
    await asyncio.sleep(0)
    print(f'Done: {fut.done()}, Result: {fut.result()}')
In : await b1()
Done: False
Set to: the result
Hello
Greeting
Done: True, Result: the result

這個(gè)例子輸出的比較復(fù)雜,我挨個(gè)分析:

call_soon可以用來(lái)設(shè)置任務(wù)的結(jié)果: 在mark_done里面設(shè)置

通過(guò)2個(gè)print可以感受到call_soon支持參數(shù)。

最重要的就是輸出部分了,首先f(wàn)ut.done()的結(jié)果是False,因?yàn)檫€沒(méi)到下個(gè)事件循環(huán),sleep(0)就可以切到下次循環(huán),這樣就會(huì)調(diào)用三個(gè)call_soon回調(diào),最后再看fut.done()的結(jié)果就是True,而且fut.result()可以拿到之前在mark_done設(shè)置的值了

call_later

安排回調(diào)在給定的時(shí)間(單位秒)后執(zhí)行:

async def b2():
    loop = asyncio.get_event_loop()
    fut = asyncio.Future()
    loop.call_later(2, mark_done, fut, 'the result')
    loop.call_later(1, partial(print, 'Hello'))
    loop.call_later(1, partial(print, 'Greeting'))
    print(f'Done: {fut.done()}')
    await asyncio.sleep(2)
    print(f'Done: {fut.done()}, Result: {fut.result()}')
In : await b2()
Done: False
Hello
Greeting
Set to: the result
Done: True, Result: the result

這次要注意3個(gè)回調(diào)的延遲時(shí)間時(shí)間要<=sleep的,要不然還沒(méi)來(lái)及的回調(diào)程序就結(jié)束了

call_at

安排回調(diào)在給定的時(shí)間執(zhí)行,注意這個(gè)時(shí)間要基于 loop.time() 獲取當(dāng)前時(shí)間:

async def b3():
    loop = asyncio.get_event_loop()
    now = loop.time()
    fut = asyncio.Future()
    loop.call_at(now + 2, mark_done, fut, 'the result')
    loop.call_at(now + 1, partial(print, 'Hello', flush=True))
    loop.call_at(now + 1, partial(print, 'Greeting', flush=True))
    print(f'Done: {fut.done()}')
    await asyncio.sleep(2)
    print(f'Done: {fut.done()}, Result: {fut.result()}')
In : await b3()
Done: False
Hello
Greeting
Set to: the result
Done: True, Result: the result

關(guān)于Python中的asyncio庫(kù)-函數(shù)的回調(diào)與調(diào)度就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI