Clippy nits

This commit is contained in:
asonix 2020-04-25 17:12:43 -05:00
parent 57e6a06f00
commit 77999cf295
7 changed files with 19 additions and 20 deletions

View file

@ -2,7 +2,7 @@ use anyhow::Error;
use background_jobs::{create_server, Job, MaxRetries, WorkerConfig}; use background_jobs::{create_server, Job, MaxRetries, WorkerConfig};
use futures::future::{ok, Ready}; use futures::future::{ok, Ready};
const DEFAULT_QUEUE: &'static str = "default"; const DEFAULT_QUEUE: &str = "default";
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct MyState { pub struct MyState {

View file

@ -22,10 +22,9 @@ where
loop { loop {
interval.tick().await; interval.tick().await;
match spawner.queue(job.clone()) { if spawner.queue(job.clone()).is_err() {
Err(_) => error!("Failed to queue job"), error!("Failed to queue job");
_ => (), }
};
} }
}); });
} }

View file

@ -16,9 +16,11 @@ use std::{
}; };
use tokio::sync::Mutex; use tokio::sync::Mutex;
type WorkerQueue = VecDeque<Box<dyn Worker + Send>>;
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct ServerCache { pub(crate) struct ServerCache {
cache: Arc<Mutex<HashMap<String, VecDeque<Box<dyn Worker + Send>>>>>, cache: Arc<Mutex<HashMap<String, WorkerQueue>>>,
} }
/// The server Actor /// The server Actor
@ -144,7 +146,7 @@ impl ServerCache {
async fn push(&self, queue: String, worker: Box<dyn Worker + Send>) { async fn push(&self, queue: String, worker: Box<dyn Worker + Send>) {
let mut cache = self.cache.lock().await; let mut cache = self.cache.lock().await;
let entry = cache.entry(queue).or_insert(VecDeque::new()); let entry = cache.entry(queue).or_insert_with(VecDeque::new);
entry.push_back(worker); entry.push_back(worker);
} }

View file

@ -53,11 +53,7 @@ pub(crate) fn local_worker<State>(
let (tx, mut rx) = channel(16); let (tx, mut rx) = channel(16);
let handle = LocalWorkerHandle { let handle = LocalWorkerHandle { tx, id, queue };
tx: tx.clone(),
id,
queue: queue.clone(),
};
spawn(async move { spawn(async move {
debug!("Beginning worker loop for {}", id); debug!("Beginning worker loop for {}", id);

View file

@ -103,7 +103,7 @@ where
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
actix_rt::spawn(async move { actix_rt::spawn(async move {
if let Err(_) = tx.send(ActixJob::run(self, state).await) { if tx.send(ActixJob::run(self, state).await).is_err() {
error!("Job dropped"); error!("Job dropped");
} }
}); });

View file

@ -170,6 +170,12 @@ pub mod memory_storage {
} }
} }
impl Default for Storage {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait] #[async_trait::async_trait]
impl super::Storage for Storage { impl super::Storage for Storage {
type Error = Infallible; type Error = Infallible;
@ -192,7 +198,7 @@ pub mod memory_storage {
} }
async fn fetch_job(&self, id: Uuid) -> Result<Option<JobInfo>, Self::Error> { async fn fetch_job(&self, id: Uuid) -> Result<Option<JobInfo>, Self::Error> {
let j = self.inner.lock().await.jobs.get(&id).map(|j| j.clone()); let j = self.inner.lock().await.jobs.get(&id).cloned();
Ok(j) Ok(j)
} }

View file

@ -110,8 +110,7 @@ impl Storage for SledStorage {
) )
.filter_map(|id| job_tree.get(id).ok()) .filter_map(|id| job_tree.get(id).ok())
.filter_map(|opt| opt) .filter_map(|opt| opt)
.filter(|job| job.is_ready(now) && job.is_pending(now)) .find(|job| job.is_ready(now) && job.is_pending(now));
.next();
if let Some(ref job) = job { if let Some(ref job) = job {
queue_tree.remove(&job_key(job.id()))?; queue_tree.remove(&job_key(job.id()))?;
@ -170,10 +169,7 @@ impl Storage for SledStorage {
async fn get_stats(&self) -> Result<Stats> { async fn get_stats(&self) -> Result<Stats> {
let this = self.clone(); let this = self.clone();
Ok( Ok(run(move || Ok(this.stats.get("stats")?.unwrap_or_default()) as Result<Stats>).await?)
run(move || Ok(this.stats.get("stats")?.unwrap_or(Stats::default())) as Result<Stats>)
.await?,
)
} }
async fn update_stats<F>(&self, f: F) -> Result<()> async fn update_stats<F>(&self, f: F) -> Result<()>