Use ActixJob, save a few lines

This commit is contained in:
asonix 2020-03-30 10:45:44 -05:00
parent b8bc230403
commit 680ccc511c
8 changed files with 29 additions and 56 deletions

12
Cargo.lock generated
View file

@ -470,7 +470,7 @@ dependencies = [
[[package]] [[package]]
name = "background-jobs" name = "background-jobs"
version = "0.8.0-alpha.0" version = "0.8.0-alpha.0"
source = "git+https://git.asonix.dog/Aardwolf/background-jobs#799391811c795f0918e0df2b1a1b5fc44f55f89f" source = "git+https://git.asonix.dog/Aardwolf/background-jobs#ca1c07366692c44fce6ae351a4a6762089660a20"
dependencies = [ dependencies = [
"background-jobs-actix", "background-jobs-actix",
"background-jobs-core", "background-jobs-core",
@ -479,7 +479,7 @@ dependencies = [
[[package]] [[package]]
name = "background-jobs-actix" name = "background-jobs-actix"
version = "0.7.0-alpha.0" version = "0.7.0-alpha.0"
source = "git+https://git.asonix.dog/Aardwolf/background-jobs#799391811c795f0918e0df2b1a1b5fc44f55f89f" source = "git+https://git.asonix.dog/Aardwolf/background-jobs#ca1c07366692c44fce6ae351a4a6762089660a20"
dependencies = [ dependencies = [
"actix", "actix",
"actix-rt", "actix-rt",
@ -500,8 +500,9 @@ dependencies = [
[[package]] [[package]]
name = "background-jobs-core" name = "background-jobs-core"
version = "0.7.0" version = "0.7.0"
source = "git+https://git.asonix.dog/Aardwolf/background-jobs#799391811c795f0918e0df2b1a1b5fc44f55f89f" source = "git+https://git.asonix.dog/Aardwolf/background-jobs#ca1c07366692c44fce6ae351a4a6762089660a20"
dependencies = [ dependencies = [
"actix",
"anyhow", "anyhow",
"async-trait", "async-trait",
"chrono", "chrono",
@ -510,6 +511,7 @@ dependencies = [
"serde 1.0.105", "serde 1.0.105",
"serde_json", "serde_json",
"thiserror", "thiserror",
"tokio",
"uuid", "uuid",
] ]
@ -662,9 +664,9 @@ checksum = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1"
[[package]] [[package]]
name = "bytestring" name = "bytestring"
version = "0.1.4" version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc267467f58ef6cc8874064c62a0423eb0d099362c8a23edd1c6d044f46eead4" checksum = "fc7c05fa5172da78a62d9949d662d2ac89d4cc7355d7b49adee5163f1fb3f363"
dependencies = [ dependencies = [
"bytes", "bytes",
] ]

View file

1
src/jobs/apub/mod.rs Normal file
View file

@ -0,0 +1 @@
mod announce;

View file

@ -1,9 +1,8 @@
use crate::{error::MyError, jobs::JobState}; use crate::{error::MyError, jobs::JobState};
use activitystreams::primitives::XsdAnyUri; use activitystreams::primitives::XsdAnyUri;
use anyhow::Error; use anyhow::Error;
use background_jobs::{Backoff, Job, Processor}; use background_jobs::{ActixJob, Backoff, Processor};
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
use tokio::sync::oneshot;
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct Deliver { pub struct Deliver {
@ -21,30 +20,22 @@ impl Deliver {
data: serde_json::to_value(data)?, data: serde_json::to_value(data)?,
}) })
} }
async fn perform(self, state: JobState) -> Result<(), Error> {
state.requests.deliver(self.to, &self.data).await?;
Ok(())
}
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct DeliverProcessor; pub struct DeliverProcessor;
impl Job for Deliver { impl ActixJob for Deliver {
type State = JobState; type State = JobState;
type Processor = DeliverProcessor; type Processor = DeliverProcessor;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>; type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
let (tx, rx) = oneshot::channel(); Box::pin(async move {
state.requests.deliver(self.to, &self.data).await?;
actix::spawn(async move { Ok(())
let _ = tx.send(self.perform(state).await); })
});
Box::pin(async move { rx.await? })
} }
} }

View file

@ -4,7 +4,7 @@ use crate::{
}; };
use activitystreams::primitives::XsdAnyUri; use activitystreams::primitives::XsdAnyUri;
use anyhow::Error; use anyhow::Error;
use background_jobs::{Job, Processor}; use background_jobs::{ActixJob, Processor};
use futures::future::{ready, Ready}; use futures::future::{ready, Ready};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
@ -38,7 +38,7 @@ impl DeliverMany {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct DeliverManyProcessor; pub struct DeliverManyProcessor;
impl Job for DeliverMany { impl ActixJob for DeliverMany {
type State = JobState; type State = JobState;
type Processor = DeliverManyProcessor; type Processor = DeliverManyProcessor;
type Future = Ready<Result<(), Error>>; type Future = Ready<Result<(), Error>>;

View file

@ -1,10 +1,9 @@
use crate::{config::UrlKind, jobs::JobState}; use crate::{config::UrlKind, jobs::JobState};
use activitystreams::primitives::XsdAnyUri; use activitystreams::primitives::XsdAnyUri;
use anyhow::Error; use anyhow::Error;
use background_jobs::{Job, Processor}; use background_jobs::{ActixJob, Processor};
use futures::join; use futures::join;
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
use tokio::sync::oneshot;
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct QueryInstance { pub struct QueryInstance {
@ -85,19 +84,13 @@ impl QueryInstance {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct InstanceProcessor; pub struct InstanceProcessor;
impl Job for QueryInstance { impl ActixJob for QueryInstance {
type State = JobState; type State = JobState;
type Processor = InstanceProcessor; type Processor = InstanceProcessor;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>; type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
let (tx, rx) = oneshot::channel(); Box::pin(self.perform(state))
actix::spawn(async move {
let _ = tx.send(self.perform(state).await);
});
Box::pin(async move { rx.await? })
} }
} }

View file

@ -1,9 +1,8 @@
use crate::jobs::JobState; use crate::jobs::JobState;
use activitystreams::primitives::XsdAnyUri; use activitystreams::primitives::XsdAnyUri;
use anyhow::Error; use anyhow::Error;
use background_jobs::{Job, Processor}; use background_jobs::{ActixJob, Processor};
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
use tokio::sync::oneshot;
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct QueryNodeinfo { pub struct QueryNodeinfo {
@ -56,19 +55,13 @@ impl QueryNodeinfo {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct NodeinfoProcessor; pub struct NodeinfoProcessor;
impl Job for QueryNodeinfo { impl ActixJob for QueryNodeinfo {
type State = JobState; type State = JobState;
type Processor = NodeinfoProcessor; type Processor = NodeinfoProcessor;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>; type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
let (tx, rx) = oneshot::channel(); Box::pin(self.perform(state))
actix::spawn(async move {
let _ = tx.send(self.perform(state).await);
});
Box::pin(async move { rx.await? })
} }
} }

View file

@ -1,8 +1,7 @@
use crate::jobs::{instance::QueryInstance, nodeinfo::QueryNodeinfo, JobState}; use crate::jobs::{instance::QueryInstance, nodeinfo::QueryNodeinfo, JobState};
use anyhow::Error; use anyhow::Error;
use background_jobs::{Job, Processor}; use background_jobs::{ActixJob, Processor};
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
use tokio::sync::oneshot;
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct Listeners; pub struct Listeners;
@ -23,19 +22,13 @@ impl Listeners {
} }
} }
impl Job for Listeners { impl ActixJob for Listeners {
type State = JobState; type State = JobState;
type Processor = ListenersProcessor; type Processor = ListenersProcessor;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>; type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
let (tx, rx) = oneshot::channel(); Box::pin(self.perform(state))
actix::spawn(async move {
let _ = tx.send(self.perform(state).await);
});
Box::pin(async move { rx.await? })
} }
} }