spawn/handle.rs
1//! The Spawn Handle is produced after consuming a Spawner via `spawn()`. It
2//! mediates access to the child's input, output, error (As long as the
3//! Spawner was configured to hook such descriptors), as well as mediating
4//! signal handling and teardown.
5
6use log::warn;
7use nix::{
8 errno::Errno,
9 sys::{
10 signal::{Signal, kill, killpg, raise},
11 wait::{WaitPidFlag, WaitStatus, waitpid},
12 },
13 unistd::Pid,
14};
15use parking_lot::{Condvar, Mutex, MutexGuard};
16use signal_hook::{consts::signal, iterator::Signals};
17use std::{
18 collections::VecDeque,
19 fs::File,
20 io::{self, Read, Write},
21 os::fd::OwnedFd,
22 sync::Arc,
23 thread::{self, JoinHandle, sleep},
24 time::Duration,
25};
26use thiserror::Error;
27
28/// Errors related to a ProcessHandle
29#[derive(Debug, Error)]
30pub enum Error {
31 /// Errors related to communicating with the process, such as
32 /// when waiting, killing, or sending a signal fails.
33 #[error("Communication error: {0}")]
34 Comm(Errno),
35
36 /// Errors when a Handle's descriptor functions are called, but
37 /// the Spawner made no such descriptors.
38 #[error("No such file was created.")]
39 NoFile,
40
41 /// Errors when no associate has the provided name.
42 #[error("No such associate found: {0}")]
43 NoAssociate(String),
44
45 /// Errors when the Child fails; returned when the Handle's readers
46 /// get strange output from the child.
47 #[error("Error in child process")]
48 Child,
49
50 /// Error when a Handle tries to write to a child standard input, but
51 /// the child no longer exist.
52 #[error("Failed to write to child")]
53 Input,
54
55 /// The parent received a termination signal
56 #[error("Failed to communicate with child.")]
57 Signal,
58
59 /// Error trying to write to standard input.
60 #[error("I/O error: {0}")]
61 Io(#[from] io::Error),
62
63 /// Timeout error
64 #[error("Timeout")]
65 Timeout,
66
67 /// User switching errors.
68 #[error("Failed to switch user: {0}")]
69 User(Errno),
70
71 /// User switching errors.
72 #[error("System Error: {0}")]
73 Errno(#[from] Errno),
74}
75
76/// The shared state between StreamHandle and Worker Thread.
77struct InnerBuffer {
78 /// The current contents from the pipe
79 buffer: VecDeque<u8>,
80
81 /// Whether the Thread is still working.
82 finished: bool,
83}
84
85/// The shared state between thread and handle.
86struct SharedBuffer {
87 state: Mutex<InnerBuffer>,
88 condvar: Condvar,
89}
90
91/// A handle on a process' Output or Error streams.
92/// The Handle can either be used asynchronously to read content as it is filled by the child,
93/// or synchronously by calling `read_all`, which will wait until the child terminates, then
94/// collect all output. For async, you can use `read_line`, or `read` for an exact byte count.
95///
96/// Content pulled with async functions are removed from the handle--it will not be present in `read_all`.
97/// Therefore, you likely want to either use this handle in one of the two modes.
98///
99/// ## Examples
100///
101/// Synchronous.
102/// ```rust
103/// use std::os::fd::{OwnedFd, FromRawFd};
104/// let mut handle = spawn::Stream::new(unsafe {OwnedFd::from_raw_fd(1)});
105/// handle.read_all().unwrap();
106/// ```
107///
108/// Asynchronous.
109/// ```rust
110/// use std::os::fd::{OwnedFd, FromRawFd};
111/// let mut handle = spawn::Stream::new(unsafe {OwnedFd::from_raw_fd(1)});
112/// while let Some(line) = handle.read_line() {
113/// println!("{line}");
114/// }
115/// ```
116pub struct Stream {
117 /// The shared buffer.
118 shared: Arc<SharedBuffer>,
119
120 /// The worker.
121 thread: Option<JoinHandle<()>>,
122}
123
124impl Stream {
125 /// Construct a new StreamHandle from an OwnedFd connected to the child.
126 pub fn new(owned_fd: OwnedFd) -> Self {
127 let mut file = File::from(owned_fd);
128 let shared = Arc::new(SharedBuffer {
129 state: Mutex::new(InnerBuffer {
130 buffer: VecDeque::new(),
131 finished: false,
132 }),
133 condvar: Condvar::new(),
134 });
135
136 let thread_shared = Arc::clone(&shared);
137
138 // Spawn the worker thread.
139 let handle = thread::spawn(move || {
140 let _ = (|| -> io::Result<()> {
141 let mut buf = [0u8; 4096];
142 loop {
143 let n = file.read(&mut buf)?;
144 if n == 0 {
145 break;
146 }
147 let mut state = thread_shared.state.lock();
148 state.buffer.extend(&buf[..n]);
149 thread_shared.condvar.notify_all();
150 }
151 Ok(())
152 })();
153
154 let mut state = thread_shared.state.lock();
155 state.finished = true;
156 thread_shared.condvar.notify_all();
157 });
158
159 Stream {
160 shared,
161 thread: Some(handle),
162 }
163 }
164
165 /// Drain the current contents of the buffer.
166 fn drain(&self, state: &mut MutexGuard<InnerBuffer>, upto: Option<usize>) -> Vec<u8> {
167 match upto {
168 Some(n) => {
169 if n > state.buffer.len() {
170 state.buffer.drain(..).collect()
171 } else {
172 state.buffer.drain(..=n).collect()
173 }
174 }
175 None => state.buffer.drain(..).collect(),
176 }
177 }
178
179 /// Read a line from the stream.
180 /// This function is blocking, and will wait until a full line has been
181 /// written to the stream. The line will then be removed from the Handle.
182 pub fn read_line(&self) -> Option<String> {
183 let mut state = self.shared.state.lock();
184 loop {
185 if let Some(pos) = state.buffer.iter().position(|&b| b == b'\n') {
186 let line = String::from_utf8_lossy(&self.drain(&mut state, Some(pos))).into_owned();
187 return Some(line);
188 }
189
190 if state.finished {
191 if !state.buffer.is_empty() {
192 let rest = String::from_utf8_lossy(&self.drain(&mut state, None)).into_owned();
193 return Some(rest);
194 } else {
195 return None;
196 }
197 }
198 self.shared.condvar.wait(&mut state);
199 }
200 }
201
202 /// Read the exact amount of bytes specified, or else throw an error.
203 /// This function is blocking.
204 pub fn read_bytes(&self, bytes: Option<usize>) -> Result<Vec<u8>, Error> {
205 let mut state = self.shared.state.lock();
206 let mut res = self.drain(&mut state, bytes);
207 while res.is_empty() {
208 self.shared.condvar.wait(&mut state);
209 res = self.drain(&mut state, bytes);
210 }
211 Ok(res)
212 }
213
214 /// Read everything currently in the pipe, blocking.
215 pub fn read_blocking(&mut self) -> Result<String, Error> {
216 self.wait()?;
217 let mut state = self.shared.state.lock();
218 Ok(String::from_utf8_lossy(&self.drain(&mut state, None)).into_owned())
219 }
220
221 /// Read everything currently in the pipe. Not blocking.
222 pub fn read_all(&mut self) -> Result<String, Error> {
223 let mut state = self.shared.state.lock();
224 Ok(String::from_utf8_lossy(&self.drain(&mut state, None)).into_owned())
225 }
226
227 /// Join the worker thread, waiting until the subprocess closes their side of the pipe.
228 pub fn wait(&mut self) -> Result<(), Error> {
229 if let Some(handle) = self.thread.take() {
230 match handle.join() {
231 Ok(_) => Ok(()),
232 Err(_) => Err(Error::Child),
233 }
234 } else {
235 Ok(())
236 }
237 }
238}
239impl Drop for Stream {
240 fn drop(&mut self) {
241 if let Some(handle) = self.thread.take() {
242 let _ = handle.join();
243 }
244 }
245}
246
247/// A handle to a child process created via `Spawner::spawn()`
248/// If input/output/error redirection were setup in the Spawner,
249/// you can use their related functions to access them.
250///
251/// Additionally, if there are other associated handles (Such as an auxiliary
252/// task to the one launched by the handle), you can delegate them as associates
253/// and allow the caller to manage their lifetimes. This allows you to only manage
254/// a single handle, with all its associates being cleanup when it does.
255///
256/// You should never construct a Handle yourself, it should always be returned through
257/// a Spawner.
258pub struct Handle {
259 /// The name of the spawned binary.
260 name: String,
261
262 /// The child PID. Once wait has been called, it is set to None
263 child: Option<Pid>,
264
265 /// The exit code, if the child has exited.
266 exit: i32,
267
268 /// A list of other Pids that the Handle should be responsible for,
269 /// attached to the main child.
270 associated: Vec<Handle>,
271
272 /// The child's standard input.
273 stdin: Option<File>,
274
275 /// The child's standard output.
276 stdout: Option<Stream>,
277
278 /// The child's standard error.
279 stderr: Option<Stream>,
280
281 #[cfg(feature = "user")]
282 mode: user::Mode,
283}
284impl Handle {
285 /// Construct a new `Handle` from a Child PID and pipes
286 pub fn new(
287 name: String,
288 pid: Pid,
289
290 #[cfg(feature = "user")] mode: user::Mode,
291
292 stdin: Option<OwnedFd>,
293 stdout: Option<OwnedFd>,
294 stderr: Option<OwnedFd>,
295 associates: Vec<Handle>,
296 ) -> Self {
297 Self {
298 name,
299 child: Some(pid),
300 exit: -1,
301 stdin: stdin.map(File::from),
302 stdout: stdout.map(Stream::new),
303 stderr: stderr.map(Stream::new),
304 associated: associates,
305
306 #[cfg(feature = "user")]
307 mode,
308 }
309 }
310
311 /// Get the name of the handle.
312 pub fn name(&self) -> &str {
313 &self.name
314 }
315
316 /// Get the pid of the child.
317 pub fn pid(&self) -> &Option<Pid> {
318 &self.child
319 }
320
321 /// Wait for the child to exit, with a timeout in case of no activity.
322 ///
323 /// Note that this function uses a signal handler to ensure it does not
324 /// hang the process, as well as efficiently wait the timeout. You cannot
325 /// use this function in multi-threaded environments.
326 pub fn wait_timeout(mut self, timeout: Duration) -> Result<i32, Error> {
327 if let Some(pid) = self.alive()? {
328 let mut signals = Signals::new([
329 signal::SIGTERM,
330 signal::SIGINT,
331 signal::SIGCHLD,
332 signal::SIGALRM,
333 ])?;
334
335 let _ = thread::spawn(move || {
336 sleep(timeout);
337 let _ = raise(Signal::SIGALRM);
338 });
339
340 'outer: loop {
341 for signal in signals.wait() {
342 match signal {
343 signal::SIGCHLD => match waitpid(pid, None) {
344 Ok(status) => {
345 self.child = None;
346 if let WaitStatus::Exited(_, code) = status {
347 self.exit = code;
348 break 'outer;
349 }
350 }
351 Err(Errno::ECHILD) => {
352 self.child = None;
353 self.exit = -1;
354 break 'outer;
355 }
356 Err(e) => return Err(Error::Comm(e)),
357 },
358 signal::SIGALRM => return Err(Error::Timeout),
359 _ => return Err(Error::Signal),
360 }
361 }
362 }
363
364 // Collect the error code and return
365 self.wait()
366 } else {
367 Ok(self.exit)
368 }
369 }
370
371 /// Wait for the child to exit.
372 ///
373 /// Note that this function uses a signal handler to ensure it does not
374 /// hang the process. You cannot use this function in multi-threaded environments.
375 pub fn wait_and(&mut self) -> Result<i32, Error> {
376 if let Some(pid) = self.alive()? {
377 let mut signals = Signals::new([signal::SIGTERM, signal::SIGINT, signal::SIGCHLD])?;
378 'outer: loop {
379 for signal in signals.wait() {
380 match signal {
381 signal::SIGCHLD => match waitpid(pid, None) {
382 Ok(status) => {
383 self.child = None;
384 if let WaitStatus::Exited(_, code) = status {
385 self.exit = code;
386 break 'outer;
387 }
388 }
389 Err(Errno::ECHILD) => {
390 self.child = None;
391 self.exit = -1;
392 break 'outer;
393 }
394 Err(e) => return Err(Error::Comm(e)),
395 },
396 _ => return Err(Error::Signal),
397 }
398 }
399 }
400 }
401 Ok(self.exit)
402 }
403
404 /// Consume the handle and return the exit code of the process.
405 pub fn wait(mut self) -> Result<i32, Error> {
406 self.wait_and()
407 }
408
409 /// Wait for the child without signal handlers.
410 ///
411 /// This function is a thread-safe version of wait, but
412 /// means that signals will not be caught.
413 pub fn wait_blocking(&mut self) -> Result<i32, Error> {
414 if let Some(pid) = self.alive()? {
415 'outer: loop {
416 match waitpid(pid, None) {
417 Ok(status) => {
418 self.child = None;
419 if let WaitStatus::Exited(_, code) = status {
420 self.exit = code;
421 break 'outer;
422 }
423 }
424 Err(e) => return Err(Error::Comm(e)),
425 }
426 }
427 }
428 Ok(self.exit)
429 }
430
431 /// Check if the process is still alive, non-blocking.
432 pub fn alive(&mut self) -> Result<Option<Pid>, Error> {
433 if let Some(pid) = self.child {
434 loop {
435 match waitpid(pid, Some(WaitPidFlag::WNOHANG)) {
436 Ok(WaitStatus::StillAlive) => break Ok(Some(pid)),
437 Ok(WaitStatus::Exited(_, exit)) => {
438 self.child.take();
439 self.exit = exit;
440 break Ok(None);
441 }
442 Ok(WaitStatus::Signaled(_, _, _)) => {
443 self.child.take();
444 self.exit = -1;
445 break Ok(None);
446 }
447 Ok(_) => continue,
448 Err(Errno::ECHILD) => {
449 self.child = None;
450 self.exit = -1;
451 break Ok(None);
452 }
453 Err(e) => break Err(Error::Comm(e)),
454 }
455 }
456 } else {
457 Ok(None)
458 }
459 }
460
461 /// Terminate the process with a SIGTERM request, but
462 /// do not consume the Handle.
463 pub fn terminate(&mut self) -> Result<(), Error> {
464 if let Some(pid) = self.alive()? {
465 match self.signal(Signal::SIGTERM) {
466 Ok(_) => {
467 let _ = waitpid(pid, None);
468 }
469 Err(e) => return Err(e),
470 }
471 }
472 Ok(())
473 }
474
475 /// Send a signal to the child.
476 pub fn signal(&mut self, sig: Signal) -> Result<(), Error> {
477 if let Some(pid) = self.alive()? {
478 #[cfg(feature = "user")]
479 let result = {
480 let mode = self.mode;
481 user::run_as!(mode, kill(pid, sig)).map_err(Error::User)?
482 };
483
484 #[cfg(not(feature = "user"))]
485 let result = kill(pid, sig);
486
487 match result {
488 Ok(_) => Ok(()),
489 Err(Errno::ESRCH) => {
490 self.child = None;
491 Ok(())
492 }
493 Err(e) => Err(Error::Comm(e)),
494 }
495 } else {
496 Ok(())
497 }
498 }
499
500 /// Send the signal to the child, and all associated handles.
501 pub fn signal_group(&mut self, sig: Signal) -> Result<(), Error> {
502 if let Some(pid) = self.alive()? {
503 #[cfg(feature = "user")]
504 let result = {
505 let mode = self.mode;
506 user::run_as!(mode, killpg(pid, sig)).map_err(Error::User)?
507 };
508
509 #[cfg(not(feature = "user"))]
510 let result = killpg(pid, sig);
511
512 match result {
513 Ok(_) => Ok(()),
514 Err(Errno::ESRCH) => {
515 self.child = None;
516 Ok(())
517 }
518 Err(e) => Err(Error::Comm(e)),
519 }
520 } else {
521 Ok(())
522 }
523 }
524
525 /// Detach the thread from manual cleanup.
526 /// This function does nothing more than move the Pid of the child out of the Handle.
527 /// When the Handle falls out of scope, it will not have a Pid to terminate, so the
528 /// child process will linger.
529 ///
530 /// `Spawner` sets Death Sig to **SIGKILL**, which means that when the parent dies,
531 /// its children are sent **SIGKILL**. This means a detached thread should not
532 /// become a Zombie Process, even if the Pid is dropped on program exit.
533 ///
534 /// You therefore have three options on what to do with the return value of this
535 /// function:
536 ///
537 /// 1. If there was no child to detach, you'll get a None, and do nothing.
538 /// 2. If you want to manage the child yourself (Or associate it with another
539 /// Handle), capture the value.
540 /// 3. If you want to truly detach it, don't capture the return value. It will
541 /// run in the background, and will be killed if its still running at
542 /// program exit.
543 pub fn detach(mut self) -> Option<Pid> {
544 self.child.take()
545 }
546
547 /// Returns a mutable reference to an associate within the Handle, if it exists.
548 /// The associate is another Handle instance.
549 pub fn get_associate(&mut self, name: &str) -> Option<&mut Handle> {
550 self.associated
551 .iter_mut()
552 .find(|handle| handle.name == name)
553 }
554
555 /// Return the Stream associated with the child's standard error, if it exists.
556 /// Note that pulling from the Stream consumes the contents--calling `error_all`
557 /// will only return the contents from when you last pulled from the Stream.
558 pub fn error(&mut self) -> Result<&mut Stream, Error> {
559 if let Some(error) = &mut self.stderr {
560 Ok(error)
561 } else {
562 Err(Error::NoFile)
563 }
564 }
565
566 /// Waits for the child to terminate, then returns its entire standard error.
567 pub fn error_all(mut self) -> Result<String, Error> {
568 self.wait_blocking()?;
569 if let Some(mut error) = self.stderr.take() {
570 error.read_blocking()
571 } else {
572 Err(Error::NoFile)
573 }
574 }
575
576 /// Return the Stream associate with the child's standard output, if it exists.
577 /// Note that pulling from the Stream consumes the contents--calling `output_all`
578 /// will only return the contents from when you last pulled from the Stream.
579 pub fn output(&mut self) -> Result<&mut Stream, Error> {
580 if let Some(output) = &mut self.stdout {
581 Ok(output)
582 } else {
583 Err(Error::NoFile)
584 }
585 }
586
587 /// Waits for the child to terminate, then returns its entire standard output.
588 /// If you need the exit code, use wait() first.
589 pub fn output_all(mut self) -> Result<String, Error> {
590 self.wait_blocking()?;
591 if let Some(mut output) = self.stdout.take() {
592 output.read_blocking()
593 } else {
594 Err(Error::NoFile)
595 }
596 }
597
598 /// Closes the Handle's side of the standard input pipe, if it exists.
599 /// This sends an EOF to the child.
600 pub fn close(&mut self) -> Result<(), Error> {
601 if self.stdin.take().is_some() {
602 Ok(())
603 } else {
604 Err(Error::NoFile)
605 }
606 }
607}
608impl Drop for Handle {
609 fn drop(&mut self) {
610 if let Ok(pid) = self.alive() {
611 if let Some(pid) = pid {
612 match self.signal(Signal::SIGTERM) {
613 Ok(_) => {
614 if let Err(e) = waitpid(pid, None) {
615 warn!("Failed to wait for process {pid}: {e}");
616 }
617 }
618 Err(e) => warn!("Failed to terminate process {pid}: {e}"),
619 }
620 }
621 } else {
622 warn!("Could not communicate with child!")
623 }
624 }
625}
626impl Write for Handle {
627 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
628 match self.stdin.as_mut() {
629 Some(stdin) => stdin.write(buf),
630 None => Err(io::Error::new(io::ErrorKind::BrokenPipe, "stdin is closed")),
631 }
632 }
633
634 fn flush(&mut self) -> io::Result<()> {
635 match self.stdin.as_mut() {
636 Some(stdin) => stdin.flush(),
637 None => Ok(()),
638 }
639 }
640}