diff --git a/Dockerfile.migrations.arm64v8 b/Dockerfile.migrations.arm64v8 index 7533fda..b13aa76 100644 --- a/Dockerfile.migrations.arm64v8 +++ b/Dockerfile.migrations.arm64v8 @@ -1,5 +1,11 @@ -FROM asonix/diesel-cli:v1.4.0-r0-arm64v8 +FROM asonix/diesel-cli:v1.4.0-r1-arm64v8 COPY migrations /migrations +USER root +RUN \ + apt-get install -y tini && \ + chown -R diesel:diesel /migrations +USER diesel +ENTRYPOINT ["/usr/bin/tini"] CMD ["diesel", "migration", "run", "--migration-dir", "/migrations"] diff --git a/build.sh b/build.sh index 60af8e6..9743642 100755 --- a/build.sh +++ b/build.sh @@ -33,8 +33,8 @@ cross build \ --release mkdir -p artifacts +rm -rf artifacts/relay cp ./target/aarch64-unknown-linux-musl/release/relay artifacts/relay -cp -r ./migrations artifacts/migrations # from `sudo docker run --rm --privileged multiarch/qemu-user-static --reset -p yes` docker build \ @@ -52,7 +52,10 @@ docker push "asonix/relay:${VERSION}-arm64v8" docker push "asonix/relay:latest-arm64v8" docker push "asonix/relay:latest" -if [ "${MIGRATIONS}" = "" ]; then +if [ "${MIGRATIONS}" = "migrations" ]; then + rm -rf artifacts/migrations + cp -r ./migrations artifacts/migrations + docker build \ --pull \ --no-cache \ diff --git a/src/main.rs b/src/main.rs index 7a32a2d..8b2f4b9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -33,6 +33,7 @@ use self::{ db::Db, error::MyError, jobs::{create_server, create_workers}, + notify::notify_loop, state::State, templates::statics::StaticFile, webfinger::RelayResolver, @@ -111,7 +112,7 @@ async fn main() -> Result<(), anyhow::Error> { let job_server = create_server(db.clone()); - let _ = notify::NotifyHandler::start_handler(state.clone(), pg_config.clone()); + let _ = notify_loop(state.clone(), pg_config.clone()); let bind_address = config.bind_address(); HttpServer::new(move || { diff --git a/src/notify.rs b/src/notify.rs index 4a42ff9..b69fe28 100644 --- a/src/notify.rs +++ b/src/notify.rs @@ -1,61 +1,75 @@ -use crate::state::State; +use crate::{db::listen, state::State}; use activitystreams::primitives::XsdAnyUri; -use actix::prelude::*; -use bb8_postgres::tokio_postgres::{tls::NoTls, AsyncMessage, Client, Config, Notification}; +use actix::clock::{delay_for, Duration}; +use bb8_postgres::tokio_postgres::{tls::NoTls, AsyncMessage, Config, Notification}; use futures::{ future::ready, stream::{poll_fn, StreamExt}, }; use log::{debug, error, info, warn}; -use tokio::sync::mpsc; +use std::sync::Arc; -#[derive(Message)] -#[rtype(result = "()")] -pub enum Notify { - Msg(Notification), - Done, -} - -pub struct NotifyHandler { - client: Option, - state: State, - config: Config, -} - -impl NotifyHandler { - fn new(state: State, config: Config) -> Self { - NotifyHandler { - state, - config, - client: None, +async fn handle_notification(state: &State, notif: Notification) { + match notif.channel() { + "new_blocks" => { + info!("Caching block of {}", notif.payload()); + state.cache_block(notif.payload().to_owned()).await; } - } - - pub fn start_handler(state: State, config: Config) -> Addr { - Supervisor::start(|_| Self::new(state, config)) - } + "new_whitelists" => { + info!("Caching whitelist of {}", notif.payload()); + state.cache_whitelist(notif.payload().to_owned()).await; + } + "new_listeners" => { + if let Ok(uri) = notif.payload().parse::() { + info!("Caching listener {}", uri); + state.cache_listener(uri).await; + } + } + "rm_blocks" => { + info!("Busting block cache for {}", notif.payload()); + state.bust_block(notif.payload()).await; + } + "rm_whitelists" => { + info!("Busting whitelist cache for {}", notif.payload()); + state.bust_whitelist(notif.payload()).await; + } + "rm_listeners" => { + if let Ok(uri) = notif.payload().parse::() { + info!("Busting listener cache for {}", uri); + state.bust_listener(&uri).await; + } + } + _ => (), + }; } -impl Actor for NotifyHandler { - type Context = Context; +pub fn notify_loop(state: State, config: Config) { + actix::spawn(async move { + let mut client; - fn started(&mut self, ctx: &mut Self::Context) { - info!("Starting notify handler"); - let config = self.config.clone(); - - let fut = async move { - let (client, mut conn) = match config.connect(NoTls).await { + loop { + let (new_client, mut conn) = match config.connect(NoTls).await { Ok((client, conn)) => (client, conn), Err(e) => { error!("Error establishing DB Connection, {}", e); - return Err(()); + delay_for(Duration::new(5, 0)).await; + continue; } }; + client = Arc::new(new_client); + let new_client = client.clone(); + + actix::spawn(async move { + if let Err(e) = listen(&new_client).await { + error!("Error listening for updates, {}", e); + } + }); + let mut stream = poll_fn(move |cx| conn.poll_message(cx)).filter_map(|m| match m { Ok(AsyncMessage::Notification(n)) => { debug!("Handling Notification, {:?}", n); - ready(Some(Notify::Msg(n))) + ready(Some(n)) } Ok(AsyncMessage::Notice(e)) => { debug!("Handling Notice, {:?}", e); @@ -71,104 +85,12 @@ impl Actor for NotifyHandler { } }); - let (mut tx, rx) = mpsc::channel(256); - - Arbiter::spawn(async move { - debug!("Spawned stream handler"); - while let Some(n) = stream.next().await { - match tx.send(n).await { - Err(e) => error!("Error forwarding notification, {}", e), - _ => (), - }; - } - warn!("Stream handler ended"); - let _ = tx.send(Notify::Done).await; - }); - - Ok((client, rx)) - }; - - let fut = fut.into_actor(self).map(|res, actor, ctx| match res { - Ok((client, stream)) => { - Self::add_stream(stream, ctx); - let f = async move { - match crate::db::listen(&client).await { - Err(e) => { - error!("Error listening, {}", e); - Err(()) - } - Ok(_) => Ok(client), - } - }; - - ctx.wait(f.into_actor(actor).map(|res, actor, ctx| match res { - Ok(client) => { - actor.client = Some(client); - } - Err(_) => { - ctx.stop(); - } - })); + while let Some(n) = stream.next().await { + handle_notification(&state, n).await; } - Err(_) => { - ctx.stop(); - } - }); - ctx.wait(fut); - info!("Listener starting"); - } + drop(client); + warn!("Restarting listener task"); + } + }); } - -impl StreamHandler for NotifyHandler { - fn handle(&mut self, notify: Notify, ctx: &mut Self::Context) { - let notif = match notify { - Notify::Msg(notif) => notif, - Notify::Done => { - warn!("Stopping notify handler"); - ctx.stop(); - return; - } - }; - - let state = self.state.clone(); - - let fut = async move { - match notif.channel() { - "new_blocks" => { - info!("Caching block of {}", notif.payload()); - state.cache_block(notif.payload().to_owned()).await; - } - "new_whitelists" => { - info!("Caching whitelist of {}", notif.payload()); - state.cache_whitelist(notif.payload().to_owned()).await; - } - "new_listeners" => { - if let Ok(uri) = notif.payload().parse::() { - info!("Caching listener {}", uri); - state.cache_listener(uri).await; - } - } - "rm_blocks" => { - info!("Busting block cache for {}", notif.payload()); - state.bust_block(notif.payload()).await; - } - "rm_whitelists" => { - info!("Busting whitelist cache for {}", notif.payload()); - state.bust_whitelist(notif.payload()).await; - } - "rm_listeners" => { - if let Ok(uri) = notif.payload().parse::() { - info!("Busting listener cache for {}", uri); - state.bust_listener(&uri).await; - } - } - _ => (), - } - }; - - ctx.spawn(fut.into_actor(self)); - } -} - -impl Supervised for NotifyHandler {}