Process Create() activities in background jobs
This commit is contained in:
parent
2bf16b260e
commit
1efbf5a3fb
3 changed files with 115 additions and 3 deletions
|
@ -8,6 +8,6 @@ mod deliverer;
|
||||||
pub mod fetcher;
|
pub mod fetcher;
|
||||||
mod handlers;
|
mod handlers;
|
||||||
pub mod identifiers;
|
pub mod identifiers;
|
||||||
mod receiver;
|
pub mod receiver;
|
||||||
pub mod views;
|
pub mod views;
|
||||||
mod vocabulary;
|
mod vocabulary;
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
use actix_web::HttpRequest;
|
use actix_web::HttpRequest;
|
||||||
|
use chrono::{Duration, Utc};
|
||||||
use serde::{
|
use serde::{
|
||||||
Deserialize,
|
Deserialize,
|
||||||
Deserializer,
|
Deserializer,
|
||||||
|
Serialize,
|
||||||
de::DeserializeOwned,
|
de::DeserializeOwned,
|
||||||
de::Error as DeserializerError,
|
de::Error as DeserializerError,
|
||||||
};
|
};
|
||||||
|
@ -9,12 +11,20 @@ use serde_json::Value;
|
||||||
use tokio_postgres::GenericClient;
|
use tokio_postgres::GenericClient;
|
||||||
|
|
||||||
use crate::config::Config;
|
use crate::config::Config;
|
||||||
use crate::database::DatabaseError;
|
use crate::database::{DatabaseError, DatabaseTypeError};
|
||||||
use crate::errors::{
|
use crate::errors::{
|
||||||
ConversionError,
|
ConversionError,
|
||||||
HttpError,
|
HttpError,
|
||||||
ValidationError,
|
ValidationError,
|
||||||
};
|
};
|
||||||
|
use crate::models::{
|
||||||
|
background_jobs::queries::{
|
||||||
|
enqueue_job,
|
||||||
|
get_job_batch,
|
||||||
|
delete_job_from_queue,
|
||||||
|
},
|
||||||
|
background_jobs::types::JobType,
|
||||||
|
};
|
||||||
use super::authentication::{
|
use super::authentication::{
|
||||||
verify_signed_activity,
|
verify_signed_activity,
|
||||||
verify_signed_request,
|
verify_signed_request,
|
||||||
|
@ -306,6 +316,14 @@ pub async fn receive_activity(
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if activity_type == CREATE {
|
||||||
|
// Add activity to job queue and release lock
|
||||||
|
IncomingActivity::new(activity, is_authenticated)
|
||||||
|
.enqueue(db_client, 0).await?;
|
||||||
|
log::info!("activity added to the queue: {}", activity_type);
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
|
|
||||||
handle_activity(
|
handle_activity(
|
||||||
config,
|
config,
|
||||||
db_client,
|
db_client,
|
||||||
|
@ -314,6 +332,84 @@ pub async fn receive_activity(
|
||||||
).await
|
).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Serialize)]
|
||||||
|
struct IncomingActivity {
|
||||||
|
activity: Value,
|
||||||
|
is_authenticated: bool,
|
||||||
|
failure_count: i32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IncomingActivity {
|
||||||
|
fn new(activity: &Value, is_authenticated: bool) -> Self {
|
||||||
|
Self {
|
||||||
|
activity: activity.clone(),
|
||||||
|
is_authenticated,
|
||||||
|
failure_count: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn enqueue(
|
||||||
|
self,
|
||||||
|
db_client: &impl GenericClient,
|
||||||
|
delay: i64,
|
||||||
|
) -> Result<(), DatabaseError> {
|
||||||
|
let job_data = serde_json::to_value(self)
|
||||||
|
.expect("activity should be serializable");
|
||||||
|
let scheduled_for = Utc::now() + Duration::seconds(delay);
|
||||||
|
enqueue_job(
|
||||||
|
db_client,
|
||||||
|
&JobType::IncomingActivity,
|
||||||
|
&job_data,
|
||||||
|
&scheduled_for,
|
||||||
|
).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn process_queued_activities(
|
||||||
|
config: &Config,
|
||||||
|
db_client: &mut impl GenericClient,
|
||||||
|
) -> Result<(), DatabaseError> {
|
||||||
|
let batch_size = 10;
|
||||||
|
let max_retries = 2;
|
||||||
|
let retry_after = 60 * 10; // 10 minutes
|
||||||
|
|
||||||
|
let batch = get_job_batch(
|
||||||
|
db_client,
|
||||||
|
&JobType::IncomingActivity,
|
||||||
|
batch_size,
|
||||||
|
).await?;
|
||||||
|
for job in batch {
|
||||||
|
let mut incoming_activity: IncomingActivity =
|
||||||
|
serde_json::from_value(job.job_data)
|
||||||
|
.map_err(|_| DatabaseTypeError)?;
|
||||||
|
let is_error = match handle_activity(
|
||||||
|
config,
|
||||||
|
db_client,
|
||||||
|
&incoming_activity.activity,
|
||||||
|
incoming_activity.is_authenticated,
|
||||||
|
).await {
|
||||||
|
Ok(_) => false,
|
||||||
|
Err(error) => {
|
||||||
|
incoming_activity.failure_count += 1;
|
||||||
|
log::warn!(
|
||||||
|
"failed to process activity ({}) (attempt #{}): {}",
|
||||||
|
error,
|
||||||
|
incoming_activity.failure_count,
|
||||||
|
incoming_activity.activity,
|
||||||
|
);
|
||||||
|
true
|
||||||
|
},
|
||||||
|
};
|
||||||
|
if is_error && incoming_activity.failure_count <= max_retries {
|
||||||
|
// Re-queue
|
||||||
|
log::info!("activity re-queued");
|
||||||
|
incoming_activity.enqueue(db_client, retry_after).await?;
|
||||||
|
};
|
||||||
|
delete_job_from_queue(db_client, &job.id).await?;
|
||||||
|
};
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
|
|
|
@ -5,8 +5,9 @@ use anyhow::Error;
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use crate::activitypub::receiver::process_queued_activities;
|
||||||
use crate::config::{Config, Instance};
|
use crate::config::{Config, Instance};
|
||||||
use crate::database::DbPool;
|
use crate::database::{get_database_client, DbPool};
|
||||||
use crate::ethereum::contracts::Blockchain;
|
use crate::ethereum::contracts::Blockchain;
|
||||||
use crate::ethereum::nft::process_nft_events;
|
use crate::ethereum::nft::process_nft_events;
|
||||||
use crate::ethereum::subscriptions::{
|
use crate::ethereum::subscriptions::{
|
||||||
|
@ -21,6 +22,7 @@ enum Task {
|
||||||
EthereumSubscriptionMonitor,
|
EthereumSubscriptionMonitor,
|
||||||
SubscriptionExpirationMonitor,
|
SubscriptionExpirationMonitor,
|
||||||
MoneroPaymentMonitor,
|
MoneroPaymentMonitor,
|
||||||
|
IncomingActivityQueue,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Task {
|
impl Task {
|
||||||
|
@ -31,6 +33,7 @@ impl Task {
|
||||||
Self::EthereumSubscriptionMonitor => 300,
|
Self::EthereumSubscriptionMonitor => 300,
|
||||||
Self::SubscriptionExpirationMonitor => 300,
|
Self::SubscriptionExpirationMonitor => 300,
|
||||||
Self::MoneroPaymentMonitor => 30,
|
Self::MoneroPaymentMonitor => 30,
|
||||||
|
Self::IncomingActivityQueue => 5,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -109,6 +112,15 @@ async fn monero_payment_monitor_task(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn incoming_activity_queue_task(
|
||||||
|
config: &Config,
|
||||||
|
db_pool: &DbPool,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let db_client = &mut **get_database_client(db_pool).await?;
|
||||||
|
process_queued_activities(config, db_client).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub fn run(
|
pub fn run(
|
||||||
config: Config,
|
config: Config,
|
||||||
mut maybe_blockchain: Option<Blockchain>,
|
mut maybe_blockchain: Option<Blockchain>,
|
||||||
|
@ -120,6 +132,7 @@ pub fn run(
|
||||||
scheduler_state.insert(Task::EthereumSubscriptionMonitor, None);
|
scheduler_state.insert(Task::EthereumSubscriptionMonitor, None);
|
||||||
scheduler_state.insert(Task::SubscriptionExpirationMonitor, None);
|
scheduler_state.insert(Task::SubscriptionExpirationMonitor, None);
|
||||||
scheduler_state.insert(Task::MoneroPaymentMonitor, None);
|
scheduler_state.insert(Task::MoneroPaymentMonitor, None);
|
||||||
|
scheduler_state.insert(Task::IncomingActivityQueue, None);
|
||||||
|
|
||||||
let mut interval = tokio::time::interval(Duration::from_secs(5));
|
let mut interval = tokio::time::interval(Duration::from_secs(5));
|
||||||
let mut token_waitlist_map: HashMap<Uuid, DateTime<Utc>> = HashMap::new();
|
let mut token_waitlist_map: HashMap<Uuid, DateTime<Utc>> = HashMap::new();
|
||||||
|
@ -154,6 +167,9 @@ pub fn run(
|
||||||
Task::MoneroPaymentMonitor => {
|
Task::MoneroPaymentMonitor => {
|
||||||
monero_payment_monitor_task(&config, &db_pool).await
|
monero_payment_monitor_task(&config, &db_pool).await
|
||||||
},
|
},
|
||||||
|
Task::IncomingActivityQueue => {
|
||||||
|
incoming_activity_queue_task(&config, &db_pool).await
|
||||||
|
},
|
||||||
};
|
};
|
||||||
task_result.unwrap_or_else(|err| {
|
task_result.unwrap_or_else(|err| {
|
||||||
log::error!("{:?}: {}", task, err);
|
log::error!("{:?}: {}", task, err);
|
||||||
|
|
Loading…
Reference in a new issue