From 77999cf2954a1adf1492166413eef86704ece9ee Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 25 Apr 2020 17:12:43 -0500 Subject: [PATCH] Clippy nits --- examples/actix-example/src/main.rs | 2 +- jobs-actix/src/every.rs | 7 +++---- jobs-actix/src/server.rs | 6 ++++-- jobs-actix/src/worker.rs | 6 +----- jobs-core/src/actix_job.rs | 2 +- jobs-core/src/storage.rs | 8 +++++++- jobs-sled/src/lib.rs | 8 ++------ 7 files changed, 19 insertions(+), 20 deletions(-) diff --git a/examples/actix-example/src/main.rs b/examples/actix-example/src/main.rs index 852fc70..d2b542b 100644 --- a/examples/actix-example/src/main.rs +++ b/examples/actix-example/src/main.rs @@ -2,7 +2,7 @@ use anyhow::Error; use background_jobs::{create_server, Job, MaxRetries, WorkerConfig}; use futures::future::{ok, Ready}; -const DEFAULT_QUEUE: &'static str = "default"; +const DEFAULT_QUEUE: &str = "default"; #[derive(Clone, Debug)] pub struct MyState { diff --git a/jobs-actix/src/every.rs b/jobs-actix/src/every.rs index 0633a93..b96fe91 100644 --- a/jobs-actix/src/every.rs +++ b/jobs-actix/src/every.rs @@ -22,10 +22,9 @@ where loop { interval.tick().await; - match spawner.queue(job.clone()) { - Err(_) => error!("Failed to queue job"), - _ => (), - }; + if spawner.queue(job.clone()).is_err() { + error!("Failed to queue job"); + } } }); } diff --git a/jobs-actix/src/server.rs b/jobs-actix/src/server.rs index aad478f..31f53de 100644 --- a/jobs-actix/src/server.rs +++ b/jobs-actix/src/server.rs @@ -16,9 +16,11 @@ use std::{ }; use tokio::sync::Mutex; +type WorkerQueue = VecDeque>; + #[derive(Clone)] pub(crate) struct ServerCache { - cache: Arc>>>>, + cache: Arc>>, } /// The server Actor @@ -144,7 +146,7 @@ impl ServerCache { async fn push(&self, queue: String, worker: Box) { let mut cache = self.cache.lock().await; - let entry = cache.entry(queue).or_insert(VecDeque::new()); + let entry = cache.entry(queue).or_insert_with(VecDeque::new); entry.push_back(worker); } diff --git a/jobs-actix/src/worker.rs b/jobs-actix/src/worker.rs index d731816..2ef7ca2 100644 --- a/jobs-actix/src/worker.rs +++ b/jobs-actix/src/worker.rs @@ -53,11 +53,7 @@ pub(crate) fn local_worker( let (tx, mut rx) = channel(16); - let handle = LocalWorkerHandle { - tx: tx.clone(), - id, - queue: queue.clone(), - }; + let handle = LocalWorkerHandle { tx, id, queue }; spawn(async move { debug!("Beginning worker loop for {}", id); diff --git a/jobs-core/src/actix_job.rs b/jobs-core/src/actix_job.rs index 5887cfa..4fff4d3 100644 --- a/jobs-core/src/actix_job.rs +++ b/jobs-core/src/actix_job.rs @@ -103,7 +103,7 @@ where let (tx, rx) = oneshot::channel(); actix_rt::spawn(async move { - if let Err(_) = tx.send(ActixJob::run(self, state).await) { + if tx.send(ActixJob::run(self, state).await).is_err() { error!("Job dropped"); } }); diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index 62eebdc..2c3f557 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -170,6 +170,12 @@ pub mod memory_storage { } } + impl Default for Storage { + fn default() -> Self { + Self::new() + } + } + #[async_trait::async_trait] impl super::Storage for Storage { type Error = Infallible; @@ -192,7 +198,7 @@ pub mod memory_storage { } async fn fetch_job(&self, id: Uuid) -> Result, Self::Error> { - let j = self.inner.lock().await.jobs.get(&id).map(|j| j.clone()); + let j = self.inner.lock().await.jobs.get(&id).cloned(); Ok(j) } diff --git a/jobs-sled/src/lib.rs b/jobs-sled/src/lib.rs index 5c07f34..e371fe1 100644 --- a/jobs-sled/src/lib.rs +++ b/jobs-sled/src/lib.rs @@ -110,8 +110,7 @@ impl Storage for SledStorage { ) .filter_map(|id| job_tree.get(id).ok()) .filter_map(|opt| opt) - .filter(|job| job.is_ready(now) && job.is_pending(now)) - .next(); + .find(|job| job.is_ready(now) && job.is_pending(now)); if let Some(ref job) = job { queue_tree.remove(&job_key(job.id()))?; @@ -170,10 +169,7 @@ impl Storage for SledStorage { async fn get_stats(&self) -> Result { let this = self.clone(); - Ok( - run(move || Ok(this.stats.get("stats")?.unwrap_or(Stats::default())) as Result) - .await?, - ) + Ok(run(move || Ok(this.stats.get("stats")?.unwrap_or_default()) as Result).await?) } async fn update_stats(&self, f: F) -> Result<()>