溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

Python?Asyncio庫(kù)之a(chǎn)syncio.task常用函數(shù)有哪些

發(fā)布時(shí)間:2023-03-01 16:37:21 來(lái)源:億速云 閱讀:118 作者:iii 欄目:開(kāi)發(fā)技術(shù)

這篇文章主要介紹“Python Asyncio庫(kù)之a(chǎn)syncio.task常用函數(shù)有哪些”的相關(guān)知識(shí),小編通過(guò)實(shí)際案例向大家展示操作過(guò)程,操作方法簡(jiǎn)單快捷,實(shí)用性強(qiáng),希望這篇“Python Asyncio庫(kù)之a(chǎn)syncio.task常用函數(shù)有哪些”文章能幫助大家解決問(wèn)題。

0.基礎(chǔ)

在《Python Asyncio調(diào)度原理》中介紹了Asyncio的兩種調(diào)度基本單位,HandlerTimeHandler,他們只能被loop.call_xx函數(shù)調(diào)用,開(kāi)發(fā)者從表面上不知道他們的存在,他們和loop.call_xx屬于事件循環(huán)的基礎(chǔ)功能,但是這些操作都屬于單一操作,需要開(kāi)發(fā)者自己編寫(xiě)代碼把他們的操作給串聯(lián)起來(lái)。 而在《Python的可等待對(duì)象在Asyncio的作用》中介紹了協(xié)程鏈的發(fā)起者asyncio.Task能通過(guò)loop.call_soon跟事件循環(huán)進(jìn)行交互,并串聯(lián)整個(gè)協(xié)程鏈中可等待對(duì)象以及安排可等待對(duì)象的運(yùn)行。 不過(guò)對(duì)于loop.call_atloop.call_later仍需要開(kāi)發(fā)者通過(guò)asyncio.Future來(lái)把Timehandler的執(zhí)行結(jié)果與asyncio.Task給串聯(lián)起來(lái),比如休眠一秒的代碼實(shí)現(xiàn):

import asyncio


async def main():
    loop = asyncio.get_event_loop()
    f = asyncio.Future()

    def _on_complete():
        f.set_result(True)

    loop.call_later(1, _on_complete)
    return await f


if __name__ == "__main__":
    import time
    s_t = time.time()
    asyncio.run(main())
    print(time.time() - s_t)

這段代碼中asyncio.Future執(zhí)行的是類(lèi)似容器的功能,自己本身會(huì)接受各種狀態(tài),并把自己的狀態(tài)同步給管理當(dāng)前協(xié)程鏈的asyncio.Task,使asyncio.Task能管理其他類(lèi)型的操作。 在asyncio.tasks模塊中的所有功能函數(shù)的原理也差不多,他們接受的參數(shù)基本是都是可等待對(duì)象,然后通過(guò)asyncio.Futurte作為容器來(lái)同步調(diào)用端和可等待對(duì)象間的狀態(tài),也可以通過(guò)其他的一些方法把asyncio.Task的狀態(tài)同步給可等待對(duì)象。

1.休眠--asyncio.sleep

asyncio.sleep是一個(gè)常用的方法,開(kāi)發(fā)者通過(guò)它可以很方便的讓協(xié)程休眠設(shè)定的時(shí)間,它本身也非常簡(jiǎn)單,它的源碼如下:

@types.coroutine
def __sleep0():
    yield


async def sleep(delay, result=None):
    """Coroutine that completes after a given time (in seconds)."""
    if delay <= 0:
        await __sleep0()
        return result

    loop = events.get_running_loop()
    future = loop.create_future()
    h = loop.call_later(delay,
                        futures._set_result_unless_cancelled,
                        future, result)
    try:
        return await future
    finally:
        h.cancel()

