diff --git a/src/activitypub/deliverer.rs b/src/activitypub/deliverer.rs index 165a8ce..b7c5a0e 100644 --- a/src/activitypub/deliverer.rs +++ b/src/activitypub/deliverer.rs @@ -1,3 +1,5 @@ +use rsa::RsaPrivateKey; + use crate::config::{Environment, Config}; use crate::http_signatures::create::{create_http_signature, SignatureError}; use crate::models::users::types::User; @@ -27,28 +29,19 @@ pub enum DelivererError { async fn send_activity( config: &Config, - sender: &User, - activity: &Activity, + actor_key: &RsaPrivateKey, + actor_key_id: &str, + activity_json: &str, inbox_url: &str, ) -> Result<(), DelivererError> { - let activity_json = serde_json::to_string(&activity)?; log::info!("sending activity: {}", activity_json); - let actor_key = deserialize_private_key(&sender.private_key)?; - let actor_key_id = format!( - "{}#main-key", - get_actor_url( - &config.instance_url(), - &sender.profile.username, - ), - ); let headers = create_http_signature( - &inbox_url, - &activity_json, + inbox_url, + activity_json, actor_key, actor_key_id, )?; - // Send match config.environment { Environment::Development => { log::info!( @@ -65,7 +58,7 @@ async fn send_activity( .header("Digest", headers.digest) .header("Signature", headers.signature) .header("Content-Type", ACTIVITY_CONTENT_TYPE) - .body(activity_json) + .body(activity_json.to_owned()) .send() .await?; let response_status = response.status(); @@ -82,16 +75,52 @@ async fn send_activity( Ok(()) } -pub async fn deliver_activity( +async fn deliver_activity_worker( + config: Config, + sender: User, + activity: Activity, + recipients: Vec, +) -> Result<(), DelivererError> { + let actor_key = deserialize_private_key(&sender.private_key)?; + let actor_key_id = format!( + "{}#main-key", + get_actor_url( + &config.instance_url(), + &sender.profile.username, + ), + ); + let activity_json = serde_json::to_string(&activity)?; + for recipient in recipients { + // TODO: retry on error + if let Err(err) = send_activity( + &config, + &actor_key, + &actor_key_id, + &activity_json, + &recipient.inbox, + ).await { + log::error!("{}", err); + } + }; + Ok(()) +} + +pub fn deliver_activity( config: &Config, sender: &User, activity: Activity, recipients: Vec, ) -> () { - for actor in recipients { - // TODO: retry on error - if let Err(err) = send_activity(&config, &sender, &activity, &actor.inbox).await { + let config = config.clone(); + let sender = sender.clone(); + actix_rt::spawn(async move { + deliver_activity_worker( + config, + sender, + activity, + recipients, + ).await.unwrap_or_else(|err| { log::error!("{}", err); - } - }; + }); + }); } diff --git a/src/activitypub/receiver.rs b/src/activitypub/receiver.rs index 268e7cc..6e03b99 100644 --- a/src/activitypub/receiver.rs +++ b/src/activitypub/receiver.rs @@ -265,15 +265,7 @@ pub async fn receive_activity( // Send activity let recipients = vec![source_actor]; - let config_clone = config.clone(); - actix_rt::spawn(async move { - deliver_activity( - &config_clone, - &target_user, - new_activity, - recipients, - ).await; - }); + deliver_activity(&config, &target_user, new_activity, recipients); }, (UNDO, FOLLOW) => { let object: Object = serde_json::from_value(activity.object) diff --git a/src/http_signatures/create.rs b/src/http_signatures/create.rs index 0c731f5..f438e50 100644 --- a/src/http_signatures/create.rs +++ b/src/http_signatures/create.rs @@ -24,8 +24,8 @@ pub enum SignatureError { pub fn create_http_signature( request_url: &str, request_body: &str, - actor_key: RsaPrivateKey, - actor_key_id: String, + actor_key: &RsaPrivateKey, + actor_key_id: &str, ) -> Result { let request_url_object = url::Url::parse(request_url) .map_err(|_| SignatureError::UrlError)?; @@ -41,7 +41,7 @@ pub fn create_http_signature( digest, ); let headers_parameter = &["(request-target)", "host", "date", "digest"]; - let signature_parameter = sign_message(&actor_key, &message)?; + let signature_parameter = sign_message(actor_key, &message)?; let signature_header = format!( r#"keyId="{}",headers="{}",signature="{}""#, actor_key_id, @@ -72,8 +72,8 @@ mod tests { let result = create_http_signature( request_url, request_body, - actor_key, - actor_key_id.to_string(), + &actor_key, + actor_key_id, ); assert_eq!(result.is_ok(), true); diff --git a/src/mastodon_api/accounts/views.rs b/src/mastodon_api/accounts/views.rs index 7f13a59..9c0621f 100644 --- a/src/mastodon_api/accounts/views.rs +++ b/src/mastodon_api/accounts/views.rs @@ -130,15 +130,7 @@ async fn follow( &request.id, &actor.id, ); - let activity_sender = current_user.clone(); - actix_rt::spawn(async move { - deliver_activity( - &config, - &activity_sender, - activity, - vec![actor], - ).await; - }); + deliver_activity(&config, ¤t_user, activity, vec![actor]); follows::get_relationship(db_client, ¤t_user.id, &profile.id).await? } else { follows::follow(db_client, ¤t_user.id, &profile.id).await? @@ -177,14 +169,7 @@ async fn unfollow( &follow_request.id, &actor.id, ); - actix_rt::spawn(async move { - deliver_activity( - &config, - ¤t_user, - activity, - vec![actor], - ).await; - }); + deliver_activity(&config, ¤t_user, activity, vec![actor]); // TODO: uncouple unfollow and get_relationship relationship } else { diff --git a/src/mastodon_api/statuses/views.rs b/src/mastodon_api/statuses/views.rs index a71e53d..6d516be 100644 --- a/src/mastodon_api/statuses/views.rs +++ b/src/mastodon_api/statuses/views.rs @@ -71,15 +71,7 @@ async fn create_status( recipients.push(actor); }; }; - let config_clone = config.clone(); - actix_rt::spawn(async move { - deliver_activity( - &config_clone, - ¤t_user, - activity, - recipients, - ).await; - }); + deliver_activity(&config, ¤t_user, activity, recipients); let status = Status::from_post(post, &config.instance_url()); Ok(HttpResponse::Created().json(status)) } @@ -161,15 +153,7 @@ async fn favourite( ); let recipient: Actor = serde_json::from_value(actor_value.clone()) .map_err(|_| HttpError::InternalError)?; - let config_clone = config.clone(); - actix_rt::spawn(async move { - deliver_activity( - &config_clone, - ¤t_user, - activity, - vec![recipient], - ).await; - }); + deliver_activity(&config, ¤t_user, activity, vec![recipient]); } }