Revert back to one generic arg

This commit is contained in:
cetra3 2023-07-08 11:10:22 +09:30
parent 912f930122
commit 57d5b86370
29 changed files with 219 additions and 397 deletions

View file

@ -10,7 +10,7 @@ use activitypub_federation::{
fetch::object_id::ObjectId,
kinds::activity::CreateType,
protocol::{context::WithContext, helpers::deserialize_one_or_many},
queue::{send_activity, simple_queue::SimpleQueue},
queue::send_activity,
traits::{ActivityHandler, Object},
};
use serde::{Deserialize, Serialize};
@ -29,11 +29,7 @@ pub struct CreatePost {
}
impl CreatePost {
pub async fn send(
note: Note,
inbox: Url,
data: &Data<DatabaseHandle, SimpleQueue>,
) -> Result<(), Error> {
pub async fn send(note: Note, inbox: Url, data: &Data<DatabaseHandle>) -> Result<(), Error> {
print!("Sending reply to {}", &note.attributed_to);
let create = CreatePost {
actor: note.attributed_to.clone(),
@ -51,7 +47,6 @@ impl CreatePost {
#[async_trait::async_trait]
impl ActivityHandler for CreatePost {
type DataType = DatabaseHandle;
type QueueType = SimpleQueue;
type Error = crate::error::Error;
fn id(&self) -> &Url {
@ -62,18 +57,12 @@ impl ActivityHandler for CreatePost {
self.actor.inner()
}
async fn verify(
&self,
data: &Data<Self::DataType, Self::QueueType>,
) -> Result<(), Self::Error> {
async fn verify(&self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
DbPost::verify(&self.object, &self.id, data).await?;
Ok(())
}
async fn receive(
self,
data: &Data<Self::DataType, Self::QueueType>,
) -> Result<(), Self::Error> {
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
DbPost::from_json(self.object, data).await?;
Ok(())
}

View file

@ -11,7 +11,6 @@ use activitypub_federation::{
config::Data,
fetch::webfinger::{build_webfinger_response, extract_webfinger_name, Webfinger},
protocol::context::WithContext,
queue::simple_queue::SimpleQueue,
traits::Object,
};
use axum::{
@ -32,7 +31,7 @@ impl IntoResponse for Error {
#[debug_handler]
pub async fn http_get_user(
Path(name): Path<String>,
data: Data<DatabaseHandle, SimpleQueue>,
data: Data<DatabaseHandle>,
) -> Result<FederationJson<WithContext<Person>>, Error> {
let db_user = data.read_user(&name)?;
let json_user = db_user.into_json(&data).await?;
@ -41,10 +40,10 @@ pub async fn http_get_user(
#[debug_handler]
pub async fn http_post_user_inbox(
data: Data<DatabaseHandle, SimpleQueue>,
data: Data<DatabaseHandle>,
activity_data: ActivityData,
) -> impl IntoResponse {
receive_activity::<WithContext<PersonAcceptedActivities>, DbUser, DatabaseHandle, SimpleQueue>(
receive_activity::<WithContext<PersonAcceptedActivities>, DbUser, DatabaseHandle>(
activity_data,
&data,
)
@ -59,7 +58,7 @@ pub struct WebfingerQuery {
#[debug_handler]
pub async fn webfinger(
Query(query): Query<WebfingerQuery>,
data: Data<DatabaseHandle, SimpleQueue>,
data: Data<DatabaseHandle>,
) -> Result<Json<Webfinger>, Error> {
let name = extract_webfinger_name(&query.resource, &data)?;
let db_user = data.read_user(&name)?;

View file

@ -5,7 +5,6 @@ use activitypub_federation::{
http_signatures::generate_actor_keypair,
kinds::actor::PersonType,
protocol::{public_key::PublicKey, verification::verify_domains_match},
queue::simple_queue::SimpleQueue,
traits::{ActivityHandler, Actor, Object},
};
use chrono::{Local, NaiveDateTime};
@ -67,7 +66,6 @@ pub struct Person {
#[async_trait::async_trait]
impl Object for DbUser {
type DataType = DatabaseHandle;
type QueueType = SimpleQueue;
type Kind = Person;
type Error = Error;
@ -77,7 +75,7 @@ impl Object for DbUser {
async fn read_from_id(
object_id: Url,
data: &Data<Self::DataType, Self::QueueType>,
data: &Data<Self::DataType>,
) -> Result<Option<Self>, Self::Error> {
let users = data.users.lock().unwrap();
let res = users
@ -87,10 +85,7 @@ impl Object for DbUser {
Ok(res)
}
async fn into_json(
self,
_data: &Data<Self::DataType, Self::QueueType>,
) -> Result<Self::Kind, Self::Error> {
async fn into_json(self, _data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> {
Ok(Person {
preferred_username: self.name.clone(),
kind: Default::default(),
@ -103,7 +98,7 @@ impl Object for DbUser {
async fn verify(
json: &Self::Kind,
expected_domain: &Url,
_data: &Data<Self::DataType, Self::QueueType>,
_data: &Data<Self::DataType>,
) -> Result<(), Self::Error> {
verify_domains_match(json.id.inner(), expected_domain)?;
Ok(())
@ -111,7 +106,7 @@ impl Object for DbUser {
async fn from_json(
json: Self::Kind,
_data: &Data<Self::DataType, Self::QueueType>,
_data: &Data<Self::DataType>,
) -> Result<Self, Self::Error> {
Ok(DbUser {
name: json.preferred_username,

View file

@ -10,7 +10,6 @@ use activitypub_federation::{
fetch::object_id::ObjectId,
kinds::{object::NoteType, public},
protocol::{helpers::deserialize_one_or_many, verification::verify_domains_match},
queue::simple_queue::SimpleQueue,
traits::{Actor, Object},
};
use activitystreams_kinds::link::MentionType;
@ -49,37 +48,30 @@ pub struct Mention {
#[async_trait::async_trait]
impl Object for DbPost {
type DataType = DatabaseHandle;
type QueueType = SimpleQueue;
type Kind = Note;
type Error = Error;
async fn read_from_id(
_object_id: Url,
_data: &Data<Self::DataType, Self::QueueType>,
_data: &Data<Self::DataType>,
) -> Result<Option<Self>, Self::Error> {
Ok(None)
}
async fn into_json(
self,
_data: &Data<Self::DataType, Self::QueueType>,
) -> Result<Self::Kind, Self::Error> {
async fn into_json(self, _data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> {
unimplemented!()
}
async fn verify(
json: &Self::Kind,
expected_domain: &Url,
_data: &Data<Self::DataType, Self::QueueType>,
_data: &Data<Self::DataType>,
) -> Result<(), Self::Error> {
verify_domains_match(json.id.inner(), expected_domain)?;
Ok(())
}
async fn from_json(
json: Self::Kind,
data: &Data<Self::DataType, Self::QueueType>,
) -> Result<Self, Self::Error> {
async fn from_json(json: Self::Kind, data: &Data<Self::DataType>) -> Result<Self, Self::Error> {
println!(
"Received post with content {} and id {}",
&json.content, &json.id

View file

@ -3,7 +3,6 @@ use activitypub_federation::{
config::Data,
fetch::object_id::ObjectId,
kinds::activity::AcceptType,
queue::simple_queue::SimpleQueue,
traits::ActivityHandler,
};
use serde::{Deserialize, Serialize};
@ -33,7 +32,6 @@ impl Accept {
#[async_trait::async_trait]
impl ActivityHandler for Accept {
type DataType = DatabaseHandle;
type QueueType = SimpleQueue;
type Error = crate::error::Error;
fn id(&self) -> &Url {
@ -44,17 +42,11 @@ impl ActivityHandler for Accept {
self.actor.inner()
}
async fn verify(
&self,
_data: &Data<Self::DataType, Self::QueueType>,
) -> Result<(), Self::Error> {
async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
Ok(())
}
async fn receive(
self,
_data: &Data<Self::DataType, Self::QueueType>,
) -> Result<(), Self::Error> {
async fn receive(self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
Ok(())
}
}

View file

@ -8,7 +8,6 @@ use activitypub_federation::{
fetch::object_id::ObjectId,
kinds::activity::CreateType,
protocol::helpers::deserialize_one_or_many,
queue::simple_queue::SimpleQueue,
traits::{ActivityHandler, Object},
};
use serde::{Deserialize, Serialize};
@ -41,7 +40,6 @@ impl CreatePost {
#[async_trait::async_trait]
impl ActivityHandler for CreatePost {
type DataType = DatabaseHandle;
type QueueType = SimpleQueue;
type Error = crate::error::Error;
fn id(&self) -> &Url {
@ -52,18 +50,12 @@ impl ActivityHandler for CreatePost {
self.actor.inner()
}
async fn verify(
&self,
data: &Data<Self::DataType, Self::QueueType>,
) -> Result<(), Self::Error> {
async fn verify(&self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
DbPost::verify(&self.object, &self.id, data).await?;
Ok(())
}
async fn receive(
self,
data: &Data<Self::DataType, Self::QueueType>,
) -> Result<(), Self::Error> {
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
DbPost::from_json(self.object, data).await?;
Ok(())
}

View file

@ -8,7 +8,6 @@ use activitypub_federation::{
config::Data,
fetch::object_id::ObjectId,
kinds::activity::FollowType,
queue::simple_queue::SimpleQueue,
traits::{ActivityHandler, Actor},
};
use serde::{Deserialize, Serialize};
@ -38,7 +37,6 @@ impl Follow {
#[async_trait::async_trait]
impl ActivityHandler for Follow {
type DataType = DatabaseHandle;
type QueueType = SimpleQueue;
type Error = crate::error::Error;
fn id(&self) -> &Url {
@ -49,19 +47,13 @@ impl ActivityHandler for Follow {
self.actor.inner()
}
async fn verify(
&self,
_data: &Data<Self::DataType, Self::QueueType>,
) -> Result<(), Self::Error> {
async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
Ok(())
}
// Ignore clippy false positive: https://github.com/rust-lang/rust-clippy/issues/6446
#[allow(clippy::await_holding_lock)]
async fn receive(
self,
data: &Data<Self::DataType, Self::QueueType>,
) -> Result<(), Self::Error> {
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
// add to followers
let local_user = {
let mut users = data.users.lock().unwrap();

View file

@ -8,7 +8,6 @@ use activitypub_federation::{
config::{Data, FederationConfig, FederationMiddleware},
fetch::webfinger::{build_webfinger_response, extract_webfinger_name},
protocol::context::WithContext,
queue::simple_queue::SimpleQueue,
traits::{Actor, Object},
FEDERATION_CONTENT_TYPE,
};
@ -17,7 +16,7 @@ use anyhow::anyhow;
use serde::Deserialize;
use tracing::info;
pub fn listen(config: &FederationConfig<DatabaseHandle, SimpleQueue>) -> Result<(), Error> {
pub fn listen(config: &FederationConfig<DatabaseHandle>) -> Result<(), Error> {
let hostname = config.domain();
info!("Listening with actix-web on {hostname}");
let config = config.clone();
@ -36,9 +35,7 @@ pub fn listen(config: &FederationConfig<DatabaseHandle, SimpleQueue>) -> Result<
}
/// Handles requests to fetch system user json over HTTP
pub async fn http_get_system_user(
data: Data<DatabaseHandle, SimpleQueue>,
) -> Result<HttpResponse, Error> {
pub async fn http_get_system_user(data: Data<DatabaseHandle>) -> Result<HttpResponse, Error> {
let json_user = data.system_user.clone().into_json(&data).await?;
Ok(HttpResponse::Ok()
.content_type(FEDERATION_CONTENT_TYPE)
@ -49,7 +46,7 @@ pub async fn http_get_system_user(
pub async fn http_get_user(
request: HttpRequest,
user_name: web::Path<String>,
data: Data<DatabaseHandle, SimpleQueue>,
data: Data<DatabaseHandle>,
) -> Result<HttpResponse, Error> {
let signed_by = signing_actor::<DbUser>(&request, None, &data).await?;
// here, checks can be made on the actor or the domain to which
@ -74,9 +71,9 @@ pub async fn http_get_user(
pub async fn http_post_user_inbox(
request: HttpRequest,
body: Bytes,
data: Data<DatabaseHandle, SimpleQueue>,
data: Data<DatabaseHandle>,
) -> Result<HttpResponse, Error> {
receive_activity::<WithContext<PersonAcceptedActivities>, DbUser, DatabaseHandle, SimpleQueue>(
receive_activity::<WithContext<PersonAcceptedActivities>, DbUser, DatabaseHandle>(
request, body, &data,
)
.await
@ -89,7 +86,7 @@ pub struct WebfingerQuery {
pub async fn webfinger(
query: web::Query<WebfingerQuery>,
data: Data<DatabaseHandle, SimpleQueue>,
data: Data<DatabaseHandle>,
) -> Result<HttpResponse, Error> {
let name = extract_webfinger_name(&query.resource, &data)?;
let db_user = data.read_user(&name)?;

View file

@ -11,7 +11,6 @@ use activitypub_federation::{
config::{Data, FederationConfig, FederationMiddleware},
fetch::webfinger::{build_webfinger_response, extract_webfinger_name, Webfinger},
protocol::context::WithContext,
queue::simple_queue::SimpleQueue,
traits::Object,
};
use axum::{
@ -26,7 +25,7 @@ use serde::Deserialize;
use std::net::ToSocketAddrs;
use tracing::info;
pub fn listen(config: &FederationConfig<DatabaseHandle, SimpleQueue>) -> Result<(), Error> {
pub fn listen(config: &FederationConfig<DatabaseHandle>) -> Result<(), Error> {
let hostname = config.domain();
info!("Listening with axum on {hostname}");
let config = config.clone();
@ -49,7 +48,7 @@ pub fn listen(config: &FederationConfig<DatabaseHandle, SimpleQueue>) -> Result<
#[debug_handler]
async fn http_get_user(
Path(name): Path<String>,
data: Data<DatabaseHandle, SimpleQueue>,
data: Data<DatabaseHandle>,
) -> Result<FederationJson<WithContext<Person>>, Error> {
let db_user = data.read_user(&name)?;
let json_user = db_user.into_json(&data).await?;
@ -58,10 +57,10 @@ async fn http_get_user(
#[debug_handler]
async fn http_post_user_inbox(
data: Data<DatabaseHandle, SimpleQueue>,
data: Data<DatabaseHandle>,
activity_data: ActivityData,
) -> impl IntoResponse {
receive_activity::<WithContext<PersonAcceptedActivities>, DbUser, DatabaseHandle, SimpleQueue>(
receive_activity::<WithContext<PersonAcceptedActivities>, DbUser, DatabaseHandle>(
activity_data,
&data,
)
@ -76,7 +75,7 @@ struct WebfingerQuery {
#[debug_handler]
async fn webfinger(
Query(query): Query<WebfingerQuery>,
data: Data<DatabaseHandle, SimpleQueue>,
data: Data<DatabaseHandle>,
) -> Result<Json<Webfinger>, Error> {
let name = extract_webfinger_name(&query.resource, &data)?;
let db_user = data.read_user(&name)?;

View file

@ -2,10 +2,7 @@ use crate::{
objects::{person::DbUser, post::DbPost},
Error,
};
use activitypub_federation::{
config::{FederationConfig, UrlVerifier},
queue::simple_queue::SimpleQueue,
};
use activitypub_federation::config::{FederationConfig, UrlVerifier};
use anyhow::anyhow;
use async_trait::async_trait;
use std::{
@ -17,7 +14,7 @@ use url::Url;
pub async fn new_instance(
hostname: &str,
name: String,
) -> Result<FederationConfig<DatabaseHandle, SimpleQueue>, Error> {
) -> Result<FederationConfig<DatabaseHandle>, Error> {
let mut system_user = DbUser::new(hostname, "system".into())?;
system_user.ap_id = Url::parse(&format!("http://{}/", hostname))?.into();
@ -79,7 +76,7 @@ impl FromStr for Webserver {
}
pub fn listen(
config: &FederationConfig<DatabaseHandle, SimpleQueue>,
config: &FederationConfig<DatabaseHandle>,
webserver: &Webserver,
) -> Result<(), Error> {
match webserver {

View file

@ -11,7 +11,7 @@ use activitypub_federation::{
http_signatures::generate_actor_keypair,
kinds::actor::PersonType,
protocol::{context::WithContext, public_key::PublicKey, verification::verify_domains_match},
queue::{send_activity, simple_queue::SimpleQueue},
queue::send_activity,
traits::{ActivityHandler, Actor, Object},
};
use chrono::{Local, NaiveDateTime};
@ -81,11 +81,7 @@ impl DbUser {
Ok(Url::parse(&format!("{}/followers", self.ap_id.inner()))?)
}
pub async fn follow(
&self,
other: &str,
data: &Data<DatabaseHandle, SimpleQueue>,
) -> Result<(), Error> {
pub async fn follow(&self, other: &str, data: &Data<DatabaseHandle>) -> Result<(), Error> {
let other: DbUser = webfinger_resolve_actor(other, data).await?;
let id = generate_object_id(data.domain())?;
let follow = Follow::new(self.ap_id.clone(), other.ap_id.clone(), id.clone());
@ -94,11 +90,7 @@ impl DbUser {
Ok(())
}
pub async fn post(
&self,
post: DbPost,
data: &Data<DatabaseHandle, SimpleQueue>,
) -> Result<(), Error> {
pub async fn post(&self, post: DbPost, data: &Data<DatabaseHandle>) -> Result<(), Error> {
let id = generate_object_id(data.domain())?;
let create = CreatePost::new(post.into_json(data).await?, id.clone());
let mut inboxes = vec![];
@ -114,7 +106,7 @@ impl DbUser {
&self,
activity: Activity,
recipients: Vec<Url>,
data: &Data<DatabaseHandle, SimpleQueue>,
data: &Data<DatabaseHandle>,
) -> Result<(), <Activity as ActivityHandler>::Error>
where
Activity: ActivityHandler + Serialize + Debug + Send + Sync,
@ -129,7 +121,6 @@ impl DbUser {
#[async_trait::async_trait]
impl Object for DbUser {
type DataType = DatabaseHandle;
type QueueType = SimpleQueue;
type Kind = Person;
type Error = Error;
@ -139,7 +130,7 @@ impl Object for DbUser {
async fn read_from_id(
object_id: Url,
data: &Data<Self::DataType, Self::QueueType>,
data: &Data<Self::DataType>,
) -> Result<Option<Self>, Self::Error> {
let users = data.users.lock().unwrap();
let res = users
@ -149,10 +140,7 @@ impl Object for DbUser {
Ok(res)
}
async fn into_json(
self,
_data: &Data<Self::DataType, Self::QueueType>,
) -> Result<Self::Kind, Self::Error> {
async fn into_json(self, _data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> {
Ok(Person {
preferred_username: self.name.clone(),
kind: Default::default(),
@ -165,16 +153,13 @@ impl Object for DbUser {
async fn verify(
json: &Self::Kind,
expected_domain: &Url,
_data: &Data<Self::DataType, Self::QueueType>,
_data: &Data<Self::DataType>,
) -> Result<(), Self::Error> {
verify_domains_match(json.id.inner(), expected_domain)?;
Ok(())
}
async fn from_json(
json: Self::Kind,
data: &Data<Self::DataType, Self::QueueType>,
) -> Result<Self, Self::Error> {
async fn from_json(json: Self::Kind, data: &Data<Self::DataType>) -> Result<Self, Self::Error> {
let user = DbUser {
name: json.preferred_username,
ap_id: json.id,

View file

@ -4,7 +4,6 @@ use activitypub_federation::{
fetch::object_id::ObjectId,
kinds::{object::NoteType, public},
protocol::{helpers::deserialize_one_or_many, verification::verify_domains_match},
queue::simple_queue::SimpleQueue,
traits::Object,
};
use serde::{Deserialize, Serialize};
@ -45,13 +44,12 @@ pub struct Note {
#[async_trait::async_trait]
impl Object for DbPost {
type DataType = DatabaseHandle;
type QueueType = SimpleQueue;
type Kind = Note;
type Error = Error;
async fn read_from_id(
object_id: Url,
data: &Data<Self::DataType, Self::QueueType>,
data: &Data<Self::DataType>,
) -> Result<Option<Self>, Self::Error> {
let posts = data.posts.lock().unwrap();
let res = posts
@ -61,10 +59,7 @@ impl Object for DbPost {
Ok(res)
}
async fn into_json(
self,
data: &Data<Self::DataType, Self::QueueType>,
) -> Result<Self::Kind, Self::Error> {
async fn into_json(self, data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> {
let creator = self.creator.dereference_local(data).await?;
Ok(Note {
kind: Default::default(),
@ -78,16 +73,13 @@ impl Object for DbPost {
async fn verify(
json: &Self::Kind,
expected_domain: &Url,
_data: &Data<Self::DataType, Self::QueueType>,
_data: &Data<Self::DataType>,
) -> Result<(), Self::Error> {
verify_domains_match(json.id.inner(), expected_domain)?;
Ok(())
}
async fn from_json(
json: Self::Kind,
data: &Data<Self::DataType, Self::QueueType>,
) -> Result<Self, Self::Error> {
async fn from_json(json: Self::Kind, data: &Data<Self::DataType>) -> Result<Self, Self::Error> {
let post = DbPost {
text: json.content,
ap_id: json.id,

View file

@ -5,7 +5,6 @@ use crate::{
error::Error,
fetch::object_id::ObjectId,
http_signatures::{verify_body_hash, verify_signature},
queue::ActivityQueue,
traits::{ActivityHandler, Actor, Object},
};
use actix_web::{web::Bytes, HttpRequest, HttpResponse};
@ -16,17 +15,14 @@ use tracing::debug;
/// Handles incoming activities, verifying HTTP signatures and other checks
///
/// After successful validation, activities are passed to respective [trait@ActivityHandler].
pub async fn receive_activity<Activity, ActorT, Datatype, Queuetype>(
pub async fn receive_activity<Activity, ActorT, Datatype>(
request: HttpRequest,
body: Bytes,
data: &Data<Datatype, Queuetype>,
data: &Data<Datatype>,
) -> Result<HttpResponse, <Activity as ActivityHandler>::Error>
where
Activity: ActivityHandler<DataType = Datatype, QueueType = Queuetype>
+ DeserializeOwned
+ Send
+ 'static,
ActorT: Object<DataType = Datatype, QueueType = Queuetype> + Actor + Send + 'static,
Activity: ActivityHandler<DataType = Datatype> + DeserializeOwned + Send + 'static,
ActorT: Object<DataType = Datatype> + Actor + Send + 'static,
for<'de2> <ActorT as Object>::Kind: serde::Deserialize<'de2>,
<Activity as ActivityHandler>::Error: From<anyhow::Error>
+ From<Error>
@ -34,7 +30,6 @@ where
+ From<serde_json::Error>,
<ActorT as Object>::Error: From<Error> + From<anyhow::Error>,
Datatype: Clone,
Queuetype: ActivityQueue,
{
verify_body_hash(request.headers().get("Digest"), &body)?;
@ -64,7 +59,7 @@ mod test {
use crate::{
config::FederationConfig,
http_signatures::sign_request,
queue::{request::generate_request_headers, simple_queue::SimpleQueue},
queue::request::generate_request_headers,
traits::tests::{DbConnection, DbUser, Follow, DB_USER_KEYPAIR},
};
use actix_web::test::TestRequest;
@ -75,7 +70,7 @@ mod test {
#[tokio::test]
async fn test_receive_activity() {
let (body, incoming_request, config) = setup_receive_test().await;
receive_activity::<Follow, DbUser, DbConnection, SimpleQueue>(
receive_activity::<Follow, DbUser, DbConnection>(
incoming_request.to_http_request(),
body,
&config.to_request_data(),
@ -87,7 +82,7 @@ mod test {
#[tokio::test]
async fn test_receive_activity_invalid_body_signature() {
let (_, incoming_request, config) = setup_receive_test().await;
let err = receive_activity::<Follow, DbUser, DbConnection, SimpleQueue>(
let err = receive_activity::<Follow, DbUser, DbConnection>(
incoming_request.to_http_request(),
"invalid".into(),
&config.to_request_data(),
@ -104,7 +99,7 @@ mod test {
async fn test_receive_activity_invalid_path() {
let (body, incoming_request, config) = setup_receive_test().await;
let incoming_request = incoming_request.uri("/wrong");
let err = receive_activity::<Follow, DbUser, DbConnection, SimpleQueue>(
let err = receive_activity::<Follow, DbUser, DbConnection>(
incoming_request.to_http_request(),
body,
&config.to_request_data(),
@ -117,11 +112,7 @@ mod test {
assert_eq!(e, &Error::ActivitySignatureInvalid)
}
async fn setup_receive_test() -> (
Bytes,
TestRequest,
FederationConfig<DbConnection, SimpleQueue>,
) {
async fn setup_receive_test() -> (Bytes, TestRequest, FederationConfig<DbConnection>) {
let inbox = "https://example.com/inbox";
let headers = generate_request_headers(&Url::parse(inbox).unwrap());
let request_builder = ClientWithMiddleware::from(Client::default())

View file

@ -1,7 +1,4 @@
use crate::{
config::{Data, FederationConfig, FederationMiddleware},
queue::ActivityQueue,
};
use crate::config::{Data, FederationConfig, FederationMiddleware};
use actix_web::{
dev::{forward_ready, Payload, Service, ServiceRequest, ServiceResponse, Transform},
Error,
@ -11,47 +8,45 @@ use actix_web::{
};
use std::future::{ready, Ready};
impl<S, B, T, Q> Transform<S, ServiceRequest> for FederationMiddleware<T, Q>
impl<S, B, T> Transform<S, ServiceRequest> for FederationMiddleware<T>
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
S::Future: 'static,
B: 'static,
T: Clone + Sync + 'static,
Q: ActivityQueue + Sync + 'static,
{
type Response = ServiceResponse<B>;
type Error = Error;
type Transform = FederationService<S, T, Q>;
type Transform = FederationService<S, T>;
type InitError = ();
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ready(Ok(FederationService {
service,
config: self.config.clone(),
config: self.0.clone(),
}))
}
}
/// Passes [FederationConfig] to HTTP handlers, converting it to [Data] in the process
#[doc(hidden)]
pub struct FederationService<S, T: Clone, Q: ActivityQueue>
pub struct FederationService<S, T: Clone>
where
S: Service<ServiceRequest, Error = Error>,
S::Future: 'static,
T: Sync,
{
service: S,
config: FederationConfig<T, Q>,
config: FederationConfig<T>,
}
impl<S, B, T, Q> Service<ServiceRequest> for FederationService<S, T, Q>
impl<S, B, T> Service<ServiceRequest> for FederationService<S, T>
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
S::Future: 'static,
B: 'static,
T: Clone + Sync + 'static,
Q: ActivityQueue + Sync + 'static,
{
type Response = ServiceResponse<B>;
type Error = Error;
@ -66,12 +61,12 @@ where
}
}
impl<T: Clone + 'static, Q: ActivityQueue + 'static> FromRequest for Data<T, Q> {
impl<T: Clone + 'static> FromRequest for Data<T> {
type Error = Error;
type Future = Ready<Result<Self, Self::Error>>;
fn from_request(req: &HttpRequest, _payload: &mut Payload) -> Self::Future {
ready(match req.extensions().get::<FederationConfig<T, Q>>() {
ready(match req.extensions().get::<FederationConfig<T>>() {
Some(c) => Ok(c.to_request_data()),
None => Err(actix_web::error::ErrorBadRequest(
"Missing extension, did you register FederationMiddleware?",

View file

@ -18,7 +18,7 @@ use serde::Deserialize;
pub async fn signing_actor<A>(
request: &HttpRequest,
body: Option<Bytes>,
data: &Data<<A as Object>::DataType, <A as Object>::QueueType>,
data: &Data<<A as Object>::DataType>,
) -> Result<A, <A as Object>::Error>
where
A: Object + Actor,

View file

@ -7,7 +7,6 @@ use crate::{
error::Error,
fetch::object_id::ObjectId,
http_signatures::{verify_body_hash, verify_signature},
queue::ActivityQueue,
traits::{ActivityHandler, Actor, Object},
};
use axum::{
@ -22,16 +21,13 @@ use serde::de::DeserializeOwned;
use tracing::debug;
/// Handles incoming activities, verifying HTTP signatures and other checks
pub async fn receive_activity<Activity, ActorT, Datatype, Queuetype>(
pub async fn receive_activity<Activity, ActorT, Datatype>(
activity_data: ActivityData,
data: &Data<Datatype, Queuetype>,
data: &Data<Datatype>,
) -> Result<(), <Activity as ActivityHandler>::Error>
where
Activity: ActivityHandler<DataType = Datatype, QueueType = Queuetype>
+ DeserializeOwned
+ Send
+ 'static,
ActorT: Object<DataType = Datatype, QueueType = Queuetype> + Actor + Send + 'static,
Activity: ActivityHandler<DataType = Datatype> + DeserializeOwned + Send + 'static,
ActorT: Object<DataType = Datatype> + Actor + Send + 'static,
for<'de2> <ActorT as Object>::Kind: serde::Deserialize<'de2>,
<Activity as ActivityHandler>::Error: From<anyhow::Error>
+ From<Error>
@ -39,7 +35,6 @@ where
+ From<serde_json::Error>,
<ActorT as Object>::Error: From<Error> + From<anyhow::Error>,
Datatype: Clone,
Queuetype: ActivityQueue,
{
verify_body_hash(activity_data.headers.get("Digest"), &activity_data.body)?;

View file

@ -1,45 +1,33 @@
use crate::{
config::{Data, FederationConfig, FederationMiddleware},
queue::ActivityQueue,
};
use crate::config::{Data, FederationConfig, FederationMiddleware};
use axum::{async_trait, body::Body, extract::FromRequestParts, http::Request, response::Response};
use http::{request::Parts, StatusCode};
use std::task::{Context, Poll};
use tower::{Layer, Service};
impl<S, T: Clone, Q: ActivityQueue> Layer<S> for FederationMiddleware<T, Q> {
type Service = FederationService<S, T, Q>;
impl<S, T: Clone> Layer<S> for FederationMiddleware<T> {
type Service = FederationService<S, T>;
fn layer(&self, inner: S) -> Self::Service {
FederationService {
inner,
config: self.config.clone(),
config: self.0.clone(),
}
}
}
/// Passes [FederationConfig] to HTTP handlers, converting it to [Data] in the process
#[doc(hidden)]
pub struct FederationService<S, T: Clone, Q: ActivityQueue> {
#[derive(Clone)]
pub struct FederationService<S, T: Clone> {
inner: S,
config: FederationConfig<T, Q>,
config: FederationConfig<T>,
}
impl<S: Clone, T: Clone, Q: ActivityQueue> Clone for FederationService<S, T, Q> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
config: self.config.clone(),
}
}
}
impl<S, T, Q> Service<Request<Body>> for FederationService<S, T, Q>
impl<S, T> Service<Request<Body>> for FederationService<S, T>
where
S: Service<Request<Body>, Response = Response> + Send + 'static,
S::Future: Send + 'static,
T: Clone + Send + Sync + 'static,
Q: ActivityQueue + Send + Sync + 'static,
{
type Response = S::Response;
type Error = S::Error;
@ -56,16 +44,15 @@ where
}
#[async_trait]
impl<S, T: Clone + 'static, Q: ActivityQueue + 'static> FromRequestParts<S> for Data<T, Q>
impl<S, T: Clone + 'static> FromRequestParts<S> for Data<T>
where
S: Send + Sync,
T: Send + Sync,
Q: Send + Sync,
{
type Rejection = (StatusCode, &'static str);
async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
match parts.extensions.get::<FederationConfig<T, Q>>() {
match parts.extensions.get::<FederationConfig<T>>() {
Some(c) => Ok(c.to_request_data()),
None => Err((
StatusCode::INTERNAL_SERVER_ERROR,

View file

@ -18,10 +18,9 @@
use crate::{
error::Error,
protocol::verification::verify_domains_match,
queue::{simple_queue::SimpleQueue, ActivityQueue},
queue::{simple_queue::SimpleQueue, ActivityQueue, SendActivityTask},
traits::{ActivityHandler, Actor},
};
use anyhow::Context;
use async_trait::async_trait;
use derive_builder::Builder;
use dyn_clone::{clone_trait_object, DynClone};
@ -36,12 +35,14 @@ use std::{
},
time::Duration,
};
use tokio::sync::mpsc::{channel, Sender};
use tracing::error;
use url::Url;
/// Configuration for this library, with various federation related settings
#[derive(Builder)]
#[builder(build_fn(private, name = "partial_build"), pattern = "owned")]
pub struct FederationConfig<T: Clone, Q: ActivityQueue + ?Sized> {
#[derive(Builder, Clone)]
#[builder(build_fn(private, name = "partial_build"))]
pub struct FederationConfig<T: Clone> {
/// The domain where this federated instance is running
#[builder(setter(into))]
pub(crate) domain: String,
@ -94,34 +95,13 @@ pub struct FederationConfig<T: Clone, Q: ActivityQueue + ?Sized> {
pub(crate) signed_fetch_actor: Option<Arc<(Url, PKey<Private>)>>,
/// Queue for sending outgoing activities. Only optional to make builder work, its always
/// present once constructed.
#[builder(default = "None", setter(custom))]
pub(crate) activity_queue: Option<Arc<Q>>,
#[builder(setter(custom))]
pub(crate) activity_queue: Sender<SendActivityTask>,
}
// No clue why this can't be derived, so just implemented manually...
impl<T: Clone, Q: ActivityQueue> Clone for FederationConfig<T, Q> {
fn clone(&self) -> Self {
Self {
domain: self.domain.clone(),
app_data: self.app_data.clone(),
http_fetch_limit: self.http_fetch_limit.clone(),
client: self.client.clone(),
worker_count: self.worker_count.clone(),
retry_count: self.retry_count.clone(),
debug: self.debug.clone(),
allow_http_urls: self.allow_http_urls.clone(),
request_timeout: self.request_timeout.clone(),
url_verifier: self.url_verifier.clone(),
http_signature_compat: self.http_signature_compat.clone(),
signed_fetch_actor: self.signed_fetch_actor.clone(),
activity_queue: self.activity_queue.clone(),
}
}
}
impl<T: Clone, Q: ActivityQueue> FederationConfig<T, Q> {
impl<T: Clone> FederationConfig<T> {
/// Returns a new config builder with default values.
pub fn builder() -> FederationConfigBuilder<T, Q> {
pub fn builder() -> FederationConfigBuilder<T> {
FederationConfigBuilder::default()
}
@ -144,7 +124,7 @@ impl<T: Clone, Q: ActivityQueue> FederationConfig<T, Q> {
}
/// Create new [Data] from this. You should prefer to use a middleware if possible.
pub fn to_request_data(&self) -> Data<T, Q> {
pub fn to_request_data(&self) -> Data<T> {
Data {
config: self.clone(),
request_counter: Default::default(),
@ -207,9 +187,9 @@ impl<T: Clone, Q: ActivityQueue> FederationConfig<T, Q> {
}
}
impl<T: Clone, Q: ActivityQueue> FederationConfigBuilder<T, Q> {
impl<T: Clone> FederationConfigBuilder<T> {
/// Sets an actor to use to sign all federated fetch requests
pub fn signed_fetch_actor<A: Actor>(mut self, actor: &A) -> Self {
pub fn signed_fetch_actor<A: Actor>(&mut self, actor: &A) -> &mut Self {
let private_key_pem = actor
.private_key_pem()
.expect("actor does not have a private key to sign with");
@ -221,51 +201,75 @@ impl<T: Clone, Q: ActivityQueue> FederationConfigBuilder<T, Q> {
}
/// Sets an actor to use to sign all federated fetch requests
pub fn activity_queue(mut self, queue: Q) -> Self {
self.activity_queue = Some(Some(Arc::new(queue)));
pub async fn activity_queue<Q: ActivityQueue + Sync + Send + 'static>(
&mut self,
queue: Q,
) -> &mut Self {
let (sender, mut receiver) = channel(8192);
tokio::spawn(async move {
while let Some(message) = receiver.recv().await {
if let Err(err) = queue.queue(message).await {
error!("{err:?}");
}
}
});
self.activity_queue = Some(sender);
self
}
}
impl<T: Clone> FederationConfigBuilder<T, SimpleQueue> {
/// Constructs a new config instance with the values supplied to builder.
///
/// Values which are not explicitly specified use the defaults. Also initializes the
/// queue for outgoing activities, which is stored internally in the config struct.
/// Requires a tokio runtime for the background queue.
pub async fn build(
self,
) -> Result<FederationConfig<T, SimpleQueue>, FederationConfigBuilderError> {
let mut config = self.partial_build()?;
config.activity_queue = Some(Arc::new(SimpleQueue::from_config(&config)));
Ok(config)
pub async fn build(&mut self) -> Result<FederationConfig<T>, FederationConfigBuilderError> {
if self.activity_queue.is_none() {
let queue = SimpleQueue::new(
self.client
.clone()
.unwrap_or_else(|| reqwest::Client::default().into()),
self.worker_count.unwrap_or_default(),
self.retry_count.unwrap_or_default(),
self.request_timeout.unwrap_or(Duration::from_secs(10)),
60,
self.http_signature_compat.unwrap_or_default(),
);
self.activity_queue(queue).await;
}
self.partial_build()
}
}
impl<T: Clone> FederationConfig<T, SimpleQueue> {
/// Shut down this federation, waiting for the outgoing queue to be sent.
/// If the activityqueue is still in use in other requests or was never constructed, returns an error.
/// If wait_retries is true, also wait for requests that have initially failed and are being retried.
/// Returns a stats object that can be printed for debugging (structure currently not part of the public interface).
///
/// Currently, this method does not work correctly if worker_count = 0 (unlimited)
pub async fn shutdown(mut self, wait_retries: bool) -> anyhow::Result<impl std::fmt::Debug> {
let q = self
.activity_queue
.take()
.context("ActivityQueue never constructed, build() not called?")?;
// Todo: use Arc::into_inner but is only part of rust 1.70.
let stats = Arc::<SimpleQueue>::try_unwrap(q)
.map_err(|_| {
anyhow::anyhow!(
"Could not cleanly shut down: activityqueue arc was still in use elsewhere "
)
})?
.shutdown(wait_retries)
.await?;
Ok(stats)
}
}
impl<T: Clone, Q: ActivityQueue> Deref for FederationConfig<T, Q> {
// impl<T: Clone> FederationConfig<T> {
// /// Shut down this federation, waiting for the outgoing queue to be sent.
// /// If the activityqueue is still in use in other requests or was never constructed, returns an error.
// /// If wait_retries is true, also wait for requests that have initially failed and are being retried.
// /// Returns a stats object that can be printed for debugging (structure currently not part of the public interface).
// ///
// /// Currently, this method does not work correctly if worker_count = 0 (unlimited)
// pub async fn shutdown(mut self, wait_retries: bool) -> anyhow::Result<impl std::fmt::Debug> {
// let q = self
// .activity_queue
// .take()
// .context("ActivityQueue never constructed, build() not called?")?;
// // Todo: use Arc::into_inner but is only part of rust 1.70.
// let stats = Arc::<SimpleQueue>::try_unwrap(q)
// .map_err(|_| {
// anyhow::anyhow!(
// "Could not cleanly shut down: activityqueue arc was still in use elsewhere "
// )
// })?
// .shutdown(wait_retries)
// .await?;
// Ok(stats)
// }
// }
impl<T: Clone> Deref for FederationConfig<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
@ -334,12 +338,12 @@ clone_trait_object!(UrlVerifier);
/// prevent denial of service attacks, where an attacker triggers fetching of recursive objects.
///
/// <https://www.w3.org/TR/activitypub/#security-recursive-objects>
pub struct Data<T: Clone, Q: ActivityQueue> {
pub(crate) config: FederationConfig<T, Q>,
pub struct Data<T: Clone> {
pub(crate) config: FederationConfig<T>,
pub(crate) request_counter: AtomicU32,
}
impl<T: Clone, Q: ActivityQueue> Data<T, Q> {
impl<T: Clone> Data<T> {
/// Returns the data which was stored in [FederationConfigBuilder::app_data]
pub fn app_data(&self) -> &T {
&self.config.app_data
@ -363,7 +367,7 @@ impl<T: Clone, Q: ActivityQueue> Data<T, Q> {
}
}
impl<T: Clone, Q: ActivityQueue> Deref for Data<T, Q> {
impl<T: Clone> Deref for Data<T> {
type Target = T;
fn deref(&self) -> &T {
@ -372,21 +376,12 @@ impl<T: Clone, Q: ActivityQueue> Deref for Data<T, Q> {
}
/// Middleware for HTTP handlers which provides access to [Data]
pub struct FederationMiddleware<T: Clone, Q: ActivityQueue> {
pub(crate) config: FederationConfig<T, Q>,
}
#[derive(Clone)]
pub struct FederationMiddleware<T: Clone>(pub(crate) FederationConfig<T>);
impl<T: Clone, Q: ActivityQueue> FederationMiddleware<T, Q> {
impl<T: Clone> FederationMiddleware<T> {
/// Construct a new middleware instance
pub fn new(config: FederationConfig<T, Q>) -> Self {
FederationMiddleware { config }
}
}
impl<T: Clone, Q: ActivityQueue> Clone for FederationMiddleware<T, Q> {
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
}
pub fn new(config: FederationConfig<T>) -> Self {
FederationMiddleware(config)
}
}

View file

@ -35,7 +35,7 @@ where
pub async fn dereference(
&self,
owner: &<Kind as Collection>::Owner,
data: &Data<<Kind as Collection>::DataType, <Kind as Collection>::QueueType>,
data: &Data<<Kind as Collection>::DataType>,
) -> Result<Kind, <Kind as Collection>::Error>
where
<Kind as Collection>::Error: From<Error>,

View file

@ -6,7 +6,6 @@ use crate::{
config::Data,
error::Error,
http_signatures::sign_request,
queue::ActivityQueue,
reqwest_shim::ResponseExt,
FEDERATION_CONTENT_TYPE,
};
@ -34,9 +33,9 @@ pub mod webfinger;
/// If the value exceeds [FederationSettings.http_fetch_limit], the request is aborted with
/// [Error::RequestLimit]. This prevents denial of service attacks where an attack triggers
/// infinite, recursive fetching of data.
pub async fn fetch_object_http<T: Clone, Kind: DeserializeOwned, Q: ActivityQueue>(
pub async fn fetch_object_http<T: Clone, Kind: DeserializeOwned>(
url: &Url,
data: &Data<T, Q>,
data: &Data<T>,
) -> Result<Kind, Error> {
let config = &data.config;
// dont fetch local objects this way

View file

@ -87,7 +87,7 @@ where
/// Fetches an activitypub object, either from local database (if possible), or over http.
pub async fn dereference(
&self,
data: &Data<<Kind as Object>::DataType, <Kind as Object>::QueueType>,
data: &Data<<Kind as Object>::DataType>,
) -> Result<Kind, <Kind as Object>::Error>
where
<Kind as Object>::Error: From<Error> + From<anyhow::Error>,
@ -121,7 +121,7 @@ where
/// the object is not found in the database.
pub async fn dereference_local(
&self,
data: &Data<<Kind as Object>::DataType, <Kind as Object>::QueueType>,
data: &Data<<Kind as Object>::DataType>,
) -> Result<Kind, <Kind as Object>::Error>
where
<Kind as Object>::Error: From<Error>,
@ -133,7 +133,7 @@ where
/// returning none means the object was not found in local db
async fn dereference_from_db(
&self,
data: &Data<<Kind as Object>::DataType, <Kind as Object>::QueueType>,
data: &Data<<Kind as Object>::DataType>,
) -> Result<Option<Kind>, <Kind as Object>::Error> {
let id = self.0.clone();
Object::read_from_id(*id, data).await
@ -141,7 +141,7 @@ where
async fn dereference_from_http(
&self,
data: &Data<<Kind as Object>::DataType, <Kind as Object>::QueueType>,
data: &Data<<Kind as Object>::DataType>,
db_object: Option<Kind>,
) -> Result<Kind, <Kind as Object>::Error>
where

View file

@ -2,7 +2,6 @@ use crate::{
config::Data,
error::{Error, Error::WebfingerResolveFailed},
fetch::{fetch_object_http, object_id::ObjectId},
queue::ActivityQueue,
traits::{Actor, Object},
FEDERATION_CONTENT_TYPE,
};
@ -20,7 +19,7 @@ use url::Url;
/// is then fetched using [ObjectId::dereference], and the result returned.
pub async fn webfinger_resolve_actor<T: Clone, Kind>(
identifier: &str,
data: &Data<T, <Kind as Object>::QueueType>,
data: &Data<T>,
) -> Result<Kind, <Kind as Object>::Error>
where
Kind: Object + Actor + Send + 'static + Object<DataType = T>,
@ -86,10 +85,9 @@ where
/// # Ok::<(), anyhow::Error>(())
/// }).unwrap();
///```
pub fn extract_webfinger_name<T, Q>(query: &str, data: &Data<T, Q>) -> Result<String, Error>
pub fn extract_webfinger_name<T>(query: &str, data: &Data<T>) -> Result<String, Error>
where
T: Clone,
Q: ActivityQueue,
{
// TODO: would be nice if we could implement this without regex and remove the dependency
// Regex taken from Mastodon -

View file

@ -148,7 +148,7 @@ pub(crate) async fn signing_actor<'a, A, H>(
headers: H,
method: &Method,
uri: &Uri,
data: &Data<<A as Object>::DataType, <A as Object>::QueueType>,
data: &Data<<A as Object>::DataType>,
) -> Result<A, <A as Object>::Error>
where
A: Object + Actor,

View file

@ -61,7 +61,6 @@ where
T: ActivityHandler + Send + Sync,
{
type DataType = <T as ActivityHandler>::DataType;
type QueueType = <T as ActivityHandler>::QueueType;
type Error = <T as ActivityHandler>::Error;
fn id(&self) -> &Url {
@ -72,17 +71,11 @@ where
self.inner.actor()
}
async fn verify(
&self,
data: &Data<Self::DataType, Self::QueueType>,
) -> Result<(), Self::Error> {
async fn verify(&self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
self.inner.verify(data).await
}
async fn receive(
self,
data: &Data<Self::DataType, Self::QueueType>,
) -> Result<(), Self::Error> {
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
self.inner.receive(data).await
}
}

View file

@ -25,7 +25,7 @@ use std::{
};
use uuid::Uuid;
use tracing::{debug, info, warn};
use tracing::{debug, warn};
use url::Url;
use self::request::sign_and_send;
@ -34,7 +34,7 @@ use self::request::sign_and_send;
/// Anything that can enqueue outgoing activitypub requests
pub trait ActivityQueue {
/// The errors that can be returned when queuing
type Error;
type Error: Debug;
/// Retrieve the queue stats
fn stats(&self) -> &Stats;
@ -44,26 +44,20 @@ pub trait ActivityQueue {
}
/// Sends an activity with an outbound activity queue
pub async fn send_activity<Q: ActivityQueue, Activity, Datatype, ActorType>(
pub async fn send_activity<Activity, Datatype, ActorType>(
activity: Activity,
actor: &ActorType,
inboxes: Vec<Url>,
data: &Data<Datatype, Q>,
data: &Data<Datatype>,
) -> Result<(), <Activity as ActivityHandler>::Error>
where
Activity: ActivityHandler + Serialize,
<Activity as ActivityHandler>::Error: From<anyhow::Error>
+ From<serde_json::Error>
+ std::convert::From<<Q as ActivityQueue>::Error>,
<Activity as ActivityHandler>::Error: From<anyhow::Error> + From<serde_json::Error>,
Datatype: Clone,
ActorType: Actor,
{
let config = &data.config;
let queue = data
.config
.activity_queue
.clone()
.expect("Should have a queue configured");
let queue = data.config.activity_queue.clone();
let actor_id = activity.actor();
let activity_id = activity.id();
let activity_serialized: Bytes = serde_json::to_vec(&activity)?.into();
@ -114,15 +108,18 @@ where
warn!("{err}");
}
} else {
queue.queue(message).await?;
let stats = queue.stats();
let running = stats.running.load(Ordering::Relaxed);
if running == config.worker_count && config.worker_count != 0 {
warn!("Reached max number of send activity workers ({}). Consider increasing worker count to avoid federation delays", config.worker_count);
warn!("{:?}", stats);
} else {
info!("{:?}", stats);
}
queue
.send(message)
.await
.map_err(|err| anyhow!("Error sending to queue:{err:?}"))?;
//let stats = queue.stats();
//let running = stats.running.load(Ordering::Relaxed);
//if running == config.worker_count && config.worker_count != 0 {
// warn!("Reached max number of send activity workers ({}). Consider increasing worker count to avoid federation delays", config.worker_count);
// warn!("{:?}", stats);
//} else {
// info!("{:?}", stats);
//}
}
}
@ -184,6 +181,7 @@ mod tests {
};
use crate::{http_signatures::generate_actor_keypair, queue::simple_queue::SimpleQueue};
use tracing::{debug, info};
use super::*;

View file

@ -14,7 +14,7 @@ use crate::{
FEDERATION_CONTENT_TYPE,
};
use anyhow::{anyhow, Context};
use tracing::*;
use tracing::debug;
use super::{util::RetryStrategy, SendActivityTask};

View file

@ -50,7 +50,7 @@ impl ActivityQueue for SimpleQueue {
impl SimpleQueue {
/// Construct a queue from federation config
pub fn from_config<T: Clone>(config: &FederationConfig<T, SimpleQueue>) -> Self {
pub fn from_config<T: Clone>(config: &FederationConfig<T>) -> Self {
Self::new(
config.client.clone(),
config.worker_count,

View file

@ -3,7 +3,7 @@ use std::{
fmt::{Debug, Display},
time::Duration,
};
use tracing::*;
use tracing::warn;
#[derive(Clone, Copy, Default)]
pub(crate) struct RetryStrategy {

View file

@ -1,10 +1,6 @@
//! Traits which need to be implemented for federated data types
use crate::{
config::Data,
protocol::public_key::PublicKey,
queue::{simple_queue::SimpleQueue, ActivityQueue},
};
use crate::{config::Data, protocol::public_key::PublicKey};
use async_trait::async_trait;
use chrono::NaiveDateTime;
use serde::Deserialize;
@ -101,8 +97,6 @@ pub trait Object: Sized + Debug {
/// App data type passed to handlers. Must be identical to
/// [crate::config::FederationConfigBuilder::app_data] type.
type DataType: Clone + Send + Sync;
/// The queue type to use with this object
type QueueType: ActivityQueue + Send + Sync;
/// The type of protocol struct which gets sent over network to federate this database struct.
type Kind;
/// Error type returned by handler methods
@ -126,16 +120,13 @@ pub trait Object: Sized + Debug {
/// Should return `Ok(None)` if not found.
async fn read_from_id(
object_id: Url,
data: &Data<Self::DataType, Self::QueueType>,
data: &Data<Self::DataType>,
) -> Result<Option<Self>, Self::Error>;
/// Mark remote object as deleted in local database.
///
/// Called when a `Delete` activity is received, or if fetch returns a `Tombstone` object.
async fn delete(
self,
_data: &Data<Self::DataType, Self::QueueType>,
) -> Result<(), Self::Error> {
async fn delete(self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
Ok(())
}
@ -143,10 +134,7 @@ pub trait Object: Sized + Debug {
///
/// Called when a local object gets fetched by another instance over HTTP, or when an object
/// gets sent in an activity.
async fn into_json(
self,
data: &Data<Self::DataType, Self::QueueType>,
) -> Result<Self::Kind, Self::Error>;
async fn into_json(self, data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error>;
/// Verifies that the received object is valid.
///
@ -158,7 +146,7 @@ pub trait Object: Sized + Debug {
async fn verify(
json: &Self::Kind,
expected_domain: &Url,
data: &Data<Self::DataType, Self::QueueType>,
data: &Data<Self::DataType>,
) -> Result<(), Self::Error>;
/// Convert object from ActivityPub type to database type.
@ -166,10 +154,7 @@ pub trait Object: Sized + Debug {
/// Called when an object is received from HTTP fetch or as part of an activity. This method
/// should write the received object to database. Note that there is no distinction between
/// create and update, so an `upsert` operation should be used.
async fn from_json(
json: Self::Kind,
data: &Data<Self::DataType, Self::QueueType>,
) -> Result<Self, Self::Error>;
async fn from_json(json: Self::Kind, data: &Data<Self::DataType>) -> Result<Self, Self::Error>;
}
/// Handler for receiving incoming activities.
@ -221,10 +206,6 @@ pub trait ActivityHandler {
/// App data type passed to handlers. Must be identical to
/// [crate::config::FederationConfigBuilder::app_data] type.
type DataType: Clone + Send + Sync;
/// The queue type to use with this object
type QueueType: ActivityQueue + Send + Sync;
/// Error type returned by handler methods
type Error;
@ -238,15 +219,13 @@ pub trait ActivityHandler {
///
/// This needs to be a separate method, because it might be used for activities
/// like `Undo/Follow`, which shouldn't perform any database write for the inner `Follow`.
async fn verify(&self, data: &Data<Self::DataType, Self::QueueType>)
-> Result<(), Self::Error>;
async fn verify(&self, data: &Data<Self::DataType>) -> Result<(), Self::Error>;
/// Called when an activity is received.
///
/// Should perform validation and possibly write action to the database. In case the activity
/// has a nested `object` field, must call `object.from_json` handler.
async fn receive(self, data: &Data<Self::DataType, Self::QueueType>)
-> Result<(), Self::Error>;
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error>;
}
/// Trait to allow retrieving common Actor data.
@ -292,7 +271,6 @@ where
T: ActivityHandler + Send + Sync,
{
type DataType = T::DataType;
type QueueType = T::QueueType;
type Error = T::Error;
fn id(&self) -> &Url {
@ -303,17 +281,11 @@ where
self.deref().actor()
}
async fn verify(
&self,
data: &Data<Self::DataType, Self::QueueType>,
) -> Result<(), Self::Error> {
async fn verify(&self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
self.deref().verify(data).await
}
async fn receive(
self,
data: &Data<Self::DataType, Self::QueueType>,
) -> Result<(), Self::Error> {
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
(*self).receive(data).await
}
}
@ -326,10 +298,6 @@ pub trait Collection: Sized {
/// App data type passed to handlers. Must be identical to
/// [crate::config::FederationConfigBuilder::app_data] type.
type DataType: Clone + Send + Sync;
/// The queue type to use with this object
type QueueType: ActivityQueue + Send + Sync;
/// The type of protocol struct which gets sent over network to federate this database struct.
type Kind: for<'de2> Deserialize<'de2>;
/// Error type returned by handler methods
@ -338,7 +306,7 @@ pub trait Collection: Sized {
/// Reads local collection from database and returns it as Activitypub JSON.
async fn read_local(
owner: &Self::Owner,
data: &Data<Self::DataType, Self::QueueType>,
data: &Data<Self::DataType>,
) -> Result<Self::Kind, Self::Error>;
/// Verifies that the received object is valid.
@ -348,7 +316,7 @@ pub trait Collection: Sized {
async fn verify(
json: &Self::Kind,
expected_domain: &Url,
data: &Data<Self::DataType, Self::QueueType>,
data: &Data<Self::DataType>,
) -> Result<(), Self::Error>;
/// Convert object from ActivityPub type to database type.
@ -359,7 +327,7 @@ pub trait Collection: Sized {
async fn from_json(
json: Self::Kind,
owner: &Self::Owner,
data: &Data<Self::DataType, Self::QueueType>,
data: &Data<Self::DataType>,
) -> Result<Self, Self::Error>;
}
@ -435,21 +403,17 @@ pub mod tests {
#[async_trait]
impl Object for DbUser {
type DataType = DbConnection;
type QueueType = SimpleQueue;
type Kind = Person;
type Error = Error;
async fn read_from_id(
_object_id: Url,
_data: &Data<Self::DataType, Self::QueueType>,
_data: &Data<Self::DataType>,
) -> Result<Option<Self>, Self::Error> {
Ok(Some(DB_USER.clone()))
}
async fn into_json(
self,
_data: &Data<Self::DataType, Self::QueueType>,
) -> Result<Self::Kind, Self::Error> {
async fn into_json(self, _data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> {
Ok(Person {
preferred_username: self.name.clone(),
kind: Default::default(),
@ -462,7 +426,7 @@ pub mod tests {
async fn verify(
json: &Self::Kind,
expected_domain: &Url,
_data: &Data<Self::DataType, Self::QueueType>,
_data: &Data<Self::DataType>,
) -> Result<(), Self::Error> {
verify_domains_match(json.id.inner(), expected_domain)?;
Ok(())
@ -470,7 +434,7 @@ pub mod tests {
async fn from_json(
json: Self::Kind,
_data: &Data<Self::DataType, Self::QueueType>,
_data: &Data<Self::DataType>,
) -> Result<Self, Self::Error> {
Ok(DbUser {
name: json.preferred_username,
@ -515,7 +479,6 @@ pub mod tests {
#[async_trait]
impl ActivityHandler for Follow {
type DataType = DbConnection;
type QueueType = SimpleQueue;
type Error = Error;
fn id(&self) -> &Url {
@ -526,17 +489,11 @@ pub mod tests {
self.actor.inner()
}
async fn verify(
&self,
_: &Data<Self::DataType, Self::QueueType>,
) -> Result<(), Self::Error> {
async fn verify(&self, _: &Data<Self::DataType>) -> Result<(), Self::Error> {
Ok(())
}
async fn receive(
self,
_data: &Data<Self::DataType, Self::QueueType>,
) -> Result<(), Self::Error> {
async fn receive(self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
Ok(())
}
}
@ -550,36 +507,29 @@ pub mod tests {
#[async_trait]
impl Object for DbPost {
type DataType = DbConnection;
type QueueType = SimpleQueue;
type Kind = Note;
type Error = Error;
async fn read_from_id(
_: Url,
_: &Data<Self::DataType, Self::QueueType>,
_: &Data<Self::DataType>,
) -> Result<Option<Self>, Self::Error> {
todo!()
}
async fn into_json(
self,
_: &Data<Self::DataType, Self::QueueType>,
) -> Result<Self::Kind, Self::Error> {
async fn into_json(self, _: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> {
todo!()
}
async fn verify(
_: &Self::Kind,
_: &Url,
_: &Data<Self::DataType, Self::QueueType>,
_: &Data<Self::DataType>,
) -> Result<(), Self::Error> {
todo!()
}
async fn from_json(
_: Self::Kind,
_: &Data<Self::DataType, Self::QueueType>,
) -> Result<Self, Self::Error> {
async fn from_json(_: Self::Kind, _: &Data<Self::DataType>) -> Result<Self, Self::Error> {
todo!()
}
}