1use crate::FALSE;
4use std::cmp;
5use std::fmt;
6use std::io;
7use std::mem;
8use std::os::windows::io::{AsRawHandle, AsRawSocket, FromRawHandle, IntoRawHandle, RawHandle};
9use std::time::Duration;
10
11use crate::handle::Handle;
12use crate::Overlapped;
13use windows_sys::Win32::Foundation::{HANDLE, INVALID_HANDLE_VALUE};
14use windows_sys::Win32::System::IO::{
15 CreateIoCompletionPort, GetQueuedCompletionStatus, GetQueuedCompletionStatusEx,
16 PostQueuedCompletionStatus, OVERLAPPED, OVERLAPPED_ENTRY,
17};
18
19#[derive(Debug)]
21pub struct CompletionPort {
22 handle: Handle,
23}
24
25#[derive(Clone, Copy)]
31#[repr(transparent)]
32pub struct CompletionStatus(OVERLAPPED_ENTRY);
33
34impl fmt::Debug for CompletionStatus {
35 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
36 write!(f, "CompletionStatus(OVERLAPPED_ENTRY)")
37 }
38}
39
40unsafe impl Send for CompletionStatus {}
41unsafe impl Sync for CompletionStatus {}
42
43impl CompletionPort {
44 pub fn new(threads: u32) -> io::Result<CompletionPort> {
50 let ret = unsafe {
51 CreateIoCompletionPort(INVALID_HANDLE_VALUE, std::ptr::null_mut(), 0, threads)
52 };
53 if ret.is_null() {
54 Err(io::Error::last_os_error())
55 } else {
56 Ok(CompletionPort {
57 handle: Handle::new(ret),
58 })
59 }
60 }
61
62 pub fn add_handle<T: AsRawHandle + ?Sized>(&self, token: usize, t: &T) -> io::Result<()> {
72 self._add(token, t.as_raw_handle() as HANDLE)
73 }
74
75 pub fn add_socket<T: AsRawSocket + ?Sized>(&self, token: usize, t: &T) -> io::Result<()> {
85 self._add(token, t.as_raw_socket() as HANDLE)
86 }
87
88 fn _add(&self, token: usize, handle: HANDLE) -> io::Result<()> {
89 assert_eq!(mem::size_of_val(&token), mem::size_of::<usize>());
90 let ret = unsafe { CreateIoCompletionPort(handle, self.handle.raw(), token, 0) };
91 if ret.is_null() {
92 Err(io::Error::last_os_error())
93 } else {
94 debug_assert_eq!(ret, self.handle.raw());
95 Ok(())
96 }
97 }
98
99 pub fn get(&self, timeout: Option<Duration>) -> io::Result<CompletionStatus> {
113 let mut bytes = 0;
114 let mut token = 0;
115 let mut overlapped = std::ptr::null_mut();
116 let timeout = crate::dur2ms(timeout);
117 let ret = unsafe {
118 GetQueuedCompletionStatus(
119 self.handle.raw(),
120 &mut bytes,
121 &mut token,
122 &mut overlapped,
123 timeout,
124 )
125 };
126 crate::cvt(ret).map(|_| {
127 CompletionStatus(OVERLAPPED_ENTRY {
128 dwNumberOfBytesTransferred: bytes,
129 lpCompletionKey: token,
130 lpOverlapped: overlapped,
131 Internal: 0,
132 })
133 })
134 }
135
136 pub fn get_many<'a>(
146 &self,
147 list: &'a mut [CompletionStatus],
148 timeout: Option<Duration>,
149 ) -> io::Result<&'a mut [CompletionStatus]> {
150 debug_assert_eq!(
151 mem::size_of::<CompletionStatus>(),
152 mem::size_of::<OVERLAPPED_ENTRY>()
153 );
154 let mut removed = 0;
155 let timeout = crate::dur2ms(timeout);
156 let len = cmp::min(list.len(), u32::MAX as usize) as u32;
157 let ret = unsafe {
158 GetQueuedCompletionStatusEx(
159 self.handle.raw(),
160 list.as_ptr() as *mut _,
161 len,
162 &mut removed,
163 timeout,
164 FALSE,
165 )
166 };
167 match crate::cvt(ret) {
168 Ok(_) => Ok(&mut list[..removed as usize]),
169 Err(e) => Err(e),
170 }
171 }
172
173 pub fn post(&self, status: CompletionStatus) -> io::Result<()> {
179 let ret = unsafe {
180 PostQueuedCompletionStatus(
181 self.handle.raw(),
182 status.0.dwNumberOfBytesTransferred,
183 status.0.lpCompletionKey,
184 status.0.lpOverlapped,
185 )
186 };
187 crate::cvt(ret).map(|_| ())
188 }
189}
190
191impl AsRawHandle for CompletionPort {
192 fn as_raw_handle(&self) -> RawHandle {
193 self.handle.raw() as RawHandle
194 }
195}
196
197impl FromRawHandle for CompletionPort {
198 unsafe fn from_raw_handle(handle: RawHandle) -> CompletionPort {
199 CompletionPort {
200 handle: Handle::new(handle as HANDLE),
201 }
202 }
203}
204
205impl IntoRawHandle for CompletionPort {
206 fn into_raw_handle(self) -> RawHandle {
207 self.handle.into_raw() as RawHandle
208 }
209}
210
211impl CompletionStatus {
212 pub fn new(bytes: u32, token: usize, overlapped: *mut Overlapped) -> CompletionStatus {
218 assert_eq!(mem::size_of_val(&token), mem::size_of::<usize>());
219 CompletionStatus(OVERLAPPED_ENTRY {
220 dwNumberOfBytesTransferred: bytes,
221 lpCompletionKey: token,
222 lpOverlapped: overlapped as *mut _,
223 Internal: 0,
224 })
225 }
226
227 pub fn from_entry(entry: &OVERLAPPED_ENTRY) -> &CompletionStatus {
233 unsafe { &*(entry as *const _ as *const _) }
237 }
238
239 pub fn zero() -> CompletionStatus {
244 CompletionStatus::new(0, 0, std::ptr::null_mut())
245 }
246
247 pub fn bytes_transferred(&self) -> u32 {
250 self.0.dwNumberOfBytesTransferred
251 }
252
253 pub fn token(&self) -> usize {
259 self.0.lpCompletionKey
260 }
261
262 pub fn overlapped(&self) -> *mut OVERLAPPED {
265 self.0.lpOverlapped
266 }
267
268 pub fn entry(&self) -> &OVERLAPPED_ENTRY {
270 &self.0
271 }
272}
273
274#[cfg(test)]
275mod tests {
276 use crate::iocp::{CompletionPort, CompletionStatus};
277 use std::mem;
278 use std::time::Duration;
279 use windows_sys::Win32::Foundation::*;
280
281 #[test]
282 fn is_send_sync() {
283 fn is_send_sync<T: Send + Sync>() {}
284 is_send_sync::<CompletionPort>();
285 }
286
287 #[test]
288 fn token_right_size() {
289 assert_eq!(mem::size_of::<usize>(), mem::size_of::<usize>());
290 }
291
292 #[test]
293 fn timeout() {
294 let c = CompletionPort::new(1).unwrap();
295 let err = c.get(Some(Duration::from_millis(1))).unwrap_err();
296 assert_eq!(err.raw_os_error(), Some(WAIT_TIMEOUT as i32));
297 }
298
299 #[test]
300 fn get() {
301 let c = CompletionPort::new(1).unwrap();
302 c.post(CompletionStatus::new(
303 1,
304 2,
305 std::ptr::NonNull::dangling().as_ptr(),
306 ))
307 .unwrap();
308 let s = c.get(None).unwrap();
309 assert_eq!(s.bytes_transferred(), 1);
310 assert_eq!(s.token(), 2);
311 assert_eq!(s.overlapped(), std::ptr::NonNull::dangling().as_ptr());
312 }
313
314 #[test]
315 fn get_many() {
316 let c = CompletionPort::new(1).unwrap();
317
318 c.post(CompletionStatus::new(
319 1,
320 2,
321 std::ptr::NonNull::dangling().as_ptr(),
322 ))
323 .unwrap();
324 c.post(CompletionStatus::new(
325 4,
326 5,
327 std::ptr::NonNull::dangling().as_ptr(),
328 ))
329 .unwrap();
330
331 let mut s = vec![CompletionStatus::zero(); 4];
332 {
333 let s = c.get_many(&mut s, None).unwrap();
334 assert_eq!(s.len(), 2);
335 assert_eq!(s[0].bytes_transferred(), 1);
336 assert_eq!(s[0].token(), 2);
337 assert_eq!(s[0].overlapped(), std::ptr::NonNull::dangling().as_ptr());
338 assert_eq!(s[1].bytes_transferred(), 4);
339 assert_eq!(s[1].token(), 5);
340 assert_eq!(s[1].overlapped(), std::ptr::NonNull::dangling().as_ptr());
341 }
342 assert_eq!(s[2].bytes_transferred(), 0);
343 assert_eq!(s[2].token(), 0);
344 assert_eq!(s[2].overlapped(), std::ptr::null_mut());
345 }
346}