tokio runtime 基本组件

这个系列文章会以这段代码为样例,了解tokio内部运行机制

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
use std::error::Error;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
use tokio;

fn main() -> Result<(), Box<dyn Error>> {
    let rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async {
        //tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
        let listener = tokio::net::TcpListener::bind("127.0.0.1:8080").await.unwrap();

        loop {
            let (mut socket, _) = listener.accept().await.unwrap();

            tokio::spawn(async move {
                let mut buf = vec![0; 1024];

                // In a loop, read data from the socket and write the data back.
                loop {
                    let n = socket
                        .read(&mut buf)
                        .await
                        .expect("failed to read data from socket");

                    if n == 0 {
                        return;
                    }

                    socket
                        .write_all(&buf[0..n])
                        .await
                        .expect("failed to write data to socket");
                    println!("buf: {:?}", buf);
                }
            });
        }
    });
    Ok(())
}

先看runtime创建部分

1
2
3
4
5
6
// tokio/src/runtime/runtime.rs
#[cfg(feature = "rt-multi-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
pub fn new() -> std::io::Result<Runtime> {
    Builder::new_multi_thread().enable_all().build()
}
1
2
3
4
5
6
7
8
  // tokio/src/runtime/builder.rs
pub fn build(&mut self) -> io::Result<Runtime> {
    match &self.kind {
        Kind::CurrentThread => self.build_current_thread_runtime(),
        #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
        Kind::MultiThread => self.build_threaded_runtime(),
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// tokio/src/runtime/builder.rs
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
    use crate::loom::sys::num_cpus;
    use crate::runtime::{Config, runtime::Scheduler};
    use crate::runtime::scheduler::{self, MultiThread};
  
    let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
  
    let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
  
    // Create the blocking pool
    let blocking_pool =
    blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
    let blocking_spawner = blocking_pool.spawner().clone();
  
    // Generate a rng seed for this runtime.
    let seed_generator_1 = self.seed_generator.next_generator();
    let seed_generator_2 = self.seed_generator.next_generator();
  
    let (scheduler, handle, launch) = MultiThread::new(
        core_threads,
        driver,
        driver_handle,
        blocking_spawner,
        seed_generator_2,
        Config {
            before_park: self.before_park.clone(),
            after_unpark: self.after_unpark.clone(),
            global_queue_interval: self.global_queue_interval,
            event_interval: self.event_interval,
            #[cfg(tokio_unstable)]
            unhandled_panic: self.unhandled_panic.clone(),
            disable_lifo_slot: self.disable_lifo_slot,
            seed_generator: seed_generator_1,
            metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
        },
    );
  
    let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
  
    // Spawn the thread pool workers
    let _enter = handle.enter();
    launch.launch();
  
    Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
}

从这段代码可以看到

  1. 创建了线程池
  2. 创建drive,drive handle,猜测这个就是和网络,时间控制器绑定,用于后续的事件通知,之后看猜的对不对
  3. 生成了两个随机种子
  4. 创建scheduler handle和lunch
  5. 包装了下handle,然后执行handle.enter()
  6. 执行了launch.launch()

但其实并不明白以上操作的目的是什么?以及之后有什么用?但最后几行代码似乎是返回了runtime的handle,以及将所有的线程先运行起来。先详细看第五和第六步具体操作

  1. 这些handle各自都是什么?大概有什么作用?
  2. launch.launch具体做了什么事情?

对于第一个问题:

  1. 这里有几个handle,先看看每个handle具体指什么?
  2. let (scheduler, handle, launch) = MultiThread::new(...)可以看到第一个handle, 所以MultiThread::new(...) 获取scheduler::Handle, 第二个handleruntimehandle。以下是scheduler handle结构体和runtime handle结构体。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// tokio/src/runtime/scheduler/multi_thread/handle.rs
/// Handle to the multi thread scheduler
pub(crate) struct Handle {
    /// Task spawner
    pub(super) shared: worker::Shared,

    /// Resource driver handles
    pub(crate) driver: driver::Handle,

    /// Blocking pool spawner
    pub(crate) blocking_spawner: blocking::Spawner,

    /// Current random number generator seed
    pub(crate) seed_generator: RngSeedGenerator,
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
//tokio/src/runtime/handle.rs

/// Handle to the runtime.
///
/// The handle is internally reference-counted and can be freely cloned. A 
/// handle can be obtained using the [`Runtime::handle`] method.
///
/// [`Runtime::handle`]: crate::runtime::Runtime::handle()
#[derive(Debug, Clone)]
// When the `rt` feature is *not* enabled, this type is still defined, but not
// included in the public API.
pub struct Handle {
    pub(crate) inner: scheduler::Handle,
}

对于第二个问题:
launch是在MultiThread::new(...)中通过worker::create创建的

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// tokio/src/runtime/scheduler/multi_thread/worker.rs
pub(super) fn create(
    size: usize,
    park: Parker,
    driver_handle: driver::Handle,
    blocking_spawner: blocking::Spawner,
    seed_generator: RngSeedGenerator,
    config: Config,
) -> (Arc<Handle>, Launch) {
    let mut cores = Vec::with_capacity(size);
    // remotes 是什么
    let mut remotes = Vec::with_capacity(size);
    let mut worker_metrics = Vec::with_capacity(size);

    // Create the local queues
    for _ in 0..size {
        let (steal, run_queue) = queue::local();

        let park = park.clone();
        let unpark = park.unpark();
        let metrics = WorkerMetrics::from_config(&config);

        // NOTE: 这里已经确定了每个worker在数组中的index
        cores.push(Box::new(Core {
            tick: 0,
            lifo_slot: None,
            lifo_enabled: !config.disable_lifo_slot,
            run_queue,
            is_searching: false,
            is_shutdown: false,
            park: Some(park),
            metrics: MetricsBatch::new(&metrics),
            rand: FastRand::new(config.seed_generator.next_seed()),
        }));

        // NOTE: 这里worker在remotes中的顺序和在cores中的顺序一样
        remotes.push(Remote { steal, unpark });
        worker_metrics.push(metrics);
    }

    let handle = Arc::new(Handle {
        shared: Shared {
            remotes: remotes.into_boxed_slice(),
            inject: Inject::new(),
            idle: Idle::new(size),
            owned: OwnedTasks::new(),
            shutdown_cores: Mutex::new(vec![]),
            config,
            scheduler_metrics: SchedulerMetrics::new(),
            worker_metrics: worker_metrics.into_boxed_slice(),
            _counters: Counters,
        },
        driver: driver_handle,
        blocking_spawner,
        seed_generator,
    });

    let mut launch = Launch(vec![]);

    for (index, core) in cores.drain(..).enumerate() {
        launch.0.push(Arc::new(Worker {
            handle: handle.clone(),
            index,
            core: AtomicCell::new(Some(core)),
        }));
    }

    (handle, launch)
}

/// Starts the workers
pub(crate) struct Launch(Vec<Arc<Worker>>);

impl Launch {
    pub(crate) fn launch(mut self) {
        for worker in self.0.drain(..) {
            runtime::spawn_blocking(move || run(worker));
        }
    }
}

/// A scheduler worker
pub(super) struct Worker {
    /// Reference to scheduler's handle
    handle: Arc<Handle>,

    /// Index holding this worker's remote state
    index: usize,

    /// Used to hand-off a worker's core to another thread.
    core: AtomicCell<Core>,
}

/// Core data
struct Core {
    /// Used to schedule bookkeeping tasks every so often.
    tick: u32,

    /// When a task is scheduled from a worker, it is stored in this slot. The
    /// worker will check this slot for a task **before** checking the run
    /// queue. This effectively results in the **last** scheduled task to be run
    /// next (LIFO). This is an optimization for improving locality which
    /// benefits message passing patterns and helps to reduce latency.
    lifo_slot: Option<Notified>,

    /// When `true`, locally scheduled tasks go to the LIFO slot. When `false`,
    /// they go to the back of the `run_queue`.
    lifo_enabled: bool,

    /// The worker-local run queue.
    run_queue: queue::Local<Arc<Handle>>,

    /// True if the worker is currently searching for more work. Searching
    /// involves attempting to steal from other workers.
    is_searching: bool,

    /// True if the scheduler is being shutdown
    is_shutdown: bool,

    /// Parker
    ///
    /// Stored in an `Option` as the parker is added / removed to make the
    /// borrow checker happy.
    park: Option<Parker>,

    /// Batching metrics so they can be submitted to RuntimeMetrics.
    metrics: MetricsBatch,

    /// Fast random number generator.
    rand: FastRand,
}

这里的worker的数量和cpu核数相同,launch中全是worker,worker中包含scheduler handle, index(之后调度排除自身时使用),根据core struct,core属于每个系统线程用于管理异步任务的执行,存储和任务调度、性能指标相关的数据的结构。 launch.launch 中就是让每个worker run起来,有了任务就执行,如果当前worker上没有任务了,就从其他worker上获取。具体细节后面再看

基于目前看过的代码,可以清楚这几个结构体之间的关系

  • scheduler handle
  • scheduler handle
  • core
  • worker
  • launch

tokio compenent structure