Max number of conns (#54)
* Max number of conns * fixing builder , connect and example * fix comments * fix comments * Update src/asynk/async_queue.rs Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>
This commit is contained in:
parent
36393676e7
commit
549f5a1c4b
3 changed files with 58 additions and 30 deletions
|
@ -11,16 +11,19 @@ async fn main() {
|
||||||
env_logger::init();
|
env_logger::init();
|
||||||
|
|
||||||
log::info!("Starting...");
|
log::info!("Starting...");
|
||||||
|
let max_pool_size: u32 = 2;
|
||||||
|
let mut queue = AsyncQueue::builder()
|
||||||
|
.uri("postgres://postgres:postgres@localhost/fang")
|
||||||
|
.max_pool_size(max_pool_size)
|
||||||
|
.duplicated_tasks(true)
|
||||||
|
.build();
|
||||||
|
|
||||||
let mut queue = AsyncQueue::connect("postgres://postgres:postgres@localhost/fang", NoTls, true)
|
queue.connect(NoTls).await.unwrap();
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
log::info!("Queue connected...");
|
log::info!("Queue connected...");
|
||||||
|
|
||||||
let mut pool = AsyncWorkerPool::builder()
|
let mut pool = AsyncWorkerPool::builder()
|
||||||
|
.number_of_workers(max_pool_size)
|
||||||
.queue(queue.clone())
|
.queue(queue.clone())
|
||||||
.number_of_workers(2 as u16)
|
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
log::info!("Pool created ...");
|
log::info!("Pool created ...");
|
||||||
|
|
|
@ -100,6 +100,10 @@ pub enum AsyncQueueError {
|
||||||
SerdeError(#[from] serde_json::Error),
|
SerdeError(#[from] serde_json::Error),
|
||||||
#[error("returned invalid result (expected {expected:?}, found {found:?})")]
|
#[error("returned invalid result (expected {expected:?}, found {found:?})")]
|
||||||
ResultError { expected: u64, found: u64 },
|
ResultError { expected: u64, found: u64 },
|
||||||
|
#[error(
|
||||||
|
"AsyncQueue is not connected :( , call connect() method first and then perform operations"
|
||||||
|
)]
|
||||||
|
NotConnectedError,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<AsyncQueueError> for FangError {
|
impl From<AsyncQueueError> for FangError {
|
||||||
|
@ -159,10 +163,16 @@ where
|
||||||
<Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
|
<Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
|
||||||
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
|
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
|
||||||
{
|
{
|
||||||
|
#[builder(default=None, setter(skip))]
|
||||||
|
pool: Option<Pool<PostgresConnectionManager<Tls>>>,
|
||||||
#[builder(setter(into))]
|
#[builder(setter(into))]
|
||||||
pool: Pool<PostgresConnectionManager<Tls>>,
|
uri: String,
|
||||||
|
#[builder(setter(into))]
|
||||||
|
max_pool_size: u32,
|
||||||
#[builder(default = false, setter(into))]
|
#[builder(default = false, setter(into))]
|
||||||
duplicated_tasks: bool,
|
duplicated_tasks: bool,
|
||||||
|
#[builder(default = false, setter(skip))]
|
||||||
|
connected: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
@ -330,20 +340,25 @@ where
|
||||||
<Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
|
<Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
|
||||||
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
|
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
|
||||||
{
|
{
|
||||||
pub async fn connect(
|
pub fn check_if_connection(&self) -> Result<(), AsyncQueueError> {
|
||||||
uri: impl ToString,
|
if self.connected {
|
||||||
tls: Tls,
|
Ok(())
|
||||||
duplicated_tasks: bool,
|
} else {
|
||||||
) -> Result<Self, AsyncQueueError> {
|
Err(AsyncQueueError::NotConnectedError)
|
||||||
let manager = PostgresConnectionManager::new_from_stringlike(uri, tls)?;
|
}
|
||||||
let pool = Pool::builder().build(manager).await?;
|
|
||||||
|
|
||||||
Ok(Self {
|
|
||||||
pool,
|
|
||||||
duplicated_tasks,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
pub async fn connect(&mut self, tls: Tls) -> Result<(), AsyncQueueError> {
|
||||||
|
let manager = PostgresConnectionManager::new_from_stringlike(self.uri.clone(), tls)?;
|
||||||
|
|
||||||
|
let pool = Pool::builder()
|
||||||
|
.max_size(self.max_pool_size)
|
||||||
|
.build(manager)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
self.pool = Some(pool);
|
||||||
|
self.connected = true;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
async fn remove_all_tasks_query(
|
async fn remove_all_tasks_query(
|
||||||
transaction: &mut Transaction<'_>,
|
transaction: &mut Transaction<'_>,
|
||||||
) -> Result<u64, AsyncQueueError> {
|
) -> Result<u64, AsyncQueueError> {
|
||||||
|
@ -599,7 +614,8 @@ where
|
||||||
&mut self,
|
&mut self,
|
||||||
task_type: Option<String>,
|
task_type: Option<String>,
|
||||||
) -> Result<Option<Task>, AsyncQueueError> {
|
) -> Result<Option<Task>, AsyncQueueError> {
|
||||||
let mut connection = self.pool.get().await?;
|
self.check_if_connection()?;
|
||||||
|
let mut connection = self.pool.as_ref().unwrap().get().await?;
|
||||||
let mut transaction = connection.transaction().await?;
|
let mut transaction = connection.transaction().await?;
|
||||||
|
|
||||||
let task = Self::fetch_and_touch_task_query(&mut transaction, task_type).await?;
|
let task = Self::fetch_and_touch_task_query(&mut transaction, task_type).await?;
|
||||||
|
@ -610,7 +626,8 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError> {
|
async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError> {
|
||||||
let mut connection = self.pool.get().await?;
|
self.check_if_connection()?;
|
||||||
|
let mut connection = self.pool.as_ref().unwrap().get().await?;
|
||||||
let mut transaction = connection.transaction().await?;
|
let mut transaction = connection.transaction().await?;
|
||||||
|
|
||||||
let metadata = serde_json::to_value(task)?;
|
let metadata = serde_json::to_value(task)?;
|
||||||
|
@ -633,7 +650,8 @@ where
|
||||||
timestamp: DateTime<Utc>,
|
timestamp: DateTime<Utc>,
|
||||||
period: i32,
|
period: i32,
|
||||||
) -> Result<PeriodicTask, AsyncQueueError> {
|
) -> Result<PeriodicTask, AsyncQueueError> {
|
||||||
let mut connection = self.pool.get().await?;
|
self.check_if_connection()?;
|
||||||
|
let mut connection = self.pool.as_ref().unwrap().get().await?;
|
||||||
let mut transaction = connection.transaction().await?;
|
let mut transaction = connection.transaction().await?;
|
||||||
|
|
||||||
let metadata = serde_json::to_value(task)?;
|
let metadata = serde_json::to_value(task)?;
|
||||||
|
@ -650,7 +668,8 @@ where
|
||||||
&mut self,
|
&mut self,
|
||||||
periodic_task: PeriodicTask,
|
periodic_task: PeriodicTask,
|
||||||
) -> Result<PeriodicTask, AsyncQueueError> {
|
) -> Result<PeriodicTask, AsyncQueueError> {
|
||||||
let mut connection = self.pool.get().await?;
|
self.check_if_connection()?;
|
||||||
|
let mut connection = self.pool.as_ref().unwrap().get().await?;
|
||||||
let mut transaction = connection.transaction().await?;
|
let mut transaction = connection.transaction().await?;
|
||||||
|
|
||||||
let periodic_task = Self::schedule_next_task_query(&mut transaction, periodic_task).await?;
|
let periodic_task = Self::schedule_next_task_query(&mut transaction, periodic_task).await?;
|
||||||
|
@ -664,7 +683,8 @@ where
|
||||||
&mut self,
|
&mut self,
|
||||||
error_margin_seconds: i64,
|
error_margin_seconds: i64,
|
||||||
) -> Result<Option<Vec<PeriodicTask>>, AsyncQueueError> {
|
) -> Result<Option<Vec<PeriodicTask>>, AsyncQueueError> {
|
||||||
let mut connection = self.pool.get().await?;
|
self.check_if_connection()?;
|
||||||
|
let mut connection = self.pool.as_ref().unwrap().get().await?;
|
||||||
let mut transaction = connection.transaction().await?;
|
let mut transaction = connection.transaction().await?;
|
||||||
|
|
||||||
let periodic_task =
|
let periodic_task =
|
||||||
|
@ -676,7 +696,8 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError> {
|
async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError> {
|
||||||
let mut connection = self.pool.get().await?;
|
self.check_if_connection()?;
|
||||||
|
let mut connection = self.pool.as_ref().unwrap().get().await?;
|
||||||
let mut transaction = connection.transaction().await?;
|
let mut transaction = connection.transaction().await?;
|
||||||
|
|
||||||
let result = Self::remove_all_tasks_query(&mut transaction).await?;
|
let result = Self::remove_all_tasks_query(&mut transaction).await?;
|
||||||
|
@ -687,7 +708,8 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn remove_task(&mut self, task: Task) -> Result<u64, AsyncQueueError> {
|
async fn remove_task(&mut self, task: Task) -> Result<u64, AsyncQueueError> {
|
||||||
let mut connection = self.pool.get().await?;
|
self.check_if_connection()?;
|
||||||
|
let mut connection = self.pool.as_ref().unwrap().get().await?;
|
||||||
let mut transaction = connection.transaction().await?;
|
let mut transaction = connection.transaction().await?;
|
||||||
|
|
||||||
let result = Self::remove_task_query(&mut transaction, task).await?;
|
let result = Self::remove_task_query(&mut transaction, task).await?;
|
||||||
|
@ -698,7 +720,8 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError> {
|
async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError> {
|
||||||
let mut connection = self.pool.get().await?;
|
self.check_if_connection()?;
|
||||||
|
let mut connection = self.pool.as_ref().unwrap().get().await?;
|
||||||
let mut transaction = connection.transaction().await?;
|
let mut transaction = connection.transaction().await?;
|
||||||
|
|
||||||
let result = Self::remove_tasks_type_query(&mut transaction, task_type).await?;
|
let result = Self::remove_tasks_type_query(&mut transaction, task_type).await?;
|
||||||
|
@ -713,7 +736,8 @@ where
|
||||||
task: Task,
|
task: Task,
|
||||||
state: FangTaskState,
|
state: FangTaskState,
|
||||||
) -> Result<Task, AsyncQueueError> {
|
) -> Result<Task, AsyncQueueError> {
|
||||||
let mut connection = self.pool.get().await?;
|
self.check_if_connection()?;
|
||||||
|
let mut connection = self.pool.as_ref().unwrap().get().await?;
|
||||||
let mut transaction = connection.transaction().await?;
|
let mut transaction = connection.transaction().await?;
|
||||||
|
|
||||||
let task = Self::update_task_state_query(&mut transaction, task, state).await?;
|
let task = Self::update_task_state_query(&mut transaction, task, state).await?;
|
||||||
|
@ -727,7 +751,8 @@ where
|
||||||
task: Task,
|
task: Task,
|
||||||
error_message: &str,
|
error_message: &str,
|
||||||
) -> Result<Task, AsyncQueueError> {
|
) -> Result<Task, AsyncQueueError> {
|
||||||
let mut connection = self.pool.get().await?;
|
self.check_if_connection()?;
|
||||||
|
let mut connection = self.pool.as_ref().unwrap().get().await?;
|
||||||
let mut transaction = connection.transaction().await?;
|
let mut transaction = connection.transaction().await?;
|
||||||
|
|
||||||
let task = Self::fail_task_query(&mut transaction, task, error_message).await?;
|
let task = Self::fail_task_query(&mut transaction, task, error_message).await?;
|
||||||
|
|
|
@ -26,7 +26,7 @@ where
|
||||||
#[builder(default, setter(into))]
|
#[builder(default, setter(into))]
|
||||||
pub retention_mode: RetentionMode,
|
pub retention_mode: RetentionMode,
|
||||||
#[builder(setter(into))]
|
#[builder(setter(into))]
|
||||||
pub number_of_workers: u16,
|
pub number_of_workers: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(TypedBuilder, Clone)]
|
#[derive(TypedBuilder, Clone)]
|
||||||
|
@ -47,7 +47,7 @@ where
|
||||||
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
|
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
|
||||||
{
|
{
|
||||||
pub async fn start(&mut self) {
|
pub async fn start(&mut self) {
|
||||||
for _idx in 1..self.number_of_workers + 1 {
|
for _idx in 0..self.number_of_workers {
|
||||||
let queue = self.queue.clone();
|
let queue = self.queue.clone();
|
||||||
let sleep_params = self.sleep_params.clone();
|
let sleep_params = self.sleep_params.clone();
|
||||||
let retention_mode = self.retention_mode.clone();
|
let retention_mode = self.retention_mode.clone();
|
||||||
|
|
Loading…
Reference in a new issue