[go: up one dir, main page]

miow/
iocp.rs

1//! Bindings to IOCP, I/O Completion Ports
2
3use crate::FALSE;
4use std::cmp;
5use std::fmt;
6use std::io;
7use std::mem;
8use std::os::windows::io::{AsRawHandle, AsRawSocket, FromRawHandle, IntoRawHandle, RawHandle};
9use std::time::Duration;
10
11use crate::handle::Handle;
12use crate::Overlapped;
13use windows_sys::Win32::Foundation::{HANDLE, INVALID_HANDLE_VALUE};
14use windows_sys::Win32::System::IO::{
15    CreateIoCompletionPort, GetQueuedCompletionStatus, GetQueuedCompletionStatusEx,
16    PostQueuedCompletionStatus, OVERLAPPED, OVERLAPPED_ENTRY,
17};
18
19/// A handle to an Windows I/O Completion Port.
20#[derive(Debug)]
21pub struct CompletionPort {
22    handle: Handle,
23}
24
25/// A status message received from an I/O completion port.
26///
27/// These statuses can be created via the `new` or `empty` constructors and then
28/// provided to a completion port, or they are read out of a completion port.
29/// The fields of each status are read through its accessor methods.
30#[derive(Clone, Copy)]
31#[repr(transparent)]
32pub struct CompletionStatus(OVERLAPPED_ENTRY);
33
34impl fmt::Debug for CompletionStatus {
35    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
36        write!(f, "CompletionStatus(OVERLAPPED_ENTRY)")
37    }
38}
39
40unsafe impl Send for CompletionStatus {}
41unsafe impl Sync for CompletionStatus {}
42
43impl CompletionPort {
44    /// Creates a new I/O completion port with the specified concurrency value.
45    ///
46    /// The number of threads given corresponds to the level of concurrency
47    /// allowed for threads associated with this port. Consult the Windows
48    /// documentation for more information about this value.
49    pub fn new(threads: u32) -> io::Result<CompletionPort> {
50        let ret = unsafe {
51            CreateIoCompletionPort(INVALID_HANDLE_VALUE, std::ptr::null_mut(), 0, threads)
52        };
53        if ret.is_null() {
54            Err(io::Error::last_os_error())
55        } else {
56            Ok(CompletionPort {
57                handle: Handle::new(ret),
58            })
59        }
60    }
61
62    /// Associates a new `HANDLE` to this I/O completion port.
63    ///
64    /// This function will associate the given handle to this port with the
65    /// given `token` to be returned in status messages whenever it receives a
66    /// notification.
67    ///
68    /// Any object which is convertible to a `HANDLE` via the `AsRawHandle`
69    /// trait can be provided to this function, such as `std::fs::File` and
70    /// friends.
71    pub fn add_handle<T: AsRawHandle + ?Sized>(&self, token: usize, t: &T) -> io::Result<()> {
72        self._add(token, t.as_raw_handle() as HANDLE)
73    }
74
75    /// Associates a new `SOCKET` to this I/O completion port.
76    ///
77    /// This function will associate the given socket to this port with the
78    /// given `token` to be returned in status messages whenever it receives a
79    /// notification.
80    ///
81    /// Any object which is convertible to a `SOCKET` via the `AsRawSocket`
82    /// trait can be provided to this function, such as `std::net::TcpStream`
83    /// and friends.
84    pub fn add_socket<T: AsRawSocket + ?Sized>(&self, token: usize, t: &T) -> io::Result<()> {
85        self._add(token, t.as_raw_socket() as HANDLE)
86    }
87
88    fn _add(&self, token: usize, handle: HANDLE) -> io::Result<()> {
89        assert_eq!(mem::size_of_val(&token), mem::size_of::<usize>());
90        let ret = unsafe { CreateIoCompletionPort(handle, self.handle.raw(), token, 0) };
91        if ret.is_null() {
92            Err(io::Error::last_os_error())
93        } else {
94            debug_assert_eq!(ret, self.handle.raw());
95            Ok(())
96        }
97    }
98
99    /// Dequeue a completion status from this I/O completion port.
100    ///
101    /// This function will associate the calling thread with this completion
102    /// port and then wait for a status message to become available. The precise
103    /// semantics on when this function returns depends on the concurrency value
104    /// specified when the port was created.
105    ///
106    /// A timeout can optionally be specified to this function. If `None` is
107    /// provided this function will not time out, and otherwise it will time out
108    /// after the specified duration has passed.
109    ///
110    /// On success this will return the status message which was dequeued from
111    /// this completion port.
112    pub fn get(&self, timeout: Option<Duration>) -> io::Result<CompletionStatus> {
113        let mut bytes = 0;
114        let mut token = 0;
115        let mut overlapped = std::ptr::null_mut();
116        let timeout = crate::dur2ms(timeout);
117        let ret = unsafe {
118            GetQueuedCompletionStatus(
119                self.handle.raw(),
120                &mut bytes,
121                &mut token,
122                &mut overlapped,
123                timeout,
124            )
125        };
126        crate::cvt(ret).map(|_| {
127            CompletionStatus(OVERLAPPED_ENTRY {
128                dwNumberOfBytesTransferred: bytes,
129                lpCompletionKey: token,
130                lpOverlapped: overlapped,
131                Internal: 0,
132            })
133        })
134    }
135
136    /// Dequeues a number of completion statuses from this I/O completion port.
137    ///
138    /// This function is the same as `get` except that it may return more than
139    /// one status. A buffer of "zero" statuses is provided (the contents are
140    /// not read) and then on success this function will return a sub-slice of
141    /// statuses which represent those which were dequeued from this port. This
142    /// function does not wait to fill up the entire list of statuses provided.
143    ///
144    /// Like with `get`, a timeout may be specified for this operation.
145    pub fn get_many<'a>(
146        &self,
147        list: &'a mut [CompletionStatus],
148        timeout: Option<Duration>,
149    ) -> io::Result<&'a mut [CompletionStatus]> {
150        debug_assert_eq!(
151            mem::size_of::<CompletionStatus>(),
152            mem::size_of::<OVERLAPPED_ENTRY>()
153        );
154        let mut removed = 0;
155        let timeout = crate::dur2ms(timeout);
156        let len = cmp::min(list.len(), u32::MAX as usize) as u32;
157        let ret = unsafe {
158            GetQueuedCompletionStatusEx(
159                self.handle.raw(),
160                list.as_ptr() as *mut _,
161                len,
162                &mut removed,
163                timeout,
164                FALSE,
165            )
166        };
167        match crate::cvt(ret) {
168            Ok(_) => Ok(&mut list[..removed as usize]),
169            Err(e) => Err(e),
170        }
171    }
172
173    /// Posts a new completion status onto this I/O completion port.
174    ///
175    /// This function will post the given status, with custom parameters, to the
176    /// port. Threads blocked in `get` or `get_many` will eventually receive
177    /// this status.
178    pub fn post(&self, status: CompletionStatus) -> io::Result<()> {
179        let ret = unsafe {
180            PostQueuedCompletionStatus(
181                self.handle.raw(),
182                status.0.dwNumberOfBytesTransferred,
183                status.0.lpCompletionKey,
184                status.0.lpOverlapped,
185            )
186        };
187        crate::cvt(ret).map(|_| ())
188    }
189}
190
191impl AsRawHandle for CompletionPort {
192    fn as_raw_handle(&self) -> RawHandle {
193        self.handle.raw() as RawHandle
194    }
195}
196
197impl FromRawHandle for CompletionPort {
198    unsafe fn from_raw_handle(handle: RawHandle) -> CompletionPort {
199        CompletionPort {
200            handle: Handle::new(handle as HANDLE),
201        }
202    }
203}
204
205impl IntoRawHandle for CompletionPort {
206    fn into_raw_handle(self) -> RawHandle {
207        self.handle.into_raw() as RawHandle
208    }
209}
210
211impl CompletionStatus {
212    /// Creates a new completion status with the provided parameters.
213    ///
214    /// This function is useful when creating a status to send to a port with
215    /// the `post` method. The parameters are opaquely passed through and not
216    /// interpreted by the system at all.
217    pub fn new(bytes: u32, token: usize, overlapped: *mut Overlapped) -> CompletionStatus {
218        assert_eq!(mem::size_of_val(&token), mem::size_of::<usize>());
219        CompletionStatus(OVERLAPPED_ENTRY {
220            dwNumberOfBytesTransferred: bytes,
221            lpCompletionKey: token,
222            lpOverlapped: overlapped as *mut _,
223            Internal: 0,
224        })
225    }
226
227    /// Creates a new borrowed completion status from the borrowed
228    /// `OVERLAPPED_ENTRY` argument provided.
229    ///
230    /// This method will wrap the `OVERLAPPED_ENTRY` in a `CompletionStatus`,
231    /// returning the wrapped structure.
232    pub fn from_entry(entry: &OVERLAPPED_ENTRY) -> &CompletionStatus {
233        // Safety: CompletionStatus is repr(transparent) w/ OVERLAPPED_ENTRY, so
234        // a reference to one is guaranteed to be layout compatible with the
235        // reference to another.
236        unsafe { &*(entry as *const _ as *const _) }
237    }
238
239    /// Creates a new "zero" completion status.
240    ///
241    /// This function is useful when creating a stack buffer or vector of
242    /// completion statuses to be passed to the `get_many` function.
243    pub fn zero() -> CompletionStatus {
244        CompletionStatus::new(0, 0, std::ptr::null_mut())
245    }
246
247    /// Returns the number of bytes that were transferred for the I/O operation
248    /// associated with this completion status.
249    pub fn bytes_transferred(&self) -> u32 {
250        self.0.dwNumberOfBytesTransferred
251    }
252
253    /// Returns the completion key value associated with the file handle whose
254    /// I/O operation has completed.
255    ///
256    /// A completion key is a per-handle key that is specified when it is added
257    /// to an I/O completion port via `add_handle` or `add_socket`.
258    pub fn token(&self) -> usize {
259        self.0.lpCompletionKey
260    }
261
262    /// Returns a pointer to the `Overlapped` structure that was specified when
263    /// the I/O operation was started.
264    pub fn overlapped(&self) -> *mut OVERLAPPED {
265        self.0.lpOverlapped
266    }
267
268    /// Returns a pointer to the internal `OVERLAPPED_ENTRY` object.
269    pub fn entry(&self) -> &OVERLAPPED_ENTRY {
270        &self.0
271    }
272}
273
274#[cfg(test)]
275mod tests {
276    use crate::iocp::{CompletionPort, CompletionStatus};
277    use std::mem;
278    use std::time::Duration;
279    use windows_sys::Win32::Foundation::*;
280
281    #[test]
282    fn is_send_sync() {
283        fn is_send_sync<T: Send + Sync>() {}
284        is_send_sync::<CompletionPort>();
285    }
286
287    #[test]
288    fn token_right_size() {
289        assert_eq!(mem::size_of::<usize>(), mem::size_of::<usize>());
290    }
291
292    #[test]
293    fn timeout() {
294        let c = CompletionPort::new(1).unwrap();
295        let err = c.get(Some(Duration::from_millis(1))).unwrap_err();
296        assert_eq!(err.raw_os_error(), Some(WAIT_TIMEOUT as i32));
297    }
298
299    #[test]
300    fn get() {
301        let c = CompletionPort::new(1).unwrap();
302        c.post(CompletionStatus::new(
303            1,
304            2,
305            std::ptr::NonNull::dangling().as_ptr(),
306        ))
307        .unwrap();
308        let s = c.get(None).unwrap();
309        assert_eq!(s.bytes_transferred(), 1);
310        assert_eq!(s.token(), 2);
311        assert_eq!(s.overlapped(), std::ptr::NonNull::dangling().as_ptr());
312    }
313
314    #[test]
315    fn get_many() {
316        let c = CompletionPort::new(1).unwrap();
317
318        c.post(CompletionStatus::new(
319            1,
320            2,
321            std::ptr::NonNull::dangling().as_ptr(),
322        ))
323        .unwrap();
324        c.post(CompletionStatus::new(
325            4,
326            5,
327            std::ptr::NonNull::dangling().as_ptr(),
328        ))
329        .unwrap();
330
331        let mut s = vec![CompletionStatus::zero(); 4];
332        {
333            let s = c.get_many(&mut s, None).unwrap();
334            assert_eq!(s.len(), 2);
335            assert_eq!(s[0].bytes_transferred(), 1);
336            assert_eq!(s[0].token(), 2);
337            assert_eq!(s[0].overlapped(), std::ptr::NonNull::dangling().as_ptr());
338            assert_eq!(s[1].bytes_transferred(), 4);
339            assert_eq!(s[1].token(), 5);
340            assert_eq!(s[1].overlapped(), std::ptr::NonNull::dangling().as_ptr());
341        }
342        assert_eq!(s[2].bytes_transferred(), 0);
343        assert_eq!(s[2].token(), 0);
344        assert_eq!(s[2].overlapped(), std::ptr::null_mut());
345    }
346}