溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Python異步方法如何使用

發(fā)布時(shí)間:2023-05-11 10:06:11 來源:億速云 閱讀:120 作者:zzz 欄目:編程語(yǔ)言

本篇內(nèi)容介紹了“Python異步方法如何使用”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

為什么要異步編程?

要了解異步編程的動(dòng)機(jī),我們首先必須了解是什么限制了我們的代碼運(yùn)行速度。理想情況下,我們希望我們的代碼以光速運(yùn)行,立即跳過我們的代碼,沒有任何延遲。然而,由于兩個(gè)因素,實(shí)際上代碼運(yùn)行速度要慢得多:

  • CPU時(shí)間(處理器執(zhí)行指令的時(shí)間)

  • IO時(shí)間(等待網(wǎng)絡(luò)請(qǐng)求或存儲(chǔ)讀/寫的時(shí)間)

當(dāng)我們的代碼在等待 IO 時(shí),CPU 基本上是空閑的,等待某個(gè)外部設(shè)備響應(yīng)。通常,內(nèi)核會(huì)檢測(cè)到這一點(diǎn)并立即切換到執(zhí)行系統(tǒng)中的其他線程。因此,如果我們想加快處理一組 IO 密集型任務(wù),我們可以為每個(gè)任務(wù)創(chuàng)建一個(gè)線程。當(dāng)其中一個(gè)線程停止,等待 IO 時(shí),內(nèi)核將切換到另一個(gè)線程繼續(xù)處理。

這在實(shí)踐中效果很好,但有兩個(gè)缺點(diǎn):

  • 線程有開銷(尤其是在 Python 中)

  • 我們無法控制內(nèi)核何時(shí)選擇在線程之間切換

例如,如果我們想要執(zhí)行 10,000 個(gè)任務(wù),我們要么必須創(chuàng)建 10,000 個(gè)線程,這將占用大量 RAM,要么我們需要?jiǎng)?chuàng)建較少數(shù)量的工作線程并以較少的并發(fā)性執(zhí)行任務(wù)。此外,最初生成這些線程會(huì)占用 CPU 時(shí)間。

由于內(nèi)核可以隨時(shí)選擇在線程之間切換,因此我們代碼中的任何時(shí)候都可能出現(xiàn)相互競(jìng)爭(zhēng)。

引入異步

在傳統(tǒng)的基于同步線程的代碼中,內(nèi)核必須檢測(cè)線程何時(shí)是IO綁定的,并選擇在線程之間隨意切換。使用 Python 異步,程序員使用關(guān)鍵字await確認(rèn)聲明 IO 綁定的代碼行,并確認(rèn)授予執(zhí)行其他任務(wù)的權(quán)限。例如,考慮以下執(zhí)行Web請(qǐng)求的代碼:

async def request_google():
    reader, writer = await asyncio.open_connection('google.com', 80)
    writer.write(b'GET / HTTP/2\n\n')
    await writer.drain()
    response = await reader.read()
    return response.decode()

在這里,在這里,我們看到該代碼在兩個(gè)地方await。因此,在等待我們的字節(jié)被發(fā)送到服務(wù)器writer.drain())時(shí),在等待服務(wù)器用一些字節(jié)(reader.read())回復(fù)時(shí),我們知道其他代碼可能會(huì)執(zhí)行,全局變量可能會(huì)更改。然而,從函數(shù)開始到第一次等待,我們可以確保代碼逐行運(yùn)行,而不會(huì)切換到運(yùn)行程序中的其他代碼。這就是異步的美妙之處。

asyncio是一個(gè)標(biāo)準(zhǔn)庫(kù),可以讓我們用這些異步函數(shù)做一些有趣的事情。例如,如果我們想同時(shí)向Google執(zhí)行兩個(gè)請(qǐng)求,我們可以:

async def request_google_twice():
    response_1, response_2 = await asyncio.gather(request_google(), request_google())
    return response_1, response_2

當(dāng)我們調(diào)用request_google_twice()時(shí),神奇的asyncio.gather會(huì)啟動(dòng)一個(gè)函數(shù)調(diào)用,但是當(dāng)我們調(diào)用時(shí)await writer.drain(),它會(huì)開始執(zhí)行第二個(gè)函數(shù)調(diào)用,這樣兩個(gè)請(qǐng)求就會(huì)并行發(fā)生。然后,它等待第一個(gè)或第二個(gè)請(qǐng)求的writer.drain()調(diào)用完成并繼續(xù)執(zhí)行該函數(shù)。

