rename methods, join outgoing activities task

This commit is contained in:
Felix Ableitner 2023-07-17 16:32:35 +02:00
parent 9b45158e68
commit 3b87fd8b14
4 changed files with 16 additions and 14 deletions

View file

@ -26,12 +26,14 @@ pub struct ActivityChannel {
}
impl ActivityChannel {
pub async fn receive_activity() -> Option<SendActivityData> {
pub async fn retrieve_activity() -> Option<SendActivityData> {
let mut lock = ACTIVITY_CHANNEL.receiver.lock().await;
lock.recv().await
}
pub async fn send_activity(data: SendActivityData) -> LemmyResult<()> {
pub async fn submit_activity(data: SendActivityData) -> LemmyResult<()> {
// TODO: this will return immediately, and not wait for send to complete
// which causes problems for api tests
let lock = &ACTIVITY_CHANNEL.sender;
lock.send(data)?;
Ok(())

View file

@ -150,7 +150,7 @@ pub async fn create_post(
.await
.with_lemmy_type(LemmyErrorType::CouldntLikePost)?;
ActivityChannel::send_activity(SendActivityData::CreatePost(updated_post.clone())).await?;
ActivityChannel::submit_activity(SendActivityData::CreatePost(updated_post.clone())).await?;
// Mark the post as read
mark_post_as_read(person_id, post_id, &mut context.pool()).await?;

View file

@ -206,9 +206,8 @@ where
Ok(())
}
// TODO: naming is confusing, it *receives* jobs from queue to *send out* activities
pub async fn handle_send_activity(context: Data<LemmyContext>) -> LemmyResult<()> {
while let Some(data) = ActivityChannel::receive_activity().await {
pub async fn handle_outgoing_activities(context: Data<LemmyContext>) -> LemmyResult<()> {
while let Some(data) = ActivityChannel::retrieve_activity().await {
let fed_task = match data {
SendActivityData::CreatePost(post) => {
let creator_id = post.creator_id;
@ -220,11 +219,7 @@ pub async fn handle_send_activity(context: Data<LemmyContext>) -> LemmyResult<()
)
}
};
if *SYNCHRONOUS_FEDERATION {
fed_task.await?;
} else {
spawn_try_task(fed_task);
}
spawn_try_task(fed_task);
}
Ok(())
}

View file

@ -27,7 +27,11 @@ use lemmy_api_common::{
local_site_rate_limit_to_rate_limit_config,
},
};
use lemmy_apub::{activities::handle_send_activity, VerifyUrlData, FEDERATION_HTTP_FETCH_LIMIT};
use lemmy_apub::{
activities::handle_outgoing_activities,
VerifyUrlData,
FEDERATION_HTTP_FETCH_LIMIT,
};
use lemmy_db_schema::{
source::secret::Secret,
utils::{build_db_pool, get_database_url, run_migrations},
@ -167,7 +171,7 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> {
.unwrap();
let request_data = federation_config.to_request_data();
tokio::task::spawn(handle_send_activity(request_data));
let outgoing_activities_task = tokio::task::spawn(handle_outgoing_activities(request_data));
// Create Http server with websocket support
HttpServer::new(move || {
let cors_origin = std::env::var("LEMMY_CORS_ORIGIN");
@ -216,7 +220,8 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> {
.run()
.await?;
ActivityChannel::close().await;
// Wait for outgoing apub sends to complete
outgoing_activities_task.await??;
Ok(())
}