Rust Async Await# 协程并不是抢占式任务模型,所以不是在任意时间点强制暂停正在运行的任务,而是让每个任务一直运行,直到它自愿放弃对cpu的控制,这样任务可以在合适时间点暂停。那么有三个问题, 1. 在rust中异步代码是怎么执行的? 2. 切换不同的任务时,被切换的任务的状态信息怎么保存? 3. 任务什么时候再次被执行?
1. 在rust中异步代码是怎么执行的?# 在rust中写异步代码时就像写同步代码一样,但是rust为了实现这一机制,会将async code block 编译为一个状态机,使用下面这个例子来说明。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
use std ::future ::Future ;
use std ::pin ::Pin ;
use std ::task ::{ Context , Poll };
use std ::time ::{ Duration , Instant };
struct Delay {
when : Instant ,
}
impl Future for Delay {
type Output = & 'static str ;
fn poll ( self : Pin <& mut Self > , cx : & mut Context < '_ > )
-> Poll <& 'static str >
{
if Instant ::now () >= self . when {
println ! ( "Hello world" );
Poll ::Ready ( "done" )
} else {
// Ignore this line for now.
cx . waker (). wake_by_ref ();
Poll ::Pending
}
}
}
async fn sleep_some_millis () {
let array = [ 1 , 2 , 3 ];
let num = & array [ 2 ];
let when = Instant ::now () + Duration ::from_millis ( num );
let future = Delay { when };
let out = future . await ;
assert_eq ! ( out , "done" );
}
copy
以上是正常编写的异步代码,但是在编译器编译后,会生成类似下面的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
use std ::future ::Future ;
use std ::pin ::Pin ;
use std ::task ::{ Context , Poll };
use std ::time ::{ Duration , Instant };
enum SleepSomeMillisStateMachine {
State0 ,
State1 ,
Terminated ,
}
struct SleepSomeMillisHandler {
array : [ i32 : 3 ]
num : & i32
out_fut : Delay
state : SleepSomeMillisStateMachine
}
impl Future for SleepSomeMillisHandler {
type Output = ();
fn poll ( mut self : Pin <& mut Self > , cx : & mut Context < '_ > )
-> Poll < () >
{
use SleepSomeMillisStateMachine ::* ;
loop {
match self . state {
SleepSomeMillisStateMachine ::State0 => {
let num = & array [ 2 ];
let when = Instant ::now () +
Duration ::from_millis ( num );
self . out_fut = Delay { when };
self . state = SleepSomeMillisStateMachine ::State1 ;
}
SleepSomeMillisStateMachine ::State1 => {
match Pin ::new ( delay_future ). poll ( cx ) {
Poll ::Ready ( out ) => {
assert_eq ! ( out , "done" );
self . state = SleepSomeMillisStateMachine ::Terminated ;
return Poll ::Ready (());
}
Poll ::Pending => {
return Poll ::Pending ;
}
}
}
SleepSomeMillisStateMachine ::Terminated => {
panic ! ( "future polled after completion" )
}
}
}
}
}
copy
可以看到在实际执行时,每个task会这样被执行,如果遇到pending,则会暂时让出cpu。
2.任务状态信息怎么保存?# 可以看到上面类似编译后的代码,会生成struct SleepSomeMillisHandler, 所以task的状态信息都会存在这个struct中。同时看下std::future::Future的定义
1
2
3
4
5
6
7
8
9
use std ::pin ::Pin ;
use std ::task ::{ Context , Poll };
pub trait Future {
type Output ;
fn poll ( self : Pin <& mut Self > , cx : & mut Context )
-> Poll < Self ::Output > ;
}
copy
可以看到Future trait的poll方法,self类型是Pin<&mut Self>, Pin<&mut Self>和普通的&mut Self行为类似,但是会固定在内存的某个位置,这样做的原因是什么呢?,可以看到struct SleepSomeMillisHandler包含array和num,num是array最后一个元素的引用,这是自引用结构,但这样带来一个问题,如果array的地址变了,那么num还是指向原先的地址,这样num就变成了一个悬垂指针,因此对于自引用的结构,rust提供了pin<T>机制,可以固定T在内存中位置,使其不变
3. 任务什么时候再次被执行# 这时就要看future trait poll方法中的第二个参数Context。Context有一个waker()方法,可以返回一个Waker, Waker有wake()方法。这个方法的作用就是在任务为pending状态,但资源都准备好了,由executor或者scheduler周期性检查调用wake()方法,将任务标记为需要唤醒,等待后续执行