您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“rust異步代碼async/.await的內(nèi)部運(yùn)行機(jī)制是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“rust異步代碼async/.await的內(nèi)部運(yùn)行機(jī)制是什么”吧!
首先讓我們先創(chuàng)建一個(gè)Cargo項(xiàng)目:
~$ cargo new --bin sleepus-interruptus
如果你期望和教程使用的編譯器保持一致,可以添加一個(gè)內(nèi)容為1.39.0的rust-toolchain文件。
在繼續(xù)下面的內(nèi)容之前,先運(yùn)行cargo run
確保環(huán)境沒有問題。
我們要寫一個(gè)簡單的程序,它可以顯示10次Sleepus消息,每次間隔0.5秒;同時(shí)顯示5次Interruptus消息,每次間隔1秒。下面是相當(dāng)簡單的rust實(shí)現(xiàn)代碼:
use std::thread::{sleep}; use std::time::Duration; fn sleepus() { for i in 1..=10 { println!("Sleepus {}", i); sleep(Duration::from_millis(500)); } } fn interruptus() { for i in 1..=5 { println!("Interruptus {}", i); sleep(Duration::from_millis(1000)); } } fn main() { sleepus(); interruptus(); }
不過,上面的代碼會(huì)同步執(zhí)行兩個(gè)操作,它會(huì)先顯示完所有的Sleepus消息,然后再顯示Interruptus消息。而我們期望的是這兩種消息交織顯示,也就是說Interruptus消息可以打斷Sleepus消息的顯示。
有兩個(gè)辦法可以實(shí)現(xiàn)交織顯示的目標(biāo)。顯而易見的一個(gè)是為每個(gè)函數(shù)創(chuàng)建一個(gè)單獨(dú)的線程,然后等待線程執(zhí)行完畢。
use std::thread::{sleep, spawn}; fn main() { let sleepus = spawn(sleepus); let interruptus = spawn(interruptus); sleepus.join().unwrap(); interruptus.join().unwrap(); }
需要指出的是:
我們使用spawn(sleepus)
而不是spawn(sleepus())
來創(chuàng)建線程。后者將 立即執(zhí)行sleepus()
然后將其執(zhí)行結(jié)果傳給spawn
,這不是我們期望的- 我在主函數(shù)種使用join()
來等待子線程結(jié)束,并使用unwrap()
來處理 可以發(fā)生的故障,因?yàn)槲覒小?/p>
另一種實(shí)現(xiàn)方法是創(chuàng)建一個(gè)輔助線程,然后在主線程種調(diào)用其中一個(gè)函數(shù):
fn main() { let sleepus = spawn(sleepus); interruptus(); sleepus.join().unwrap(); }
這種方法效率更高,因?yàn)橹恍枰~外創(chuàng)建一個(gè)線程,并且也沒有什么副作用,因此我推薦使用這個(gè)方法。
不過這兩種方法都不是異步解決方案!我們使用兩個(gè)由操作系統(tǒng)管理的線程來并發(fā)執(zhí)行兩個(gè)同步任務(wù)!接下來讓我們嘗試如何在單一線程內(nèi)讓兩個(gè)任務(wù)協(xié)作執(zhí)行!
我們將從較高層次的抽象開始,然后逐步深入rust異步編程的細(xì)節(jié)?,F(xiàn)在讓我們以async風(fēng)格重寫前面的應(yīng)用。
首先在Cargo.toml中添加以下依賴:
async-std = { version = "1.2.0", features = ["attributes"] }
現(xiàn)在我們可以將應(yīng)用重寫為:
use async_std::task::{sleep, spawn}; use std::time::Duration; async fn sleepus() { for i in 1..=10 { println!("Sleepus {}", i); sleep(Duration::from_millis(500)).await; } } async fn interruptus() { for i in 1..=5 { println!("Interruptus {}", i); sleep(Duration::from_millis(1000)).await; } } #[async_std::main] async fn main() { let sleepus = spawn(sleepus()); interruptus().await; sleepus.await; }
主要的修改說明如下:
我們不再使用std::thread中的sleep和spawn函數(shù),而是采用async_std::task。- 在sleepus和interruptus函數(shù)前都加async
在調(diào)用sleep之后,我們補(bǔ)充了.await
。注意不是.await()
調(diào)用,而是一個(gè)新語法
在主函數(shù)上使用#[async_std::main]
屬性
主函數(shù)前也有async關(guān)鍵字
我們現(xiàn)在使用spawn(sleepus())
而不是spawn(sleepus)
,這表示直接調(diào)用sleepus 并將結(jié)果傳給spawn
對interruptus()的調(diào)用增加.await
對sleepus不再使用join(),而是改用.await語法
看起來有很多修改,不過實(shí)際上,我們的代碼結(jié)構(gòu)和之前的版本基本是一致的?,F(xiàn)在程序運(yùn)行和我們的期望一致:采用單一線程進(jìn)行無阻塞調(diào)用。
接下來讓我們分析上述修改到底意味著什么。
在函數(shù)定義前添加async主要做了以下3個(gè)事:
這將允許你在函數(shù)體內(nèi)使用.await語法。我們接下來會(huì)深入探討這一點(diǎn)
它修改了函數(shù)的返回類型。async fn foo() -> Bar 實(shí)際上返回的是 impl std::future::Future<Output=Bar>
它自動(dòng)將結(jié)果值封裝進(jìn)一個(gè)新的Future對象。我們下面會(huì)詳細(xì)展示這一點(diǎn)
現(xiàn)在讓我們展開說明第2點(diǎn)。在Rust的標(biāo)準(zhǔn)庫中有一個(gè)名為Future的trait,F(xiàn)uture有一個(gè)關(guān)聯(lián)類型Output。這個(gè)trait的意思是:我承諾當(dāng)我完成任務(wù)時(shí),會(huì)給你一個(gè)類型為Output的值。例如你可以想象一個(gè)異步HTTP客戶端可能會(huì)這樣實(shí)現(xiàn):
impl HttpRequest { fn perform(self) -> impl Future<Output=HttpResponse> { ... } }
在發(fā)送HTTP請求時(shí)需要一些無阻塞的I/O,我們并不希望阻塞調(diào)用線程,但是需要最終得到響應(yīng)結(jié)果。
async fn sleepus()
的結(jié)果類型隱含為()
。因此我們的Future的Output也應(yīng)該為()
。這意味著我們需要修改函數(shù)為:
fn sleepus() -> impl std::future::Future<Output=()>
不過如果只修改這里,編譯就會(huì)出現(xiàn)如下錯(cuò)誤:
error[E0728]: `await` is only allowed inside `async` functions and blocks --> src/main.rs:7:9 | 4 | fn sleepus() -> impl std::future::Future<Output=()> { | ------- this is not `async` ... 7 | sleep(Duration::from_millis(500)).await; | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ only allowed inside `async` functions and blocks error[E0277]: the trait bound `(): std::future::Future` is not satisfied --> src/main.rs:4:17 | 4 | fn sleepus() -> impl std::future::Future<Output=()> { | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `std::future::Future` is not implemented for `()` | = note: the return type of a function must have a statically known size
第一個(gè)錯(cuò)誤信息很直接:你只能在async函數(shù)或代碼塊中使用.await語法。我們還沒有接觸到異步代碼塊,不過看起來就是這樣:
async { // async noises intensify }
第二個(gè)錯(cuò)誤消息就是第一個(gè)的結(jié)果:async關(guān)鍵字要求函數(shù)返回類型是impl Future
。如果沒有這個(gè)關(guān)鍵字,我們的loop結(jié)果類型是()
,這顯然不滿足要求。
將整個(gè)函數(shù)體用一個(gè)異步代碼塊包裹起來就解決問題了:
fn sleepus() -> impl std::future::Future<Output=()> { async { for i in 1..=10 { println!("Sleepus {}", i); sleep(Duration::from_millis(500)).await; } } }
可能我們并不需要所有這些async/.await。如果我們移除sleepus的.await會(huì)怎么樣?令人吃驚的是,居然編譯通過了,雖然給出了一個(gè)警告:
warning: unused implementer of `std::future::Future` that must be used --> src/main.rs:8:13 | 8 | sleep(Duration::from_millis(500)); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = note: `#[warn(unused_must_use)]` on by default = note: futures do nothing unless you `.await` or poll them
我們在生成一個(gè)Future值但沒有使用它。如果查看程序的輸出,你可以理解編譯器的警告是什么意思了:
Interruptus 1 Sleepus 1 Sleepus 2 Sleepus 3 Sleepus 4 Sleepus 5 Sleepus 6 Sleepus 7 Sleepus 8 Sleepus 9 Sleepus 10 Interruptus 2 Interruptus 3 Interruptus 4 Interruptus 5
我們所有的Sleepus消息輸出都沒有延遲。問題在于對sleep的調(diào)用實(shí)際上沒有讓當(dāng)前線程休息,它只是生成一個(gè)實(shí)現(xiàn)了Future的值,然后當(dāng)承諾最終實(shí)現(xiàn)時(shí),我們知道的確發(fā)生了延遲。但是由于我們簡單地忽略了Future,因此實(shí)際上沒有利用延遲。
為了理解.await語法到底做了什么,我們接下來直接使用Future值來實(shí)現(xiàn)我們的函數(shù)。首先從不用async塊開始。
如果我們丟掉async代碼塊,看起來就是這樣:
fn sleepus() -> impl std::future::Future<Output=()> { for i in 1..=10 { println!("Sleepus {}", i); sleep(Duration::from_millis(500)); } }
這樣編譯會(huì)出現(xiàn)以下錯(cuò)誤:
error[E0277]: the trait bound `(): std::future::Future` is not satisfied --> src/main.rs:4:17 | 4 | fn sleepus() -> impl std::future::Future<Output=()> { | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `std::future::Future` is not implemented for `()` |
上面錯(cuò)誤是由于for循環(huán)的結(jié)果類型為()
,它沒有實(shí)現(xiàn)Future這個(gè)trait。修復(fù)這個(gè)問題的一種辦法是在for循環(huán)后面加一句話使其返回Future的實(shí)現(xiàn)類型。我們已經(jīng)知道可以用這個(gè):sleep:
fn sleepus() -> impl std::future::Future<Output=()> { for i in 1..=10 { println!("Sleepus {}", i); sleep(Duration::from_millis(500)); } sleep(Duration::from_millis(0)) }
現(xiàn)在我們依然會(huì)看到在for循環(huán)內(nèi)存在未使用的Future值的警告信息,不過返回值那個(gè)錯(cuò)誤已經(jīng)解決掉了。這個(gè)sleep調(diào)用實(shí)際上什么也沒做,我們可以將其替換為一個(gè)真正的占位Future:
fn sleepus() -> impl std::future::Future<Output=()> { for i in 1..=10 { println!("Sleepus {}", i); sleep(Duration::from_millis(500)); } async_std::future::ready(()) }
為了打破沙鍋問到底,讓我們再深入一步,不適用async_std庫中的ready函數(shù),而是定義自己的實(shí)現(xiàn)Future的結(jié)構(gòu)。讓我們稱之為DoNothing。
use std::future::Future; struct DoNothing; fn sleepus() -> impl Future<Output=()> { for i in 1..=10 { println!("Sleepus {}", i); sleep(Duration::from_millis(500)); } DoNothing }
問題在于DoNothing還沒有提供Future實(shí)現(xiàn)。我們接下來將進(jìn)行一些編譯器驅(qū)動(dòng)的開發(fā),讓rustc告訴我們?nèi)绾涡迯?fù)這個(gè)程序。第一個(gè)錯(cuò)誤信息是:
the trait bound `DoNothing: std::future::Future` is not satisfied
因此讓我們補(bǔ)上這個(gè)trait的實(shí)現(xiàn):
impl Future for DoNothing { }
繼續(xù)報(bào)錯(cuò):
error[E0046]: not all trait items implemented, missing: `Output`, `poll` --> src/main.rs:7:1 | 7 | impl Future for DoNothing { | ^^^^^^^^^^^^^^^^^^^^^^^^^ missing `Output`, `poll` in implementation | = note: `Output` from trait: `type Output;` = note: `poll` from trait: `fn(std::pin::Pin<&mut Self>, &mut std::task::Context<'_>) -> std::task::Poll<<Self as std::future::Future>::Output>`
我們還不是真正了解Pin<&mut Self>
或者Context
,不過我們知道Output
。因?yàn)槲覀冎胺祷?code>(),現(xiàn)在讓我們照做。
use std::pin::Pin; use std::task::{Context, Poll}; impl Future for DoNothing { type Output = (); fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> { unimplemented!() } }
喔!編譯通過了!當(dāng)然在運(yùn)行時(shí)它會(huì)失敗,因?yàn)槲覀兊?code>unimplemented!()調(diào)用:
thread 'async-std/executor' panicked at 'not yet implemented', src/main.rs:13:9
現(xiàn)在讓我們嘗試實(shí)現(xiàn)poll。我們需要返回一個(gè)值其類型為Poll<Self::Output>
或者 Poll<()>
。讓我們看一下Poll的定義:
pub enum Poll<T> { Ready(T), Pending, }
利用一些基本的推理,我們可以理解Ready表示“我們的Future已經(jīng)完成,這是輸出”,而Pending表示“還沒完事兒”。假設(shè)我們的DoNothing希望立即返回()
類型的輸出,可以這樣:
fn poll(self: Pin<&mut Self>, _ctx: &mut Context) -> Poll<Self::Output> { Poll::Ready(()) }
恭喜!你剛剛實(shí)現(xiàn)了自己的第一個(gè)Future結(jié)構(gòu)!
還記得我們說過async對函數(shù)做的第三件事嗎:自動(dòng)將結(jié)果值封裝為一個(gè)新的Future。我們接下來展示這一點(diǎn)。
首先簡化sleepus的定義:
fn sleepus() -> impl Future<Output=()> { DoNothing }
編譯和運(yùn)行正?!,F(xiàn)在切換回async風(fēng)格:
async fn sleepus() { DoNothing }
這時(shí)候會(huì)報(bào)錯(cuò):
error[E0271]: type mismatch resolving `<impl std::future::Future as std::future::Future>::Output == ()` --> src/main.rs:17:20 | 17 | async fn sleepus() { | ^ expected struct `DoNothing`, found () | = note: expected type `DoNothing` found type `()`
可以看到,當(dāng)你有了一個(gè)async函數(shù)或代碼塊,結(jié)果會(huì)自動(dòng)封裝到一個(gè)Future實(shí)現(xiàn)對象里。因此我們需要返回一個(gè)impl Future<Output=DoNothing>
?,F(xiàn)在我們的類型需要是Output=()
。
處理很簡單,只需要在DoNothing后面簡單添加.await:
async fn sleepus() { DoNothing.await }
這讓我們對.await的作用增加了一點(diǎn)直覺:它從DoNothing中提取Output值。不過,我們依然并不真正了解它是如何實(shí)現(xiàn)的。
到此,相信大家對“rust異步代碼async/.await的內(nèi)部運(yùn)行機(jī)制是什么”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。