diff --git a/src/notify.rs b/src/notify.rs index 4e86a8b..2f2a4d1 100644 --- a/src/notify.rs +++ b/src/notify.rs @@ -6,10 +6,7 @@ use crate::{ use activitystreams::primitives::XsdAnyUri; use actix::clock::{delay_for, Duration}; use bb8_postgres::tokio_postgres::{tls::NoTls, AsyncMessage, Config}; -use futures::{ - future::ready, - stream::{poll_fn, StreamExt}, -}; +use futures::stream::{poll_fn, StreamExt}; use log::{debug, error, warn}; use std::{collections::HashMap, sync::Arc}; use uuid::Uuid; @@ -68,29 +65,31 @@ impl Notifier { } }); - let mut stream = poll_fn(move |cx| conn.poll_message(cx)).filter_map(|m| match m { - Ok(AsyncMessage::Notification(n)) => { - debug!("Handling Notification, {:?}", n); - ready(Some(n)) - } - Ok(AsyncMessage::Notice(e)) => { - debug!("Handling Notice, {:?}", e); - ready(None) - } - Err(e) => { - debug!("Handling Error, {:?}", e); - ready(None) - } - _ => { - debug!("Handling rest"); - ready(None) - } - }); + let mut stream = poll_fn(move |cx| conn.poll_message(cx)); - while let Some(n) = stream.next().await { - if let Some(v) = listeners.get(n.channel()) { - for l in v { - l.execute(n.payload()); + loop { + match stream.next().await { + Some(Ok(AsyncMessage::Notification(n))) => { + debug!("Handling Notification, {:?}", n); + if let Some(v) = listeners.get(n.channel()) { + for l in v { + l.execute(n.payload()); + } + } + } + Some(Ok(AsyncMessage::Notice(e))) => { + debug!("Handling Notice, {:?}", e); + } + Some(Ok(_)) => { + debug!("Handling rest"); + } + Some(Err(e)) => { + debug!("Breaking loop due to error Error, {:?}", e); + break; + } + None => { + debug!("End of stream, breaking loop"); + break; } } }