add comment

This commit is contained in:
Felix Ableitner 2024-03-19 16:41:55 +01:00
parent 05ab30a1b9
commit 73222a8c37
3 changed files with 10 additions and 9 deletions

View file

@ -11,6 +11,7 @@ use crate::{
FEDERATION_CONTENT_TYPE, FEDERATION_CONTENT_TYPE,
}; };
use bytes::Bytes; use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::StreamExt; use futures::StreamExt;
use httpdate::fmt_http_date; use httpdate::fmt_http_date;
use itertools::Itertools; use itertools::Itertools;
@ -22,7 +23,6 @@ use std::{
fmt::{Debug, Display}, fmt::{Debug, Display},
time::{Duration, SystemTime}, time::{Duration, SystemTime},
}; };
use chrono::{DateTime, Utc};
use tracing::debug; use tracing::debug;
use url::Url; use url::Url;
@ -133,10 +133,10 @@ where
let activity_id = activity.id(); let activity_id = activity.id();
let activity_serialized: Bytes = match published { let activity_serialized: Bytes = match published {
Some(published) => serde_json::to_vec(&WithPublished::new(activity, published)), Some(published) => serde_json::to_vec(&WithPublished::new(activity, published)),
None => serde_json::to_vec(activity) None => serde_json::to_vec(activity),
} }
.map_err(|e| Error::SerializeOutgoingActivity(e, format!("{:?}", activity)))? .map_err(|e| Error::SerializeOutgoingActivity(e, format!("{:?}", activity)))?
.into(); .into();
let private_key = get_pkey_cached(data, actor).await?; let private_key = get_pkey_cached(data, actor).await?;
Ok(futures::stream::iter( Ok(futures::stream::iter(
@ -228,10 +228,7 @@ struct WithPublished<T> {
impl<T> WithPublished<T> { impl<T> WithPublished<T> {
pub fn new(inner: T, published: DateTime<Utc>) -> WithPublished<T> { pub fn new(inner: T, published: DateTime<Utc>) -> WithPublished<T> {
Self { Self { published, inner }
published,
inner,
}
} }
} }

View file

@ -39,6 +39,10 @@ where
)?; )?;
debug!("Receiving activity {}", activity.id().to_string()); debug!("Receiving activity {}", activity.id().to_string());
// TODO: need to throw received activities in a queue ordered by published timestamp, then
// process in order. tricky due to generics. probably need one queue per sending instance.
activity.verify(data).await?; activity.verify(data).await?;
activity.receive(data).await?; activity.receive(data).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())

View file

@ -343,7 +343,7 @@ pub mod tests {
error::Error, error::Error,
fetch::object_id::ObjectId, fetch::object_id::ObjectId,
http_signatures::{generate_actor_keypair, Keypair}, http_signatures::{generate_actor_keypair, Keypair},
protocol::{verification::verify_domains_match}, protocol::verification::verify_domains_match,
}; };
use activitystreams_kinds::{activity::FollowType, actor::PersonType}; use activitystreams_kinds::{activity::FollowType, actor::PersonType};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;