main thread execution

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
#[track_caller]
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
    #[cfg(all(tokio_unstable, feature = "tracing"))]
    let future = crate::util::trace::task(
        future,
        "block_on",
        None,
        crate::runtime::task::Id::next().as_u64(),
    );

    let _enter = self.enter();

    match &self.scheduler {
        Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future),
        #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
        Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future),
    }
}

执行multithread::block_on

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
/// Blocks the current thread waiting for the future to complete.
///
/// The future will execute on the current thread, but all spawned tasks
/// will be executed on the thread pool.
pub(crate) fn block_on<F>(&self, handle: &scheduler::Handle, future: F) -> F::Output
where
    F: Future,
{
    let mut enter = crate::runtime::context::enter_runtime(handle, true);
    enter
        .blocking
        .block_on(future)
        .expect("failed to park thread")
}

然后执行BlockingRegionGuardblock_on

1
2
3
4
5
6
7
8
9
pub(crate) fn block_on<F>(&mut self, f: F) -> Result<F::Output, AccessError>
where
    F: std::future::Future,
{
    use crate::runtime::park::CachedParkThread;

    let mut park = CachedParkThread::new();
    park.block_on(f)
}

CachedParkThreadblock_on

1
2
3
4
5
/// Blocks the current thread using a condition variable.
#[derive(Debug)]
pub(crate) struct CachedParkThread {
    _anchor: PhantomData<Rc<()>>,
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// tokio-1.28.2/src/runtime/park.rs
pub(crate) fn block_on<F: Future>(&mut self, f: F) -> Result<F::Output, AccessError> {
    use std::task::Context;
    use std::task::Poll::Ready;

    // `get_unpark()` should not return a Result
    let waker = self.waker()?;
    let mut cx = Context::from_waker(&waker);
    
    pin!(f);
    
    loop {
    	if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) {
    	  return Ok(v);
    	}
    
    	// Wake any yielded tasks before parking in order to avoid
    	// blocking.
    	#[cfg(feature = "rt")]
    	crate::runtime::context::with_defer(|defer| defer.wake());
    
    	self.park();
    }
}

这里调用了self.waker()? ,创建了CURRENT_PARKER -> UnparkThread -> waker -> context

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// tokio-1.28.2/src/runtime/park.rs
fn unpark(&self) -> Result<UnparkThread, AccessError> {
    self.with_current(|park_thread| park_thread.unpark())
}

pub(crate) fn park(&mut self) {
    self.with_current(|park_thread| park_thread.inner.park()).unwrap();
}

//...

/// Gets a reference to the `ParkThread` handle for this thread.
fn with_current<F, R>(&self, f: F) -> Result<R, AccessError>
where
    F: FnOnce(&ParkThread) -> R,
{
    CURRENT_PARKER.try_with(|inner| f(inner))
}

tokio_thread_local! {
    static CURRENT_PARKER: ParkThread = ParkThread::new();
}

#[derive(Debug)]
pub(crate) struct ParkThread {
    inner: Arc<Inner>,
}

/// Unblocks a thread that was blocked by `ParkThread`.
#[derive(Clone, Debug)]
pub(crate) struct UnparkThread {
    inner: Arc<Inner>,
}

#[derive(Debug)]
struct Inner {
    state: AtomicUsize,
    mutex: Mutex<()>,
    condvar: Condvar,
}

impl UnparkThread {
    pub(crate) fn into_waker(self) -> Waker {
        unsafe {
            let raw = unparker_to_raw_waker(self.inner);
            Waker::from_raw(raw)
        }
    }
}

unsafe fn unparker_to_raw_waker(unparker: Arc<Inner>) -> RawWaker {
    RawWaker::new(
        Inner::into_raw(unparker),
        &RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker),
    )
}