通過(guò)源碼可以發(fā)現(xiàn)當(dāng)設(shè)置的休眠時(shí)間等于小于0的時(shí)候,sleep只執(zhí)行了yield,并不會(huì)執(zhí)行其他邏輯,而在值大于0時(shí)會(huì)創(chuàng)建一個(gè)Future對(duì)象,接著就一直等待,直到Future對(duì)象被loop.call_later控制結(jié)束時(shí)才返回結(jié)果值。

需要注意的是,當(dāng)asyncio.sleep在值為0時(shí),sleep執(zhí)行yield可以讓Task.__step感知而讓出控制權(quán),這是最小的讓出當(dāng)前協(xié)程控制權(quán)的方法,所以我們?cè)诰帉?xiě)涉及到CPU比較多的時(shí)候或者消耗時(shí)間較長(zhǎng)的函數(shù)時(shí)可以通過(guò)asyncio.sleep(0)來(lái)主動(dòng)讓出控制權(quán),如下:

import asyncio

async def demo() -> None:
    for index, i in enumerate(range(10000)):
        if index % 100 == 0:
            await asyncio.sleep(0)
        ...  # 假設(shè)這里的代碼占用過(guò)多的CPU時(shí)間

在這個(gè)例子中每循環(huán)100次就讓出控制權(quán),以減少對(duì)其他協(xié)程的影響。

2.屏蔽取消--asyncio.shield

asyncio.shield可以保護(hù)一個(gè)可等待對(duì)象被取消,或者說(shuō)是防止協(xié)程鏈上的取消傳播到被asyncio.shield托管的可等待對(duì)象,但是調(diào)用可等待對(duì)象的cancel方法仍然可以取消可等待對(duì)象的運(yùn)行,如下例子:

import asyncio


async def sub(f):
    await asyncio.shield(f)


async def main():
    f1 = asyncio.Future()
    f2 = asyncio.Future()
    sub1 = asyncio.create_task(sub(f1))
    sub2 = asyncio.create_task(sub(f2))
    f1.cancel()
    sub2.cancel()
    await asyncio.sleep(0)  # 確保已經(jīng)取消完成
    print("f1 future run success:", f1.done())
    print("f2 future run success:", f2.done())
    print("sub1 future run result:", sub1.done())
    print("sub2 future run result:", sub2.done())

asyncio.run(main())

# >>> future run success: True
# >>> future run success: False
# >>> sub1 future run result: True
# >>> sub2 future run result: True

其中f1, f2都在main函數(shù)中創(chuàng)建, 然后同時(shí)被sub函數(shù)包裹,并通過(guò)asyncio.create_task在后臺(tái)異步運(yùn)行并分別返回sub1sub2兩個(gè)Future對(duì)應(yīng)著sub函數(shù)的執(zhí)行情況。 接著分別取消f1sub2的執(zhí)行,并把f1,f2,sub1,sub2是否為done打印出來(lái),可以發(fā)現(xiàn)f1,sub1,sub2的狀態(tài)都為done(被取消也認(rèn)為是done),而f2則還在運(yùn)行中。

在文章《Python的可等待對(duì)象在Asyncio的作用》中說(shuō)過(guò),一條協(xié)程鏈?zhǔn)怯?code>asyncio.Task牽頭組成的,后續(xù)的所有成功和異常都會(huì)在這條鏈上傳播,而取消本質(zhì)上就是一種異常,所以也可以在協(xié)程鏈上傳播。 而shield為了杜絕運(yùn)行的可等待對(duì)象收到協(xié)程鏈的異常傳播又能讓協(xié)程鏈知道可等待對(duì)象的執(zhí)行結(jié)果,會(huì)先讓可等待對(duì)象在另外一條協(xié)程鏈運(yùn)行,然后創(chuàng)建一個(gè)容器接到原來(lái)鏈上,并在可等待對(duì)象執(zhí)行完成的時(shí)候把結(jié)果告訴容器,由容器把結(jié)果傳播到原有的協(xié)程鏈上,對(duì)應(yīng)的源碼如下:

