Remove dependency on Actix Actors

This commit is contained in:
asonix 2020-04-23 13:16:56 -05:00
parent e52348a9ec
commit ba1a9f422f
6 changed files with 19 additions and 13 deletions

View file

@ -7,7 +7,6 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
actix = "0.10.0-alpha.2"
actix-rt = "1.0.0" actix-rt = "1.0.0"
anyhow = "1.0" anyhow = "1.0"
async-trait = "0.1.24" async-trait = "0.1.24"

View file

@ -1,7 +1,7 @@
[package] [package]
name = "background-jobs-actix" name = "background-jobs-actix"
description = "in-process jobs processor based on Actix" description = "in-process jobs processor based on Actix"
version = "0.8.0-alpha.1" version = "0.8.0-alpha.2"
license-file = "../LICENSE" license-file = "../LICENSE"
authors = ["asonix <asonix@asonix.dog>"] authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/Aardwolf/background-jobs" repository = "https://git.asonix.dog/Aardwolf/background-jobs"
@ -10,8 +10,7 @@ readme = "../README.md"
edition = "2018" edition = "2018"
[dependencies] [dependencies]
actix = "0.10.0-alpha.2" actix-rt = "1.1.0"
actix-rt = "1.0.0"
anyhow = "1.0" anyhow = "1.0"
async-trait = "0.1.24" async-trait = "0.1.24"
background-jobs-core = { version = "0.8.0-alpha.0", path = "../jobs-core", features = ["with-actix"] } background-jobs-core = { version = "0.8.0-alpha.0", path = "../jobs-core", features = ["with-actix"] }

View file

@ -1,6 +1,10 @@
use crate::{Job, QueueHandle}; use crate::{Job, QueueHandle};
use actix::clock::{interval_at, Duration, Instant}; use actix_rt::{
spawn,
time::{interval_at, Instant},
};
use log::error; use log::error;
use std::time::Duration;
/// A type used to schedule recurring jobs. /// A type used to schedule recurring jobs.
/// ///
@ -12,7 +16,7 @@ pub(crate) fn every<J>(spawner: QueueHandle, duration: Duration, job: J)
where where
J: Job + Clone, J: Job + Clone,
{ {
actix::spawn(async move { spawn(async move {
let mut interval = interval_at(Instant::now(), duration); let mut interval = interval_at(Instant::now(), duration);
loop { loop {

View file

@ -12,7 +12,6 @@
//! //!
//! ### Example //! ### Example
//! ```rust,ignore //! ```rust,ignore
//! use actix::System;
//! use anyhow::Error; //! use anyhow::Error;
//! use background_jobs::{create_server, Backoff, Job, MaxRetries, WorkerConfig}; //! use background_jobs::{create_server, Backoff, Job, MaxRetries, WorkerConfig};
//! use futures::future::{ok, Ready}; //! use futures::future::{ok, Ready};
@ -117,7 +116,7 @@
//! } //! }
//! ``` //! ```
use actix::Arbiter; use actix_rt::{spawn, Arbiter};
use anyhow::Error; use anyhow::Error;
use background_jobs_core::{new_job, new_scheduled_job, Job, ProcessorMap, Stats, Storage}; use background_jobs_core::{new_job, new_scheduled_job, Job, ProcessorMap, Stats, Storage};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
@ -253,7 +252,7 @@ impl QueueHandle {
{ {
let job = new_job(job)?; let job = new_job(job)?;
let server = self.inner.clone(); let server = self.inner.clone();
actix::spawn(async move { spawn(async move {
if let Err(e) = server.new_job(job).await { if let Err(e) = server.new_job(job).await {
error!("Error creating job, {}", e); error!("Error creating job, {}", e);
} }
@ -271,7 +270,7 @@ impl QueueHandle {
{ {
let job = new_scheduled_job(job, after)?; let job = new_scheduled_job(job, after)?;
let server = self.inner.clone(); let server = self.inner.clone();
actix::spawn(async move { spawn(async move {
if let Err(e) = server.new_job(job).await { if let Err(e) = server.new_job(job).await {
error!("Error creating job, {}", e); error!("Error creating job, {}", e);
} }

View file

@ -2,13 +2,17 @@ use crate::{
storage::{ActixStorage, StorageWrapper}, storage::{ActixStorage, StorageWrapper},
worker::Worker, worker::Worker,
}; };
use actix::clock::{interval_at, Duration, Instant}; use actix_rt::{
spawn,
time::{interval_at, Instant},
};
use anyhow::Error; use anyhow::Error;
use background_jobs_core::{NewJobInfo, ReturnJobInfo, Stats, Storage}; use background_jobs_core::{NewJobInfo, ReturnJobInfo, Stats, Storage};
use log::{error, trace}; use log::{error, trace};
use std::{ use std::{
collections::{HashMap, VecDeque}, collections::{HashMap, VecDeque},
sync::Arc, sync::Arc,
time::Duration,
}; };
use tokio::sync::Mutex; use tokio::sync::Mutex;
@ -39,7 +43,7 @@ impl Server {
}; };
let server2 = server.clone(); let server2 = server.clone();
actix::spawn(async move { spawn(async move {
let mut interval = interval_at(Instant::now(), Duration::from_secs(1)); let mut interval = interval_at(Instant::now(), Duration::from_secs(1));
loop { loop {

View file

@ -1,4 +1,5 @@
use crate::Server; use crate::Server;
use actix_rt::spawn;
use background_jobs_core::{CachedProcessorMap, JobInfo}; use background_jobs_core::{CachedProcessorMap, JobInfo};
use log::{debug, error, warn}; use log::{debug, error, warn};
use tokio::sync::mpsc::{channel, Sender}; use tokio::sync::mpsc::{channel, Sender};
@ -58,7 +59,7 @@ pub(crate) fn local_worker<State>(
queue: queue.clone(), queue: queue.clone(),
}; };
actix::spawn(async move { spawn(async move {
debug!("Beginning worker loop for {}", id); debug!("Beginning worker loop for {}", id);
if let Err(e) = server.request_job(Box::new(handle.clone())).await { if let Err(e) = server.request_job(Box::new(handle.clone())).await {
error!("Couldn't request first job, bailing, {}", e); error!("Couldn't request first job, bailing, {}", e);