继续上面的代码let mut cx = Context::from_waker(&waker);

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/// core/src/task/wake.rs
pub struct Context<'a> {
    waker: &'a Waker,
    // Ensure we future-proof against variance changes by forcing
    // the lifetime to be invariant (argument-position lifetimes
    // are contravariant while return-position lifetimes are
    // covariant).
    _marker: PhantomData<fn(&'a ()) -> &'a ()>,
    // Ensure `Context` is `!Send` and `!Sync` in order to allow
    // for future `!Send` and / or `!Sync` fields.
    _marker2: PhantomData<*mut ()>,
}

impl<'a> Context<'a> {
    /// Create a new `Context` from a [`&Waker`](Waker).
    #[stable(feature = "futures_api", since = "1.36.0")]
    #[rustc_const_unstable(feature = "const_waker", issue = "102012")]
    #[must_use]
    #[inline]
    pub const fn from_waker(waker: &'a Waker) -> Self {
        Context { waker, _marker: PhantomData, _marker2: PhantomData }
    }

    /// Returns a reference to the [`Waker`] for the current task.
    #[stable(feature = "futures_api", since = "1.36.0")]
    #[rustc_const_unstable(feature = "const_waker", issue = "102012")]
    #[must_use]
    #[inline]
    pub const fn waker(&self) -> &'a Waker {
        &self.waker
    }
}

进入了一个loop,在其中执行了future poll,如果结束就直接返回,否则就继续后面的代码。 crate::runtime::context::with_defer(|defer| defer.wake());,这里需要执行CONTEXT defer中waker的wake方法,但是看起来只有调用了defer的defer方法才可以增加,但是只有yield_now方法中调用了。 self.park() 最终调用的是Inner::park(), 可以看到这里其实是会等待unpark drop mutex.lock()唤醒线程的,否则该线程就会一直self.condvar.wait

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
fn park(&self) {
    // If we were previously notified then we consume this notification and
    // return quickly.
    if self
        .state
        .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
        .is_ok()
    {
        return;
    }

    // Otherwise we need to coordinate going to sleep
    let mut m = self.mutex.lock();

    match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
        Ok(_) => {}
        Err(NOTIFIED) => {
            // We must read here, even though we know it will be `NOTIFIED`.
            // This is because `unpark` may have been called again since we read
            // `NOTIFIED` in the `compare_exchange` above. We must perform an
            // acquire operation that synchronizes with that `unpark` to observe
            // any writes it made before the call to unpark. To do that we must
            // read from the write it made to `state`.
            let old = self.state.swap(EMPTY, SeqCst);
            debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");

            return;
        }
        Err(actual) => panic!("inconsistent park state; actual = {}", actual),
    }

    loop {
        m = self.condvar.wait(m).unwrap();

        if self
            .state
            .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
            .is_ok()
        {
            // got a notification
            return;
        }

        // spurious wakeup, go back to sleep
    }
}

fn unpark(&self) {
    // To ensure the unparked thread will observe any writes we made before
    // this call, we must perform a release operation that `park` can
    // synchronize with. To do that we must write `NOTIFIED` even if `state`
    // is already `NOTIFIED`. That is why this must be a swap rather than a
    // compare-and-swap that returns if it reads `NOTIFIED` on failure.
    match self.state.swap(NOTIFIED, SeqCst) {
        EMPTY => return,    // no one was waiting
        NOTIFIED => return, // already unparked
        PARKED => {}        // gotta go wake someone up
        _ => panic!("inconsistent state in unpark"),
    }

    // There is a period between when the parked thread sets `state` to
    // `PARKED` (or last checked `state` in the case of a spurious wake
    // up) and when it actually waits on `cvar`. If we were to notify
    // during this period it would be ignored and then when the parked
    // thread went to sleep it would never wake up. Fortunately, it has
    // `lock` locked at this stage so we can acquire `lock` to wait until
    // it is ready to receive the notification.
    //
    // Releasing `lock` before the call to `notify_one` means that when the
    // parked thread wakes it doesn't get woken only to have to wait for us
    // to release `lock`.
    drop(self.mutex.lock());

    self.condvar.notify_one()
}

总结一下就是循环中会先poll 传入的future,如果执行完就返回,否则就进入park,等待信号量