def shield(arg):
    # 如果是Coro,則需要包裝成future
    inner = _ensure_future(arg)
    if inner.done():
        # 如果已經(jīng)完成,就不需要被處理了
        return inner
    loop = futures._get_loop(inner)
    # 創(chuàng)建一個(gè)future容器
    outer = loop.create_future()

    def _inner_done_callback(inner):
        if outer.cancelled():
            if not inner.cancelled():
                # 如果容器已經(jīng)被取消,而自己沒(méi)被取消且已經(jīng)完成,則手動(dòng)獲取下結(jié)果,方便被回收
                inner.exception()
            return

        if inner.cancelled():
            # 如果自己被取消,則把取消通過(guò)容器傳播到協(xié)程鏈上
            outer.cancel()
        else:
            # 自己已經(jīng)完成且容器未完成,把自己的結(jié)果或者異常通過(guò)替身傳播到協(xié)程鏈上
            exc = inner.exception()
            if exc is not None:
                outer.set_exception(exc)
            else:
                outer.set_result(inner.result())


    def _outer_done_callback(outer):
        if not inner.done():
            inner.remove_done_callback(_inner_done_callback)

    # 添加回調(diào),在執(zhí)行成功或被取消時(shí)通知對(duì)方
    inner.add_done_callback(_inner_done_callback)
    outer.add_done_callback(_outer_done_callback)
    return outer

通過(guò)源碼可以發(fā)現(xiàn)shield被調(diào)用的時(shí)候(假設(shè)驅(qū)動(dòng)調(diào)用shieldTask名為main.Task),會(huì)先通過(guò)_ensure_future輔助函數(shù)創(chuàng)建一個(gè)Task(other.Task)在后臺(tái)異步運(yùn)行可等待對(duì)象,驅(qū)動(dòng)可等待對(duì)象的運(yùn)行,由于是新的Task驅(qū)動(dòng)著可等待對(duì)象的執(zhí)行,所以main.Task的任何狀態(tài)不會(huì)傳播到當(dāng)前的可等待對(duì)象。 接著創(chuàng)建一個(gè)Future容器,并在other.TaskFuture容器掛上完成的回調(diào)使他們?cè)谕瓿傻臅r(shí)候都能通知到對(duì)方,最后返回Future容器給main.Task,使main.Task能夠間接的知道可等待對(duì)象的運(yùn)行結(jié)果,如下圖:

Python?Asyncio庫(kù)之a(chǎn)syncio.task常用函數(shù)有哪些

不過(guò)Future容器完成的回調(diào)只是把托管可等待對(duì)象的other.Task回調(diào)給移除了,導(dǎo)致main.Task的狀態(tài)不會(huì)同步到other.Task中(圖中Future通知可等待對(duì)象aws的通道是不通的),進(jìn)而不會(huì)影響到托管的可等待對(duì)象。 而other.Task完成的回調(diào)會(huì)把任何狀態(tài)同步到Future中,進(jìn)而影響到main.Task。

3.超時(shí)--asyncio.wait_for

asyncio.wait_for可以托管可等待對(duì)象,直到可等待對(duì)象完成,不過(guò)可等待對(duì)象在設(shè)定的時(shí)間內(nèi)還沒(méi)執(zhí)行完成時(shí)會(huì)被直接取消執(zhí)行并拋出asyncio.TimeoutError異常。 它的運(yùn)行原理綜合了上面的asyncio.shieldasyncio.sleep,它一樣會(huì)為可等待對(duì)象創(chuàng)建一個(gè)Future容器,并在容器上掛了一個(gè)超時(shí)的回調(diào)和可等待對(duì)象執(zhí)行結(jié)束的回調(diào),接著就等待容器執(zhí)行結(jié)束。 不過(guò)在了解asyncio.wait_for之前,先了解他用到的兩個(gè)輔助函數(shù)_cancel_and_wait_release_waiter,他們的源碼如下:

def _release_waiter(waiter, *args):
    if not waiter.done():
        waiter.set_result(None)

