Rust Async Await

协程并不是抢占式任务模型,所以不是在任意时间点强制暂停正在运行的任务,而是让每个任务一直运行,直到它自愿放弃对cpu的控制,这样任务可以在合适时间点暂停。那么有三个问题,
1. 在rust中异步代码是怎么执行的?
2. 切换不同的任务时,被切换的任务的状态信息怎么保存?
3. 任务什么时候再次被执行?

1. 在rust中异步代码是怎么执行的?

在rust中写异步代码时就像写同步代码一样,但是rust为了实现这一机制,会将async code block 编译为一个状态机,使用下面这个例子来说明。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};


struct Delay {
    when: Instant,
}

impl Future for Delay {
    type Output = &'static str;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<&'static str>
    {
        if Instant::now() >= self.when {
            println!("Hello world");
            Poll::Ready("done")
        } else {
            // Ignore this line for now.
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

async fn sleep_some_millis() {
    let array = [1,2,3];
    let num = &array[2];
  
    let when = Instant::now() + Duration::from_millis(num);
    let future = Delay { when };

    let out = future.await;
    assert_eq!(out, "done");
}

以上是正常编写的异步代码,但是在编译器编译后,会生成类似下面的代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

enum SleepSomeMillisStateMachine {
    State0,
    State1,
    Terminated,
}

struct SleepSomeMillisHandler {
	array: [i32: 3]
    num: &i32
    out_fut: Delay
    state: SleepSomeMillisStateMachine
}

impl Future for SleepSomeMillisHandler {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<()>
    {
        use SleepSomeMillisStateMachine::*;

        loop {
            match self.state {
                SleepSomeMillisStateMachine::State0 => {
                    let num = &array[2];
                    let when = Instant::now() +
                        Duration::from_millis(num);
                    self.out_fut = Delay { when };
                    self.state = SleepSomeMillisStateMachine::State1;
                }
                SleepSomeMillisStateMachine::State1 => {
                    match Pin::new(delay_future).poll(cx) {
                        Poll::Ready(out) => {
                            assert_eq!(out, "done");
                            self.state = SleepSomeMillisStateMachine::Terminated;
                            return Poll::Ready(());
                        }
                        Poll::Pending => {
                            return Poll::Pending;
                        }
                    }
                }
                SleepSomeMillisStateMachine::Terminated => {
                    panic!("future polled after completion")
                }
            }
        }
    }
}

可以看到在实际执行时,每个task会这样被执行,如果遇到pending,则会暂时让出cpu。

2.任务状态信息怎么保存?

可以看到上面类似编译后的代码,会生成struct SleepSomeMillisHandler, 所以task的状态信息都会存在这个struct中。同时看下std::future::Future的定义

1
2
3
4
5
6
7
8
9
use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context)
        -> Poll<Self::Output>;
}

可以看到Future traitpoll方法,self类型是Pin<&mut Self>, Pin<&mut Self>和普通的&mut Self行为类似,但是会固定在内存的某个位置,这样做的原因是什么呢?,可以看到struct SleepSomeMillisHandler包含arraynumnumarray最后一个元素的引用,这是自引用结构,但这样带来一个问题,如果array的地址变了,那么num还是指向原先的地址,这样num就变成了一个悬垂指针,因此对于自引用的结构,rust提供了pin<T>机制,可以固定T在内存中位置,使其不变

3. 任务什么时候再次被执行

这时就要看future trait poll方法中的第二个参数ContextContext有一个waker()方法,可以返回一个Waker, Wakerwake()方法。这个方法的作用就是在任务为pending状态,但资源都准备好了,由executor或者scheduler周期性检查调用wake()方法,将任务标记为需要唤醒,等待后续执行