Inline process background future, clean tracing a bit

This commit is contained in:
asonix 2023-12-22 13:12:19 -06:00
parent db43392a3b
commit b94ba5fcfc
10 changed files with 142 additions and 100 deletions

View file

@ -47,7 +47,7 @@ impl Drop for MetricsGuard {
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
#[tracing::instrument(skip(repo, store, hash, process_map, config))] #[tracing::instrument(skip(tmp_dir, repo, store, hash, process_map, config))]
pub(crate) async fn generate<S: Store + 'static>( pub(crate) async fn generate<S: Store + 'static>(
tmp_dir: &TmpDir, tmp_dir: &TmpDir,
repo: &ArcRepo, repo: &ArcRepo,

View file

@ -144,7 +144,7 @@ where
Ok((input_type, identifier, details, state)) Ok((input_type, identifier, details, state))
} }
#[tracing::instrument(skip(repo, store, client, stream, config))] #[tracing::instrument(skip(tmp_dir, repo, store, client, stream, config))]
pub(crate) async fn ingest<S>( pub(crate) async fn ingest<S>(
tmp_dir: &TmpDir, tmp_dir: &TmpDir,
repo: &ArcRepo, repo: &ArcRepo,

View file

@ -866,7 +866,7 @@ async fn not_found_hash(repo: &ArcRepo) -> Result<Option<(Alias, Hash)>, Error>
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
#[tracing::instrument( #[tracing::instrument(
name = "Serving processed image", name = "Serving processed image",
skip(repo, store, client, config, process_map) skip(tmp_dir, repo, store, client, config, process_map)
)] )]
async fn process<S: Store + 'static>( async fn process<S: Store + 'static>(
range: Option<web::Header<Range>>, range: Option<web::Header<Range>>,

View file

@ -14,21 +14,25 @@ use std::{
use tokio::{ use tokio::{
io::{AsyncRead, AsyncWriteExt, ReadBuf}, io::{AsyncRead, AsyncWriteExt, ReadBuf},
process::{Child, ChildStdin, ChildStdout, Command}, process::{Child, ChildStdin, ChildStdout, Command},
task::JoinHandle,
}; };
use tracing::{Instrument, Span}; use tracing::{Instrument, Span};
use uuid::Uuid;
use crate::{error_code::ErrorCode, future::WithTimeout}; use crate::{
error_code::ErrorCode,
future::{LocalBoxFuture, WithTimeout},
sync::DropHandle,
};
struct MetricsGuard { struct MetricsGuard {
start: Instant, start: Instant,
armed: bool, armed: bool,
command: String, command: Arc<str>,
} }
impl MetricsGuard { impl MetricsGuard {
fn guard(command: String) -> Self { fn guard(command: Arc<str>) -> Self {
metrics::increment_counter!("pict-rs.process.start", "command" => command.clone()); metrics::increment_counter!("pict-rs.process.start", "command" => command.to_string());
Self { Self {
start: Instant::now(), start: Instant::now(),
@ -47,11 +51,11 @@ impl Drop for MetricsGuard {
metrics::histogram!( metrics::histogram!(
"pict-rs.process.duration", "pict-rs.process.duration",
self.start.elapsed().as_secs_f64(), self.start.elapsed().as_secs_f64(),
"command" => self.command.clone(), "command" => self.command.to_string(),
"completed" => (!self.armed).to_string(), "completed" => (!self.armed).to_string(),
); );
metrics::increment_counter!("pict-rs.process.end", "completed" => (!self.armed).to_string() , "command" => self.command.clone()); metrics::increment_counter!("pict-rs.process.end", "completed" => (!self.armed).to_string() , "command" => self.command.to_string());
} }
} }
@ -59,10 +63,11 @@ impl Drop for MetricsGuard {
struct StatusError(ExitStatus); struct StatusError(ExitStatus);
pub(crate) struct Process { pub(crate) struct Process {
command: String, command: Arc<str>,
child: Child, child: Child,
guard: MetricsGuard, guard: MetricsGuard,
timeout: Duration, timeout: Duration,
id: Uuid,
} }
impl std::fmt::Debug for Process { impl std::fmt::Debug for Process {
@ -71,10 +76,6 @@ impl std::fmt::Debug for Process {
} }
} }
struct DropHandle {
inner: JoinHandle<std::io::Result<()>>,
}
struct ProcessReadState { struct ProcessReadState {
flags: AtomicU8, flags: AtomicU8,
parent: Mutex<Option<Waker>>, parent: Mutex<Option<Waker>>,
@ -86,29 +87,31 @@ struct ProcessReadWaker {
} }
pub(crate) struct ProcessRead { pub(crate) struct ProcessRead {
inner: ChildStdout, stdout: ChildStdout,
handle: DropHandle, handle: LocalBoxFuture<'static, std::io::Result<()>>,
closed: bool, closed: bool,
state: Arc<ProcessReadState>, state: Arc<ProcessReadState>,
span: Span, span: Option<Span>,
command: Arc<str>,
id: Uuid,
} }
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub(crate) enum ProcessError { pub(crate) enum ProcessError {
#[error("Required command {0} not found, make sure it exists in pict-rs' $PATH")] #[error("Required command {0} not found, make sure it exists in pict-rs' $PATH")]
NotFound(String), NotFound(Arc<str>),
#[error("Cannot run command {0} due to invalid permissions on binary, make sure the pict-rs user has permission to run it")] #[error("Cannot run command {0} due to invalid permissions on binary, make sure the pict-rs user has permission to run it")]
PermissionDenied(String), PermissionDenied(Arc<str>),
#[error("Reached process spawn limit")] #[error("Reached process spawn limit")]
LimitReached, LimitReached,
#[error("{0} timed out")] #[error("{0} timed out")]
Timeout(String), Timeout(Arc<str>),
#[error("{0} Failed with {1}")] #[error("{0} Failed with {1}")]
Status(String, ExitStatus), Status(Arc<str>, ExitStatus),
#[error("Unknown process error")] #[error("Unknown process error")]
Other(#[source] std::io::Error), Other(#[source] std::io::Error),
@ -136,10 +139,14 @@ impl Process {
where where
T: AsRef<OsStr>, T: AsRef<OsStr>,
{ {
let command: Arc<str> = Arc::from(String::from(command));
let res = tracing::trace_span!(parent: None, "Create command", %command).in_scope(|| { let res = tracing::trace_span!(parent: None, "Create command", %command).in_scope(|| {
Self::spawn( Self::spawn(
command, command.clone(),
Command::new(command).args(args).envs(envs.iter().copied()), Command::new(command.as_ref())
.args(args)
.envs(envs.iter().copied()),
timeout, timeout,
) )
}); });
@ -147,9 +154,9 @@ impl Process {
match res { match res {
Ok(this) => Ok(this), Ok(this) => Ok(this),
Err(e) => match e.kind() { Err(e) => match e.kind() {
std::io::ErrorKind::NotFound => Err(ProcessError::NotFound(command.to_string())), std::io::ErrorKind::NotFound => Err(ProcessError::NotFound(command)),
std::io::ErrorKind::PermissionDenied => { std::io::ErrorKind::PermissionDenied => {
Err(ProcessError::PermissionDenied(command.to_string())) Err(ProcessError::PermissionDenied(command))
} }
std::io::ErrorKind::WouldBlock => Err(ProcessError::LimitReached), std::io::ErrorKind::WouldBlock => Err(ProcessError::LimitReached),
_ => Err(ProcessError::Other(e)), _ => Err(ProcessError::Other(e)),
@ -157,9 +164,9 @@ impl Process {
} }
} }
fn spawn(command: &str, cmd: &mut Command, timeout: u64) -> std::io::Result<Self> { fn spawn(command: Arc<str>, cmd: &mut Command, timeout: u64) -> std::io::Result<Self> {
tracing::trace_span!(parent: None, "Spawn command", %command).in_scope(|| { tracing::trace_span!(parent: None, "Spawn command", %command).in_scope(|| {
let guard = MetricsGuard::guard(command.into()); let guard = MetricsGuard::guard(command.clone());
let cmd = cmd let cmd = cmd
.stdin(Stdio::piped()) .stdin(Stdio::piped())
@ -168,20 +175,22 @@ impl Process {
cmd.spawn().map(|child| Process { cmd.spawn().map(|child| Process {
child, child,
command: String::from(command), command,
guard, guard,
timeout: Duration::from_secs(timeout), timeout: Duration::from_secs(timeout),
id: Uuid::now_v7(),
}) })
}) })
} }
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self), fields(command = %self.command, id = %self.id))]
pub(crate) async fn wait(self) -> Result<(), ProcessError> { pub(crate) async fn wait(self) -> Result<(), ProcessError> {
let Process { let Process {
command, command,
mut child, mut child,
guard, guard,
timeout, timeout,
id: _,
} = self; } = self;
let res = child.wait().with_timeout(timeout).await; let res = child.wait().with_timeout(timeout).await;
@ -226,52 +235,44 @@ impl Process {
mut child, mut child,
guard, guard,
timeout, timeout,
id,
} = self; } = self;
let stdin = child.stdin.take().expect("stdin exists"); let stdin = child.stdin.take().expect("stdin exists");
let stdout = child.stdout.take().expect("stdout exists"); let stdout = child.stdout.take().expect("stdout exists");
let background_span = let handle = Box::pin(async move {
tracing::info_span!(parent: None, "Background process task", %command); let child_fut = async {
background_span.follows_from(Span::current()); (f)(stdin).await?;
let span = tracing::info_span!(parent: None, "Foreground process task", %command); child.wait().await
span.follows_from(Span::current()); };
let handle = crate::sync::spawn( let error = match child_fut.with_timeout(timeout).await {
"await-process", Ok(Ok(status)) if status.success() => {
async move { guard.disarm();
let child_fut = async { return Ok(());
(f)(stdin).await?; }
Ok(Ok(status)) => {
std::io::Error::new(std::io::ErrorKind::Other, StatusError(status))
}
Ok(Err(e)) => e,
Err(_) => std::io::ErrorKind::TimedOut.into(),
};
child.wait().await child.kill().await?;
};
let error = match child_fut.with_timeout(timeout).await { Err(error)
Ok(Ok(status)) if status.success() => { });
guard.disarm();
return Ok(());
}
Ok(Ok(status)) => {
std::io::Error::new(std::io::ErrorKind::Other, StatusError(status))
}
Ok(Err(e)) => e,
Err(_) => std::io::ErrorKind::TimedOut.into(),
};
child.kill().await?;
Err(error)
}
.instrument(background_span),
);
ProcessRead { ProcessRead {
inner: stdout, stdout,
handle: DropHandle { inner: handle }, handle,
closed: false, closed: false,
state: ProcessReadState::new_woken(), state: ProcessReadState::new_woken(),
span, span: None,
command,
id,
} }
} }
} }
@ -345,14 +346,19 @@ impl AsyncRead for ProcessRead {
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>, buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> { ) -> Poll<std::io::Result<()>> {
let span = self.span.clone(); let command = self.command.clone();
let id = self.id;
let span = self
.span
.get_or_insert_with(|| tracing::info_span!("process task", %command, %id))
.clone();
let guard = span.enter(); let guard = span.enter();
let value = loop { let value = loop {
// always poll for bytes when poll_read is called // always poll for bytes when poll_read is called
let before_size = buf.filled().len(); let before_size = buf.filled().len();
if let Poll::Ready(res) = Pin::new(&mut self.inner).poll_read(cx, buf) { if let Poll::Ready(res) = Pin::new(&mut self.stdout).poll_read(cx, buf) {
if let Err(e) = res { if let Err(e) = res {
self.closed = true; self.closed = true;
@ -368,11 +374,10 @@ impl AsyncRead for ProcessRead {
// only poll handle if we've been explicitly woken // only poll handle if we've been explicitly woken
let mut handle_cx = Context::from_waker(&waker); let mut handle_cx = Context::from_waker(&waker);
if let Poll::Ready(res) = Pin::new(&mut self.handle.inner).poll(&mut handle_cx) { if let Poll::Ready(res) = Pin::new(&mut self.handle).poll(&mut handle_cx) {
let error = match res { let error = match res {
Ok(Ok(())) => continue, Ok(()) => continue,
Ok(Err(e)) => e, Err(e) => e,
Err(e) => std::io::Error::new(std::io::ErrorKind::Other, e),
}; };
self.closed = true; self.closed = true;
@ -398,12 +403,6 @@ impl AsyncRead for ProcessRead {
} }
} }
impl Drop for DropHandle {
fn drop(&mut self) {
self.inner.abort();
}
}
impl Wake for ProcessReadWaker { impl Wake for ProcessReadWaker {
fn wake(self: Arc<Self>) { fn wake(self: Arc<Self>) {
match Arc::try_unwrap(self) { match Arc::try_unwrap(self) {

View file

@ -236,7 +236,7 @@ where
let span = tracing::info_span!(parent: current_span, "error-boundary"); let span = tracing::info_span!(parent: current_span, "error-boundary");
let res = crate::sync::spawn( let res = crate::sync::abort_on_drop(crate::sync::spawn(
"prune-missing", "prune-missing",
async move { async move {
let mut count = count; let mut count = count;
@ -261,7 +261,7 @@ where
Ok(count) as Result<u64, Error> Ok(count) as Result<u64, Error>
} }
.instrument(span), .instrument(span),
) ))
.await; .await;
match res { match res {

View file

@ -1,5 +1,6 @@
use reqwest_middleware::ClientWithMiddleware; use reqwest_middleware::ClientWithMiddleware;
use time::Instant; use time::Instant;
use tracing::{Instrument, Span};
use crate::{ use crate::{
concurrent_processor::ProcessMap, concurrent_processor::ProcessMap,
@ -135,23 +136,30 @@ where
let repo = repo.clone(); let repo = repo.clone();
let client = client.clone(); let client = client.clone();
let current_span = Span::current();
let span = tracing::info_span!(parent: current_span, "error_boundary");
let config = config.clone(); let config = config.clone();
let error_boundary = crate::sync::spawn("ingest-media", async move { let error_boundary = crate::sync::abort_on_drop(crate::sync::spawn(
let stream = crate::stream::from_err(store2.to_stream(&ident, None, None).await?); "ingest-media",
async move {
let stream = crate::stream::from_err(store2.to_stream(&ident, None, None).await?);
let session = crate::ingest::ingest( let session = crate::ingest::ingest(
&tmp_dir, &tmp_dir,
&repo, &repo,
&store2, &store2,
&client, &client,
stream, stream,
declared_alias, declared_alias,
&config, &config,
) )
.await?; .await?;
Ok(session) as Result<Session, Error> Ok(session) as Result<Session, Error>
}) }
.instrument(span),
))
.await; .await;
store.remove(&unprocessed_identifier).await?; store.remove(&unprocessed_identifier).await?;

View file

@ -33,6 +33,7 @@ use crate::{
future::{WithMetrics, WithTimeout}, future::{WithMetrics, WithTimeout},
serde_str::Serde, serde_str::Serde,
stream::LocalBoxStream, stream::LocalBoxStream,
sync::DropHandle,
}; };
use self::job_status::JobStatus; use self::job_status::JobStatus;
@ -49,7 +50,7 @@ use super::{
pub(crate) struct PostgresRepo { pub(crate) struct PostgresRepo {
inner: Arc<Inner>, inner: Arc<Inner>,
#[allow(dead_code)] #[allow(dead_code)]
notifications: Arc<tokio::task::JoinHandle<()>>, notifications: Arc<DropHandle<()>>,
} }
struct Inner { struct Inner {
@ -151,7 +152,7 @@ impl PostgresRepo {
.await .await
.map_err(ConnectPostgresError::ConnectForMigration)?; .map_err(ConnectPostgresError::ConnectForMigration)?;
let handle = crate::sync::spawn("postgres-migrations", conn); let handle = crate::sync::abort_on_drop(crate::sync::spawn("postgres-migrations", conn));
embedded::migrations::runner() embedded::migrations::runner()
.run_async(&mut client) .run_async(&mut client)
@ -199,10 +200,10 @@ impl PostgresRepo {
upload_notifications: DashMap::new(), upload_notifications: DashMap::new(),
}); });
let handle = crate::sync::spawn( let handle = crate::sync::abort_on_drop(crate::sync::spawn(
"postgres-delegate-notifications", "postgres-delegate-notifications",
delegate_notifications(rx, inner.clone(), parallelism * 8), delegate_notifications(rx, inner.clone(), parallelism * 8),
); ));
let notifications = Arc::new(handle); let notifications = Arc::new(handle);

View file

@ -298,7 +298,7 @@ impl Store for ObjectStore {
let object_id2 = object_id.clone(); let object_id2 = object_id.clone();
let upload_id2 = upload_id.to_string(); let upload_id2 = upload_id.to_string();
let handle = crate::sync::spawn( let handle = crate::sync::abort_on_drop(crate::sync::spawn(
"upload-multipart-part", "upload-multipart-part",
async move { async move {
let response = this let response = this
@ -333,7 +333,7 @@ impl Store for ObjectStore {
Ok(etag) as Result<String, StoreError> Ok(etag) as Result<String, StoreError>
} }
.instrument(tracing::Span::current()), .instrument(tracing::Span::current()),
); ));
futures.push(handle); futures.push(handle);
} }

View file

@ -30,7 +30,7 @@ where
{ {
let (tx, rx) = crate::sync::channel(1); let (tx, rx) = crate::sync::channel(1);
let handle = crate::sync::spawn("send-stream", async move { let handle = crate::sync::abort_on_drop(crate::sync::spawn("send-stream", async move {
let stream = std::pin::pin!(stream); let stream = std::pin::pin!(stream);
let mut streamer = stream.into_streamer(); let mut streamer = stream.into_streamer();
@ -39,7 +39,7 @@ where
break; break;
} }
} }
}); }));
streem::from_fn(|yiedler| async move { streem::from_fn(|yiedler| async move {
let mut stream = rx.into_stream().into_streamer(); let mut stream = rx.into_stream().into_streamer();

View file

@ -1,6 +1,40 @@
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{Notify, Semaphore}; use tokio::{
sync::{Notify, Semaphore},
task::JoinHandle,
};
pub(crate) struct DropHandle<T> {
handle: JoinHandle<T>,
}
pub(crate) fn abort_on_drop<T>(handle: JoinHandle<T>) -> DropHandle<T> {
DropHandle { handle }
}
impl<T> DropHandle<T> {
pub(crate) fn abort(&self) {
self.handle.abort();
}
}
impl<T> Drop for DropHandle<T> {
fn drop(&mut self) {
self.handle.abort();
}
}
impl<T> std::future::Future for DropHandle<T> {
type Output = <JoinHandle<T> as std::future::Future>::Output;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
std::pin::Pin::new(&mut self.handle).poll(cx)
}
}
#[track_caller] #[track_caller]
pub(crate) fn channel<T>(bound: usize) -> (flume::Sender<T>, flume::Receiver<T>) { pub(crate) fn channel<T>(bound: usize) -> (flume::Sender<T>, flume::Receiver<T>) {