async def _cancel_and_wait(fut, loop):
    waiter = loop.create_future()
    cb = functools.partial(_release_waiter, waiter)
    fut.add_done_callback(cb)

    try:
        fut.cancel()
        await waiter
    finally:
        fut.remove_done_callback(cb)

可以看出源碼比較簡(jiǎn)單,他們的作用都是為了確??傻却龑?duì)象能完全執(zhí)行結(jié)束才返回,其中_release_waiter是確??傻却龑?duì)象一定被設(shè)置為執(zhí)行結(jié)束,而_cancel_and_wait是為了確保能等到可等待對(duì)象被取消且完整結(jié)束時(shí)才返回。

可等待對(duì)象的cancel方法可以認(rèn)為是異步的,調(diào)用后需要等事件循環(huán)再次調(diào)用可等待對(duì)象時(shí),可等待對(duì)象才會(huì)被取消。而_cancel_and_wait通過(guò)一個(gè)容器來(lái)規(guī)避這個(gè)問(wèn)題,使取消這個(gè)操作變?yōu)橥降模@個(gè)方法在某些開(kāi)發(fā)場(chǎng)景經(jīng)常被使用,如果不是私有API就更好了。

接下來(lái)就可以通過(guò)wait_for的源碼了解他的執(zhí)行邏輯了,源碼如下:

async def wait_for(fut, timeout):
    loop = events.get_running_loop()

    if timeout is None:
        return await fut

    if timeout <= 0:
        # 當(dāng)超時(shí)的值小于等于0時(shí)就意味著想馬上得到結(jié)果
        
        fut = ensure_future(fut, loop=loop)

        if fut.done():
            # 如果執(zhí)行完成就返回可等待對(duì)象的數(shù)據(jù)
            return fut.result()
        # 取消可等待對(duì)象并等待
        await _cancel_and_wait(fut, loop=loop)
        # 如果被_cancel_and_wait取消,那么會(huì)拋出CancelledError異常,這時(shí)候把它轉(zhuǎn)為超時(shí)異常
        try:
            return fut.result()
        except exceptions.CancelledError as exc:
            raise exceptions.TimeoutError() from exc

    # 初始化一個(gè)Future,只有在超時(shí)和完成時(shí)才會(huì)變?yōu)閐one
    waiter = loop.create_future()
    timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
    cb = functools.partial(_release_waiter, waiter)

    fut = ensure_future(fut, loop=loop)
    fut.add_done_callback(cb)

    try:
        try:
            await waiter
        except exceptions.CancelledError:
            # 此時(shí)是asyncio.Task被取消,并把取消傳播到waiter
            if fut.done():
                return fut.result()
            else:
                # 如果任務(wù)被取消了,那么需要確保任務(wù)沒(méi)有被執(zhí)行才返回
                fut.remove_done_callback(cb)
                await _cancel_and_wait(fut, loop=loop)
                raise
        # 計(jì)時(shí)結(jié)束或者是執(zhí)行完畢的情況
        if fut.done():
            # 執(zhí)行完畢,返回對(duì)應(yīng)的值
            return fut.result()
        else:
            # 計(jì)時(shí)結(jié)束,清理資源,并拋出異常
            fut.remove_done_callback(cb)
            # 如果任務(wù)被取消了,那么需要確保任務(wù)沒(méi)有被執(zhí)行才返回
            await _cancel_and_wait(fut, loop=loop)
            # 如果被_cancel_and_wait取消,那么會(huì)拋出CancelledError異常,這時(shí)候把它轉(zhuǎn)為超時(shí)異常
            try:
                return fut.result()
            except exceptions.CancelledError as exc:
                raise exceptions.TimeoutError() from exc
    finally:
        timeout_handle.cancel()

wait_for的源碼為了兼容各種情況,代碼復(fù)雜度比較高,同時(shí)超時(shí)參數(shù)小于等于0跟大于0的邏輯是一樣的,分開(kāi)寫(xiě)只是為了避免在小于等于0時(shí)創(chuàng)建了一些額外的對(duì)象,在精簡(jiǎn)了一些asyncio.Task傳播異常給waiter的邏輯后,wait_for的執(zhí)行邏輯如下圖:

