Outbound interface written

This commit is contained in:
golfinq 2023-06-23 04:09:51 -04:00
parent 7300940e10
commit e32b407085
2 changed files with 38 additions and 1 deletions

View file

@ -13,6 +13,7 @@ use crate::{
use anyhow::anyhow;
use bytes::Bytes;
use dyn_clone::{DynClone, clone_trait_object};
use futures_core::Future;
use http::{header::HeaderName, HeaderMap, HeaderValue};
use httpdate::fmt_http_date;
@ -96,6 +97,7 @@ where
private_key: private_key.clone(),
http_signature_compat: config.http_signature_compat,
};
let _ = config.outbound_storage.store_task(&message);
// Don't use the activity queue if this is in debug mode, send and wait directly
if config.debug {
@ -259,6 +261,36 @@ pub(crate) struct ActivityQueue {
retry_sender_task: JoinHandle<()>,
}
/// A trait for manipulating a cache of SendActivityTask
pub trait StorageInterface: DynClone + Send {
/// Store an SendActivityTask to disk
fn store_task(&self, task: &SendActivityTask) -> Result<(), anyhow::Error>;
/// read SendActivityTask from disk
fn read_task(&self)->Result<Option<Vec<SendActivityTask>>, anyhow::Error>;
/// Delete the SendActivityTask on disk
fn delete_task(&self, task: &SendActivityTask) -> Result<(), anyhow::Error>;
}
#[derive(Clone)]
pub struct DefaultStorageHandler();
impl StorageInterface for DefaultStorageHandler {
fn store_task(&self, task: &SendActivityTask) -> Result<(), anyhow::Error> {
Ok(())
}
fn read_task(&self)->Result<Option<Vec<SendActivityTask>>, anyhow::Error> {
Ok(None)
}
fn delete_task(&self, task: &SendActivityTask) -> Result<(), anyhow::Error> {
Ok(())
}
}
clone_trait_object!(StorageInterface);
/// Simple stat counter to show where we're up to with sending messages
/// This is a lock-free way to share things between tasks
/// When reading these values it's possible (but extremely unlikely) to get stale data if a worker task is in the middle of transitioning

View file

@ -16,7 +16,7 @@
//! ```
use crate::{
activity_queue::{create_activity_queue, ActivityQueue},
activity_queue::{create_activity_queue, ActivityQueue, DefaultStorageHandler, StorageInterface},
error::Error,
protocol::verification::verify_domains_match,
traits::{ActivityHandler, Actor},
@ -92,6 +92,11 @@ pub struct FederationConfig<T: Clone> {
/// present once constructed.
#[builder(setter(skip))]
pub(crate) activity_queue: Option<Arc<ActivityQueue>>,
/// Implements the StorageInterface trait which provides an interface for
/// storing objects and removing objects when appropiate
#[builder(default = "Box::new(DefaultStorageHandler())")]
pub(crate) outbound_storage: Box<dyn StorageInterface + Sync>,
}
impl<T: Clone> FederationConfig<T> {