spawn/
handle.rs

1//! The Spawn Handle is produced after consuming a Spawner via `spawn()`. It
2//! mediates access to the child's input, output, error (As long as the
3//! Spawner was configured to hook such descriptors), as well as mediating
4//! signal handling and teardown.
5
6use log::warn;
7use nix::{
8    errno::Errno,
9    sys::{
10        signal::{Signal, kill, killpg, raise},
11        wait::{WaitPidFlag, WaitStatus, waitpid},
12    },
13    unistd::Pid,
14};
15use parking_lot::{Condvar, Mutex, MutexGuard};
16use signal_hook::{consts::signal, iterator::Signals};
17use std::{
18    collections::VecDeque,
19    fs::File,
20    io::{self, Read, Write},
21    os::fd::OwnedFd,
22    sync::Arc,
23    thread::{self, JoinHandle, sleep},
24    time::Duration,
25};
26use thiserror::Error;
27
28/// Errors related to a ProcessHandle
29#[derive(Debug, Error)]
30pub enum Error {
31    /// Errors related to communicating with the process, such as
32    /// when waiting, killing, or sending a signal fails.
33    #[error("Communication error: {0}")]
34    Comm(Errno),
35
36    /// Errors when a Handle's descriptor functions are called, but
37    /// the Spawner made no such descriptors.
38    #[error("No such file was created.")]
39    NoFile,
40
41    /// Errors when no associate has the provided name.
42    #[error("No such associate found: {0}")]
43    NoAssociate(String),
44
45    /// Errors when the Child fails; returned when the Handle's readers
46    /// get strange output from the child.
47    #[error("Error in child process")]
48    Child,
49
50    /// Error when a Handle tries to write to a child standard input, but
51    /// the child no longer exist.
52    #[error("Failed to write to child")]
53    Input,
54
55    /// The parent received a termination signal
56    #[error("Failed to communicate with child.")]
57    Signal,
58
59    /// Error trying to write to standard input.
60    #[error("I/O error: {0}")]
61    Io(#[from] io::Error),
62
63    /// Timeout error
64    #[error("Timeout")]
65    Timeout,
66
67    /// User switching errors.
68    #[error("Failed to switch user: {0}")]
69    User(Errno),
70
71    /// User switching errors.
72    #[error("System Error: {0}")]
73    Errno(#[from] Errno),
74}
75
76/// The shared state between StreamHandle and Worker Thread.
77struct InnerBuffer {
78    /// The current contents from the pipe
79    buffer: VecDeque<u8>,
80
81    /// Whether the Thread is still working.
82    finished: bool,
83}
84
85/// The shared state between thread and handle.
86struct SharedBuffer {
87    state: Mutex<InnerBuffer>,
88    condvar: Condvar,
89}
90
91/// A handle on a process' Output or Error streams.
92/// The Handle can either be used asynchronously to read content as it is filled by the child,
93/// or synchronously by calling `read_all`, which will wait until the child terminates, then
94/// collect all output. For async, you can use `read_line`, or `read` for an exact byte count.
95///
96/// Content pulled with async functions are removed from the handle--it will not be present in `read_all`.
97/// Therefore, you likely want to either use this handle in one of the two modes.
98///
99/// ## Examples
100///
101/// Synchronous.
102/// ```rust
103/// use std::os::fd::{OwnedFd, FromRawFd};
104/// let mut handle = spawn::Stream::new(unsafe {OwnedFd::from_raw_fd(1)});
105/// handle.read_all().unwrap();
106/// ```
107///
108/// Asynchronous.
109/// ```rust
110/// use std::os::fd::{OwnedFd, FromRawFd};
111/// let mut handle = spawn::Stream::new(unsafe {OwnedFd::from_raw_fd(1)});
112/// while let Some(line) = handle.read_line() {
113///     println!("{line}");
114/// }
115/// ```
116pub struct Stream {
117    /// The shared buffer.
118    shared: Arc<SharedBuffer>,
119
120    /// The worker.
121    thread: Option<JoinHandle<()>>,
122}
123
124impl Stream {
125    /// Construct a new StreamHandle from an OwnedFd connected to the child.
126    pub fn new(owned_fd: OwnedFd) -> Self {
127        let mut file = File::from(owned_fd);
128        let shared = Arc::new(SharedBuffer {
129            state: Mutex::new(InnerBuffer {
130                buffer: VecDeque::new(),
131                finished: false,
132            }),
133            condvar: Condvar::new(),
134        });
135
136        let thread_shared = Arc::clone(&shared);
137
138        // Spawn the worker thread.
139        let handle = thread::spawn(move || {
140            let _ = (|| -> io::Result<()> {
141                let mut buf = [0u8; 4096];
142                loop {
143                    let n = file.read(&mut buf)?;
144                    if n == 0 {
145                        break;
146                    }
147                    let mut state = thread_shared.state.lock();
148                    state.buffer.extend(&buf[..n]);
149                    thread_shared.condvar.notify_all();
150                }
151                Ok(())
152            })();
153
154            let mut state = thread_shared.state.lock();
155            state.finished = true;
156            thread_shared.condvar.notify_all();
157        });
158
159        Stream {
160            shared,
161            thread: Some(handle),
162        }
163    }
164
165    /// Drain the current contents of the buffer.
166    fn drain(&self, state: &mut MutexGuard<InnerBuffer>, upto: Option<usize>) -> Vec<u8> {
167        match upto {
168            Some(n) => {
169                if n > state.buffer.len() {
170                    state.buffer.drain(..).collect()
171                } else {
172                    state.buffer.drain(..=n).collect()
173                }
174            }
175            None => state.buffer.drain(..).collect(),
176        }
177    }
178
179    /// Read a line from the stream.
180    /// This function is blocking, and will wait until a full line has been
181    /// written to the stream. The line will then be removed from the Handle.
182    pub fn read_line(&self) -> Option<String> {
183        let mut state = self.shared.state.lock();
184        loop {
185            if let Some(pos) = state.buffer.iter().position(|&b| b == b'\n') {
186                let line = String::from_utf8_lossy(&self.drain(&mut state, Some(pos))).into_owned();
187                return Some(line);
188            }
189
190            if state.finished {
191                if !state.buffer.is_empty() {
192                    let rest = String::from_utf8_lossy(&self.drain(&mut state, None)).into_owned();
193                    return Some(rest);
194                } else {
195                    return None;
196                }
197            }
198            self.shared.condvar.wait(&mut state);
199        }
200    }
201
202    /// Read the exact amount of bytes specified, or else throw an error.
203    /// This function is blocking.
204    pub fn read_bytes(&self, bytes: Option<usize>) -> Result<Vec<u8>, Error> {
205        let mut state = self.shared.state.lock();
206        let mut res = self.drain(&mut state, bytes);
207        while res.is_empty() {
208            self.shared.condvar.wait(&mut state);
209            res = self.drain(&mut state, bytes);
210        }
211        Ok(res)
212    }
213
214    /// Read everything currently in the pipe, blocking.
215    pub fn read_blocking(&mut self) -> Result<String, Error> {
216        self.wait()?;
217        let mut state = self.shared.state.lock();
218        Ok(String::from_utf8_lossy(&self.drain(&mut state, None)).into_owned())
219    }
220
221    /// Read everything currently in the pipe. Not blocking.
222    pub fn read_all(&mut self) -> Result<String, Error> {
223        let mut state = self.shared.state.lock();
224        Ok(String::from_utf8_lossy(&self.drain(&mut state, None)).into_owned())
225    }
226
227    /// Join the worker thread, waiting until the subprocess closes their side of the pipe.
228    pub fn wait(&mut self) -> Result<(), Error> {
229        if let Some(handle) = self.thread.take() {
230            match handle.join() {
231                Ok(_) => Ok(()),
232                Err(_) => Err(Error::Child),
233            }
234        } else {
235            Ok(())
236        }
237    }
238}
239impl Drop for Stream {
240    fn drop(&mut self) {
241        if let Some(handle) = self.thread.take() {
242            let _ = handle.join();
243        }
244    }
245}
246
247/// A handle to a child process created via `Spawner::spawn()`
248/// If input/output/error redirection were setup in the Spawner,
249/// you can use their related functions to access them.
250///
251/// Additionally, if there are other associated handles (Such as an auxiliary
252/// task to the one launched by the handle), you can delegate them as associates
253/// and allow the caller to manage their lifetimes. This allows you to only manage
254/// a single handle, with all its associates being cleanup when it does.
255///
256/// You should never construct a Handle yourself, it should always be returned through
257/// a Spawner.
258pub struct Handle {
259    /// The name of the spawned binary.
260    name: String,
261
262    /// The child PID. Once wait has been called, it is set to None
263    child: Option<Pid>,
264
265    /// The exit code, if the child has exited.
266    exit: i32,
267
268    /// A list of other Pids that the Handle should be responsible for,
269    /// attached to the main child.
270    associated: Vec<Handle>,
271
272    /// The child's standard input.
273    stdin: Option<File>,
274
275    /// The child's standard output.
276    stdout: Option<Stream>,
277
278    /// The child's standard error.
279    stderr: Option<Stream>,
280
281    #[cfg(feature = "user")]
282    mode: user::Mode,
283}
284impl Handle {
285    /// Construct a new `Handle` from a Child PID and pipes
286    pub fn new(
287        name: String,
288        pid: Pid,
289
290        #[cfg(feature = "user")] mode: user::Mode,
291
292        stdin: Option<OwnedFd>,
293        stdout: Option<OwnedFd>,
294        stderr: Option<OwnedFd>,
295        associates: Vec<Handle>,
296    ) -> Self {
297        Self {
298            name,
299            child: Some(pid),
300            exit: -1,
301            stdin: stdin.map(File::from),
302            stdout: stdout.map(Stream::new),
303            stderr: stderr.map(Stream::new),
304            associated: associates,
305
306            #[cfg(feature = "user")]
307            mode,
308        }
309    }
310
311    /// Get the name of the handle.
312    pub fn name(&self) -> &str {
313        &self.name
314    }
315
316    /// Get the pid of the child.
317    pub fn pid(&self) -> &Option<Pid> {
318        &self.child
319    }
320
321    /// Wait for the child to exit, with a timeout in case of no activity.
322    ///
323    /// Note that this function uses a signal handler to ensure it does not
324    /// hang the process, as well as efficiently wait the timeout. You cannot
325    /// use this function in multi-threaded environments.
326    pub fn wait_timeout(mut self, timeout: Duration) -> Result<i32, Error> {
327        if let Some(pid) = self.alive()? {
328            let mut signals = Signals::new([
329                signal::SIGTERM,
330                signal::SIGINT,
331                signal::SIGCHLD,
332                signal::SIGALRM,
333            ])?;
334
335            let _ = thread::spawn(move || {
336                sleep(timeout);
337                let _ = raise(Signal::SIGALRM);
338            });
339
340            'outer: loop {
341                for signal in signals.wait() {
342                    match signal {
343                        signal::SIGCHLD => match waitpid(pid, None) {
344                            Ok(status) => {
345                                self.child = None;
346                                if let WaitStatus::Exited(_, code) = status {
347                                    self.exit = code;
348                                    break 'outer;
349                                }
350                            }
351                            Err(Errno::ECHILD) => {
352                                self.child = None;
353                                self.exit = -1;
354                                break 'outer;
355                            }
356                            Err(e) => return Err(Error::Comm(e)),
357                        },
358                        signal::SIGALRM => return Err(Error::Timeout),
359                        _ => return Err(Error::Signal),
360                    }
361                }
362            }
363
364            // Collect the error code and return
365            self.wait()
366        } else {
367            Ok(self.exit)
368        }
369    }
370
371    /// Wait for the child to exit.
372    ///
373    /// Note that this function uses a signal handler to ensure it does not
374    /// hang the process. You cannot use this function in multi-threaded environments.
375    pub fn wait_and(&mut self) -> Result<i32, Error> {
376        if let Some(pid) = self.alive()? {
377            let mut signals = Signals::new([signal::SIGTERM, signal::SIGINT, signal::SIGCHLD])?;
378            'outer: loop {
379                for signal in signals.wait() {
380                    match signal {
381                        signal::SIGCHLD => match waitpid(pid, None) {
382                            Ok(status) => {
383                                self.child = None;
384                                if let WaitStatus::Exited(_, code) = status {
385                                    self.exit = code;
386                                    break 'outer;
387                                }
388                            }
389                            Err(Errno::ECHILD) => {
390                                self.child = None;
391                                self.exit = -1;
392                                break 'outer;
393                            }
394                            Err(e) => return Err(Error::Comm(e)),
395                        },
396                        _ => return Err(Error::Signal),
397                    }
398                }
399            }
400        }
401        Ok(self.exit)
402    }
403
404    /// Consume the handle and return the exit code of the process.
405    pub fn wait(mut self) -> Result<i32, Error> {
406        self.wait_and()
407    }
408
409    /// Wait for the child without signal handlers.
410    ///
411    /// This function is a thread-safe version of wait, but
412    /// means that signals will not be caught.
413    pub fn wait_blocking(&mut self) -> Result<i32, Error> {
414        if let Some(pid) = self.alive()? {
415            'outer: loop {
416                match waitpid(pid, None) {
417                    Ok(status) => {
418                        self.child = None;
419                        if let WaitStatus::Exited(_, code) = status {
420                            self.exit = code;
421                            break 'outer;
422                        }
423                    }
424                    Err(e) => return Err(Error::Comm(e)),
425                }
426            }
427        }
428        Ok(self.exit)
429    }
430
431    /// Check if the process is still alive, non-blocking.
432    pub fn alive(&mut self) -> Result<Option<Pid>, Error> {
433        if let Some(pid) = self.child {
434            loop {
435                match waitpid(pid, Some(WaitPidFlag::WNOHANG)) {
436                    Ok(WaitStatus::StillAlive) => break Ok(Some(pid)),
437                    Ok(WaitStatus::Exited(_, exit)) => {
438                        self.child.take();
439                        self.exit = exit;
440                        break Ok(None);
441                    }
442                    Ok(WaitStatus::Signaled(_, _, _)) => {
443                        self.child.take();
444                        self.exit = -1;
445                        break Ok(None);
446                    }
447                    Ok(_) => continue,
448                    Err(Errno::ECHILD) => {
449                        self.child = None;
450                        self.exit = -1;
451                        break Ok(None);
452                    }
453                    Err(e) => break Err(Error::Comm(e)),
454                }
455            }
456        } else {
457            Ok(None)
458        }
459    }
460
461    /// Terminate the process with a SIGTERM request, but
462    /// do not consume the Handle.
463    pub fn terminate(&mut self) -> Result<(), Error> {
464        if let Some(pid) = self.alive()? {
465            match self.signal(Signal::SIGTERM) {
466                Ok(_) => {
467                    let _ = waitpid(pid, None);
468                }
469                Err(e) => return Err(e),
470            }
471        }
472        Ok(())
473    }
474
475    /// Send a signal to the child.
476    pub fn signal(&mut self, sig: Signal) -> Result<(), Error> {
477        if let Some(pid) = self.alive()? {
478            #[cfg(feature = "user")]
479            let result = {
480                let mode = self.mode;
481                user::run_as!(mode, kill(pid, sig)).map_err(Error::User)?
482            };
483
484            #[cfg(not(feature = "user"))]
485            let result = kill(pid, sig);
486
487            match result {
488                Ok(_) => Ok(()),
489                Err(Errno::ESRCH) => {
490                    self.child = None;
491                    Ok(())
492                }
493                Err(e) => Err(Error::Comm(e)),
494            }
495        } else {
496            Ok(())
497        }
498    }
499
500    /// Send the signal to the child, and all associated handles.
501    pub fn signal_group(&mut self, sig: Signal) -> Result<(), Error> {
502        if let Some(pid) = self.alive()? {
503            #[cfg(feature = "user")]
504            let result = {
505                let mode = self.mode;
506                user::run_as!(mode, killpg(pid, sig)).map_err(Error::User)?
507            };
508
509            #[cfg(not(feature = "user"))]
510            let result = killpg(pid, sig);
511
512            match result {
513                Ok(_) => Ok(()),
514                Err(Errno::ESRCH) => {
515                    self.child = None;
516                    Ok(())
517                }
518                Err(e) => Err(Error::Comm(e)),
519            }
520        } else {
521            Ok(())
522        }
523    }
524
525    /// Detach the thread from manual cleanup.
526    /// This function does nothing more than move the Pid of the child out of the Handle.
527    /// When the Handle falls out of scope, it will not have a Pid to terminate, so the
528    /// child process will linger.
529    ///
530    /// `Spawner` sets Death Sig to **SIGKILL**, which means that when the parent dies,
531    /// its children are sent **SIGKILL**. This means a detached thread should not
532    /// become a Zombie Process, even if the Pid is dropped on program exit.
533    ///
534    /// You therefore have three options on what to do with the return value of this
535    /// function:
536    ///
537    ///  1. If there was no child to detach, you'll get a None, and do nothing.
538    ///  2. If you want to manage the child yourself (Or associate it with another
539    ///     Handle), capture the value.
540    ///  3. If you want to truly detach it, don't capture the return value. It will
541    ///     run in the background, and will be killed if its still running at
542    ///     program exit.
543    pub fn detach(mut self) -> Option<Pid> {
544        self.child.take()
545    }
546
547    /// Returns a mutable reference to an associate within the Handle, if it exists.
548    /// The associate is another Handle instance.
549    pub fn get_associate(&mut self, name: &str) -> Option<&mut Handle> {
550        self.associated
551            .iter_mut()
552            .find(|handle| handle.name == name)
553    }
554
555    /// Return the Stream associated with the child's standard error, if it exists.
556    /// Note that pulling from the Stream consumes the contents--calling `error_all`
557    /// will only return the contents from when you last pulled from the Stream.
558    pub fn error(&mut self) -> Result<&mut Stream, Error> {
559        if let Some(error) = &mut self.stderr {
560            Ok(error)
561        } else {
562            Err(Error::NoFile)
563        }
564    }
565
566    /// Waits for the child to terminate, then returns its entire standard error.
567    pub fn error_all(mut self) -> Result<String, Error> {
568        self.wait_blocking()?;
569        if let Some(mut error) = self.stderr.take() {
570            error.read_blocking()
571        } else {
572            Err(Error::NoFile)
573        }
574    }
575
576    /// Return the Stream associate with the child's standard output, if it exists.
577    /// Note that pulling from the Stream consumes the contents--calling `output_all`
578    /// will only return the contents from when you last pulled from the Stream.
579    pub fn output(&mut self) -> Result<&mut Stream, Error> {
580        if let Some(output) = &mut self.stdout {
581            Ok(output)
582        } else {
583            Err(Error::NoFile)
584        }
585    }
586
587    /// Waits for the child to terminate, then returns its entire standard output.
588    /// If you need the exit code, use wait() first.
589    pub fn output_all(mut self) -> Result<String, Error> {
590        self.wait_blocking()?;
591        if let Some(mut output) = self.stdout.take() {
592            output.read_blocking()
593        } else {
594            Err(Error::NoFile)
595        }
596    }
597
598    /// Closes the Handle's side of the standard input pipe, if it exists.
599    /// This sends an EOF to the child.
600    pub fn close(&mut self) -> Result<(), Error> {
601        if self.stdin.take().is_some() {
602            Ok(())
603        } else {
604            Err(Error::NoFile)
605        }
606    }
607}
608impl Drop for Handle {
609    fn drop(&mut self) {
610        if let Ok(pid) = self.alive() {
611            if let Some(pid) = pid {
612                match self.signal(Signal::SIGTERM) {
613                    Ok(_) => {
614                        if let Err(e) = waitpid(pid, None) {
615                            warn!("Failed to wait for process {pid}: {e}");
616                        }
617                    }
618                    Err(e) => warn!("Failed to terminate process {pid}: {e}"),
619                }
620            }
621        } else {
622            warn!("Could not communicate with child!")
623        }
624    }
625}
626impl Write for Handle {
627    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
628        match self.stdin.as_mut() {
629            Some(stdin) => stdin.write(buf),
630            None => Err(io::Error::new(io::ErrorKind::BrokenPipe, "stdin is closed")),
631        }
632    }
633
634    fn flush(&mut self) -> io::Result<()> {
635        match self.stdin.as_mut() {
636            Some(stdin) => stdin.flush(),
637            None => Ok(()),
638        }
639    }
640}