Python?Asyncio庫(kù)之a(chǎn)syncio.task常用函數(shù)有哪些

fut為可等待對(duì)象,timeout為超時(shí)時(shí)間

可以看到wait_for的主要邏輯是先創(chuàng)建一個(gè)名為waiter的容器,接著通過(guò)loop.call_later指定在多少時(shí)間后釋放容器,然后再通過(guò)ensure_future使另一個(gè)asyncio.Task來(lái)托管可等待對(duì)象,并安排執(zhí)行完畢的時(shí)候釋放容器,再等待waiter容器的執(zhí)行直到被釋放。當(dāng)容器被釋放的時(shí)候再判斷可等待對(duì)象是否執(zhí)行完畢,如果執(zhí)行完畢了就直接返回,否則拋出超時(shí)異常。

4.簡(jiǎn)單的等待--wait

asyncio.wait用于等待一批可等待對(duì)象,當(dāng)有一個(gè)可等待對(duì)象執(zhí)行完成或者出現(xiàn)異常的時(shí)候才會(huì)返回?cái)?shù)據(jù)(具體還是要看return_when指定的條件,默認(rèn)為所有等待對(duì)象結(jié)束或取消時(shí)才返回),需要注意的是wait雖然支持timeout參數(shù),但是在超時(shí)的試試不會(huì)取消可等待對(duì)象,也不會(huì)拋出超時(shí)的異常,只會(huì)把完成的可等待對(duì)象放在完成的集合,把未完成的可等待對(duì)象放在未完成的集合并返回,如下代碼:

import asyncio


async def main():
    return await asyncio.wait(
        {asyncio.create_task(asyncio.sleep(1))},
        timeout=0.5
    )


if __name__ == "__main__":
    asyncio.run(main())

這段代碼可以正常的運(yùn)作,不會(huì)拋出超時(shí)錯(cuò),不過(guò)還要注意的是在后續(xù)版本中asyncio.wait只支持Task對(duì)象,如果想要傳入的是coroFuture對(duì)象,則需要開(kāi)發(fā)者自己手動(dòng)轉(zhuǎn)換。 wait的邏輯與wait_for類(lèi)似,源碼如下:

async def _wait(fs, timeout, return_when, loop):
    assert fs, 'Set of Futures is empty.'
    waiter = loop.create_future()
    timeout_handle = None
    if timeout is not None:
        # 定義一個(gè)time handler,在timeout秒后通過(guò)`_release_waiter`完成.
        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
    counter = len(fs)

    def _on_completion(f):
        # 每個(gè)可等待對(duì)象執(zhí)行完成的回調(diào)
        nonlocal counter
        counter -= 1
        if (counter <= 0 or
            return_when == FIRST_COMPLETED or
            return_when == FIRST_EXCEPTION and
             (not f.cancelled() and f.exception() is not None)
        ):
            # 如果所有任務(wù)執(zhí)行完成,或者是第一個(gè)完成或者是第一個(gè)拋出異常時(shí),
            # 意味著執(zhí)行完成,需要取消time handler,并標(biāo)記為完成
            if timeout_handle is not None:
                timeout_handle.cancel()
            if not waiter.done():
                waiter.set_result(None)
    # 為每個(gè)可等待對(duì)象添加回調(diào)
    for f in fs:
        f.add_done_callback(_on_completion)

    try:
        # 等待替身執(zhí)行完成
        await waiter
    finally:
        # 取消time handler并移除回調(diào)(因?yàn)閏ancel是異步的)
        if timeout_handle is not None:
            timeout_handle.cancel()
        for f in fs:
            f.remove_done_callback(_on_completion)

    # 處理并返回done和pending,其中done代表完成,pending代表執(zhí)行中。
    done, pending = set(), set()
    for f in fs:
        if f.done():
            done.add(f)
        else:
            pending.add(f)
    return done, pending

