1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
use {Builder};
use futures::task::Task;
use slab::Slab;
use std::{cmp, mem, usize};
use std::time::{Instant, Duration};
pub struct Wheel {
// Actual timer wheel itself.
//
// Each slot represents a fixed duration of time, and this wheel also
// behaves like a ring buffer. All timeouts scheduled will correspond to one
// slot and therefore each slot has a linked list of timeouts scheduled in
// it. Right now linked lists are done through indices into the `slab`
// below.
//
// Each slot also contains the next timeout associated with it (the minimum
// of the entire linked list).
wheel: Vec<Slot>,
// A slab containing all the timeout entries themselves. This is the memory
// backing the "linked lists" in the wheel above. Each entry has a prev/next
// pointer (indices in this array) along with the data associated with the
// timeout and the time the timeout will fire.
slab: Slab<Entry, Token>,
// The instant that this timer was created, through which all other timeout
// computations are relative to.
start: Instant,
// State used during `poll`. The `cur_wheel_tick` field is the current tick
// we've poll'd to. That is, all events from `cur_wheel_tick` to the
// actual current tick in time still need to be processed.
//
// The `cur_slab_idx` variable is basically just an iterator over the linked
// list associated with a wheel slot. This will get incremented as we move
// forward in `poll`
cur_wheel_tick: u64,
// The next timeout to tick
cur_slab_idx: Token,
// Max capacity of the slab
max_capacity: usize,
// The duration of each tick in ms
tick_ms: u64,
// Mask to convert the current tick to a wheel slot
mask: usize,
}
#[derive(Clone)]
struct Slot {
head: Token,
next_timeout: Option<Instant>,
}
enum Entry {
Reserved,
Timeout(Timeout),
}
struct Timeout {
task: Task,
when: Instant,
wheel_idx: usize,
prev: Token,
next: Token,
}
/// Represents a slot in the timer
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Token(pub usize);
const EMPTY: Token = Token(usize::MAX);
impl Wheel {
/// Creates a new timer wheel with the given configuration settings.
pub fn new(builder: &Builder) -> Wheel {
let num_slots = builder.get_num_slots();
let mask = num_slots - 1;
// Check that the number of slots requested is, in fact, a power of two
assert!(num_slots & mask == 0, "num_slots must be a power of two");
Wheel {
wheel: vec![Slot { head: EMPTY, next_timeout: None }; num_slots],
slab: Slab::with_capacity(builder.get_initial_capacity()),
start: Instant::now(),
cur_wheel_tick: 0,
cur_slab_idx: EMPTY,
max_capacity: builder.get_max_capacity(),
tick_ms: millis(builder.get_tick_duration()),
mask: mask,
}
}
pub fn available(&self) -> usize {
self.slab.available()
}
/// Reserve a slot in the timer
pub fn reserve(&mut self) -> Option<Token> {
// Ensure that there is enough space to reserve a new token.
//
// TODO: Ensure max capacity is not exceeded
if self.slab.vacant_entry().is_none() {
let amt = self.slab.len();
let amt = cmp::min(amt, self.max_capacity - amt);
if amt == 0 {
// Reached max capacity
return None;
}
self.slab.reserve_exact(amt);
}
// Reserve the slot
self.slab.insert(Entry::Reserved).ok()
}
pub fn release(&mut self, token: Token) {
self.slab.remove(token);
}
pub fn set_timeout(&mut self, token: Token, mut at: Instant, task: Task) {
// First up, figure out where we're gonna go in the wheel. Note that if
// we're being scheduled on or before the current wheel tick we just
// make sure to defer ourselves to the next tick.
let mut tick = self.time_to_ticks(at);
if tick <= self.cur_wheel_tick {
tick = self.cur_wheel_tick + 1;
}
let wheel_idx = self.ticks_to_wheel_idx(tick);
let actual_tick = self.start +
Duration::from_millis(self.tick_ms) * (tick as u32);
at = actual_tick;
// Insert ourselves at the head of the linked list in the wheel.
let slot = &mut self.wheel[wheel_idx];
let prev_head = mem::replace(&mut slot.head, token);
{
self.slab[token] = Entry::Timeout(Timeout {
task: task,
when: at,
wheel_idx: wheel_idx,
prev: EMPTY,
next: prev_head,
});
}
if prev_head != EMPTY {
match self.slab[prev_head] {
Entry::Timeout(ref mut v) => v.prev = slot.head,
_ => panic!("unexpected state"),
}
}
// Update the wheel slot's next timeout field.
if at <= slot.next_timeout.unwrap_or(at) {
slot.next_timeout = Some(at);
}
}
/// Queries this timer to see if any timeouts are ready to fire.
///
/// This function will advance the internal wheel to the time specified by
/// `at`, returning any timeout which has happened up to that point. This
/// method should be called in a loop until it returns `None` to ensure that
/// all timeouts are processed.
///
/// # Panics
///
/// This method will panic if `at` is before the instant that this timer
/// wheel was created.
pub fn poll(&mut self, at: Instant) -> Option<Task> {
let wheel_tick = self.time_to_ticks(at);
// Advance forward in time to the `wheel_tick` specified.
//
// TODO: don't visit slots in the wheel more than once
while self.cur_wheel_tick <= wheel_tick {
let head = self.cur_slab_idx;
let idx = self.ticks_to_wheel_idx(self.cur_wheel_tick);
// If the current slot has no entries or we're done iterating go to
// the next tick.
if head == EMPTY {
if head == self.wheel[idx].head {
self.wheel[idx].next_timeout = None;
}
self.cur_wheel_tick += 1;
let idx = self.ticks_to_wheel_idx(self.cur_wheel_tick);
self.cur_slab_idx = self.wheel[idx].head;
continue
}
// If we're starting to iterate over a slot, clear its timeout as
// we're probably going to remove entries. As we skip over each
// element of this slot we'll restore the `next_timeout` field if
// necessary.
if head == self.wheel[idx].head {
self.wheel[idx].next_timeout = None;
}
// Otherwise, continue iterating over the linked list in the wheel
// slot we're on and remove anything which has expired.
let head_timeout = {
let timeout = self.slab[head].timeout();
self.cur_slab_idx = timeout.next;
timeout.when
};
if self.time_to_ticks(head_timeout) <= self.time_to_ticks(at) {
let task = match self.remove_slab(head) {
Some(Entry::Timeout(v)) => {
Some(v.task)
}
_ => None,
};
return task;
} else {
let next = self.wheel[idx].next_timeout.unwrap_or(head_timeout);
if head_timeout <= next {
self.wheel[idx].next_timeout = Some(head_timeout);
}
}
}
None
}
/// Returns the instant in time that corresponds to the next timeout
/// scheduled in this wheel.
pub fn next_timeout(&self) -> Option<Instant> {
// TODO: can this be optimized to not look at the whole array?
let mut min = None;
for a in self.wheel.iter().filter_map(|s| s.next_timeout.as_ref()) {
if let Some(b) = min {
if b < a {
continue
}
}
min = Some(a);
}
min.map(|t| *t)
}
pub fn move_timeout(&mut self, token: Token, when: Instant, task: Task) {
match self.slab.get_mut(token) {
Some(&mut Entry::Timeout(ref mut e)) if e.when == when => {
e.task = task;
}
_ => {}
}
}
/// Cancels the specified timeout.
///
/// For timeouts previously registered via `insert` they can be passed back
/// to this method to cancel the associated timeout, retrieving the value
/// inserted if the timeout has not already fired.
///
/// This method completes in O(1) time.
///
/// # Panics
///
/// This method may panic if `timeout` wasn't created by this timer wheel.
pub fn cancel(&mut self, token: Token, when: Instant) {
match self.slab.get(token) {
Some(&Entry::Timeout(ref e)) if e.when == when => {}
_ => return,
}
self.remove_slab(token);
}
fn remove_slab(&mut self, slab_idx: Token) -> Option<Entry> {
let mut entry = match self.slab.remove(slab_idx) {
Some(e) => e,
None => return None,
};
if let Entry::Timeout(ref mut entry) = entry {
// Remove the node from the linked list
if entry.prev == EMPTY {
self.wheel[entry.wheel_idx].head = entry.next;
} else {
self.slab[entry.prev].timeout_mut().next = entry.next;
}
if entry.next != EMPTY {
self.slab[entry.next].timeout_mut().prev = entry.prev;
}
if self.cur_slab_idx == slab_idx {
self.cur_slab_idx = entry.next;
}
}
return Some(entry)
}
fn time_to_ticks(&self, time: Instant) -> u64 {
let dur = time - self.start;
let ms = dur.subsec_nanos() as u64 / 1_000_000;
let ms = dur.as_secs()
.checked_mul(1_000)
.and_then(|m| m.checked_add(ms))
.expect("overflow scheduling timeout");
ms / self.tick_ms
}
fn ticks_to_wheel_idx(&self, ticks: u64) -> usize {
(ticks as usize) & self.mask
}
}
impl Entry {
fn timeout(&self) -> &Timeout {
match *self {
Entry::Timeout(ref v) => v,
_ => panic!("unexpected state"),
}
}
fn timeout_mut(&mut self) -> &mut Timeout {
match *self {
Entry::Timeout(ref mut v) => v,
_ => panic!("unexpected state"),
}
}
}
impl From<usize> for Token {
fn from(src: usize) -> Token {
Token(src)
}
}
impl From<Token> for usize {
fn from(src: Token) -> usize {
src.0
}
}
pub fn millis(duration: Duration) -> u64 {
const NANOS_PER_MILLI: u32 = 1_000_000;
const MILLIS_PER_SEC: u64 = 1_000;
// Convert a `Duration` to milliseconds, rounding up and saturating at
// `u64::MAX`.
//
// The saturating is fine because `u64::MAX` milliseconds are still many
// million years.
let millis = (duration.subsec_nanos() + NANOS_PER_MILLI - 1) / NANOS_PER_MILLI;
duration.as_secs().saturating_mul(MILLIS_PER_SEC).saturating_add(millis as u64)
}