Merge remote-tracking branch 'upstream/main' into dev

This commit is contained in:
Tangel 2024-01-15 08:13:24 +00:00 committed by GitHub
commit a9934ace30
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 61 additions and 89 deletions

View file

@ -1,6 +1,6 @@
[package] [package]
name = "activitypub_federation" name = "activitypub_federation"
version = "0.5.0-beta.6" version = "0.5.1-beta.1"
edition = "2021" edition = "2021"
description = "High-level Activitypub framework" description = "High-level Activitypub framework"
keywords = ["activitypub", "activitystreams", "federation", "fediverse"] keywords = ["activitypub", "activitystreams", "federation", "fediverse"]
@ -11,24 +11,24 @@ documentation = "https://docs.rs/activitypub_federation/"
[features] [features]
default = ["actix-web", "axum"] default = ["actix-web", "axum"]
actix-web = ["dep:actix-web"] actix-web = ["dep:actix-web"]
axum = ["dep:axum", "dep:tower", "dep:hyper"] axum = ["dep:axum", "dep:tower", "dep:hyper", "dep:http-body-util"]
diesel = ["dep:diesel"] diesel = ["dep:diesel"]
[dependencies] [dependencies]
chrono = { version = "0.4.31", features = ["clock"], default-features = false } chrono = { version = "0.4.31", features = ["clock"], default-features = false }
serde = { version = "1.0.193", features = ["derive"] } serde = { version = "1.0.194", features = ["derive"] }
async-trait = "0.1.74" async-trait = "0.1.77"
url = { version = "2.5.0", features = ["serde"] } url = { version = "2.5.0", features = ["serde"] }
serde_json = { version = "1.0.108", features = ["preserve_order"] } serde_json = { version = "1.0.110", features = ["preserve_order"] }
reqwest = { version = "0.11.22", features = ["json", "stream"] } reqwest = { version = "0.11.23", features = ["json", "stream"] }
reqwest-middleware = "0.2.4" reqwest-middleware = "0.2.4"
tracing = "0.1.40" tracing = "0.1.40"
base64 = "0.21.5" base64 = "0.21.5"
openssl = "0.10.61" openssl = "0.10.62"
once_cell = "1.19.0" once_cell = "1.19.0"
http = "0.2.11" http = "0.2.11"
sha2 = "0.10.8" sha2 = "0.10.8"
thiserror = "1.0.50" thiserror = "1.0.56"
derive_builder = "0.12.0" derive_builder = "0.12.0"
itertools = "0.12.0" itertools = "0.12.0"
dyn-clone = "1.0.16" dyn-clone = "1.0.16"
@ -42,47 +42,43 @@ http-signature-normalization-reqwest = { version = "0.10.0", default-features =
] } ] }
http-signature-normalization = "0.7.0" http-signature-normalization = "0.7.0"
bytes = "1.5.0" bytes = "1.5.0"
futures-core = { version = "0.3.29", default-features = false } futures-core = { version = "0.3.30", default-features = false }
pin-project-lite = "0.2.13" pin-project-lite = "0.2.13"
activitystreams-kinds = "0.3.0" activitystreams-kinds = "0.3.0"
regex = { version = "1.10.2", default-features = false, features = [ regex = { version = "1.10.2", default-features = false, features = ["std", "unicode-case"] }
"std", tokio = { version = "1.35.1", features = [
"unicode-case",
] }
tokio = { version = "1.34.0", features = [
"sync", "sync",
"rt", "rt",
"rt-multi-thread", "rt-multi-thread",
"time", "time",
] } ] }
diesel = { version = "2.1.4", features = [ diesel = { version = "2.1.4", features = ["postgres"], default-features = false, optional = true }
"postgres", futures = "0.3.30"
], default-features = false, optional = true } moka = { version = "0.12.2", features = ["future"] }
futures = "0.3.29"
moka = { version = "0.12.1", features = ["future"] }
# Actix-web # Actix-web
actix-web = { version = "4.4.0", default-features = false, optional = true } actix-web = { version = "4.4.1", default-features = false, optional = true }
# Axum # Axum
axum = { git = "https://github.com/tokio-rs/axum.git", rev = "30afe97e99303fffc4bf2f411a93022b5bc1ba35", features = [ axum = { git = "https://github.com/tokio-rs/axum.git", rev = "30afe97e99303fffc4bf2f411a93022b5bc1ba35", features = [
"json", "json",
], default-features = false, optional = true } ], default-features = false, optional = true }
tower = { version = "*", optional = true } tower = { version = "0.4.13", optional = true }
hyper = { version = "*", optional = true } hyper = { version = "0.14", optional = true }
http-body-util = {version = "0.1.0", optional = true }
[dev-dependencies] [dev-dependencies]
anyhow = "1.0.75" anyhow = "1.0.79"
rand = "0.8.5" rand = "0.8.5"
env_logger = "0.10.1" env_logger = "0.10.1"
tower-http = { version = "*", features = ["map-request-body", "util"] } tower-http = { version = "0.5.0", features = ["map-request-body", "util"] }
axum = { git = "https://github.com/tokio-rs/axum.git", rev = "30afe97e99303fffc4bf2f411a93022b5bc1ba35", features = [ axum = { version = "0.6.20", features = [
"http1", "http1",
"tokio", "tokio",
"query", "query",
], default-features = false } ], default-features = false }
axum-macros = { git = "https://github.com/tokio-rs/axum.git", rev = "30afe97e99303fffc4bf2f411a93022b5bc1ba35" } axum-macros = "0.3.8"
tokio = { version = "*", features = ["full"] } tokio = { version = "1.35.1", features = ["full"] }
[profile.dev] [profile.dev]
strip = "symbols" strip = "symbols"