最后,有一個(gè)重要的細(xì)節(jié)被遺漏了:asyncio.run。要從常規(guī)的 [同步] Python 函數(shù)實(shí)際調(diào)用異步函數(shù),我們將調(diào)用包裝在asyncio.run(...)

async def async_main():
    r1, r2 = await request_google_twice()
    print('Response one:', r1)
    print('Response two:', r2)
    return 12

return_val = asyncio.run(async_main())

請(qǐng)注意,如果我們只調(diào)用async_main()而不調(diào)用await ...或者 asyncio.run(...),則不會(huì)發(fā)生任何事情。這只是由異步工作方式的性質(zhì)所限制的。

那么,異步究竟是如何工作的,這些神奇的asyncio.runasyncio.gather函數(shù)有什么作用呢?閱讀下文以了解詳情。

異步是如何工作的

要了解async的魔力,我們首先需要了解一個(gè)更簡(jiǎn)單的 Python 構(gòu)造:生成器

生成器

生成器是 Python 函數(shù),它逐個(gè)返回一系列值(可迭代)。例如:

def get_numbers():
    print("|| get_numbers begin")
    print("|| get_numbers Giving 1...")
    yield 1
    print("|| get_numbers Giving 2...")
    yield 2
    print("|| get_numbers Giving 3...")
    yield 3
    print("|| get_numbers end")

print("| for begin")
for number in get_numbers():
    print(f"| Got {number}.")
print("| for end")
| for begin
|| get_numbers begin
|| get_numbers Giving 1...
| Got 1.
|| get_numbers Giving 2...
| Got 2.
|| get_numbers Giving 3...
| Got 3.
|| get_numbers end
| for end

因此,我們看到,對(duì)于for循環(huán)的每個(gè)迭代,我們?cè)谏善髦兄粓?zhí)行一次。我們可以使用Python的next()函數(shù)更明確地執(zhí)行此迭代:

In [3]: generator = get_numbers()                                                                                                                                                            

In [4]: next(generator)                                                                                                                                                                      
|| get_numbers begin
|| get_numbers Giving 1...
Out[4]: 1

In [5]: next(generator)                                                                                                                                                                      
|| get_numbers Giving 2...
Out[5]: 2

In [6]: next(generator)                                                                                                                                                                      
|| get_numbers Giving 3...
Out[6]: 3

In [7]: next(generator)                                                                                                                                                                      
|| get_numbers end
---------------------------------------
StopIteration       Traceback (most recent call last)
<ipython-input-154-323ce5d717bb> in <module>
----> 1 next(generator)

StopIteration:

這與異步函數(shù)的行為非常相似。正如異步函數(shù)從函數(shù)開始直到第一次等待時(shí)連續(xù)執(zhí)行代碼一樣,我們第一次調(diào)用next()時(shí),生成器將從函數(shù)頂部執(zhí)行到第一個(gè)yield 語(yǔ)句。然而,現(xiàn)在我們只是從生成器返回?cái)?shù)字。我們將使用相同的思想,但返回一些不同的東西來使用生成器創(chuàng)建類似異步的函數(shù)。

使用生成器進(jìn)行異步

讓我們使用生成器來創(chuàng)建我們自己的小型異步框架。

但是,為簡(jiǎn)單起見,讓我們將實(shí)際 IO 替換為睡眠(即。time.sleep)。讓我們考慮一個(gè)需要定期發(fā)送更新的應(yīng)用程序:

def send_updates(count: int, interval_seconds: float):
    for i in range(1, count + 1):
        time.sleep(interval_seconds)
        print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count))

因此,如果我們調(diào)用send_updates(3, 1.0),它將輸出這三條消息,每條消息間隔 1 秒:

[1.0] Sending update 1/3.
[1.0] Sending update 2/3.
[1.0] Sending update 3/3.

現(xiàn)在,假設(shè)我們要同時(shí)運(yùn)行幾個(gè)不同的時(shí)間間隔。例如,send_updates(10, 1.0),send_updates(5, 2.0)send_updates(4, 3.0)。我們可以使用線程來做到這一點(diǎn),如下所示:

threads = [
    threading.Thread(target=send_updates, args=(10, 1.0)),
    threading.Thread(target=send_updates, args=(5, 2.0)),
    threading.Thread(target=send_updates, args=(4, 3.0))
]
for i in threads:
    i.start()
for i in threads:
    i.join()

