diff --git a/Cargo.lock b/Cargo.lock index 737a980..005bd34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -470,7 +470,7 @@ dependencies = [ [[package]] name = "background-jobs" version = "0.8.0-alpha.0" -source = "git+https://git.asonix.dog/Aardwolf/background-jobs#799391811c795f0918e0df2b1a1b5fc44f55f89f" +source = "git+https://git.asonix.dog/Aardwolf/background-jobs#ca1c07366692c44fce6ae351a4a6762089660a20" dependencies = [ "background-jobs-actix", "background-jobs-core", @@ -479,7 +479,7 @@ dependencies = [ [[package]] name = "background-jobs-actix" version = "0.7.0-alpha.0" -source = "git+https://git.asonix.dog/Aardwolf/background-jobs#799391811c795f0918e0df2b1a1b5fc44f55f89f" +source = "git+https://git.asonix.dog/Aardwolf/background-jobs#ca1c07366692c44fce6ae351a4a6762089660a20" dependencies = [ "actix", "actix-rt", @@ -500,8 +500,9 @@ dependencies = [ [[package]] name = "background-jobs-core" version = "0.7.0" -source = "git+https://git.asonix.dog/Aardwolf/background-jobs#799391811c795f0918e0df2b1a1b5fc44f55f89f" +source = "git+https://git.asonix.dog/Aardwolf/background-jobs#ca1c07366692c44fce6ae351a4a6762089660a20" dependencies = [ + "actix", "anyhow", "async-trait", "chrono", @@ -510,6 +511,7 @@ dependencies = [ "serde 1.0.105", "serde_json", "thiserror", + "tokio", "uuid", ] @@ -662,9 +664,9 @@ checksum = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1" [[package]] name = "bytestring" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc267467f58ef6cc8874064c62a0423eb0d099362c8a23edd1c6d044f46eead4" +checksum = "fc7c05fa5172da78a62d9949d662d2ac89d4cc7355d7b49adee5163f1fb3f363" dependencies = [ "bytes", ] diff --git a/src/jobs/apub/announce.rs b/src/jobs/apub/announce.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/jobs/apub/mod.rs b/src/jobs/apub/mod.rs new file mode 100644 index 0000000..0706c10 --- /dev/null +++ b/src/jobs/apub/mod.rs @@ -0,0 +1 @@ +mod announce; diff --git a/src/jobs/deliver.rs b/src/jobs/deliver.rs index 6c0b834..2483f90 100644 --- a/src/jobs/deliver.rs +++ b/src/jobs/deliver.rs @@ -1,9 +1,8 @@ use crate::{error::MyError, jobs::JobState}; use activitystreams::primitives::XsdAnyUri; use anyhow::Error; -use background_jobs::{Backoff, Job, Processor}; +use background_jobs::{ActixJob, Backoff, Processor}; use std::{future::Future, pin::Pin}; -use tokio::sync::oneshot; #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] pub struct Deliver { @@ -21,30 +20,22 @@ impl Deliver { data: serde_json::to_value(data)?, }) } - - async fn perform(self, state: JobState) -> Result<(), Error> { - state.requests.deliver(self.to, &self.data).await?; - - Ok(()) - } } #[derive(Clone, Debug)] pub struct DeliverProcessor; -impl Job for Deliver { +impl ActixJob for Deliver { type State = JobState; type Processor = DeliverProcessor; - type Future = Pin> + Send>>; + type Future = Pin>>>; fn run(self, state: Self::State) -> Self::Future { - let (tx, rx) = oneshot::channel(); + Box::pin(async move { + state.requests.deliver(self.to, &self.data).await?; - actix::spawn(async move { - let _ = tx.send(self.perform(state).await); - }); - - Box::pin(async move { rx.await? }) + Ok(()) + }) } } diff --git a/src/jobs/deliver_many.rs b/src/jobs/deliver_many.rs index 995c8a3..af21e61 100644 --- a/src/jobs/deliver_many.rs +++ b/src/jobs/deliver_many.rs @@ -4,7 +4,7 @@ use crate::{ }; use activitystreams::primitives::XsdAnyUri; use anyhow::Error; -use background_jobs::{Job, Processor}; +use background_jobs::{ActixJob, Processor}; use futures::future::{ready, Ready}; #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] @@ -38,7 +38,7 @@ impl DeliverMany { #[derive(Clone, Debug)] pub struct DeliverManyProcessor; -impl Job for DeliverMany { +impl ActixJob for DeliverMany { type State = JobState; type Processor = DeliverManyProcessor; type Future = Ready>; diff --git a/src/jobs/instance.rs b/src/jobs/instance.rs index 9f86241..00bf1ee 100644 --- a/src/jobs/instance.rs +++ b/src/jobs/instance.rs @@ -1,10 +1,9 @@ use crate::{config::UrlKind, jobs::JobState}; use activitystreams::primitives::XsdAnyUri; use anyhow::Error; -use background_jobs::{Job, Processor}; +use background_jobs::{ActixJob, Processor}; use futures::join; use std::{future::Future, pin::Pin}; -use tokio::sync::oneshot; #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] pub struct QueryInstance { @@ -85,19 +84,13 @@ impl QueryInstance { #[derive(Clone, Debug)] pub struct InstanceProcessor; -impl Job for QueryInstance { +impl ActixJob for QueryInstance { type State = JobState; type Processor = InstanceProcessor; - type Future = Pin> + Send>>; + type Future = Pin>>>; fn run(self, state: Self::State) -> Self::Future { - let (tx, rx) = oneshot::channel(); - - actix::spawn(async move { - let _ = tx.send(self.perform(state).await); - }); - - Box::pin(async move { rx.await? }) + Box::pin(self.perform(state)) } } diff --git a/src/jobs/nodeinfo.rs b/src/jobs/nodeinfo.rs index ef138cf..f862a6d 100644 --- a/src/jobs/nodeinfo.rs +++ b/src/jobs/nodeinfo.rs @@ -1,9 +1,8 @@ use crate::jobs::JobState; use activitystreams::primitives::XsdAnyUri; use anyhow::Error; -use background_jobs::{Job, Processor}; +use background_jobs::{ActixJob, Processor}; use std::{future::Future, pin::Pin}; -use tokio::sync::oneshot; #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] pub struct QueryNodeinfo { @@ -56,19 +55,13 @@ impl QueryNodeinfo { #[derive(Clone, Debug)] pub struct NodeinfoProcessor; -impl Job for QueryNodeinfo { +impl ActixJob for QueryNodeinfo { type State = JobState; type Processor = NodeinfoProcessor; - type Future = Pin> + Send>>; + type Future = Pin>>>; fn run(self, state: Self::State) -> Self::Future { - let (tx, rx) = oneshot::channel(); - - actix::spawn(async move { - let _ = tx.send(self.perform(state).await); - }); - - Box::pin(async move { rx.await? }) + Box::pin(self.perform(state)) } } diff --git a/src/jobs/process_listeners.rs b/src/jobs/process_listeners.rs index 449ad01..0a6c951 100644 --- a/src/jobs/process_listeners.rs +++ b/src/jobs/process_listeners.rs @@ -1,8 +1,7 @@ use crate::jobs::{instance::QueryInstance, nodeinfo::QueryNodeinfo, JobState}; use anyhow::Error; -use background_jobs::{Job, Processor}; +use background_jobs::{ActixJob, Processor}; use std::{future::Future, pin::Pin}; -use tokio::sync::oneshot; #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] pub struct Listeners; @@ -23,19 +22,13 @@ impl Listeners { } } -impl Job for Listeners { +impl ActixJob for Listeners { type State = JobState; type Processor = ListenersProcessor; - type Future = Pin> + Send>>; + type Future = Pin>>>; fn run(self, state: Self::State) -> Self::Future { - let (tx, rx) = oneshot::channel(); - - actix::spawn(async move { - let _ = tx.send(self.perform(state).await); - }); - - Box::pin(async move { rx.await? }) + Box::pin(self.perform(state)) } }