可以看到wait_for的復(fù)雜度沒(méi)有wait高,而且可以看到asyncio.wait是等waiter這個(gè)容器執(zhí)行完并移除可等待對(duì)象上面的_on_completion回調(diào)后才把可等待對(duì)象按照是否完成區(qū)分到donepending兩個(gè)集合,這樣的準(zhǔn)確度比在_on_completion高一些,但是如果開(kāi)發(fā)者在處理集合時(shí)觸發(fā)一些異步操作也可能導(dǎo)致pending集合中的部分可等待對(duì)象變?yōu)橥瓿傻?,如下代碼:

import asyncio


async def main():
    f_list = [asyncio.Future() for _ in range(10)]
    done, pending = await asyncio.wait(f_list, timeout=1)
    print(len(done), len(pending))
    print([i for i in pending if i.done()])
    f_list[1].set_result(True)
    print([i for i in pending if i.done()])


if __name__ == "__main__":
    asyncio.run(main())
# >>> 0 10
# >>> []
# >>> [<Future finished result=True>]

通過(guò)輸出可以發(fā)現(xiàn),在asyncio.wait執(zhí)行完畢后,pending中的完成的元素只有0個(gè),而在后續(xù)強(qiáng)制為其中的一個(gè)Future設(shè)置數(shù)據(jù)后,pending中完成的元素有1個(gè)了。

5.迭代可等待對(duì)象的完成--asyncio.as_completed

asyncio.wait的機(jī)制是只要被觸發(fā)就會(huì)返回,其他尚未完成的可等待對(duì)象需要開(kāi)發(fā)者自己在處理,而asyncio.as_completed可以確保每個(gè)可等待對(duì)象完成返回?cái)?shù)據(jù)或者超時(shí)時(shí)拋出異常,使用方法如下:

import asyncio


async def sub(i):
    await asyncio.sleep(i)
    return i


async def main():
    for f in asyncio.as_completed([sub(i) for i in range(5)], timeout=3):
        print(await f)


if __name__ == "__main__":
    asyncio.run(main())
# >>> 0
# >>> 1
# >>> 2
# >>> Traceback (most recent call last):
#       File "/home/so1n/github/demo_project/demo.py", line 18, in <module>
#         asyncio.run(main())
#       File "/usr/lib/python3.7/asyncio/runners.py", line 43, in run
#         return loop.run_until_complete(main)
#       File "/usr/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
#         return future.result()
#       File "/home/so1n/github/demo_project/demo.py", line 14, in main
#         print(await f)
#       File "/usr/lib/python3.7/asyncio/tasks.py", line 532, in _wait_for_one
#         raise futures.TimeoutError
#     concurrent.futures._base.TimeoutError

該程序并發(fā)執(zhí)行5個(gè)協(xié)程,其中執(zhí)行最久的時(shí)間是5秒,而as_completed設(shè)置的超時(shí)為3秒。通過(guò)輸出可以發(fā)現(xiàn),每當(dāng)一個(gè)可等待對(duì)象執(zhí)行結(jié)束時(shí)就會(huì)把數(shù)據(jù)拋出來(lái),當(dāng)超時(shí)則會(huì)拋出超時(shí)錯(cuò)誤。為了能達(dá)每有一個(gè)可等待對(duì)象就返回一次數(shù)據(jù)的效果,as_completed通過(guò)一個(gè)隊(duì)列來(lái)維護(hù)數(shù)據(jù)的返回,它的源碼如下:

def as_completed(fs, *, timeout=None):
    from .queues import Queue  # Import here to avoid circular import problem.
    done = Queue()

    loop = events._get_event_loop()
    todo = {ensure_future(f, loop=loop) for f in set(fs)}
    timeout_handle = None

    def _on_timeout():
        # 超時(shí)時(shí)調(diào)用,需要注意的是,失敗時(shí)結(jié)果為空,所以要推送一個(gè)空的數(shù)據(jù)到隊(duì)列中
        # 在消費(fèi)者發(fā)現(xiàn)元素為空時(shí)拋出錯(cuò)誤
        for f in todo:
            f.remove_done_callback(_on_completion)
            done.put_nowait(None)  # Queue a dummy value for _wait_for_one().
        todo.clear()  # Can't do todo.remove(f) in the loop.

    def _on_completion(f):
        # 如果成功,就把Future推送到隊(duì)列中,消費(fèi)者可以通過(guò)Future獲取到結(jié)果
        if not todo:
            return  # _on_timeout() was here first.
        todo.remove(f)
        done.put_nowait(f)
        if not todo and timeout_handle is not None:
            timeout_handle.cancel()

    async def _wait_for_one():
        f = await done.get()
        if f is None:
            # 如果元素為空,則證明已經(jīng)超時(shí)了,要拋出異常
            raise exceptions.TimeoutError
        return f.result()

    for f in todo:
        f.add_done_callback(_on_completion)
    if todo and timeout is not None:
        timeout_handle = loop.call_later(timeout, _on_timeout)
    # 通過(guò)生成器語(yǔ)法返回協(xié)程函數(shù),該協(xié)程函數(shù)可以獲取最近完成的可等待對(duì)象的結(jié)果
    for _ in range(len(todo)):
        yield _wait_for_one()

通過(guò)源碼可以發(fā)現(xiàn)可等待對(duì)象就像生產(chǎn)者一樣,執(zhí)行結(jié)束的時(shí)候就會(huì)把結(jié)果投遞給隊(duì)列,同時(shí)as_completed會(huì)迭代跟可等待對(duì)象的數(shù)量一樣的_wait_for_one協(xié)程函數(shù),供開(kāi)發(fā)者消費(fèi)數(shù)據(jù)。不過(guò)需要注意的是as_completed在超時(shí)的時(shí)候,并不會(huì)取消尚未完成的可等待對(duì)象,他們會(huì)變?yōu)椴豢煽氐臓顟B(tài),在某些時(shí)候會(huì)造成內(nèi)存溢出,如下示例代碼:

import asyncio
import random


async def sub():
    # 一半的幾率會(huì)被set一個(gè)值并返回,一半的幾率會(huì)卡死
    f = asyncio.Future()
    if random.choice([0, 1]) == 0:
        f.set_result(None)
    return await f


async def main():
    try:
        for f in asyncio.as_completed([sub() for i in range(5)], timeout=1):
            print(await f)
    except asyncio.TimeoutError:
        # 忽略超時(shí)
        pass
    # 統(tǒng)計(jì)未完成的sub任務(wù)
    cnt = 0
    for i in asyncio.all_tasks():
        if i._coro.__name__ == sub.__name__:
            cnt += 1
    print("runing task by name sub:", cnt)


if __name__ == "__main__":
    asyncio.run(main())
# >>> None
# >>> None
# >>> None
# >>> runing task by name sub: 2

通過(guò)結(jié)果(由于采用隨機(jī),結(jié)果可能不一樣)可以發(fā)現(xiàn),sub成功執(zhí)行完成的數(shù)量有3個(gè)(輸出None),而在as_completed觸發(fā)超時(shí)后仍有兩個(gè)sub在執(zhí)行中,這時(shí)的兩個(gè)sub成為無(wú)人管理的可等待對(duì)象,除非開(kāi)發(fā)者通過(guò)asyncio.all_tasks去找到他并清理掉,否則這幾個(gè)可等待對(duì)象會(huì)一直伴隨著程序運(yùn)行,這很容易造成內(nèi)存溢出。

關(guān)于“Python Asyncio庫(kù)之a(chǎn)syncio.task常用函數(shù)有哪些”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí),可以關(guān)注億速云行業(yè)資訊頻道,小編每天都會(huì)為大家更新不同的知識(shí)點(diǎn)。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI