Use tokio channels again

This commit is contained in:
asonix 2024-04-14 20:06:58 -05:00
parent 4bb3bad703
commit 3428c31f16
6 changed files with 22 additions and 44 deletions

19
Cargo.lock generated
View file

@ -927,18 +927,6 @@ dependencies = [
"miniz_oxide", "miniz_oxide",
] ]
[[package]]
name = "flume"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181"
dependencies = [
"futures-core",
"futures-sink",
"nanorand",
"spin",
]
[[package]] [[package]]
name = "fnv" name = "fnv"
version = "1.0.7" version = "1.0.7"
@ -1564,9 +1552,6 @@ name = "nanorand"
version = "0.7.0" version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
dependencies = [
"getrandom",
]
[[package]] [[package]]
name = "nom" name = "nom"
@ -1836,7 +1821,6 @@ dependencies = [
"diesel", "diesel",
"diesel-async", "diesel-async",
"diesel-derive-enum", "diesel-derive-enum",
"flume",
"futures-core", "futures-core",
"hex", "hex",
"md-5", "md-5",
@ -2681,9 +2665,6 @@ name = "spin"
version = "0.9.8" version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
dependencies = [
"lock_api",
]
[[package]] [[package]]
name = "spki" name = "spki"

View file

@ -34,7 +34,6 @@ dashmap = "5.1.0"
diesel = { version = "2.1.1", features = ["postgres_backend", "serde_json", "time", "uuid"] } diesel = { version = "2.1.1", features = ["postgres_backend", "serde_json", "time", "uuid"] }
diesel-async = { version = "0.4.1", features = ["bb8", "postgres"] } diesel-async = { version = "0.4.1", features = ["bb8", "postgres"] }
diesel-derive-enum = { version = "2.1.0", features = ["postgres"] } diesel-derive-enum = { version = "2.1.0", features = ["postgres"] }
flume = "0.11.0"
futures-core = "0.3" futures-core = "0.3"
hex = "0.4.3" hex = "0.4.3"
md-5 = "0.10.5" md-5 = "0.10.5"

View file

@ -45,10 +45,10 @@ impl Drop for MetricsGuard {
} }
} }
async fn drain(rx: flume::Receiver<actix_web::dev::Payload>) { async fn drain(mut rx: tokio::sync::mpsc::Receiver<actix_web::dev::Payload>) {
let mut set = JoinSet::new(); let mut set = JoinSet::new();
while let Ok(payload) = rx.recv_async().await { while let Some(payload) = rx.recv().await {
tracing::trace!("drain: looping"); tracing::trace!("drain: looping");
// draining a payload is a best-effort task - if we can't collect in 2 minutes we bail // draining a payload is a best-effort task - if we can't collect in 2 minutes we bail
@ -94,18 +94,18 @@ async fn drain(rx: flume::Receiver<actix_web::dev::Payload>) {
struct DrainHandle(Option<Rc<tokio::task::JoinHandle<()>>>); struct DrainHandle(Option<Rc<tokio::task::JoinHandle<()>>>);
pub(crate) struct Payload { pub(crate) struct Payload {
sender: flume::Sender<actix_web::dev::Payload>, sender: tokio::sync::mpsc::Sender<actix_web::dev::Payload>,
handle: DrainHandle, handle: DrainHandle,
} }
pub(crate) struct PayloadMiddleware<S> { pub(crate) struct PayloadMiddleware<S> {
inner: S, inner: S,
sender: flume::Sender<actix_web::dev::Payload>, sender: tokio::sync::mpsc::Sender<actix_web::dev::Payload>,
_handle: DrainHandle, _handle: DrainHandle,
} }
pub(crate) struct PayloadStream { pub(crate) struct PayloadStream {
inner: Option<actix_web::dev::Payload>, inner: Option<actix_web::dev::Payload>,
sender: flume::Sender<actix_web::dev::Payload>, sender: tokio::sync::mpsc::Sender<actix_web::dev::Payload>,
} }
impl DrainHandle { impl DrainHandle {

View file

@ -265,7 +265,7 @@ where
async fn build_pool( async fn build_pool(
postgres_url: &Url, postgres_url: &Url,
tx: flume::Sender<Notification>, tx: tokio::sync::mpsc::Sender<Notification>,
connector: Option<MakeRustlsConnect>, connector: Option<MakeRustlsConnect>,
max_size: u32, max_size: u32,
) -> Result<Pool<AsyncPgConnection>, ConnectPostgresError> { ) -> Result<Pool<AsyncPgConnection>, ConnectPostgresError> {
@ -319,7 +319,7 @@ impl PostgresRepo {
.map(|u| u.into()) .map(|u| u.into())
.unwrap_or(1_usize); .unwrap_or(1_usize);
let (tx, rx) = flume::bounded(10); let (tx, rx) = crate::sync::channel(10);
let pool = build_pool( let pool = build_pool(
&postgres_url, &postgres_url,
@ -621,7 +621,7 @@ type ConfigFn =
Box<dyn Fn(&str) -> BoxFuture<'_, ConnectionResult<AsyncPgConnection>> + Send + Sync + 'static>; Box<dyn Fn(&str) -> BoxFuture<'_, ConnectionResult<AsyncPgConnection>> + Send + Sync + 'static>;
async fn delegate_notifications( async fn delegate_notifications(
receiver: flume::Receiver<Notification>, mut receiver: tokio::sync::mpsc::Receiver<Notification>,
inner: Arc<Inner>, inner: Arc<Inner>,
capacity: usize, capacity: usize,
) { ) {
@ -636,7 +636,7 @@ async fn delegate_notifications(
let keyed_notifier_state = KeyedNotifierState { inner: &inner }; let keyed_notifier_state = KeyedNotifierState { inner: &inner };
while let Ok(notification) = receiver.recv_async().await { while let Some(notification) = receiver.recv().await {
tracing::trace!("delegate_notifications: looping"); tracing::trace!("delegate_notifications: looping");
metrics::counter!(crate::init_metrics::POSTGRES_NOTIFICATION).increment(1); metrics::counter!(crate::init_metrics::POSTGRES_NOTIFICATION).increment(1);
@ -666,7 +666,7 @@ async fn delegate_notifications(
} }
fn build_handler( fn build_handler(
sender: flume::Sender<Notification>, sender: tokio::sync::mpsc::Sender<Notification>,
connector: Option<MakeRustlsConnect>, connector: Option<MakeRustlsConnect>,
) -> ConfigFn { ) -> ConfigFn {
Box::new( Box::new(
@ -708,7 +708,7 @@ fn build_handler(
} }
fn spawn_db_notification_task<S>( fn spawn_db_notification_task<S>(
sender: flume::Sender<Notification>, sender: tokio::sync::mpsc::Sender<Notification>,
mut conn: Connection<Socket, S>, mut conn: Connection<Socket, S>,
) where ) where
S: tokio_postgres::tls::TlsStream + Send + Unpin + 'static, S: tokio_postgres::tls::TlsStream + Send + Unpin + 'static,
@ -729,7 +729,7 @@ fn spawn_db_notification_task<S>(
tracing::warn!("Database Notice {e:?}"); tracing::warn!("Database Notice {e:?}");
} }
Ok(AsyncMessage::Notification(notification)) => { Ok(AsyncMessage::Notification(notification)) => {
if sender.send_async(notification).await.is_err() { if sender.send(notification).await.is_err() {
tracing::warn!("Missed notification. Are we shutting down?"); tracing::warn!("Missed notification. Are we shutting down?");
} }
} }

View file

@ -91,7 +91,7 @@ where
S: Stream + 'static, S: Stream + 'static,
S::Item: Send + Sync, S::Item: Send + Sync,
{ {
let (tx, rx) = crate::sync::channel(1); let (tx, mut rx) = crate::sync::channel(1);
let handle = crate::sync::abort_on_drop(crate::sync::spawn("send-stream", async move { let handle = crate::sync::abort_on_drop(crate::sync::spawn("send-stream", async move {
let stream = std::pin::pin!(stream); let stream = std::pin::pin!(stream);
@ -100,16 +100,14 @@ where
while let Some(res) = streamer.next().await { while let Some(res) = streamer.next().await {
tracing::trace!("make send tx: looping"); tracing::trace!("make send tx: looping");
if tx.send_async(res).await.is_err() { if tx.send(res).await.is_err() {
break; break;
} }
} }
})); }));
streem::from_fn(|yiedler| async move { streem::from_fn(|yiedler| async move {
let mut stream = rx.into_stream().into_streamer(); while let Some(res) = rx.recv().await {
while let Some(res) = stream.next().await {
tracing::trace!("make send rx: looping"); tracing::trace!("make send rx: looping");
yiedler.yield_(res).await; yiedler.yield_(res).await;
@ -124,20 +122,18 @@ where
I: IntoIterator + Send + 'static, I: IntoIterator + Send + 'static,
I::Item: Send + Sync, I::Item: Send + Sync,
{ {
let (tx, rx) = crate::sync::channel(buffer); let (tx, mut rx) = crate::sync::channel(buffer);
let handle = crate::sync::spawn_blocking("blocking-iterator", move || { let handle = crate::sync::spawn_blocking("blocking-iterator", move || {
for value in iterator { for value in iterator {
if tx.send(value).is_err() { if tx.blocking_send(value).is_err() {
break; break;
} }
} }
}); });
streem::from_fn(|yielder| async move { streem::from_fn(|yielder| async move {
let mut stream = rx.into_stream().into_streamer(); while let Some(res) = rx.recv().await {
while let Some(res) = stream.next().await {
tracing::trace!("from_iterator: looping"); tracing::trace!("from_iterator: looping");
yielder.yield_(res).await; yielder.yield_(res).await;

View file

@ -39,11 +39,13 @@ impl<T> std::future::Future for DropHandle<T> {
} }
#[track_caller] #[track_caller]
pub(crate) fn channel<T>(bound: usize) -> (flume::Sender<T>, flume::Receiver<T>) { pub(crate) fn channel<T>(
bound: usize,
) -> (tokio::sync::mpsc::Sender<T>, tokio::sync::mpsc::Receiver<T>) {
let span = tracing::trace_span!(parent: None, "make channel"); let span = tracing::trace_span!(parent: None, "make channel");
let guard = span.enter(); let guard = span.enter();
let channel = flume::bounded(bound); let channel = tokio::sync::mpsc::channel(bound);
drop(guard); drop(guard);
channel channel