use std::{ ffi::OsStr, future::Future, process::{ExitStatus, Stdio}, sync::Arc, time::{Duration, Instant}, }; use futures_core::Stream; use streem::IntoStreamer; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, process::{Child, ChildStdin, Command}, }; use tokio_util::{bytes::Bytes, io::ReaderStream}; use tracing::Instrument; use uuid::Uuid; use crate::{ bytes_stream::BytesStream, error_code::ErrorCode, future::{LocalBoxFuture, WithTimeout}, read::BoxRead, }; struct MetricsGuard { start: Instant, armed: bool, command: Arc, } impl MetricsGuard { fn guard(command: Arc) -> Self { metrics::counter!(crate::init_metrics::PROCESS_START, "command" => command.to_string()) .increment(1); Self { start: Instant::now(), armed: true, command, } } fn disarm(mut self) { self.armed = false; } } impl Drop for MetricsGuard { fn drop(&mut self) { metrics::histogram!( crate::init_metrics::PROCESS_DURATION, "command" => self.command.to_string(), "completed" => (!self.armed).to_string(), ) .record(self.start.elapsed().as_secs_f64()); metrics::counter!(crate::init_metrics::PROCESS_END, "completed" => (!self.armed).to_string() , "command" => self.command.to_string()).increment(1); } } pub(crate) struct Process { command: Arc, child: Child, guard: MetricsGuard, timeout: Duration, extras: Box, id: Uuid, } impl std::fmt::Debug for Process { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Process").field("child", &"Child").finish() } } #[async_trait::async_trait(?Send)] pub(crate) trait Extras { async fn consume(&mut self) -> std::io::Result<()>; } #[async_trait::async_trait(?Send)] impl Extras for () { async fn consume(&mut self) -> std::io::Result<()> { Ok(()) } } #[async_trait::async_trait(?Send)] impl Extras for (Box, T) where T: Extras, { async fn consume(&mut self) -> std::io::Result<()> { let (res1, res2) = tokio::join!(self.0.consume(), self.1.consume()); res1?; res2 } } pub(crate) struct ProcessRead { reader: BoxRead<'static>, handle: LocalBoxFuture<'static, Result<(), ProcessError>>, command: Arc, id: Uuid, extras: Box, } #[derive(Debug, thiserror::Error)] pub(crate) enum ProcessError { #[error("Required command {0} not found, make sure it exists in pict-rs' $PATH")] NotFound(Arc), #[error("Cannot run command {0} due to invalid permissions on binary, make sure the pict-rs user has permission to run it")] PermissionDenied(Arc), #[error("Reached process spawn limit")] LimitReached, #[error("{0} timed out")] Timeout(Arc), #[error("{0} Failed with {1}")] Status(Arc, ExitStatus), #[error("Failed to read stdout for {0}")] Read(Arc, #[source] std::io::Error), #[error("Failed to cleanup for command {0}")] Cleanup(Arc, #[source] std::io::Error), #[error("Unknown process error for command {0}")] Other(Arc, #[source] std::io::Error), } impl ProcessError { pub(crate) const fn error_code(&self) -> ErrorCode { match self { Self::NotFound(_) => ErrorCode::COMMAND_NOT_FOUND, Self::PermissionDenied(_) => ErrorCode::COMMAND_PERMISSION_DENIED, Self::LimitReached | Self::Read(_, _) | Self::Cleanup(_, _) | Self::Other(_, _) => { ErrorCode::COMMAND_ERROR } Self::Timeout(_) => ErrorCode::COMMAND_TIMEOUT, Self::Status(_, _) => ErrorCode::COMMAND_FAILURE, } } pub(crate) fn is_client_error(&self) -> bool { matches!(self, Self::Timeout(_)) } } impl Process { pub(crate) async fn run( command: &str, args: &[T], envs: &[(&str, &OsStr)], timeout: u64, ) -> Result where T: AsRef + std::fmt::Debug, { let command: Arc = Arc::from(String::from(command)); tracing::debug!("{envs:?} {command} {args:?}"); let mut cmd = Command::new(command.as_ref()); cmd.args(args).envs(envs.iter().copied()); let res = Self::spawn(command.clone(), cmd, timeout).await; match res { Ok(this) => Ok(this), Err(e) => match e.kind() { std::io::ErrorKind::NotFound => Err(ProcessError::NotFound(command)), std::io::ErrorKind::PermissionDenied => { Err(ProcessError::PermissionDenied(command)) } std::io::ErrorKind::WouldBlock => Err(ProcessError::LimitReached), _ => Err(ProcessError::Other(command, e)), }, } } async fn spawn(command: Arc, mut cmd: Command, timeout: u64) -> std::io::Result { let guard = MetricsGuard::guard(command.clone()); cmd.stdin(Stdio::piped()) .stdout(Stdio::piped()) .kill_on_drop(true); crate::sync::spawn_blocking("spawn-command", move || cmd.spawn()) .await .expect("spawn panicked") .map(|child| Process { child, command, guard, timeout: Duration::from_secs(timeout), extras: Box::new(()), id: Uuid::now_v7(), }) } pub(crate) fn add_extras(self, extra: impl Extras + 'static) -> Self { Self { extras: Box::new((self.extras, extra)), ..self } } #[tracing::instrument(skip(self), fields(command = %self.command, id = %self.id))] pub(crate) async fn wait(self) -> Result<(), ProcessError> { let Process { command, mut child, guard, timeout, mut extras, id: _, } = self; let res = child.wait().with_timeout(timeout).await; extras .consume() .await .map_err(|e| ProcessError::Cleanup(command.clone(), e))?; match res { Ok(Ok(status)) if status.success() => { guard.disarm(); Ok(()) } Ok(Ok(status)) => Err(ProcessError::Status(command, status)), Ok(Err(e)) => Err(ProcessError::Other(command, e)), Err(_) => { let _ = child.kill().await; Err(ProcessError::Timeout(command)) } } } pub(crate) fn drive_with_stream(self, input: S) -> ProcessRead where S: Stream> + 'static, { self.drive(move |mut stdin| async move { let stream = std::pin::pin!(input); let mut stream = stream.into_streamer(); while let Some(mut bytes) = stream.try_next().await? { match stdin.write_all_buf(&mut bytes).await { Ok(()) => {} Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => break, Err(e) => return Err(e), } crate::sync::cooperate().await; } Ok(()) }) } pub(crate) fn read(self) -> ProcessRead { self.drive(|_| async { Ok(()) }) } #[allow(unknown_lints)] #[allow(clippy::let_with_type_underscore)] #[tracing::instrument(level = "trace", skip_all)] fn drive(self, f: F) -> ProcessRead where F: FnOnce(ChildStdin) -> Fut + 'static, Fut: Future>, { let Process { command, mut child, guard, timeout, extras, id, } = self; let stdin = child.stdin.take().expect("stdin exists"); let stdout = child.stdout.take().expect("stdout exists"); let command2 = command.clone(); let handle = Box::pin(async move { let child_fut = async { (f)(stdin).await?; child.wait().await }; match child_fut.with_timeout(timeout).await { Ok(Ok(status)) if status.success() => { guard.disarm(); Ok(()) } Ok(Ok(status)) => Err(ProcessError::Status(command2, status)), Ok(Err(e)) => Err(ProcessError::Other(command2, e)), Err(_) => { child .kill() .await .map_err(|e| ProcessError::Other(command2.clone(), e))?; Err(ProcessError::Timeout(command2)) } } }); ProcessRead { reader: Box::pin(stdout), handle, command, id, extras, } } } impl ProcessRead { pub(crate) fn new(reader: BoxRead<'static>, command: Arc, id: Uuid) -> Self { Self { reader, handle: Box::pin(async { Ok(()) }), command, id, extras: Box::new(()), } } pub(crate) async fn into_bytes_stream(self) -> Result { let cmd = self.command.clone(); self.with_stdout(move |stdout| { BytesStream::try_from_stream(ReaderStream::with_capacity(stdout, 1024 * 64)) }) .await? .map_err(move |e| ProcessError::Read(cmd, e)) } pub(crate) async fn into_vec(self) -> Result, ProcessError> { let cmd = self.command.clone(); self.with_stdout(move |mut stdout| async move { let mut vec = Vec::new(); stdout .read_to_end(&mut vec) .await .map_err(|e| ProcessError::Read(cmd, e)) .map(move |_| vec) }) .await? } pub(crate) async fn into_string(self) -> Result { let cmd = self.command.clone(); self.with_stdout(move |mut stdout| async move { let mut s = String::new(); stdout .read_to_string(&mut s) .await .map_err(|e| ProcessError::Read(cmd, e)) .map(move |_| s) }) .await? } pub(crate) fn pipe(self, process: Process) -> ProcessRead { let Process { command, mut child, guard, timeout, extras, id, } = process; let mut stdin = child.stdin.take().expect("stdin exists"); let stdout = child.stdout.take().expect("stdout exists"); let command2 = command.clone(); let handle = Box::pin(async move { self.with_stdout(move |mut stdout| async move { let child_fut = async { tokio::io::copy(&mut stdout, &mut stdin).await?; drop(stdout); drop(stdin); child.wait().await }; match child_fut.with_timeout(timeout).await { Ok(Ok(status)) if status.success() => { guard.disarm(); Ok(()) } Ok(Ok(status)) => Err(ProcessError::Status(command2, status)), Ok(Err(e)) => Err(ProcessError::Other(command2, e)), Err(_) => { child .kill() .await .map_err(|e| ProcessError::Other(command2.clone(), e))?; Err(ProcessError::Timeout(command2)) } } }) .await? }); ProcessRead { reader: Box::pin(stdout), handle, command, id, extras, } } pub(crate) async fn with_stdout( self, f: impl FnOnce(BoxRead<'static>) -> Fut, ) -> Result where Fut: Future, { self.with_stdout_inner(f).await } async fn with_stdout_inner( self, f: impl FnOnce(BoxRead<'static>) -> Fut, ) -> Result where Fut: Future, { let Self { reader, handle, command, id, mut extras, } = self; let (out, res) = tokio::join!( (f)(reader).instrument(tracing::info_span!("cmd-reader", %command, %id)), handle.instrument(tracing::info_span!("cmd-handle", %command, %id)) ); extras .consume() .await .map_err(|e| ProcessError::Cleanup(command, e))?; res?; Ok(out) } pub(crate) fn add_extras(self, more_extras: E) -> ProcessRead { let Self { reader, handle, command, id, extras, } = self; Self { reader, handle, command, id, extras: Box::new((extras, more_extras)), } } }