From 93e7e57d9a31e8da3880dcc288030c62cd00c665 Mon Sep 17 00:00:00 2001 From: Ayrat Badykov Date: Sat, 31 Jul 2021 09:47:53 +0300 Subject: [PATCH] Add simple example (#9) * Add simple example * fix env_logger --- fang_examples/simple_worker/.gitignore | 2 ++ fang_examples/simple_worker/Cargo.toml | 11 +++++++ fang_examples/simple_worker/README.md | 3 ++ fang_examples/simple_worker/src/lib.rs | 44 +++++++++++++++++++++++++ fang_examples/simple_worker/src/main.rs | 22 +++++++++++++ src/executor.rs | 2 +- src/lib.rs | 8 ++--- src/queue.rs | 2 +- src/scheduler.rs | 2 +- src/worker_pool.rs | 6 ++-- 10 files changed, 91 insertions(+), 11 deletions(-) create mode 100644 fang_examples/simple_worker/.gitignore create mode 100644 fang_examples/simple_worker/Cargo.toml create mode 100644 fang_examples/simple_worker/README.md create mode 100644 fang_examples/simple_worker/src/lib.rs create mode 100644 fang_examples/simple_worker/src/main.rs diff --git a/fang_examples/simple_worker/.gitignore b/fang_examples/simple_worker/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/fang_examples/simple_worker/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/fang_examples/simple_worker/Cargo.toml b/fang_examples/simple_worker/Cargo.toml new file mode 100644 index 0000000..3aa5eb1 --- /dev/null +++ b/fang_examples/simple_worker/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "simple_worker" +version = "0.1.0" +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +fang = { path = "../../" } +serde = { version = "1.0", features = ["derive"] } +dotenv = "0.15.0" diff --git a/fang_examples/simple_worker/README.md b/fang_examples/simple_worker/README.md new file mode 100644 index 0000000..f1c3f1b --- /dev/null +++ b/fang_examples/simple_worker/README.md @@ -0,0 +1,3 @@ +## Simple example + +The job described in this example enqueues a new job during its execution saving thread name of the current worker to its metadata. diff --git a/fang_examples/simple_worker/src/lib.rs b/fang_examples/simple_worker/src/lib.rs new file mode 100644 index 0000000..e4554a9 --- /dev/null +++ b/fang_examples/simple_worker/src/lib.rs @@ -0,0 +1,44 @@ +use fang::typetag; +use fang::Error; +use fang::PgConnection; +use fang::Queue; +use fang::Runnable; +use serde::Deserialize; +use serde::Serialize; +use std::thread; +use std::time::Duration; + +#[derive(Serialize, Deserialize)] +pub struct MyJob { + pub number: u16, + pub current_thread_name: String, +} + +impl MyJob { + pub fn new(number: u16) -> Self { + let handle = thread::current(); + let current_thread_name = handle.name().unwrap().to_string(); + + Self { + number, + current_thread_name, + } + } +} + +#[typetag::serde] +impl Runnable for MyJob { + fn run(&self, connection: &PgConnection) -> Result<(), Error> { + thread::sleep(Duration::from_secs(3)); + + let new_job = MyJob::new(self.number + 1); + + Queue::push_task_query(connection, &new_job).unwrap(); + + Ok(()) + } + + fn task_type(&self) -> String { + "worker_pool_test".to_string() + } +} diff --git a/fang_examples/simple_worker/src/main.rs b/fang_examples/simple_worker/src/main.rs new file mode 100644 index 0000000..b4160f5 --- /dev/null +++ b/fang_examples/simple_worker/src/main.rs @@ -0,0 +1,22 @@ +use dotenv::dotenv; +use fang::Queue; +use fang::RetentionMode; +use fang::WorkerParams; +use fang::WorkerPool; +use simple_worker::MyJob; + +fn main() { + dotenv().ok(); + + let mut worker_params = WorkerParams::new(); + worker_params.set_retention_mode(RetentionMode::KeepAll); + + WorkerPool::new_with_params(2, worker_params).start(); + + let queue = Queue::new(); + + queue.push_task(&MyJob::new(1)).unwrap(); + queue.push_task(&MyJob::new(1000)).unwrap(); + + std::thread::park(); +} diff --git a/src/executor.rs b/src/executor.rs index 8782c4b..7c57aad 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -174,10 +174,10 @@ mod executor_tests { use crate::queue::Queue; use crate::schema::FangTaskState; use crate::typetag; - use crate::{Deserialize, Serialize}; use diesel::connection::Connection; use diesel::pg::PgConnection; use diesel::r2d2::{ConnectionManager, PooledConnection}; + use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize)] struct ExecutorJobTest { diff --git a/src/lib.rs b/src/lib.rs index d34e39f..eb75f32 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,26 +1,24 @@ #![allow(clippy::nonstandard_macro_braces)] #[macro_use] -pub extern crate diesel; +extern crate diesel; #[macro_use] extern crate log; -mod schema; - pub mod executor; pub mod queue; pub mod scheduler; +pub mod schema; pub mod worker_pool; pub use executor::*; pub use queue::*; pub use scheduler::*; +pub use schema::*; pub use worker_pool::*; #[doc(hidden)] pub use diesel::pg::PgConnection; #[doc(hidden)] -pub use serde::{Deserialize, Serialize}; -#[doc(hidden)] pub use typetag; diff --git a/src/queue.rs b/src/queue.rs index 2594725..c5afd77 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -409,12 +409,12 @@ mod queue_tests { use crate::schema::fang_tasks; use crate::schema::FangTaskState; use crate::typetag; - use crate::{Deserialize, Serialize}; use chrono::prelude::*; use chrono::{DateTime, Duration, Utc}; use diesel::connection::Connection; use diesel::prelude::*; use diesel::result::Error; + use serde::{Deserialize, Serialize}; #[test] fn insert_inserts_task() { diff --git a/src/scheduler.rs b/src/scheduler.rs index e2db4f4..70224ae 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -85,9 +85,9 @@ mod job_scheduler_tests { use crate::queue::Task; use crate::schema::fang_tasks; use crate::typetag; - use crate::{Deserialize, Serialize}; use diesel::pg::PgConnection; use diesel::prelude::*; + use serde::{Deserialize, Serialize}; use std::thread; use std::time::Duration; diff --git a/src/worker_pool.rs b/src/worker_pool.rs index 6bd73a8..4623b4e 100644 --- a/src/worker_pool.rs +++ b/src/worker_pool.rs @@ -77,6 +77,8 @@ impl WorkerPool { } pub fn start(&self) { + env_logger::init(); + for idx in 1..self.number_of_workers + 1 { let worker_type = self .worker_params @@ -181,9 +183,9 @@ mod job_pool_tests { use crate::queue::Task; use crate::schema::fang_tasks; use crate::typetag; - use crate::{Deserialize, Serialize}; use diesel::pg::PgConnection; use diesel::prelude::*; + use serde::{Deserialize, Serialize}; use std::thread; use std::time::Duration; @@ -233,8 +235,6 @@ mod job_pool_tests { #[test] #[ignore] fn tasks_are_split_between_two_threads() { - env_logger::init(); - let queue = Queue::new(); let mut worker_params = WorkerParams::new();