這可行,在大約 12 秒內(nèi)完成,但使用具有前面提到的缺點(diǎn)的線程。讓我們使用生成器構(gòu)建相同的東西。

在演示生成器的示例中,我們返回了整數(shù)。為了獲得類似異步的行為,而不是返回任意值,我們希望返回一些描述要等待的IO的對(duì)象。在我們的例子中,我們的“IO”只是一個(gè)計(jì)時(shí)器,它將等待一段時(shí)間。因此,讓我們創(chuàng)建一個(gè)計(jì)時(shí)器對(duì)象,用于此目的:

class AsyncTimer:
    def __init__(self, duration: float):
        self.done_time = time.time() + duration

現(xiàn)在,讓我們從我們的函數(shù)中產(chǎn)生這個(gè)而不是調(diào)用time.sleep

def send_updates(count: int, interval_seconds: float):
    for i in range(1, count + 1):
        yield AsyncTimer(interval_seconds)
        print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count))

現(xiàn)在,每次我們調(diào)用send_updates(...)時(shí)調(diào)用next(...),我們都會(huì)得到一個(gè)AsyncTimer對(duì)象,告訴我們直到我們應(yīng)該等待什么時(shí)候:

generator = send_updates(3, 1.5)
timer = next(generator)  # [1.5] Sending update 1/3.
print(timer.done_time - time.time())  # 1.498...

由于我們的代碼現(xiàn)在實(shí)際上并沒有調(diào)用time.sleep,我們現(xiàn)在可以同時(shí)執(zhí)行另一個(gè)send_updates調(diào)用。

所以,為了把這一切放在一起,我們需要退后一步,意識(shí)到一些事情:

  • 生成器就像部分執(zhí)行的函數(shù),等待一些 IO(計(jì)時(shí)器)。

  • 每個(gè)部分執(zhí)行的函數(shù)都有一些 IO(計(jì)時(shí)器),它在繼續(xù)執(zhí)行之前等待。

  • 因此,我們程序的當(dāng)前狀態(tài)是每個(gè)部分執(zhí)行的函數(shù)(生成器)和該函數(shù)正在等待的 IO(計(jì)時(shí)器)對(duì)的對(duì)列表

  • 現(xiàn)在,要運(yùn)行我們的程序,我們只需要等到某個(gè) IO 準(zhǔn)備就緒(即我們的一個(gè)計(jì)時(shí)器已過期),然后再向前一步執(zhí)行相應(yīng)的函數(shù),得到一個(gè)阻塞該函數(shù)的新 IO。

實(shí)現(xiàn)此邏輯為我們提供了以下信息:

# Initialize each generator with a timer of 0 so it immediately executes
generator_timer_pairs = [
    (send_updates(10, 1.0), AsyncTimer(0)),
    (send_updates(5, 2.0), AsyncTimer(0)),
    (send_updates(4, 3.0), AsyncTimer(0))
]

while generator_timer_pairs:
    pair = min(generator_timer_pairs, key=lambda x: x[1].done_time)
    generator, min_timer = pair

    # Wait until this timer is ready
    time.sleep(max(0, min_timer.done_time - time.time()))
    del generator_timer_pairs[generator_timer_pairs.index(pair)]

    try:  # Execute one more step of this function
        new_timer = next(generator)
        generator_timer_pairs.append((generator, new_timer))
    except StopIteration:  # When the function is complete
        pass

有了這個(gè),我們有了一個(gè)使用生成器的類似異步函數(shù)的工作示例。請(qǐng)注意,當(dāng)生成器完成時(shí),它會(huì)引發(fā)StopIteration,并且當(dāng)我們不再有部分執(zhí)行的函數(shù)(生成器)時(shí),我們的函數(shù)就完成了

現(xiàn)在,我們把它包裝在一個(gè)函數(shù)中,我們得到了類似于asyncio.run的東西。結(jié)合asyncio.gather運(yùn)行:

def async_run_all(*generators):
    generator_timer_pairs = [
        (generator, AsyncTimer(0))
        for generator in generators
    ]
    while generator_timer_pairs:
        pair = min(generator_timer_pairs, key=lambda x: x[1].done_time)
        generator, min_timer = pair

        time.sleep(max(0, min_timer.done_time - time.time()))
        del generator_timer_pairs[generator_timer_pairs.index(pair)]

        try:
            new_timer = next(generator)
            generator_timer_pairs.append((generator, new_timer))
        except StopIteration:
            pass

