/// Blocks the current thread waiting for the future to complete.
///
/// The future will execute on the current thread, but all spawned tasks
/// will be executed on the thread pool.
pub(crate)fnblock_on<F>(&self,handle: &scheduler::Handle,future: F)-> F::OutputwhereF: Future,{letmutenter=crate::runtime::context::enter_runtime(handle,true);enter.blocking.block_on(future).expect("failed to park thread")}
// tokio-1.28.2/src/runtime/park.rs
pub(crate)fnblock_on<F: Future>(&mutself,f: F)-> Result<F::Output,AccessError>{usestd::task::Context;usestd::task::Poll::Ready;// `get_unpark()` should not return a Result
letwaker=self.waker()?;letmutcx=Context::from_waker(&waker);pin!(f);loop{ifletReady(v)=crate::runtime::coop::budget(||f.as_mut().poll(&mutcx)){returnOk(v);}// Wake any yielded tasks before parking in order to avoid
// blocking.
#[cfg(feature = "rt")]crate::runtime::context::with_defer(|defer|defer.wake());self.park();}}
// tokio-1.28.2/src/runtime/park.rs
fnunpark(&self)-> Result<UnparkThread,AccessError>{self.with_current(|park_thread|park_thread.unpark())}pub(crate)fnpark(&mutself){self.with_current(|park_thread|park_thread.inner.park()).unwrap();}//...
/// Gets a reference to the `ParkThread` handle for this thread.
fnwith_current<F,R>(&self,f: F)-> Result<R,AccessError>whereF: FnOnce(&ParkThread)-> R,{CURRENT_PARKER.try_with(|inner|f(inner))}tokio_thread_local!{staticCURRENT_PARKER: ParkThread=ParkThread::new();}#[derive(Debug)]pub(crate)structParkThread{inner: Arc<Inner>,}/// Unblocks a thread that was blocked by `ParkThread`.
#[derive(Clone, Debug)]pub(crate)structUnparkThread{inner: Arc<Inner>,}#[derive(Debug)]structInner{state: AtomicUsize,mutex: Mutex<()>,condvar: Condvar,}implUnparkThread{pub(crate)fninto_waker(self)-> Waker{unsafe{letraw=unparker_to_raw_waker(self.inner);Waker::from_raw(raw)}}}unsafefnunparker_to_raw_waker(unparker: Arc<Inner>)-> RawWaker{RawWaker::new(Inner::into_raw(unparker),&RawWakerVTable::new(clone,wake,wake_by_ref,drop_waker),)}
/// core/src/task/wake.rs
pubstructContext<'a>{waker: &'aWaker,// Ensure we future-proof against variance changes by forcing
// the lifetime to be invariant (argument-position lifetimes
// are contravariant while return-position lifetimes are
// covariant).
_marker: PhantomData<fn(&'a())-> &'a()>,// Ensure `Context` is `!Send` and `!Sync` in order to allow
// for future `!Send` and / or `!Sync` fields.
_marker2: PhantomData<*mut()>,}impl<'a>Context<'a>{/// Create a new `Context` from a [`&Waker`](Waker).
#[stable(feature = "futures_api", since = "1.36.0")]#[rustc_const_unstable(feature = "const_waker", issue = "102012")]#[must_use]#[inline]pubconstfnfrom_waker(waker: &'aWaker)-> Self{Context{waker,_marker: PhantomData,_marker2: PhantomData}}/// Returns a reference to the [`Waker`] for the current task.
#[stable(feature = "futures_api", since = "1.36.0")]#[rustc_const_unstable(feature = "const_waker", issue = "102012")]#[must_use]#[inline]pubconstfnwaker(&self)-> &'aWaker{&self.waker}}
fnpark(&self){// If we were previously notified then we consume this notification and
// return quickly.
ifself.state.compare_exchange(NOTIFIED,EMPTY,SeqCst,SeqCst).is_ok(){return;}// Otherwise we need to coordinate going to sleep
letmutm=self.mutex.lock();matchself.state.compare_exchange(EMPTY,PARKED,SeqCst,SeqCst){Ok(_)=>{}Err(NOTIFIED)=>{// We must read here, even though we know it will be `NOTIFIED`.
// This is because `unpark` may have been called again since we read
// `NOTIFIED` in the `compare_exchange` above. We must perform an
// acquire operation that synchronizes with that `unpark` to observe
// any writes it made before the call to unpark. To do that we must
// read from the write it made to `state`.
letold=self.state.swap(EMPTY,SeqCst);debug_assert_eq!(old,NOTIFIED,"park state changed unexpectedly");return;}Err(actual)=>panic!("inconsistent park state; actual = {}",actual),}loop{m=self.condvar.wait(m).unwrap();ifself.state.compare_exchange(NOTIFIED,EMPTY,SeqCst,SeqCst).is_ok(){// got a notification
return;}// spurious wakeup, go back to sleep
}}fnunpark(&self){// To ensure the unparked thread will observe any writes we made before
// this call, we must perform a release operation that `park` can
// synchronize with. To do that we must write `NOTIFIED` even if `state`
// is already `NOTIFIED`. That is why this must be a swap rather than a
// compare-and-swap that returns if it reads `NOTIFIED` on failure.
matchself.state.swap(NOTIFIED,SeqCst){EMPTY=>return,// no one was waiting
NOTIFIED=>return,// already unparked
PARKED=>{}// gotta go wake someone up
_=>panic!("inconsistent state in unpark"),}// There is a period between when the parked thread sets `state` to
// `PARKED` (or last checked `state` in the case of a spurious wake
// up) and when it actually waits on `cvar`. If we were to notify
// during this period it would be ignored and then when the parked
// thread went to sleep it would never wake up. Fortunately, it has
// `lock` locked at this stage so we can acquire `lock` to wait until
// it is ready to receive the notification.
//
// Releasing `lock` before the call to `notify_one` means that when the
// parked thread wakes it doesn't get woken only to have to wait for us
// to release `lock`.
drop(self.mutex.lock());self.condvar.notify_one()}