[go: up one dir, main page]

sigchld/
lib.rs

1//! # `sigchld` [![Actions Status](https://github.com/oconnor663/sigchld.rs/workflows/tests/badge.svg)](https://github.com/oconnor663/sigchld.rs/actions) [![crates.io](https://img.shields.io/crates/v/sigchld.svg)](https://crates.io/crates/sigchld) [![docs.rs](https://docs.rs/sigchld/badge.svg)](https://docs.rs/sigchld)
2//!
3//! This crate is a low-level building block for child process management. Unix doesn't provide a
4//! portable API for waiting for a child process to exit _with a timeout_. (Linux has `pidfd`, but
5//! there's no equivalent on e.g. macOS.) The next best thing is waiting for the `SIGCHLD` signal,
6//! but Unix signal handling is complicated and error-prone. This crate implements `SIGCHLD`
7//! handling using [`signal_hook`] internally, for compatibility with other Rust signal handling
8//! libraries. It allows any number of threads to wait for `SIGCHLD` with an optional timeout.
9//!
10//! Note that `SIGCHLD` indicates that _any_ child process has exited, but there's no reliable way
11//! to know _which_ child it was. You generally need to [poll your child process][try_wait] in a
12//! loop, and wait again if it hasn't exited yet. This is still a bit error-prone, and most
13//! applications will prefer a higher-level API that does this loop internally, like
14//! [`shared_child`](https://docs.rs/shared_child) or [`duct`](https://docs.rs/duct).
15//!
16//! This crate only supports Unix and doesn't build on Windows. Portable callers need to put this
17//! crate in the `[target.'cfg(unix)'.dependencies]` section of their `Cargo.toml` and only use it
18//! inside of `#[cfg(unix)]` blocks or similar.
19//!
20//! # Example
21//!
22//! ```rust
23//! # fn main() -> std::io::Result<()> {
24//! # use std::time::Duration;
25//! let mut waiter = sigchld::Waiter::new()?;
26//! // If SIGCHLD arrives after this point, the Waiter will buffer it.
27//! let mut child = std::process::Command::new("sleep").arg("1").spawn()?;
28//! // Block until *any* child exits. See also `wait_timeout` and `wait_deadline`.
29//! waiter.wait()?;
30//! // There's only one child process in this example, so we know that it exited. But in general
31//! // we might not know which child woke us up, and in that case we'd need to wait and check in a
32//! // loop. See the Waiter examples.
33//! assert!(child.try_wait()?.is_some(), "sleep has exited");
34//! # Ok(())
35//! # }
36//! ```
37//!
38//! [`signal_hook`]: https://docs.rs/signal-hook
39//! [try_wait]: https://doc.rust-lang.org/std/process/struct.Child.html#method.try_wait
40//! [`wait_timeout`]: https://docs.rs/shared_child/latest/shared_child/struct.SharedChild.html#method.wait_timeout
41//! [`wait_deadline`]: https://docs.rs/shared_child/latest/shared_child/struct.SharedChild.html#method.wait_deadline
42
43#[cfg(not(unix))]
44compile_error!(
45    "This crate is Unix-only. Put it in [target.'cfg(unix)'.dependencies] in Cargo.toml."
46);
47
48use std::io::{self, ErrorKind, Read};
49use std::os::raw::c_int;
50use std::os::unix::io::AsRawFd;
51use std::time::{Duration, Instant};
52
53// `os_pipe` predates `std::io::pipe`, and they have almost the exact same API. Taking the
54// dependency reduces the MSRV from 1.87 to 1.63 (inherited from `libc`).
55#[cfg(feature = "os_pipe")]
56use os_pipe::{pipe, PipeReader};
57#[cfg(not(feature = "os_pipe"))]
58use std::io::{pipe, PipeReader};
59
60// Use anyhow errors in testing, for backtraces.
61#[cfg(test)]
62type Result<T> = anyhow::Result<T>;
63#[cfg(not(test))]
64type Result<T> = io::Result<T>;
65
66/// An object that buffers `SIGCHLD` signals so that you can wait on them reliably.
67///
68/// `Waiter` can't tell you _which_ process woke you up, so you usually need to wait in a loop and
69/// [poll your `Child`][try_wait] each time through. One way to make sure you don't miss a signal
70/// (and potentially wait forever) is to create a `Waiter` before you spawn your child process,
71/// like this:
72///
73/// ```
74/// # use std::io;
75/// # fn main() -> io::Result<()> {
76/// let mut waiter = sigchld::Waiter::new()?;
77/// // Any SIGCHLD after this point will be buffered by the Waiter.
78/// let mut child = std::process::Command::new("sleep").arg("1").spawn()?;
79/// loop {
80///     waiter.wait()?;
81///     // *Some* child has exited. Check whether it was our child.
82///     if child.try_wait()?.is_some() {
83///         break;
84///     }
85/// }
86/// // Our child has exited.
87/// # Ok(())
88/// # }
89/// ```
90///
91/// If you create a `Waiter` after your child is already running, then you need to poll the child
92/// at least once before you wait:
93///
94/// ```
95/// # use std::io;
96/// # fn main() -> io::Result<()> {
97/// let mut child = std::process::Command::new("sleep").arg("1").spawn()?;
98/// // If SIGCHLD arrives here, before the Waiter is created, we could miss it.
99/// let mut waiter = sigchld::Waiter::new()?;
100/// while child.try_wait()?.is_none() {
101///     // Now we know the child didn't exit before we created the Waiter.
102///     waiter.wait()?;
103/// }
104/// // Our child has exited.
105/// # Ok(())
106/// # }
107/// ```
108///
109/// The following order of operations is broken. You could miss `SIGCHLD` and wait forever:
110///
111/// <div class="warning">
112///
113/// ```no_run
114/// # use std::io;
115/// # fn main() -> io::Result<()> {
116/// let mut child = std::process::Command::new("sleep").arg("1").spawn()?;
117/// // If SIGCHLD arrives now, before the Waiter is created, we could miss it.
118/// let mut waiter = sigchld::Waiter::new()?;
119/// // OOPS: If we missed SIGCHLD, we'll wait forever.
120/// waiter.wait()?;
121/// # Ok(())
122/// # }
123/// ```
124///
125/// </div>
126///
127/// Most applications will prefer higher-level APIs like
128/// [`shared_child`](https://docs.rs/shared_child) or [`duct`](https://docs.rs/duct), where you
129/// don't have to worry about this sort of mistake. This crate is intended more as a building block
130/// for those APIs.
131///
132/// [try_wait]: https://doc.rust-lang.org/std/process/struct.Child.html#method.try_wait
133#[derive(Debug)]
134pub struct Waiter {
135    reader: PipeReader,
136    sig_id: signal_hook::SigId,
137}
138
139impl Waiter {
140    /// Create a `Waiter`.
141    ///
142    /// Any `SIGCHLD` signals that arrive after a `Waiter` is created, but before a call to
143    /// [`wait`](Self::wait), [`wait_timeout`](Self::wait_timeout), or
144    /// [`wait_deadline`](Self::wait_deadline), will be buffered. In that case the next call to one
145    /// of those methods will return immediately. Note that each wait clears the entire buffer, so
146    /// a single wakeup could indicate that multiple signals arrived. In other words, signals can
147    /// be "coalesced".
148    pub fn new() -> Result<Self> {
149        let (reader, writer) = pipe()?;
150        set_nonblocking(&reader)?;
151        set_nonblocking(&writer)?;
152        let sig_id = signal_hook::low_level::pipe::register(libc::SIGCHLD, writer)?;
153        Ok(Self { reader, sig_id })
154    }
155
156    /// Block the current thread until any `SIGCHLD` signal arrives.
157    ///
158    /// If any `SIGCHLD` signals have arrived since the `Waiter` was created, this function will
159    /// return immediately. This avoids a race condition where the child exits right after you call
160    /// [`Child::try_wait`] but right before you call this function.
161    ///
162    /// This function does not reap any exited children. Child process cleanup is only done by
163    /// [`Child::wait`] or [`Child::try_wait`].
164    ///
165    /// This function is not currently susceptible to "spurious wakeups" (i.e. returning early for
166    /// no reason), but this property isn't guaranteed, and future versions might be. Getting woken
167    /// up early by an unrelated child process exiting (e.g. one spawned by some unknown library
168    /// code running on another thread) is similar to a spurious wakeup, and you might need to be
169    /// defensive and wait in a loop either way.
170    ///
171    /// [`Child::wait`]: https://doc.rust-lang.org/std/process/struct.Child.html#method.wait
172    /// [`Child::try_wait`]: https://doc.rust-lang.org/std/process/struct.Child.html#method.try_wait
173    pub fn wait(&mut self) -> Result<()> {
174        let signaled = self.wait_inner(None)?;
175        debug_assert!(signaled, "timeout shouldn't be possible");
176        Ok(())
177    }
178
179    /// Block the current thread until either any `SIGCHLD` signal arrives or a timeout passes.
180    /// Return `true` if a signal arrived before the timeout.
181    ///
182    /// If any `SIGCHLD` signals have arrived since the `Waiter` was created, this function will
183    /// return immediately. This avoids a race condition where the child exits right after you call
184    /// [`Child::try_wait`] but right before you call this function.
185    ///
186    /// This function does not reap any exited children. Child process cleanup is only done by
187    /// [`Child::wait`] or [`Child::try_wait`].
188    ///
189    /// This function is not currently susceptible to "spurious wakeups" (i.e. returning early for
190    /// no reason), but this property isn't guaranteed, and future versions might be. Getting woken
191    /// up early by an unrelated child process exiting (e.g. one spawned by some unknown library
192    /// code running on another thread) is similar to a spurious wakeup, and you might need to be
193    /// defensive and wait in a loop either way.
194    ///
195    /// [`Child::wait`]: https://doc.rust-lang.org/std/process/struct.Child.html#method.wait
196    /// [`Child::try_wait`]: https://doc.rust-lang.org/std/process/struct.Child.html#method.try_wait
197    pub fn wait_timeout(&mut self, timeout: Duration) -> Result<bool> {
198        let deadline = Instant::now() + timeout;
199        self.wait_inner(Some(deadline))
200    }
201
202    /// Block the current thread until either any `SIGCHLD` signal arrives or a deadline passes.
203    /// Return `true` if a signal arrived before the deadline.
204    ///
205    /// If any `SIGCHLD` signals have arrived since the `Waiter` was created, this function will
206    /// return immediately. This avoids a race condition where the child exits right after you call
207    /// [`Child::try_wait`] but right before you call this function.
208    ///
209    /// This function does not reap any exited children. Child process cleanup is only done by
210    /// [`Child::wait`] or [`Child::try_wait`].
211    ///
212    /// This function is not currently susceptible to "spurious wakeups" (i.e. returning early for
213    /// no reason), but this property isn't guaranteed, and future versions might be. Getting woken
214    /// up early by an unrelated child process exiting (e.g. one spawned by some unknown library
215    /// code running on another thread) is similar to a spurious wakeup, and you might need to be
216    /// defensive and wait in a loop either way.
217    ///
218    /// [`Child::wait`]: https://doc.rust-lang.org/std/process/struct.Child.html#method.wait
219    /// [`Child::try_wait`]: https://doc.rust-lang.org/std/process/struct.Child.html#method.try_wait
220    pub fn wait_deadline(&mut self, deadline: Instant) -> Result<bool> {
221        self.wait_inner(Some(deadline))
222    }
223
224    fn wait_inner(&mut self, maybe_deadline: Option<Instant>) -> Result<bool> {
225        // Loop to handle spurious wakeups from poll().
226        loop {
227            // Read the pipe until EWOULDBLOCK. This could take more than one read.
228            let mut buf = [0u8; 1024];
229            let mut signaled = false;
230            loop {
231                match self.reader.read(&mut buf) {
232                    Ok(0) => unreachable!("this pipe should never close"),
233                    Ok(_) => signaled = true,
234                    Err(e) if e.kind() == ErrorKind::WouldBlock => break,
235                    // EINTR shouldn't be possible for a nonblocking read.
236                    #[allow(clippy::useless_conversion)]
237                    Err(e) => return Err(e.into()),
238                }
239            }
240            // If we were signaled, return true.
241            if signaled {
242                return Ok(true);
243            }
244            // If the deadline has passed, return false.
245            if let Some(deadline) = maybe_deadline {
246                if Instant::now() > deadline {
247                    return Ok(false);
248                }
249            }
250            // Use poll() to wait until either the deadline passes or the pipe is readable.
251            let mut poll_fd = libc::pollfd {
252                fd: self.reader.as_raw_fd(),
253                events: libc::POLLIN,
254                revents: 0,
255            };
256            let timeout_ms: c_int = if let Some(deadline) = maybe_deadline {
257                let timeout = deadline.saturating_duration_since(Instant::now());
258                // Convert to milliseconds, rounding *up*. (That way we don't repeatedly sleep for
259                // 0ms when we're close to the timeout.)
260                (timeout.as_nanos().saturating_add(999_999) / 1_000_000)
261                    .try_into()
262                    .unwrap_or(c_int::MAX)
263            } else {
264                -1 // infinite timeout
265            };
266            let poll_return_code = unsafe {
267                libc::poll(
268                    &mut poll_fd, // an "array" of one
269                    1,            // the "array" length
270                    timeout_ms,
271                )
272            };
273            if poll_return_code < 0 {
274                // EINTR is expected here, and the delivery of SIGCHLD usually causes it.
275                let last_error = io::Error::last_os_error();
276                if last_error.kind() != ErrorKind::Interrupted {
277                    #[allow(clippy::useless_conversion)]
278                    return Err(last_error.into());
279                }
280            }
281            // Go back to the top of the loop and try to read again.
282        }
283    }
284}
285
286impl Drop for Waiter {
287    fn drop(&mut self) {
288        let existed = signal_hook::low_level::unregister(self.sig_id);
289        debug_assert!(existed, "should've existed");
290    }
291}
292
293// The standard library doesn't expose set_nonblocking for pipes. Do it the old-fashioned way.
294fn set_nonblocking(fd: &impl AsRawFd) -> Result<()> {
295    unsafe {
296        let return_code = libc::fcntl(fd.as_raw_fd(), libc::F_SETFL, libc::O_NONBLOCK);
297        if return_code == -1 {
298            #[allow(clippy::useless_conversion)]
299            Err(io::Error::last_os_error().into())
300        } else {
301            Ok(())
302        }
303    }
304}
305
306#[cfg(test)]
307mod test {
308    use super::*;
309    use duct::cmd;
310    use std::sync::{Arc, Mutex, MutexGuard};
311    use std::time::{Duration, Instant};
312
313    // We need to make sure only one test runs at a time, because these waits are global, and
314    // they'll confuse each other. Use a parking_lot mutex so that it doesn't get poisoned.
315    //
316    // XXX: These tests don't wait in a loop, because if there are bugs here that cause early
317    // wakeups, I'd rather the tests fail than hide the bug. I expect these tests will "randomly"
318    // fail under certain circumstances, and that's worth it to me to catch more bugs. But real
319    // callers should wait in a loop so that they don't randomly fail.
320    static ONE_TEST_AT_A_TIME: Mutex<()> = Mutex::new(());
321
322    fn lock_no_poison<T>(mutex: &Mutex<T>) -> MutexGuard<'_, T> {
323        match mutex.lock() {
324            Ok(guard) => guard,
325            Err(e) => e.into_inner(),
326        }
327    }
328
329    #[track_caller]
330    fn assert_approx_eq(dur1: Duration, dur2: Duration) {
331        const CLOSE_ENOUGH: f64 = 0.1; // 10%
332        let lower_bound = 1.0 - CLOSE_ENOUGH;
333        let upper_bound = 1.0 + CLOSE_ENOUGH;
334        let ratio = dur1.as_secs_f64() / dur2.as_secs_f64();
335        assert!(
336            lower_bound < ratio && ratio < upper_bound,
337            "{dur1:?} and {dur2:?} are not close enough",
338        );
339    }
340
341    #[test]
342    fn test_wait() -> Result<()> {
343        let _test_guard = lock_no_poison(&ONE_TEST_AT_A_TIME); // see comment on the lock
344        let start = Instant::now();
345
346        let mut waiter = Waiter::new()?;
347        cmd!("sleep", "0.25").start()?;
348        waiter.wait()?;
349        let dur = Instant::now() - start;
350        assert_approx_eq(Duration::from_millis(250), dur);
351
352        Ok(())
353    }
354
355    #[test]
356    fn test_wait_deadline() -> Result<()> {
357        let _test_guard = lock_no_poison(&ONE_TEST_AT_A_TIME); // see comment on the lock
358        let start = Instant::now();
359
360        let timeout = Duration::from_millis(500);
361        let mut waiter = Waiter::new()?;
362        cmd!("sleep", "0.25").start()?;
363        // This first wait should return true.
364        let signaled = waiter.wait_deadline(Instant::now() + timeout)?;
365        let dur = Instant::now() - start;
366        assert_approx_eq(Duration::from_millis(250), dur);
367        assert!(signaled);
368
369        // This second wait should time out and return false.
370        let mut waiter2 = Waiter::new()?;
371        let signaled2 = waiter2.wait_deadline(Instant::now() + timeout)?;
372        let dur2 = Instant::now() - start;
373        assert_approx_eq(Duration::from_millis(750), dur2);
374        assert!(!signaled2);
375
376        Ok(())
377    }
378
379    #[test]
380    fn test_wait_timeout() -> Result<()> {
381        let _test_guard = lock_no_poison(&ONE_TEST_AT_A_TIME); // see comment on the lock
382        let start = Instant::now();
383
384        let timeout = Duration::from_millis(500);
385        let mut waiter = Waiter::new()?;
386        cmd!("sleep", "0.25").start()?;
387        // This first wait should return true.
388        let signaled = waiter.wait_timeout(timeout)?;
389        let dur = Instant::now() - start;
390        assert_approx_eq(Duration::from_millis(250), dur);
391        assert!(signaled);
392
393        // This second wait should time out and return false.
394        let mut waiter2 = Waiter::new()?;
395        let signaled2 = waiter2.wait_timeout(timeout)?;
396        let dur2 = Instant::now() - start;
397        assert_approx_eq(Duration::from_millis(750), dur2);
398        assert!(!signaled2);
399
400        Ok(())
401    }
402
403    #[test]
404    fn test_wait_many_threads() -> Result<()> {
405        let _test_guard = lock_no_poison(&ONE_TEST_AT_A_TIME); // see comment on the lock
406        let start = Instant::now();
407
408        let handle = Arc::new(cmd!("sleep", "1").start()?);
409        let mut wait_threads = Vec::new();
410        let mut short_timeout_threads = Vec::new();
411        let mut long_timeout_threads = Vec::new();
412        for _ in 0..3 {
413            let handle_clone = handle.clone();
414            let mut waiter = Waiter::new()?;
415            wait_threads.push(std::thread::spawn(move || -> Result<Duration> {
416                waiter.wait()?;
417                let dur = Instant::now() - start;
418                assert!(handle_clone.try_wait()?.is_some(), "should've exited");
419                Ok(dur)
420            }));
421            let handle_clone = handle.clone();
422            let mut waiter = Waiter::new()?;
423            short_timeout_threads.push(std::thread::spawn(move || -> Result<bool> {
424                let signaled = waiter.wait_timeout(Duration::from_millis(500))?;
425                assert!(handle_clone.try_wait()?.is_none(), "shouldn't have exited");
426                Ok(signaled)
427            }));
428            let handle_clone = handle.clone();
429            let mut waiter = Waiter::new()?;
430            long_timeout_threads.push(std::thread::spawn(move || -> Result<bool> {
431                let signaled = waiter.wait_timeout(Duration::from_millis(1500))?;
432                assert!(handle_clone.try_wait()?.is_some(), "should've exited");
433                Ok(signaled)
434            }));
435        }
436        for thread in wait_threads {
437            let dur = thread.join().unwrap()?;
438            assert_approx_eq(Duration::from_millis(1000), dur);
439        }
440        for thread in short_timeout_threads {
441            assert!(!thread.join().unwrap()?, "should not be signaled");
442        }
443        for thread in long_timeout_threads {
444            assert!(thread.join().unwrap()?, "should be signaled");
445        }
446
447        Ok(())
448    }
449}