Refactor deliverer

This commit is contained in:
silverpill 2021-10-30 22:35:18 +00:00
parent d41d85548d
commit e10804be64
5 changed files with 60 additions and 70 deletions

View file

@ -1,3 +1,5 @@
use rsa::RsaPrivateKey;
use crate::config::{Environment, Config};
use crate::http_signatures::create::{create_http_signature, SignatureError};
use crate::models::users::types::User;
@ -27,28 +29,19 @@ pub enum DelivererError {
async fn send_activity(
config: &Config,
sender: &User,
activity: &Activity,
actor_key: &RsaPrivateKey,
actor_key_id: &str,
activity_json: &str,
inbox_url: &str,
) -> Result<(), DelivererError> {
let activity_json = serde_json::to_string(&activity)?;
log::info!("sending activity: {}", activity_json);
let actor_key = deserialize_private_key(&sender.private_key)?;
let actor_key_id = format!(
"{}#main-key",
get_actor_url(
&config.instance_url(),
&sender.profile.username,
),
);
let headers = create_http_signature(
&inbox_url,
&activity_json,
inbox_url,
activity_json,
actor_key,
actor_key_id,
)?;
// Send
match config.environment {
Environment::Development => {
log::info!(
@ -65,7 +58,7 @@ async fn send_activity(
.header("Digest", headers.digest)
.header("Signature", headers.signature)
.header("Content-Type", ACTIVITY_CONTENT_TYPE)
.body(activity_json)
.body(activity_json.to_owned())
.send()
.await?;
let response_status = response.status();
@ -82,16 +75,52 @@ async fn send_activity(
Ok(())
}
pub async fn deliver_activity(
async fn deliver_activity_worker(
config: Config,
sender: User,
activity: Activity,
recipients: Vec<Actor>,
) -> Result<(), DelivererError> {
let actor_key = deserialize_private_key(&sender.private_key)?;
let actor_key_id = format!(
"{}#main-key",
get_actor_url(
&config.instance_url(),
&sender.profile.username,
),
);
let activity_json = serde_json::to_string(&activity)?;
for recipient in recipients {
// TODO: retry on error
if let Err(err) = send_activity(
&config,
&actor_key,
&actor_key_id,
&activity_json,
&recipient.inbox,
).await {
log::error!("{}", err);
}
};
Ok(())
}
pub fn deliver_activity(
config: &Config,
sender: &User,
activity: Activity,
recipients: Vec<Actor>,
) -> () {
for actor in recipients {
// TODO: retry on error
if let Err(err) = send_activity(&config, &sender, &activity, &actor.inbox).await {
let config = config.clone();
let sender = sender.clone();
actix_rt::spawn(async move {
deliver_activity_worker(
config,
sender,
activity,
recipients,
).await.unwrap_or_else(|err| {
log::error!("{}", err);
}
};
});
});
}

View file

@ -265,15 +265,7 @@ pub async fn receive_activity(
// Send activity
let recipients = vec![source_actor];
let config_clone = config.clone();
actix_rt::spawn(async move {
deliver_activity(
&config_clone,
&target_user,
new_activity,
recipients,
).await;
});
deliver_activity(&config, &target_user, new_activity, recipients);
},
(UNDO, FOLLOW) => {
let object: Object = serde_json::from_value(activity.object)

View file

@ -24,8 +24,8 @@ pub enum SignatureError {
pub fn create_http_signature(
request_url: &str,
request_body: &str,
actor_key: RsaPrivateKey,
actor_key_id: String,
actor_key: &RsaPrivateKey,
actor_key_id: &str,
) -> Result<SignatureHeaders, SignatureError> {
let request_url_object = url::Url::parse(request_url)
.map_err(|_| SignatureError::UrlError)?;
@ -41,7 +41,7 @@ pub fn create_http_signature(
digest,
);
let headers_parameter = &["(request-target)", "host", "date", "digest"];
let signature_parameter = sign_message(&actor_key, &message)?;
let signature_parameter = sign_message(actor_key, &message)?;
let signature_header = format!(
r#"keyId="{}",headers="{}",signature="{}""#,
actor_key_id,
@ -72,8 +72,8 @@ mod tests {
let result = create_http_signature(
request_url,
request_body,
actor_key,
actor_key_id.to_string(),
&actor_key,
actor_key_id,
);
assert_eq!(result.is_ok(), true);

View file

@ -130,15 +130,7 @@ async fn follow(
&request.id,
&actor.id,
);
let activity_sender = current_user.clone();
actix_rt::spawn(async move {
deliver_activity(
&config,
&activity_sender,
activity,
vec![actor],
).await;
});
deliver_activity(&config, &current_user, activity, vec![actor]);
follows::get_relationship(db_client, &current_user.id, &profile.id).await?
} else {
follows::follow(db_client, &current_user.id, &profile.id).await?
@ -177,14 +169,7 @@ async fn unfollow(
&follow_request.id,
&actor.id,
);
actix_rt::spawn(async move {
deliver_activity(
&config,
&current_user,
activity,
vec![actor],
).await;
});
deliver_activity(&config, &current_user, activity, vec![actor]);
// TODO: uncouple unfollow and get_relationship
relationship
} else {

View file

@ -71,15 +71,7 @@ async fn create_status(
recipients.push(actor);
};
};
let config_clone = config.clone();
actix_rt::spawn(async move {
deliver_activity(
&config_clone,
&current_user,
activity,
recipients,
).await;
});
deliver_activity(&config, &current_user, activity, recipients);
let status = Status::from_post(post, &config.instance_url());
Ok(HttpResponse::Created().json(status))
}
@ -161,15 +153,7 @@ async fn favourite(
);
let recipient: Actor = serde_json::from_value(actor_value.clone())
.map_err(|_| HttpError::InternalError)?;
let config_clone = config.clone();
actix_rt::spawn(async move {
deliver_activity(
&config_clone,
&current_user,
activity,
vec![recipient],
).await;
});
deliver_activity(&config, &current_user, activity, vec![recipient]);
}
}