View file

@ -56,14 +56,16 @@ impl SendActivityTask<'_> {
data: &Data<Datatype>, data: &Data<Datatype>,
) -> Result<Vec<SendActivityTask<'a>>, Error> ) -> Result<Vec<SendActivityTask<'a>>, Error>
where where
Activity: ActivityHandler + Serialize, Activity: ActivityHandler + Serialize + Debug,
Datatype: Clone, Datatype: Clone,
ActorType: Actor, ActorType: Actor,
{ {
let config = &data.config; let config = &data.config;
let actor_id = activity.actor(); let actor_id = activity.actor();
let activity_id = activity.id(); let activity_id = activity.id();
let activity_serialized: Bytes = serde_json::to_vec(&activity).map_err(Error::Json)?.into(); let activity_serialized: Bytes = serde_json::to_vec(&activity)
.map_err(|e| Error::SerializeOutgoingActivity(e, format!("{:?}", activity)))?
.into();
let private_key = get_pkey_cached(data, actor).await?; let private_key = get_pkey_cached(data, actor).await?;
Ok(futures::stream::iter( Ok(futures::stream::iter(

View file

@ -130,8 +130,8 @@ mod test {
.await; .await;
match res { match res {
Err(Error::ParseReceivedActivity(url, _)) => { Err(Error::ParseReceivedActivity(_, url)) => {
assert_eq!(id, url.as_str()); assert_eq!(id, url.expect("has url").as_str());
} }
_ => unreachable!(), _ => unreachable!(),
} }

View file

@ -35,12 +35,18 @@ pub enum Error {
/// Failed to resolve actor via webfinger /// Failed to resolve actor via webfinger
#[error("Failed to resolve actor via webfinger")] #[error("Failed to resolve actor via webfinger")]
WebfingerResolveFailed(#[from] WebFingerError), WebfingerResolveFailed(#[from] WebFingerError),
/// JSON Error /// Failed to serialize outgoing activity
#[error(transparent)] #[error("Failed to serialize outgoing activity {1}: {0}")]
Json(#[from] serde_json::Error), SerializeOutgoingActivity(serde_json::Error, String),
/// Failed to parse an object fetched from url
#[error("Failed to parse object {1} with content {2}: {0}")]
ParseFetchedObject(serde_json::Error, Url, String),
/// Failed to parse an activity received from another instance /// Failed to parse an activity received from another instance
#[error("Failed to parse incoming activity with id {0}: {1}")] #[error("Failed to parse incoming activity {}: {0}", match .1 {
ParseReceivedActivity(Url, serde_json::Error), Some(t) => format!("with id {t}"),
None => String::new(),
})]
ParseReceivedActivity(serde_json::Error, Option<Url>),
/// Reqwest Middleware Error /// Reqwest Middleware Error
#[error(transparent)] #[error(transparent)]
ReqwestMiddleware(#[from] reqwest_middleware::Error), ReqwestMiddleware(#[from] reqwest_middleware::Error),

View file

@ -4,7 +4,7 @@
use crate::{ use crate::{
config::Data, config::Data,
error::Error, error::{Error, Error::ParseFetchedObject},
http_signatures::sign_request, http_signatures::sign_request,
reqwest_shim::ResponseExt, reqwest_shim::ResponseExt,
FEDERATION_CONTENT_TYPE, FEDERATION_CONTENT_TYPE,
@ -93,8 +93,13 @@ async fn fetch_object_http_with_accept<T: Clone, Kind: DeserializeOwned>(
} }
let url = res.url().clone(); let url = res.url().clone();
Ok(FetchObjectResponse { let text = res.bytes_limited().await?;
object: res.json_limited().await?, match serde_json::from_slice(&text) {
url, Ok(object) => Ok(FetchObjectResponse { object, url }),
}) Err(e) => Err(ParseFetchedObject(
e,
url,
String::from_utf8(Vec::from(text))?,
)),
}
} }

View file

@ -57,10 +57,8 @@ where
struct Id { struct Id {
id: Url, id: Url,
} }
match serde_json::from_slice::<Id>(body) { let id = serde_json::from_slice::<Id>(body).ok();
Ok(id) => Error::ParseReceivedActivity(id.id, e), Error::ParseReceivedActivity(e, id.map(|i| i.id))
Err(e) => Error::Json(e),
}
})?; })?;
data.config.verify_url_and_domain(&activity).await?; data.config.verify_url_and_domain(&activity).await?;
let actor = ObjectId::<ActorT>::from(activity.actor().clone()) let actor = ObjectId::<ActorT>::from(activity.actor().clone())

View file

@ -15,11 +15,11 @@
//! }; //! };
//! let note_with_context = WithContext::new_default(note); //! let note_with_context = WithContext::new_default(note);
//! let serialized = serde_json::to_string(&note_with_context)?; //! let serialized = serde_json::to_string(&note_with_context)?;
//! assert_eq!(serialized, r#"{"@context":["https://www.w3.org/ns/activitystreams"],"content":"Hello world"}"#); //! assert_eq!(serialized, r#"{"@context":"https://www.w3.org/ns/activitystreams","content":"Hello world"}"#);
//! Ok::<(), serde_json::error::Error>(()) //! Ok::<(), serde_json::error::Error>(())
//! ``` //! ```
use crate::{config::Data, protocol::helpers::deserialize_one_or_many, traits::ActivityHandler}; use crate::{config::Data, traits::ActivityHandler};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use url::Url; use url::Url;
@ -32,8 +32,7 @@ const DEFAULT_SECURITY_CONTEXT: &str = "https://w3id.org/security/v1";
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct WithContext<T> { pub struct WithContext<T> {
#[serde(rename = "@context")] #[serde(rename = "@context")]
#[serde(deserialize_with = "deserialize_one_or_many")] context: Value,
context: Vec<Value>,
#[serde(flatten)] #[serde(flatten)]
inner: T, inner: T,
} }
@ -49,7 +48,7 @@ impl<T> WithContext<T> {
} }
/// Create new wrapper with custom context. Use this in case you are implementing extensions. /// Create new wrapper with custom context. Use this in case you are implementing extensions.
pub fn new(inner: T, context: Vec<Value>) -> WithContext<T> { pub fn new(inner: T, context: Value) -> WithContext<T> {
WithContext { context, inner } WithContext { context, inner }
} }

View file

@ -56,12 +56,12 @@ where
/// #[derive(serde::Deserialize)] /// #[derive(serde::Deserialize)]
/// struct Note { /// struct Note {
/// #[serde(deserialize_with = "deserialize_one")] /// #[serde(deserialize_with = "deserialize_one")]
/// to: Url /// to: [Url; 1]
/// } /// }
/// ///
/// let note = serde_json::from_str::<Note>(r#"{"to": ["https://example.com/u/alice"] }"#); /// let note = serde_json::from_str::<Note>(r#"{"to": ["https://example.com/u/alice"] }"#);
/// assert!(note.is_ok()); /// assert!(note.is_ok());
pub fn deserialize_one<'de, T, D>(deserializer: D) -> Result<T, D::Error> pub fn deserialize_one<'de, T, D>(deserializer: D) -> Result<[T; 1], D::Error>
where where
T: Deserialize<'de>, T: Deserialize<'de>,
D: Deserializer<'de>, D: Deserializer<'de>,
@ -75,8 +75,8 @@ where
let result: MaybeArray<T> = Deserialize::deserialize(deserializer)?; let result: MaybeArray<T> = Deserialize::deserialize(deserializer)?;
Ok(match result { Ok(match result {
MaybeArray::Simple(value) => value, MaybeArray::Simple(value) => [value],
MaybeArray::Array([value]) => value, MaybeArray::Array([value]) => [value],
}) })
} }
@ -125,7 +125,7 @@ mod tests {
#[derive(serde::Deserialize)] #[derive(serde::Deserialize)]
struct Note { struct Note {
#[serde(deserialize_with = "deserialize_one")] #[serde(deserialize_with = "deserialize_one")]
_to: Url, _to: [Url; 1],
} }
let note = serde_json::from_str::<Note>( let note = serde_json::from_str::<Note>(

View file

@ -3,10 +3,8 @@ use bytes::{BufMut, Bytes, BytesMut};
use futures_core::{ready, stream::BoxStream, Stream}; use futures_core::{ready, stream::BoxStream, Stream};
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use reqwest::Response; use reqwest::Response;
use serde::de::DeserializeOwned;
use std::{ use std::{
future::Future, future::Future,
marker::PhantomData,
mem, mem,
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
@ -46,27 +44,6 @@ impl Future for BytesFuture {
} }
} }
pin_project! {
pub struct JsonFuture<T> {
_t: PhantomData<T>,
#[pin]
future: BytesFuture,
}
}
impl<T> Future for JsonFuture<T>
where
T: DeserializeOwned,
{
type Output = Result<T, Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let bytes = ready!(this.future.poll(cx))?;
Poll::Ready(serde_json::from_slice(&bytes).map_err(Error::Json))
}
}
pin_project! { pin_project! {
pub struct TextFuture { pub struct TextFuture {
#[pin] #[pin]
@ -94,20 +71,16 @@ impl Future for TextFuture {
/// TODO: Remove this shim as soon as reqwest gets support for size-limited bodies. /// TODO: Remove this shim as soon as reqwest gets support for size-limited bodies.
pub trait ResponseExt { pub trait ResponseExt {
type BytesFuture; type BytesFuture;
type JsonFuture<T>;
type TextFuture; type TextFuture;
/// Size limited version of `bytes` to work around a reqwest issue. Check [`ResponseExt`] docs for details. /// Size limited version of `bytes` to work around a reqwest issue. Check [`ResponseExt`] docs for details.
fn bytes_limited(self) -> Self::BytesFuture; fn bytes_limited(self) -> Self::BytesFuture;
/// Size limited version of `json` to work around a reqwest issue. Check [`ResponseExt`] docs for details.
fn json_limited<T>(self) -> Self::JsonFuture<T>;
/// Size limited version of `text` to work around a reqwest issue. Check [`ResponseExt`] docs for details. /// Size limited version of `text` to work around a reqwest issue. Check [`ResponseExt`] docs for details.
fn text_limited(self) -> Self::TextFuture; fn text_limited(self) -> Self::TextFuture;
} }
impl ResponseExt for Response { impl ResponseExt for Response {
type BytesFuture = BytesFuture; type BytesFuture = BytesFuture;
type JsonFuture<T> = JsonFuture<T>;
type TextFuture = TextFuture; type TextFuture = TextFuture;
fn bytes_limited(self) -> Self::BytesFuture { fn bytes_limited(self) -> Self::BytesFuture {
@ -118,13 +91,6 @@ impl ResponseExt for Response {
} }
} }
fn json_limited<T>(self) -> Self::JsonFuture<T> {
JsonFuture {
_t: PhantomData,
future: self.bytes_limited(),
}
}
fn text_limited(self) -> Self::TextFuture { fn text_limited(self) -> Self::TextFuture {
TextFuture { TextFuture {
future: self.bytes_limited(), future: self.bytes_limited(),