1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
| // tokio/src/runtime/scheduler/multi_thread/worker.rs
pub(super) fn create(
size: usize,
park: Parker,
driver_handle: driver::Handle,
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
config: Config,
) -> (Arc<Handle>, Launch) {
let mut cores = Vec::with_capacity(size);
// remotes 是什么
let mut remotes = Vec::with_capacity(size);
let mut worker_metrics = Vec::with_capacity(size);
// Create the local queues
for _ in 0..size {
let (steal, run_queue) = queue::local();
let park = park.clone();
let unpark = park.unpark();
let metrics = WorkerMetrics::from_config(&config);
// NOTE: 这里已经确定了每个worker在数组中的index
cores.push(Box::new(Core {
tick: 0,
lifo_slot: None,
lifo_enabled: !config.disable_lifo_slot,
run_queue,
is_searching: false,
is_shutdown: false,
park: Some(park),
metrics: MetricsBatch::new(&metrics),
rand: FastRand::new(config.seed_generator.next_seed()),
}));
// NOTE: 这里worker在remotes中的顺序和在cores中的顺序一样
remotes.push(Remote { steal, unpark });
worker_metrics.push(metrics);
}
let handle = Arc::new(Handle {
shared: Shared {
remotes: remotes.into_boxed_slice(),
inject: Inject::new(),
idle: Idle::new(size),
owned: OwnedTasks::new(),
shutdown_cores: Mutex::new(vec![]),
config,
scheduler_metrics: SchedulerMetrics::new(),
worker_metrics: worker_metrics.into_boxed_slice(),
_counters: Counters,
},
driver: driver_handle,
blocking_spawner,
seed_generator,
});
let mut launch = Launch(vec![]);
for (index, core) in cores.drain(..).enumerate() {
launch.0.push(Arc::new(Worker {
handle: handle.clone(),
index,
core: AtomicCell::new(Some(core)),
}));
}
(handle, launch)
}
/// Starts the workers
pub(crate) struct Launch(Vec<Arc<Worker>>);
impl Launch {
pub(crate) fn launch(mut self) {
for worker in self.0.drain(..) {
runtime::spawn_blocking(move || run(worker));
}
}
}
/// A scheduler worker
pub(super) struct Worker {
/// Reference to scheduler's handle
handle: Arc<Handle>,
/// Index holding this worker's remote state
index: usize,
/// Used to hand-off a worker's core to another thread.
core: AtomicCell<Core>,
}
/// Core data
struct Core {
/// Used to schedule bookkeeping tasks every so often.
tick: u32,
/// When a task is scheduled from a worker, it is stored in this slot. The
/// worker will check this slot for a task **before** checking the run
/// queue. This effectively results in the **last** scheduled task to be run
/// next (LIFO). This is an optimization for improving locality which
/// benefits message passing patterns and helps to reduce latency.
lifo_slot: Option<Notified>,
/// When `true`, locally scheduled tasks go to the LIFO slot. When `false`,
/// they go to the back of the `run_queue`.
lifo_enabled: bool,
/// The worker-local run queue.
run_queue: queue::Local<Arc<Handle>>,
/// True if the worker is currently searching for more work. Searching
/// involves attempting to steal from other workers.
is_searching: bool,
/// True if the scheduler is being shutdown
is_shutdown: bool,
/// Parker
///
/// Stored in an `Option` as the parker is added / removed to make the
/// borrow checker happy.
park: Option<Parker>,
/// Batching metrics so they can be submitted to RuntimeMetrics.
metrics: MetricsBatch,
/// Fast random number generator.
rand: FastRand,
}
|