From a43de122f9050b8ef7a978f7a6a3b792b7caa5b2 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 3 Sep 2023 21:30:47 -0500 Subject: [PATCH] postgres: add already-claimed case, general: tracing paranoia --- src/backgrounded.rs | 28 ++++------ src/file.rs | 6 +- src/ingest.rs | 42 ++++++-------- src/ingest/hasher.rs | 7 +-- src/lib.rs | 59 ++++++++++---------- src/migrate_store.rs | 4 +- src/process.rs | 54 +++++++++--------- src/queue/process.rs | 2 +- src/repo/postgres.rs | 113 ++++++++++++++++++++++---------------- src/repo/sled.rs | 34 +++++------- src/repo_04/sled.rs | 4 +- src/store/object_store.rs | 2 +- src/stream.rs | 27 ++++++--- src/sync.rs | 38 +++++++++++++ src/tmp_file.rs | 3 +- 15 files changed, 234 insertions(+), 189 deletions(-) create mode 100644 src/sync.rs diff --git a/src/backgrounded.rs b/src/backgrounded.rs index 94839a1..a1aa87a 100644 --- a/src/backgrounded.rs +++ b/src/backgrounded.rs @@ -82,14 +82,12 @@ impl Drop for Backgrounded { let cleanup_span = tracing::info_span!(parent: &cleanup_parent_span, "Backgrounded cleanup Identifier", identifier = ?identifier); - tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { - actix_rt::spawn( - async move { - let _ = crate::queue::cleanup_identifier(&repo, &identifier).await; - } - .instrument(cleanup_span), - ) - }); + crate::sync::spawn( + async move { + let _ = crate::queue::cleanup_identifier(&repo, &identifier).await; + } + .instrument(cleanup_span), + ); } if let Some(upload_id) = self.upload_id { @@ -97,14 +95,12 @@ impl Drop for Backgrounded { let cleanup_span = tracing::info_span!(parent: &cleanup_parent_span, "Backgrounded cleanup Upload ID", upload_id = ?upload_id); - tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { - actix_rt::spawn( - async move { - let _ = repo.claim(upload_id).await; - } - .instrument(cleanup_span), - ) - }); + crate::sync::spawn( + async move { + let _ = repo.claim(upload_id).await; + } + .instrument(cleanup_span), + ); } } } diff --git a/src/file.rs b/src/file.rs index 6706ca7..bb753b4 100644 --- a/src/file.rs +++ b/src/file.rs @@ -446,15 +446,15 @@ mod io_uring { actix_rt::System::new().block_on(async move { let arbiter = actix_rt::Arbiter::new(); - let (tx, rx) = tokio::sync::oneshot::channel(); + let (tx, rx) = crate::sync::channel(1); arbiter.spawn(async move { - let handle = actix_rt::spawn($fut); + let handle = crate::sync::spawn($fut); let _ = tx.send(handle.await.unwrap()); }); - rx.await.unwrap() + rx.into_recv_async().await.unwrap() }) }; } diff --git a/src/ingest.rs b/src/ingest.rs index e0cd34b..75f27b3 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -217,14 +217,12 @@ impl Drop for Session { let cleanup_span = tracing::info_span!(parent: &cleanup_parent_span, "Session cleanup hash", hash = ?hash); - tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { - actix_rt::spawn( - async move { - let _ = crate::queue::cleanup_hash(&repo, hash).await; - } - .instrument(cleanup_span), - ) - }); + crate::sync::spawn( + async move { + let _ = crate::queue::cleanup_hash(&repo, hash).await; + } + .instrument(cleanup_span), + ); } if let Some(alias) = self.alias.take() { @@ -233,14 +231,12 @@ impl Drop for Session { let cleanup_span = tracing::info_span!(parent: &cleanup_parent_span, "Session cleanup alias", alias = ?alias); - tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { - actix_rt::spawn( - async move { - let _ = crate::queue::cleanup_alias(&repo, alias, token).await; - } - .instrument(cleanup_span), - ) - }); + crate::sync::spawn( + async move { + let _ = crate::queue::cleanup_alias(&repo, alias, token).await; + } + .instrument(cleanup_span), + ); } if let Some(identifier) = self.identifier.take() { @@ -248,14 +244,12 @@ impl Drop for Session { let cleanup_span = tracing::info_span!(parent: &cleanup_parent_span, "Session cleanup identifier", identifier = ?identifier); - tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { - actix_rt::spawn( - async move { - let _ = crate::queue::cleanup_identifier(&repo, &identifier).await; - } - .instrument(cleanup_span), - ) - }); + crate::sync::spawn( + async move { + let _ = crate::queue::cleanup_identifier(&repo, &identifier).await; + } + .instrument(cleanup_span), + ); } } } diff --git a/src/ingest/hasher.rs b/src/ingest/hasher.rs index b5b809c..6466e10 100644 --- a/src/ingest/hasher.rs +++ b/src/ingest/hasher.rs @@ -82,16 +82,15 @@ mod test { actix_rt::System::new().block_on(async move { let arbiter = actix_rt::Arbiter::new(); - let (tx, rx) = tracing::trace_span!(parent: None, "Create channel") - .in_scope(|| tokio::sync::oneshot::channel()); + let (tx, rx) = crate::sync::channel(1); arbiter.spawn(async move { - let handle = actix_rt::spawn($fut); + let handle = crate::sync::spawn($fut); let _ = tx.send(handle.await.unwrap()); }); - rx.await.unwrap() + rx.into_recv_async().await.unwrap() }) }; } diff --git a/src/lib.rs b/src/lib.rs index f55e7de..cbb3835 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,6 +26,7 @@ mod repo_04; mod serde_str; mod store; mod stream; +mod sync; mod tmp_file; mod validate; @@ -84,8 +85,9 @@ const DAYS: u32 = 24 * HOURS; const NOT_FOUND_KEY: &str = "404-alias"; static PROCESS_SEMAPHORE: Lazy = Lazy::new(|| { - tracing::trace_span!(parent: None, "Initialize semaphore") - .in_scope(|| Semaphore::new(num_cpus::get().saturating_sub(1).max(1))) + let permits = num_cpus::get().saturating_sub(1).max(1); + + crate::sync::bare_semaphore(permits) }); async fn ensure_details( @@ -1671,44 +1673,39 @@ fn spawn_cleanup(repo: ArcRepo, config: &Configuration) { return; } - tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { - actix_rt::spawn(async move { - let mut interval = actix_rt::time::interval(Duration::from_secs(30)); + crate::sync::spawn(async move { + let mut interval = actix_rt::time::interval(Duration::from_secs(30)); - loop { - interval.tick().await; + loop { + interval.tick().await; - if let Err(e) = queue::cleanup_outdated_variants(&repo).await { - tracing::warn!( - "Failed to spawn cleanup for outdated variants:{}", - format!("\n{e}\n{e:?}") - ); - } - - if let Err(e) = queue::cleanup_outdated_proxies(&repo).await { - tracing::warn!( - "Failed to spawn cleanup for outdated proxies:{}", - format!("\n{e}\n{e:?}") - ); - } + if let Err(e) = queue::cleanup_outdated_variants(&repo).await { + tracing::warn!( + "Failed to spawn cleanup for outdated variants:{}", + format!("\n{e}\n{e:?}") + ); } - }); - }) + + if let Err(e) = queue::cleanup_outdated_proxies(&repo).await { + tracing::warn!( + "Failed to spawn cleanup for outdated proxies:{}", + format!("\n{e}\n{e:?}") + ); + } + } + }); } fn spawn_workers(repo: ArcRepo, store: S, config: Configuration, process_map: ProcessMap) where S: Store + 'static, { - tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { - actix_rt::spawn(queue::process_cleanup( - repo.clone(), - store.clone(), - config.clone(), - )) - }); - tracing::trace_span!(parent: None, "Spawn task") - .in_scope(|| actix_rt::spawn(queue::process_images(repo, store, process_map, config))); + crate::sync::spawn(queue::process_cleanup( + repo.clone(), + store.clone(), + config.clone(), + )); + crate::sync::spawn(queue::process_images(repo, store, process_map, config)); } async fn launch_file_store( diff --git a/src/migrate_store.rs b/src/migrate_store.rs index 78cb0b0..d2ce902 100644 --- a/src/migrate_store.rs +++ b/src/migrate_store.rs @@ -61,7 +61,7 @@ where tracing::warn!("Retrying migration +{failure_count}"); } - tokio::time::sleep(Duration::from_secs(3)).await; + actix_rt::time::sleep(Duration::from_secs(3)).await; } Ok(()) @@ -364,7 +364,7 @@ where tracing::warn!("Failed moving file. Retrying +{failure_count}"); } - tokio::time::sleep(Duration::from_secs(3)).await; + actix_rt::time::sleep(Duration::from_secs(3)).await; } } } diff --git a/src/process.rs b/src/process.rs index 9621444..1cb58d5 100644 --- a/src/process.rs +++ b/src/process.rs @@ -1,5 +1,6 @@ use actix_rt::task::JoinHandle; use actix_web::web::Bytes; +use flume::r#async::RecvFut; use std::{ future::Future, pin::Pin, @@ -10,7 +11,6 @@ use std::{ use tokio::{ io::{AsyncRead, AsyncWriteExt, ReadBuf}, process::{Child, ChildStdin, ChildStdout, Command}, - sync::oneshot::{channel, Receiver}, }; use tracing::{Instrument, Span}; @@ -73,7 +73,7 @@ struct DropHandle { pub(crate) struct ProcessRead { inner: I, - err_recv: Receiver, + err_recv: RecvFut<'static, std::io::Error>, err_closed: bool, #[allow(dead_code)] handle: DropHandle, @@ -206,39 +206,37 @@ impl Process { let stdin = child.stdin.take().expect("stdin exists"); let stdout = child.stdout.take().expect("stdout exists"); - let (tx, rx) = tracing::trace_span!(parent: None, "Create channel", %command) - .in_scope(channel::); + let (tx, rx) = crate::sync::channel::(1); + let rx = rx.into_recv_async(); let span = tracing::info_span!(parent: None, "Background process task", %command); span.follows_from(Span::current()); - let handle = tracing::trace_span!(parent: None, "Spawn task", %command).in_scope(|| { - actix_rt::spawn( - async move { - let child_fut = async { - (f)(stdin).await?; + let handle = crate::sync::spawn( + async move { + let child_fut = async { + (f)(stdin).await?; - child.wait().await - }; + child.wait().await + }; - let error = match actix_rt::time::timeout(timeout, child_fut).await { - Ok(Ok(status)) if status.success() => { - guard.disarm(); - return; - } - Ok(Ok(status)) => { - std::io::Error::new(std::io::ErrorKind::Other, StatusError(status)) - } - Ok(Err(e)) => e, - Err(_) => std::io::ErrorKind::TimedOut.into(), - }; + let error = match actix_rt::time::timeout(timeout, child_fut).await { + Ok(Ok(status)) if status.success() => { + guard.disarm(); + return; + } + Ok(Ok(status)) => { + std::io::Error::new(std::io::ErrorKind::Other, StatusError(status)) + } + Ok(Err(e)) => e, + Err(_) => std::io::ErrorKind::TimedOut.into(), + }; - let _ = tx.send(error); - let _ = child.kill().await; - } - .instrument(span), - ) - }); + let _ = tx.send(error); + let _ = child.kill().await; + } + .instrument(span), + ); let sleep = actix_rt::time::sleep(timeout); diff --git a/src/queue/process.rs b/src/queue/process.rs index 5454dc8..13e8b78 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -86,7 +86,7 @@ where let repo = repo.clone(); let media = media.clone(); - let error_boundary = actix_rt::spawn(async move { + let error_boundary = crate::sync::spawn(async move { let stream = store2 .to_stream(&ident, None, None) .await? diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index f63331d..addba2b 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -20,7 +20,8 @@ use diesel_async::{ AsyncConnection, AsyncPgConnection, RunQueryDsl, }; use tokio::sync::Notify; -use tokio_postgres::{AsyncMessage, Notification}; +use tokio_postgres::{tls::NoTlsStream, AsyncMessage, Connection, NoTls, Notification, Socket}; +use tracing::Instrument; use url::Url; use uuid::Uuid; @@ -64,7 +65,7 @@ async fn delegate_notifications(receiver: flume::Receiver, inner: inner .queue_notifications .entry(queue_name) - .or_insert_with(|| Arc::new(Notify::new())) + .or_insert_with(crate::sync::notify) .notify_waiters(); } "upload_completion_channel" => { @@ -134,12 +135,11 @@ impl PostgresError { impl PostgresRepo { pub(crate) async fn connect(postgres_url: Url) -> Result { - let (mut client, conn) = - tokio_postgres::connect(postgres_url.as_str(), tokio_postgres::tls::NoTls) - .await - .map_err(ConnectPostgresError::ConnectForMigration)?; + let (mut client, conn) = tokio_postgres::connect(postgres_url.as_str(), NoTls) + .await + .map_err(ConnectPostgresError::ConnectForMigration)?; - let handle = actix_rt::spawn(conn); + let handle = crate::sync::spawn(conn); embedded::migrations::runner() .run_async(&mut client) @@ -166,10 +166,12 @@ impl PostgresRepo { health_count: AtomicU64::new(0), pool, queue_notifications: DashMap::new(), - upload_notifier: Notify::new(), + upload_notifier: crate::sync::bare_notify(), }); - let notifications = Arc::new(actix_rt::spawn(delegate_notifications(rx, inner.clone()))); + let handle = crate::sync::spawn(delegate_notifications(rx, inner.clone())); + + let notifications = Arc::new(handle); Ok(PostgresRepo { inner, @@ -198,41 +200,53 @@ fn build_handler(sender: flume::Sender) -> ConfigFn { move |config: &str| -> BoxFuture<'_, ConnectionResult> { let sender = sender.clone(); - Box::pin(async move { - let (client, mut conn) = - tokio_postgres::connect(config, tokio_postgres::tls::NoTls) - .await - .map_err(|e| ConnectionError::BadConnection(e.to_string()))?; + let connect_span = tracing::trace_span!(parent: None, "connect future"); - // not very cash money (structured concurrency) of me - actix_rt::spawn(async move { - while let Some(res) = std::future::poll_fn(|cx| conn.poll_message(cx)).await { - match res { - Err(e) => { - tracing::error!("Database Connection {e:?}"); - return; - } - Ok(AsyncMessage::Notice(e)) => { - tracing::warn!("Database Notice {e:?}"); - } - Ok(AsyncMessage::Notification(notification)) => { - if sender.send_async(notification).await.is_err() { - tracing::warn!("Missed notification. Are we shutting down?"); - } - } - Ok(_) => { - tracing::warn!("Unhandled AsyncMessage!!! Please contact the developer of this application"); - } - } - } - }); + Box::pin( + async move { + let (client, conn) = + tokio_postgres::connect(config, tokio_postgres::tls::NoTls) + .await + .map_err(|e| ConnectionError::BadConnection(e.to_string()))?; - AsyncPgConnection::try_from(client).await - }) + // not very cash money (structured concurrency) of me + spawn_db_notification_task(sender, conn); + + AsyncPgConnection::try_from(client).await + } + .instrument(connect_span), + ) }, ) } +fn spawn_db_notification_task( + sender: flume::Sender, + mut conn: Connection, +) { + crate::sync::spawn(async move { + while let Some(res) = std::future::poll_fn(|cx| conn.poll_message(cx)).await { + match res { + Err(e) => { + tracing::error!("Database Connection {e:?}"); + return; + } + Ok(AsyncMessage::Notice(e)) => { + tracing::warn!("Database Notice {e:?}"); + } + Ok(AsyncMessage::Notification(notification)) => { + if sender.send_async(notification).await.is_err() { + tracing::warn!("Missed notification. Are we shutting down?"); + } + } + Ok(_) => { + tracing::warn!("Unhandled AsyncMessage!!! Please contact the developer of this application"); + } + } + } + }); +} + fn to_primitive(timestamp: time::OffsetDateTime) -> time::PrimitiveDateTime { let timestamp = timestamp.to_offset(time::UtcOffset::UTC); time::PrimitiveDateTime::new(timestamp.date(), timestamp.time()) @@ -849,7 +863,7 @@ impl QueueRepo for PostgresRepo { .inner .queue_notifications .entry(String::from(queue_name)) - .or_insert_with(|| Arc::new(Notify::new())) + .or_insert_with(crate::sync::notify) .clone(); diesel::sql_query("LISTEN queue_status_channel;") @@ -1330,20 +1344,27 @@ impl UploadRepo for PostgresRepo { .await .map_err(PostgresError::Diesel)?; - let opt = uploads + let nested_opt = uploads .select(result) .filter(id.eq(upload_id.id)) .get_result(&mut conn) .await .optional() - .map_err(PostgresError::Diesel)? - .flatten(); + .map_err(PostgresError::Diesel)?; - if let Some(upload_result) = opt { - let upload_result: InnerUploadResult = serde_json::from_value(upload_result) - .map_err(PostgresError::DeserializeUploadResult)?; + match nested_opt { + Some(opt) => { + if let Some(upload_result) = opt { + let upload_result: InnerUploadResult = + serde_json::from_value(upload_result) + .map_err(PostgresError::DeserializeUploadResult)?; - return Ok(upload_result.into()); + return Ok(upload_result.into()); + } + } + None => { + return Err(RepoError::AlreadyClaimed); + } } drop(conn); diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 671e186..aa42e9a 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -29,9 +29,7 @@ macro_rules! b { ($self:ident.$ident:ident, $expr:expr) => {{ let $ident = $self.$ident.clone(); - let span = tracing::Span::current(); - - actix_rt::task::spawn_blocking(move || span.in_scope(|| $expr)) + crate::sync::spawn_blocking(move || $expr) .await .map_err(SledError::from) .map_err(RepoError::from)? @@ -175,7 +173,7 @@ impl SledRepo { let this = self.db.clone(); - actix_rt::task::spawn_blocking(move || { + crate::sync::spawn_blocking(move || { let export = this.export(); export_db.import(export); }) @@ -258,7 +256,7 @@ impl AliasAccessRepo for SledRepo { let alias_access = self.alias_access.clone(); let inverse_alias_access = self.inverse_alias_access.clone(); - let res = actix_rt::task::spawn_blocking(move || { + let res = crate::sync::spawn_blocking(move || { (&alias_access, &inverse_alias_access).transaction( |(alias_access, inverse_alias_access)| { if let Some(old) = alias_access.insert(alias.to_bytes(), &value_bytes)? { @@ -324,7 +322,7 @@ impl AliasAccessRepo for SledRepo { let alias_access = self.alias_access.clone(); let inverse_alias_access = self.inverse_alias_access.clone(); - let res = actix_rt::task::spawn_blocking(move || { + let res = crate::sync::spawn_blocking(move || { (&alias_access, &inverse_alias_access).transaction( |(alias_access, inverse_alias_access)| { if let Some(old) = alias_access.remove(alias.to_bytes())? { @@ -364,7 +362,7 @@ impl VariantAccessRepo for SledRepo { let variant_access = self.variant_access.clone(); let inverse_variant_access = self.inverse_variant_access.clone(); - let res = actix_rt::task::spawn_blocking(move || { + let res = crate::sync::spawn_blocking(move || { (&variant_access, &inverse_variant_access).transaction( |(variant_access, inverse_variant_access)| { if let Some(old) = variant_access.insert(&key, &value_bytes)? { @@ -434,7 +432,7 @@ impl VariantAccessRepo for SledRepo { let variant_access = self.variant_access.clone(); let inverse_variant_access = self.inverse_variant_access.clone(); - let res = actix_rt::task::spawn_blocking(move || { + let res = crate::sync::spawn_blocking(move || { (&variant_access, &inverse_variant_access).transaction( |(variant_access, inverse_variant_access)| { if let Some(old) = variant_access.remove(&key)? { @@ -678,7 +676,7 @@ impl QueueRepo for SledRepo { let queue = self.queue.clone(); let job_state = self.job_state.clone(); - let res = actix_rt::task::spawn_blocking(move || { + let res = crate::sync::spawn_blocking(move || { (&queue, &job_state).transaction(|(queue, job_state)| { let state = JobState::pending(); @@ -705,7 +703,7 @@ impl QueueRepo for SledRepo { .write() .unwrap() .entry(queue_name) - .or_insert_with(|| Arc::new(Notify::new())) + .or_insert_with(crate::sync::notify) .notify_one(); metrics_guard.disarm(); @@ -728,7 +726,7 @@ impl QueueRepo for SledRepo { let job_state = self.job_state.clone(); let span = tracing::Span::current(); - let opt = actix_rt::task::spawn_blocking(move || { + let opt = crate::sync::spawn_blocking(move || { let _guard = span.enter(); // Job IDs are generated with Uuid version 7 - defining their first bits as a // timestamp. Scanning a prefix should give us jobs in the order they were queued. @@ -802,9 +800,7 @@ impl QueueRepo for SledRepo { notify } else { let mut guard = self.queue_notifier.write().unwrap(); - let entry = guard - .entry(queue_name) - .or_insert_with(|| Arc::new(Notify::new())); + let entry = guard.entry(queue_name).or_insert_with(crate::sync::notify); Arc::clone(entry) }; @@ -823,7 +819,7 @@ impl QueueRepo for SledRepo { let job_state = self.job_state.clone(); - actix_rt::task::spawn_blocking(move || { + crate::sync::spawn_blocking(move || { if let Some(state) = job_state.get(&key)? { let new_state = JobState::running(worker_id); @@ -853,7 +849,7 @@ impl QueueRepo for SledRepo { let queue = self.queue.clone(); let job_state = self.job_state.clone(); - let res = actix_rt::task::spawn_blocking(move || { + let res = crate::sync::spawn_blocking(move || { (&queue, &job_state).transaction(|(queue, job_state)| { queue.remove(&key[..])?; job_state.remove(&key[..])?; @@ -1112,7 +1108,7 @@ impl HashRepo for SledRepo { None => (self.hashes_inverse.iter(), None), }; - actix_rt::task::spawn_blocking(move || { + crate::sync::spawn_blocking(move || { let page_iter = page_iter .keys() .rev() @@ -1164,7 +1160,7 @@ impl HashRepo for SledRepo { let page_iter = self.hashes_inverse.range(..=date_nanos); let prev_iter = Some(self.hashes_inverse.range(date_nanos..)); - actix_rt::task::spawn_blocking(move || { + crate::sync::spawn_blocking(move || { let page_iter = page_iter .keys() .rev() @@ -1292,7 +1288,7 @@ impl HashRepo for SledRepo { let hash_variant_identifiers = self.hash_variant_identifiers.clone(); - actix_rt::task::spawn_blocking(move || { + crate::sync::spawn_blocking(move || { hash_variant_identifiers .compare_and_swap(key, Option::<&[u8]>::None, Some(value.as_bytes())) .map(|res| res.map_err(|_| VariantAlreadyExists)) diff --git a/src/repo_04/sled.rs b/src/repo_04/sled.rs index 09a684a..07ccd9e 100644 --- a/src/repo_04/sled.rs +++ b/src/repo_04/sled.rs @@ -34,9 +34,7 @@ macro_rules! b { ($self:ident.$ident:ident, $expr:expr) => {{ let $ident = $self.$ident.clone(); - let span = tracing::Span::current(); - - actix_rt::task::spawn_blocking(move || span.in_scope(|| $expr)) + crate::sync::spawn_blocking(move || $expr) .await .map_err(SledError::from) .map_err(RepoError::from)? diff --git a/src/store/object_store.rs b/src/store/object_store.rs index 4c1aab7..324a0eb 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -277,7 +277,7 @@ impl Store for ObjectStore { let object_id2 = object_id.clone(); let upload_id2 = upload_id.clone(); - let handle = actix_rt::spawn( + let handle = crate::sync::spawn( async move { let response = this .create_upload_part_request( diff --git a/src/stream.rs b/src/stream.rs index 9833050..1309e15 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,5 +1,6 @@ use actix_rt::{task::JoinHandle, time::Sleep}; use actix_web::web::Bytes; +use flume::r#async::RecvStream; use futures_core::Stream; use std::{ future::Future, @@ -174,19 +175,25 @@ pin_project_lite::pin_project! { } } -enum IterStreamState { +enum IterStreamState +where + T: 'static, +{ New { iterator: I, buffer: usize, }, Running { handle: JoinHandle<()>, - receiver: tokio::sync::mpsc::Receiver, + receiver: RecvStream<'static, T>, }, Pending, } -pub(crate) struct IterStream { +pub(crate) struct IterStream +where + T: 'static, +{ state: IterStreamState, } @@ -287,14 +294,13 @@ where match std::mem::replace(&mut this.state, IterStreamState::Pending) { IterStreamState::New { iterator, buffer } => { - let (sender, receiver) = tracing::trace_span!(parent: None, "Create channel") - .in_scope(|| tokio::sync::mpsc::channel(buffer)); + let (sender, receiver) = crate::sync::channel(buffer); - let mut handle = actix_rt::task::spawn_blocking(move || { + let mut handle = crate::sync::spawn_blocking(move || { let iterator = iterator.into_iter(); for item in iterator { - if sender.blocking_send(item).is_err() { + if sender.send(item).is_err() { break; } } @@ -304,14 +310,17 @@ where return Poll::Ready(None); } - this.state = IterStreamState::Running { handle, receiver }; + this.state = IterStreamState::Running { + handle, + receiver: receiver.into_stream(), + }; self.poll_next(cx) } IterStreamState::Running { mut handle, mut receiver, - } => match Pin::new(&mut receiver).poll_recv(cx) { + } => match Pin::new(&mut receiver).poll_next(cx) { Poll::Ready(Some(item)) => { this.state = IterStreamState::Running { handle, receiver }; diff --git a/src/sync.rs b/src/sync.rs new file mode 100644 index 0000000..3111daf --- /dev/null +++ b/src/sync.rs @@ -0,0 +1,38 @@ +use std::sync::Arc; + +use tokio::sync::{Notify, Semaphore}; + +pub(crate) fn channel(bound: usize) -> (flume::Sender, flume::Receiver) { + tracing::trace_span!(parent: None, "make channel").in_scope(|| flume::bounded(bound)) +} + +pub(crate) fn notify() -> Arc { + Arc::new(bare_notify()) +} + +pub(crate) fn bare_notify() -> Notify { + tracing::trace_span!(parent: None, "make notifier").in_scope(Notify::new) +} + +pub(crate) fn bare_semaphore(permits: usize) -> Semaphore { + tracing::trace_span!(parent: None, "make semaphore").in_scope(|| Semaphore::new(permits)) +} + +pub(crate) fn spawn(future: F) -> actix_rt::task::JoinHandle +where + F: std::future::Future + 'static, + F::Output: 'static, +{ + tracing::trace_span!(parent: None, "spawn task").in_scope(|| actix_rt::spawn(future)) +} + +pub(crate) fn spawn_blocking(function: F) -> actix_rt::task::JoinHandle +where + F: FnOnce() -> Out + Send + 'static, + Out: Send + 'static, +{ + let outer_span = tracing::Span::current(); + + tracing::trace_span!(parent: None, "spawn blocking task") + .in_scope(|| actix_rt::task::spawn_blocking(move || outer_span.in_scope(function))) +} diff --git a/src/tmp_file.rs b/src/tmp_file.rs index 1335abc..edcb861 100644 --- a/src/tmp_file.rs +++ b/src/tmp_file.rs @@ -13,8 +13,7 @@ struct TmpFile(PathBuf); impl Drop for TmpFile { fn drop(&mut self) { - tracing::trace_span!(parent: None, "Spawn task") - .in_scope(|| actix_rt::spawn(tokio::fs::remove_file(self.0.clone()))); + crate::sync::spawn(tokio::fs::remove_file(self.0.clone())); } }