Add simple example (#9)

* Add simple example

* fix env_logger
This commit is contained in:
Ayrat Badykov 2021-07-31 09:47:53 +03:00 committed by GitHub
parent 824e32f17b
commit 93e7e57d9a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 91 additions and 11 deletions

View file

@ -0,0 +1,2 @@
/target
Cargo.lock

View file

@ -0,0 +1,11 @@
[package]
name = "simple_worker"
version = "0.1.0"
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
fang = { path = "../../" }
serde = { version = "1.0", features = ["derive"] }
dotenv = "0.15.0"

View file

@ -0,0 +1,3 @@
## Simple example
The job described in this example enqueues a new job during its execution saving thread name of the current worker to its metadata.

View file

@ -0,0 +1,44 @@
use fang::typetag;
use fang::Error;
use fang::PgConnection;
use fang::Queue;
use fang::Runnable;
use serde::Deserialize;
use serde::Serialize;
use std::thread;
use std::time::Duration;
#[derive(Serialize, Deserialize)]
pub struct MyJob {
pub number: u16,
pub current_thread_name: String,
}
impl MyJob {
pub fn new(number: u16) -> Self {
let handle = thread::current();
let current_thread_name = handle.name().unwrap().to_string();
Self {
number,
current_thread_name,
}
}
}
#[typetag::serde]
impl Runnable for MyJob {
fn run(&self, connection: &PgConnection) -> Result<(), Error> {
thread::sleep(Duration::from_secs(3));
let new_job = MyJob::new(self.number + 1);
Queue::push_task_query(connection, &new_job).unwrap();
Ok(())
}
fn task_type(&self) -> String {
"worker_pool_test".to_string()
}
}

View file

@ -0,0 +1,22 @@
use dotenv::dotenv;
use fang::Queue;
use fang::RetentionMode;
use fang::WorkerParams;
use fang::WorkerPool;
use simple_worker::MyJob;
fn main() {
dotenv().ok();
let mut worker_params = WorkerParams::new();
worker_params.set_retention_mode(RetentionMode::KeepAll);
WorkerPool::new_with_params(2, worker_params).start();
let queue = Queue::new();
queue.push_task(&MyJob::new(1)).unwrap();
queue.push_task(&MyJob::new(1000)).unwrap();
std::thread::park();
}

View file

@ -174,10 +174,10 @@ mod executor_tests {
use crate::queue::Queue; use crate::queue::Queue;
use crate::schema::FangTaskState; use crate::schema::FangTaskState;
use crate::typetag; use crate::typetag;
use crate::{Deserialize, Serialize};
use diesel::connection::Connection; use diesel::connection::Connection;
use diesel::pg::PgConnection; use diesel::pg::PgConnection;
use diesel::r2d2::{ConnectionManager, PooledConnection}; use diesel::r2d2::{ConnectionManager, PooledConnection};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct ExecutorJobTest { struct ExecutorJobTest {

View file

@ -1,26 +1,24 @@
#![allow(clippy::nonstandard_macro_braces)] #![allow(clippy::nonstandard_macro_braces)]
#[macro_use] #[macro_use]
pub extern crate diesel; extern crate diesel;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
mod schema;
pub mod executor; pub mod executor;
pub mod queue; pub mod queue;
pub mod scheduler; pub mod scheduler;
pub mod schema;
pub mod worker_pool; pub mod worker_pool;
pub use executor::*; pub use executor::*;
pub use queue::*; pub use queue::*;
pub use scheduler::*; pub use scheduler::*;
pub use schema::*;
pub use worker_pool::*; pub use worker_pool::*;
#[doc(hidden)] #[doc(hidden)]
pub use diesel::pg::PgConnection; pub use diesel::pg::PgConnection;
#[doc(hidden)] #[doc(hidden)]
pub use serde::{Deserialize, Serialize};
#[doc(hidden)]
pub use typetag; pub use typetag;

View file

@ -409,12 +409,12 @@ mod queue_tests {
use crate::schema::fang_tasks; use crate::schema::fang_tasks;
use crate::schema::FangTaskState; use crate::schema::FangTaskState;
use crate::typetag; use crate::typetag;
use crate::{Deserialize, Serialize};
use chrono::prelude::*; use chrono::prelude::*;
use chrono::{DateTime, Duration, Utc}; use chrono::{DateTime, Duration, Utc};
use diesel::connection::Connection; use diesel::connection::Connection;
use diesel::prelude::*; use diesel::prelude::*;
use diesel::result::Error; use diesel::result::Error;
use serde::{Deserialize, Serialize};
#[test] #[test]
fn insert_inserts_task() { fn insert_inserts_task() {

View file

@ -85,9 +85,9 @@ mod job_scheduler_tests {
use crate::queue::Task; use crate::queue::Task;
use crate::schema::fang_tasks; use crate::schema::fang_tasks;
use crate::typetag; use crate::typetag;
use crate::{Deserialize, Serialize};
use diesel::pg::PgConnection; use diesel::pg::PgConnection;
use diesel::prelude::*; use diesel::prelude::*;
use serde::{Deserialize, Serialize};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;

View file

@ -77,6 +77,8 @@ impl WorkerPool {
} }
pub fn start(&self) { pub fn start(&self) {
env_logger::init();
for idx in 1..self.number_of_workers + 1 { for idx in 1..self.number_of_workers + 1 {
let worker_type = self let worker_type = self
.worker_params .worker_params
@ -181,9 +183,9 @@ mod job_pool_tests {
use crate::queue::Task; use crate::queue::Task;
use crate::schema::fang_tasks; use crate::schema::fang_tasks;
use crate::typetag; use crate::typetag;
use crate::{Deserialize, Serialize};
use diesel::pg::PgConnection; use diesel::pg::PgConnection;
use diesel::prelude::*; use diesel::prelude::*;
use serde::{Deserialize, Serialize};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
@ -233,8 +235,6 @@ mod job_pool_tests {
#[test] #[test]
#[ignore] #[ignore]
fn tasks_are_split_between_two_threads() { fn tasks_are_split_between_two_threads() {
env_logger::init();
let queue = Queue::new(); let queue = Queue::new();
let mut worker_params = WorkerParams::new(); let mut worker_params = WorkerParams::new();