From 3b87fd8b14c2ef9b8fdecb40f45ad0e627efd461 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Mon, 17 Jul 2023 16:32:35 +0200 Subject: [PATCH] rename methods, join outgoing activities task --- crates/api_common/src/send_activity.rs | 6 ++++-- crates/api_crud/src/post/create.rs | 2 +- crates/apub/src/activities/mod.rs | 11 +++-------- src/lib.rs | 11 ++++++++--- 4 files changed, 16 insertions(+), 14 deletions(-) diff --git a/crates/api_common/src/send_activity.rs b/crates/api_common/src/send_activity.rs index f20bb61ee..287216419 100644 --- a/crates/api_common/src/send_activity.rs +++ b/crates/api_common/src/send_activity.rs @@ -26,12 +26,14 @@ pub struct ActivityChannel { } impl ActivityChannel { - pub async fn receive_activity() -> Option { + pub async fn retrieve_activity() -> Option { let mut lock = ACTIVITY_CHANNEL.receiver.lock().await; lock.recv().await } - pub async fn send_activity(data: SendActivityData) -> LemmyResult<()> { + pub async fn submit_activity(data: SendActivityData) -> LemmyResult<()> { + // TODO: this will return immediately, and not wait for send to complete + // which causes problems for api tests let lock = &ACTIVITY_CHANNEL.sender; lock.send(data)?; Ok(()) diff --git a/crates/api_crud/src/post/create.rs b/crates/api_crud/src/post/create.rs index 9f702fc94..e18fc9cb7 100644 --- a/crates/api_crud/src/post/create.rs +++ b/crates/api_crud/src/post/create.rs @@ -150,7 +150,7 @@ pub async fn create_post( .await .with_lemmy_type(LemmyErrorType::CouldntLikePost)?; - ActivityChannel::send_activity(SendActivityData::CreatePost(updated_post.clone())).await?; + ActivityChannel::submit_activity(SendActivityData::CreatePost(updated_post.clone())).await?; // Mark the post as read mark_post_as_read(person_id, post_id, &mut context.pool()).await?; diff --git a/crates/apub/src/activities/mod.rs b/crates/apub/src/activities/mod.rs index a44da45e8..ee771e2e8 100644 --- a/crates/apub/src/activities/mod.rs +++ b/crates/apub/src/activities/mod.rs @@ -206,9 +206,8 @@ where Ok(()) } -// TODO: naming is confusing, it *receives* jobs from queue to *send out* activities -pub async fn handle_send_activity(context: Data) -> LemmyResult<()> { - while let Some(data) = ActivityChannel::receive_activity().await { +pub async fn handle_outgoing_activities(context: Data) -> LemmyResult<()> { + while let Some(data) = ActivityChannel::retrieve_activity().await { let fed_task = match data { SendActivityData::CreatePost(post) => { let creator_id = post.creator_id; @@ -220,11 +219,7 @@ pub async fn handle_send_activity(context: Data) -> LemmyResult<() ) } }; - if *SYNCHRONOUS_FEDERATION { - fed_task.await?; - } else { - spawn_try_task(fed_task); - } + spawn_try_task(fed_task); } Ok(()) } diff --git a/src/lib.rs b/src/lib.rs index de7c49323..28bfe4f95 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,7 +27,11 @@ use lemmy_api_common::{ local_site_rate_limit_to_rate_limit_config, }, }; -use lemmy_apub::{activities::handle_send_activity, VerifyUrlData, FEDERATION_HTTP_FETCH_LIMIT}; +use lemmy_apub::{ + activities::handle_outgoing_activities, + VerifyUrlData, + FEDERATION_HTTP_FETCH_LIMIT, +}; use lemmy_db_schema::{ source::secret::Secret, utils::{build_db_pool, get_database_url, run_migrations}, @@ -167,7 +171,7 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> { .unwrap(); let request_data = federation_config.to_request_data(); - tokio::task::spawn(handle_send_activity(request_data)); + let outgoing_activities_task = tokio::task::spawn(handle_outgoing_activities(request_data)); // Create Http server with websocket support HttpServer::new(move || { let cors_origin = std::env::var("LEMMY_CORS_ORIGIN"); @@ -216,7 +220,8 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> { .run() .await?; - ActivityChannel::close().await; + // Wait for outgoing apub sends to complete + outgoing_activities_task.await??; Ok(()) }