From 3428c31f16accb974f20f61cbb3354d50c0a19f8 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 14 Apr 2024 20:06:58 -0500 Subject: [PATCH] Use tokio channels again --- Cargo.lock | 19 ------------------- Cargo.toml | 1 - src/middleware/payload.rs | 10 +++++----- src/repo/postgres.rs | 14 +++++++------- src/stream.rs | 16 ++++++---------- src/sync.rs | 6 ++++-- 6 files changed, 22 insertions(+), 44 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8af8abe..e5a7754 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -927,18 +927,6 @@ dependencies = [ "miniz_oxide", ] -[[package]] -name = "flume" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" -dependencies = [ - "futures-core", - "futures-sink", - "nanorand", - "spin", -] - [[package]] name = "fnv" version = "1.0.7" @@ -1564,9 +1552,6 @@ name = "nanorand" version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" -dependencies = [ - "getrandom", -] [[package]] name = "nom" @@ -1836,7 +1821,6 @@ dependencies = [ "diesel", "diesel-async", "diesel-derive-enum", - "flume", "futures-core", "hex", "md-5", @@ -2681,9 +2665,6 @@ name = "spin" version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" -dependencies = [ - "lock_api", -] [[package]] name = "spki" diff --git a/Cargo.toml b/Cargo.toml index d52c770..2d011cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,7 +34,6 @@ dashmap = "5.1.0" diesel = { version = "2.1.1", features = ["postgres_backend", "serde_json", "time", "uuid"] } diesel-async = { version = "0.4.1", features = ["bb8", "postgres"] } diesel-derive-enum = { version = "2.1.0", features = ["postgres"] } -flume = "0.11.0" futures-core = "0.3" hex = "0.4.3" md-5 = "0.10.5" diff --git a/src/middleware/payload.rs b/src/middleware/payload.rs index 5f017dc..ad45c85 100644 --- a/src/middleware/payload.rs +++ b/src/middleware/payload.rs @@ -45,10 +45,10 @@ impl Drop for MetricsGuard { } } -async fn drain(rx: flume::Receiver) { +async fn drain(mut rx: tokio::sync::mpsc::Receiver) { let mut set = JoinSet::new(); - while let Ok(payload) = rx.recv_async().await { + while let Some(payload) = rx.recv().await { tracing::trace!("drain: looping"); // draining a payload is a best-effort task - if we can't collect in 2 minutes we bail @@ -94,18 +94,18 @@ async fn drain(rx: flume::Receiver) { struct DrainHandle(Option>>); pub(crate) struct Payload { - sender: flume::Sender, + sender: tokio::sync::mpsc::Sender, handle: DrainHandle, } pub(crate) struct PayloadMiddleware { inner: S, - sender: flume::Sender, + sender: tokio::sync::mpsc::Sender, _handle: DrainHandle, } pub(crate) struct PayloadStream { inner: Option, - sender: flume::Sender, + sender: tokio::sync::mpsc::Sender, } impl DrainHandle { diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index ebb4914..6a546a9 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -265,7 +265,7 @@ where async fn build_pool( postgres_url: &Url, - tx: flume::Sender, + tx: tokio::sync::mpsc::Sender, connector: Option, max_size: u32, ) -> Result, ConnectPostgresError> { @@ -319,7 +319,7 @@ impl PostgresRepo { .map(|u| u.into()) .unwrap_or(1_usize); - let (tx, rx) = flume::bounded(10); + let (tx, rx) = crate::sync::channel(10); let pool = build_pool( &postgres_url, @@ -621,7 +621,7 @@ type ConfigFn = Box BoxFuture<'_, ConnectionResult> + Send + Sync + 'static>; async fn delegate_notifications( - receiver: flume::Receiver, + mut receiver: tokio::sync::mpsc::Receiver, inner: Arc, capacity: usize, ) { @@ -636,7 +636,7 @@ async fn delegate_notifications( let keyed_notifier_state = KeyedNotifierState { inner: &inner }; - while let Ok(notification) = receiver.recv_async().await { + while let Some(notification) = receiver.recv().await { tracing::trace!("delegate_notifications: looping"); metrics::counter!(crate::init_metrics::POSTGRES_NOTIFICATION).increment(1); @@ -666,7 +666,7 @@ async fn delegate_notifications( } fn build_handler( - sender: flume::Sender, + sender: tokio::sync::mpsc::Sender, connector: Option, ) -> ConfigFn { Box::new( @@ -708,7 +708,7 @@ fn build_handler( } fn spawn_db_notification_task( - sender: flume::Sender, + sender: tokio::sync::mpsc::Sender, mut conn: Connection, ) where S: tokio_postgres::tls::TlsStream + Send + Unpin + 'static, @@ -729,7 +729,7 @@ fn spawn_db_notification_task( tracing::warn!("Database Notice {e:?}"); } Ok(AsyncMessage::Notification(notification)) => { - if sender.send_async(notification).await.is_err() { + if sender.send(notification).await.is_err() { tracing::warn!("Missed notification. Are we shutting down?"); } } diff --git a/src/stream.rs b/src/stream.rs index 3235415..43dbd6d 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -91,7 +91,7 @@ where S: Stream + 'static, S::Item: Send + Sync, { - let (tx, rx) = crate::sync::channel(1); + let (tx, mut rx) = crate::sync::channel(1); let handle = crate::sync::abort_on_drop(crate::sync::spawn("send-stream", async move { let stream = std::pin::pin!(stream); @@ -100,16 +100,14 @@ where while let Some(res) = streamer.next().await { tracing::trace!("make send tx: looping"); - if tx.send_async(res).await.is_err() { + if tx.send(res).await.is_err() { break; } } })); streem::from_fn(|yiedler| async move { - let mut stream = rx.into_stream().into_streamer(); - - while let Some(res) = stream.next().await { + while let Some(res) = rx.recv().await { tracing::trace!("make send rx: looping"); yiedler.yield_(res).await; @@ -124,20 +122,18 @@ where I: IntoIterator + Send + 'static, I::Item: Send + Sync, { - let (tx, rx) = crate::sync::channel(buffer); + let (tx, mut rx) = crate::sync::channel(buffer); let handle = crate::sync::spawn_blocking("blocking-iterator", move || { for value in iterator { - if tx.send(value).is_err() { + if tx.blocking_send(value).is_err() { break; } } }); streem::from_fn(|yielder| async move { - let mut stream = rx.into_stream().into_streamer(); - - while let Some(res) = stream.next().await { + while let Some(res) = rx.recv().await { tracing::trace!("from_iterator: looping"); yielder.yield_(res).await; diff --git a/src/sync.rs b/src/sync.rs index 008e924..a00859a 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -39,11 +39,13 @@ impl std::future::Future for DropHandle { } #[track_caller] -pub(crate) fn channel(bound: usize) -> (flume::Sender, flume::Receiver) { +pub(crate) fn channel( + bound: usize, +) -> (tokio::sync::mpsc::Sender, tokio::sync::mpsc::Receiver) { let span = tracing::trace_span!(parent: None, "make channel"); let guard = span.enter(); - let channel = flume::bounded(bound); + let channel = tokio::sync::mpsc::channel(bound); drop(guard); channel