async_run_all(
    send_updates(10, 1.0),
    send_updates(5, 2.0),
    send_updates(4, 3.0)
)
使用 async/await 進(jìn)行異步

實(shí)現(xiàn)我們的caveman版本的asyncio的最后一步是支持Python 3.5中引入的async/await語(yǔ)法。await的行為類似于yield,只是它不是直接返回提供的值,而是返回next((...).__await__())。async函數(shù)返回“協(xié)程”,其行為類似于生成器,但需要使用.send(None)而不是next()(請(qǐng)注意,正如生成器在最初調(diào)用時(shí)不返回任何內(nèi)容一樣,異步函數(shù)在逐步執(zhí)行之前不會(huì)執(zhí)行任何操作,這解釋了我們前面提到的)。

因此,鑒于這些信息,我們只需進(jìn)行一些調(diào)整即可將我們的示例轉(zhuǎn)換為async/await。以下是最終結(jié)果:

class AsyncTimer:
    def __init__(self, duration: float):
        self.done_time = time.time() + duration
    def __await__(self):
        yield self

async def send_updates(count: int, interval_seconds: float):
    for i in range(1, count + 1):
        await AsyncTimer(interval_seconds)
        print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count))

def _wait_until_io_ready(ios):
    min_timer = min(ios, key=lambda x: x.done_time)
    time.sleep(max(0, min_timer.done_time - time.time()))
    return ios.index(min_timer)

def async_run_all(*coroutines):
    coroutine_io_pairs = [
        (coroutine, AsyncTimer(0))
        for coroutine in coroutines
    ]
    while coroutine_io_pairs:
        ios = [io for cor, io in coroutine_io_pairs]
        ready_index = _wait_until_io_ready(ios)
        coroutine, _ = coroutine_io_pairs.pop(ready_index)

        try:
            new_io = coroutine.send(None)
            coroutine_io_pairs.append((coroutine, new_io))
        except StopIteration:
            pass

async_run_all(
    send_updates(10, 1.0),
    send_updates(5, 2.0),
    send_updates(4, 3.0)
)

我們有了它,我們的迷你異步示例完成了,使用async/await. 現(xiàn)在,您可能已經(jīng)注意到我將 timer 重命名為 io 并將查找最小計(jì)時(shí)器的邏輯提取到一個(gè)名為_wait_until_io_ready. 這是有意將這個(gè)示例與最后一個(gè)主題聯(lián)系起來:真實(shí) IO。

在這里,我們完成了我們的小型異步示例,使用了async/await。現(xiàn)在,你可能已經(jīng)注意到我將timer重命名為io,并將用于查找最小計(jì)時(shí)器的邏輯提取到一個(gè)名為_wait_until_io_ready的函數(shù)中。這是為了將本示例與最后一個(gè)主題:真正的IO,連接起來。

真正的 IO(而不僅僅是定時(shí)器)

所以,所有這些例子都很棒,但是它們與真正的 asyncio 有什么關(guān)系,我們希望在真正 IO 上等待 TCP 套接字和文件讀/寫?嗯,美麗就在那個(gè)_wait_until_io_ready功能中。為了讓真正的 IO 正常工作,我們所要做的就是創(chuàng)建一些AsyncReadFile類似于AsyncTimer包含文件描述符的新對(duì)象。然后,AsyncReadFile我們正在等待的對(duì)象集對(duì)應(yīng)于一組文件描述符。最后,我們可以使用函數(shù) (syscall) select()等待這些文件描述符之一準(zhǔn)備好。由于 TCP/UDP 套接字是使用文件描述符實(shí)現(xiàn)的,因此這也涵蓋了網(wǎng)絡(luò)請(qǐng)求。

所以,所有這些例子都很好,但它們與真正的異步IO有什么關(guān)系呢?我們希望等待實(shí)際的IO,比如TCP套接字和文件讀/寫?好吧,其優(yōu)點(diǎn)在于_wait_until_io_ready函數(shù)。要使真正的IO工作,我們需要做的就是創(chuàng)建一些新的AsyncReadFile,類似于AsyncTimer,它包含一個(gè)文件描述符。然后,我們正在等待的一組AsyncReadFile對(duì)象對(duì)應(yīng)于一組文件描述符。最后,我們可以使用函數(shù)(syscallselect()等待這些文件描述符之一準(zhǔn)備好。由于TCP/UDP套接字是使用文件描述符實(shí)現(xiàn)的,因此這也涵蓋了網(wǎng)絡(luò)請(qǐng)求。

“Python異步方法如何使用”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI