Process incoming activities sequentially

This is a quick fix for race conditions occuring during handling of inbox requests.
This commit is contained in:
silverpill 2022-05-04 23:13:57 +00:00
parent ae4bfcf614
commit 783c482e9e
2 changed files with 17 additions and 0 deletions

View file

@ -1,9 +1,12 @@
use std::time::Instant;
use actix_web::{ use actix_web::{
get, post, web, get, post, web,
HttpRequest, HttpResponse, Scope, HttpRequest, HttpResponse, Scope,
http::header::HeaderMap, http::header::HeaderMap,
}; };
use serde::Deserialize; use serde::Deserialize;
use tokio::sync::Mutex;
use uuid::Uuid; use uuid::Uuid;
use crate::config::Config; use crate::config::Config;
@ -100,6 +103,7 @@ async fn actor_view(
async fn inbox( async fn inbox(
config: web::Data<Config>, config: web::Data<Config>,
db_pool: web::Data<Pool>, db_pool: web::Data<Pool>,
inbox_mutex: web::Data<Mutex<()>>,
request: HttpRequest, request: HttpRequest,
activity: web::Json<serde_json::Value>, activity: web::Json<serde_json::Value>,
) -> Result<HttpResponse, HttpError> { ) -> Result<HttpResponse, HttpError> {
@ -112,6 +116,14 @@ async fn inbox(
} else { } else {
log::info!("received in {}: {}", request.uri().path(), activity_type); log::info!("received in {}: {}", request.uri().path(), activity_type);
}; };
let now = Instant::now();
// Store mutex guard in a variable to prevent it from being dropped immediately
let _guard = inbox_mutex.lock().await;
log::info!(
"acquired inbox lock after waiting for {:.2?}: {}",
now.elapsed(),
activity["id"].as_str().unwrap_or_default(),
);
let db_client = &mut **get_database_client(&db_pool).await?; let db_client = &mut **get_database_client(&db_pool).await?;
receive_activity(&config, db_client, &request, &activity).await receive_activity(&config, db_client, &request, &activity).await
.map_err(|err| { .map_err(|err| {

View file

@ -4,6 +4,7 @@ use actix_web::{
App, HttpServer, App, HttpServer,
middleware::Logger as ActixLogger, middleware::Logger as ActixLogger,
}; };
use tokio::sync::Mutex;
use mitra::activitypub::views as activitypub; use mitra::activitypub::views as activitypub;
use mitra::atom::views as atom; use mitra::atom::views as atom;
@ -56,6 +57,9 @@ async fn main() -> std::io::Result<()> {
config.http_port, config.http_port,
); );
let num_workers = std::cmp::max(num_cpus::get(), 4); let num_workers = std::cmp::max(num_cpus::get(), 4);
// Mutex is used to make server process incoming activities sequentially
let inbox_mutex = web::Data::new(Mutex::new(()));
HttpServer::new(move || { HttpServer::new(move || {
let cors_config = match config.environment { let cors_config = match config.environment {
Environment::Development => { Environment::Development => {
@ -83,6 +87,7 @@ async fn main() -> std::io::Result<()> {
.app_data(web::JsonConfig::default().limit(MAX_UPLOAD_SIZE)) .app_data(web::JsonConfig::default().limit(MAX_UPLOAD_SIZE))
.app_data(web::Data::new(config.clone())) .app_data(web::Data::new(config.clone()))
.app_data(web::Data::new(db_pool.clone())) .app_data(web::Data::new(db_pool.clone()))
.app_data(web::Data::clone(&inbox_mutex))
.service(actix_files::Files::new( .service(actix_files::Files::new(
"/media", "/media",
config.media_dir(), config.media_dir(),