1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::io;
use futures::task;
use mio::event::Evented;
use reactor::{Message, Remote, Handle, Direction};
/// A token that identifies an active timeout.
pub struct IoToken {
token: usize,
// TODO: can we avoid this allocation? It's kind of a bummer...
readiness: Arc<AtomicUsize>,
}
impl IoToken {
/// Add a new source to an event loop, returning a future which will resolve
/// to the token that can be used to identify this source.
///
/// When a new I/O object is created it needs to be communicated to the
/// event loop to ensure that it's registered and ready to receive
/// notifications. The event loop will then respond back with the I/O object
/// and a token which can be used to send more messages to the event loop.
///
/// The token returned is then passed in turn to each of the methods below
/// to interact with notifications on the I/O object itself.
///
/// # Panics
///
/// The returned future will panic if the event loop this handle is
/// associated with has gone away, or if there is an error communicating
/// with the event loop.
pub fn new(source: &Evented, handle: &Handle) -> io::Result<IoToken> {
match handle.inner.upgrade() {
Some(inner) => {
let (ready, token) = try!(inner.borrow_mut().add_source(source));
Ok(IoToken { token: token, readiness: ready })
}
None => Err(io::Error::new(io::ErrorKind::Other, "event loop gone")),
}
}
/// Consumes the last readiness notification the token this source is for
/// registered.
///
/// Currently sources receive readiness notifications on an edge-basis. That
/// is, once you receive a notification that an object can be read, you
/// won't receive any more notifications until all of that data has been
/// read.
///
/// The event loop will fill in this information and then inform futures
/// that they're ready to go with the `schedule` method, and then the `poll`
/// method can use this to figure out what happened.
///
/// > **Note**: This method should generally not be used directly, but
/// > rather the `ReadinessStream` type should be used instead.
// TODO: this should really return a proper newtype/enum, not a usize
pub fn take_readiness(&self) -> usize {
self.readiness.swap(0, Ordering::SeqCst)
}
/// Schedule the current future task to receive a notification when the
/// corresponding I/O object is readable.
///
/// Once an I/O object has been registered with the event loop through the
/// `add_source` method, this method can be used with the assigned token to
/// notify the current future task when the next read notification comes in.
///
/// The current task will only receive a notification **once** and to
/// receive further notifications it will need to call `schedule_read`
/// again.
///
/// > **Note**: This method should generally not be used directly, but
/// > rather the `ReadinessStream` type should be used instead.
///
/// # Panics
///
/// This function will panic if the event loop this handle is associated
/// with has gone away, or if there is an error communicating with the event
/// loop.
///
/// This function will also panic if there is not a currently running future
/// task.
pub fn schedule_read(&self, handle: &Remote) {
handle.send(Message::Schedule(self.token, task::current(), Direction::Read));
}
/// Schedule the current future task to receive a notification when the
/// corresponding I/O object is writable.
///
/// Once an I/O object has been registered with the event loop through the
/// `add_source` method, this method can be used with the assigned token to
/// notify the current future task when the next write notification comes
/// in.
///
/// The current task will only receive a notification **once** and to
/// receive further notifications it will need to call `schedule_write`
/// again.
///
/// > **Note**: This method should generally not be used directly, but
/// > rather the `ReadinessStream` type should be used instead.
///
/// # Panics
///
/// This function will panic if the event loop this handle is associated
/// with has gone away, or if there is an error communicating with the event
/// loop.
///
/// This function will also panic if there is not a currently running future
/// task.
pub fn schedule_write(&self, handle: &Remote) {
handle.send(Message::Schedule(self.token, task::current(), Direction::Write));
}
/// Unregister all information associated with a token on an event loop,
/// deallocating all internal resources assigned to the given token.
///
/// This method should be called whenever a source of events is being
/// destroyed. This will ensure that the event loop can reuse `tok` for
/// another I/O object if necessary and also remove it from any poll
/// notifications and callbacks.
///
/// Note that wake callbacks may still be invoked after this method is
/// called as it may take some time for the message to drop a source to
/// reach the event loop. Despite this fact, this method will attempt to
/// ensure that the callbacks are **not** invoked, so pending scheduled
/// callbacks cannot be relied upon to get called.
///
/// > **Note**: This method should generally not be used directly, but
/// > rather the `ReadinessStream` type should be used instead.
///
/// # Panics
///
/// This function will panic if the event loop this handle is associated
/// with has gone away, or if there is an error communicating with the event
/// loop.
pub fn drop_source(&self, handle: &Remote) {
handle.send(Message::DropSource(self.token));
}
}