diff --git a/src/data/media.rs b/src/data/media.rs index 4a12f41..20136d6 100644 --- a/src/data/media.rs +++ b/src/data/media.rs @@ -1,14 +1,7 @@ -use crate::{ - db::{Db, MediaMeta}, - error::Error, -}; +use crate::{db::Db, error::Error}; use activitystreams::iri_string::types::IriString; -use actix_web::web::Bytes; -use std::time::{Duration, SystemTime}; use uuid::Uuid; -static MEDIA_DURATION: Duration = Duration::from_secs(60 * 60 * 24 * 2); - #[derive(Clone, Debug)] pub struct MediaCache { db: Db, @@ -29,32 +22,6 @@ impl MediaCache { self.db.media_url(uuid).await } - #[tracing::instrument(level = "debug", name = "Is media outdated", skip(self))] - pub(crate) async fn is_outdated(&self, uuid: Uuid) -> Result { - if let Some(meta) = self.db.media_meta(uuid).await? { - if meta.saved_at + MEDIA_DURATION > SystemTime::now() { - return Ok(false); - } - } - - Ok(true) - } - - #[tracing::instrument(level = "debug", name = "Get media bytes", skip(self))] - pub(crate) async fn get_bytes(&self, uuid: Uuid) -> Result, Error> { - if let Some(meta) = self.db.media_meta(uuid).await? { - if meta.saved_at + MEDIA_DURATION > SystemTime::now() { - return self - .db - .media_bytes(uuid) - .await - .map(|opt| opt.map(|bytes| (meta.media_type, bytes))); - } - } - - Ok(None) - } - #[tracing::instrument(name = "Store media url", skip_all, fields(url = url.to_string().as_str()))] pub(crate) async fn store_url(&self, url: IriString) -> Result { let uuid = Uuid::new_v4(); @@ -63,23 +30,4 @@ impl MediaCache { Ok(uuid) } - - #[tracing::instrument(name = "store media bytes", skip(self, bytes))] - pub(crate) async fn store_bytes( - &self, - uuid: Uuid, - media_type: String, - bytes: Bytes, - ) -> Result<(), Error> { - self.db - .save_bytes( - uuid, - MediaMeta { - media_type, - saved_at: SystemTime::now(), - }, - bytes, - ) - .await - } } diff --git a/src/db.rs b/src/db.rs index 0aaf0ac..685405f 100644 --- a/src/db.rs +++ b/src/db.rs @@ -3,7 +3,6 @@ use crate::{ error::{Error, ErrorKind}, }; use activitystreams::iri_string::types::IriString; -use actix_web::web::Bytes; use rsa::{ pkcs8::{DecodePrivateKey, EncodePrivateKey}, RsaPrivateKey, @@ -26,8 +25,6 @@ struct Inner { settings: Tree, media_url_media_id: Tree, media_id_media_url: Tree, - media_id_media_bytes: Tree, - media_id_media_meta: Tree, actor_id_info: Tree, actor_id_instance: Tree, actor_id_contact: Tree, @@ -63,12 +60,6 @@ impl std::fmt::Debug for Actor { } } -#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] -pub(crate) struct MediaMeta { - pub(crate) media_type: String, - pub(crate) saved_at: SystemTime, -} - #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] pub struct Info { pub(crate) software: String, @@ -253,8 +244,6 @@ impl Db { settings: db.open_tree("settings")?, media_url_media_id: db.open_tree("media-url-media-id")?, media_id_media_url: db.open_tree("media-id-media-url")?, - media_id_media_bytes: db.open_tree("media-id-media-bytes")?, - media_id_media_meta: db.open_tree("media-id-media-meta")?, actor_id_info: db.open_tree("actor-id-info")?, actor_id_instance: db.open_tree("actor-id-instance")?, actor_id_contact: db.open_tree("actor-id-contact")?, @@ -392,25 +381,6 @@ impl Db { .await } - pub(crate) async fn save_bytes( - &self, - id: Uuid, - meta: MediaMeta, - bytes: Bytes, - ) -> Result<(), Error> { - self.unblock(move |inner| { - let vec = serde_json::to_vec(&meta)?; - - inner - .media_id_media_bytes - .insert(id.as_bytes(), bytes.as_ref())?; - inner.media_id_media_meta.insert(id.as_bytes(), vec)?; - - Ok(()) - }) - .await - } - pub(crate) async fn media_id(&self, url: IriString) -> Result, Error> { self.unblock(move |inner| { if let Some(ivec) = inner.media_url_media_id.get(url.as_str().as_bytes())? { @@ -433,29 +403,6 @@ impl Db { .await } - pub(crate) async fn media_bytes(&self, id: Uuid) -> Result, Error> { - self.unblock(move |inner| { - if let Some(ivec) = inner.media_id_media_bytes.get(id.as_bytes())? { - Ok(Some(Bytes::copy_from_slice(&ivec))) - } else { - Ok(None) - } - }) - .await - } - - pub(crate) async fn media_meta(&self, id: Uuid) -> Result, Error> { - self.unblock(move |inner| { - if let Some(ivec) = inner.media_id_media_meta.get(id.as_bytes())? { - let meta = serde_json::from_slice(&ivec)?; - Ok(Some(meta)) - } else { - Ok(None) - } - }) - .await - } - pub(crate) async fn blocks(&self) -> Result, Error> { self.unblock(|inner| Ok(inner.blocks().collect())).await } diff --git a/src/error.rs b/src/error.rs index 1c1c4d1..93c3a29 100644 --- a/src/error.rs +++ b/src/error.rs @@ -130,9 +130,6 @@ pub(crate) enum ErrorKind { #[error("{0}")] HostMismatch(#[from] CheckError), - #[error("Invalid or missing content type")] - ContentType, - #[error("Couldn't flush buffer")] FlushBuffer, diff --git a/src/jobs/cache_media.rs b/src/jobs/cache_media.rs deleted file mode 100644 index 718a059..0000000 --- a/src/jobs/cache_media.rs +++ /dev/null @@ -1,44 +0,0 @@ -use crate::{error::Error, jobs::JobState}; -use background_jobs::ActixJob; -use std::{future::Future, pin::Pin}; -use uuid::Uuid; - -#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] -pub(crate) struct CacheMedia { - uuid: Uuid, -} - -impl CacheMedia { - pub(crate) fn new(uuid: Uuid) -> Self { - CacheMedia { uuid } - } - - #[tracing::instrument(name = "Cache media", skip(state))] - async fn perform(self, state: JobState) -> Result<(), Error> { - if !state.media.is_outdated(self.uuid).await? { - return Ok(()); - } - - if let Some(url) = state.media.get_url(self.uuid).await? { - let (content_type, bytes) = state.requests.fetch_bytes(url.as_str()).await?; - - state - .media - .store_bytes(self.uuid, content_type, bytes) - .await?; - } - - Ok(()) - } -} - -impl ActixJob for CacheMedia { - type State = JobState; - type Future = Pin>>>; - - const NAME: &'static str = "relay::jobs::CacheMedia"; - - fn run(self, state: Self::State) -> Self::Future { - Box::pin(async move { self.perform(state).await.map_err(Into::into) }) - } -} diff --git a/src/jobs/instance.rs b/src/jobs/instance.rs index 6c9fa14..d3e2719 100644 --- a/src/jobs/instance.rs +++ b/src/jobs/instance.rs @@ -1,7 +1,7 @@ use crate::{ config::UrlKind, error::{Error, ErrorKind}, - jobs::{cache_media::CacheMedia, Boolish, JobState}, + jobs::{Boolish, JobState}, }; use activitystreams::{iri, iri_string::types::IriString}; use background_jobs::ActixJob; @@ -75,8 +75,6 @@ impl QueryInstance { let avatar = state.config.generate_url(UrlKind::Media(uuid)); - state.job_server.queue(CacheMedia::new(uuid)).await?; - state .node_cache .set_contact( diff --git a/src/jobs/mod.rs b/src/jobs/mod.rs index 15d2023..6dfdbe9 100644 --- a/src/jobs/mod.rs +++ b/src/jobs/mod.rs @@ -1,5 +1,4 @@ pub mod apub; -mod cache_media; mod contact; mod deliver; mod deliver_many; @@ -8,7 +7,7 @@ mod nodeinfo; mod process_listeners; pub(crate) use self::{ - cache_media::CacheMedia, contact::QueryContact, deliver::Deliver, deliver_many::DeliverMany, + contact::QueryContact, deliver::Deliver, deliver_many::DeliverMany, instance::QueryInstance, nodeinfo::QueryNodeinfo, }; @@ -59,7 +58,6 @@ pub(crate) fn create_workers( .register::() .register::() .register::() - .register::() .register::() .register::() .register::() diff --git a/src/requests.rs b/src/requests.rs index c278d60..332024a 100644 --- a/src/requests.rs +++ b/src/requests.rs @@ -1,6 +1,6 @@ use crate::error::{Error, ErrorKind}; use activitystreams::iri_string::types::IriString; -use actix_web::{http::header::Date, web::Bytes}; +use actix_web::http::header::Date; use awc::{error::SendRequestError, Client, ClientResponse}; use dashmap::DashMap; use http_signature_normalization_actix::prelude::*; @@ -294,11 +294,9 @@ impl Requests { Ok(serde_json::from_slice(body.as_ref())?) } - #[tracing::instrument(name = "Fetch Bytes", skip(self), fields(signing_string))] - pub(crate) async fn fetch_bytes(&self, url: &str) -> Result<(String, Bytes), Error> { - let parsed_url = url.parse::()?; - - if !self.breakers.should_try(&parsed_url) { + #[tracing::instrument(name = "Fetch response", skip(self), fields(signing_string))] + pub(crate) async fn fetch_response(&self, url: IriString) -> Result { + if !self.breakers.should_try(&url) { return Err(ErrorKind::Breaker.into()); } @@ -307,9 +305,10 @@ impl Requests { let client: Client = self.client.borrow().clone(); let res = client - .get(url) + .get(url.as_str()) .insert_header(("Accept", "*/*")) .insert_header(Date(SystemTime::now().into())) + .no_decompress() .signature( self.config.clone(), self.key_id.clone(), @@ -322,26 +321,9 @@ impl Requests { .send() .await; - let mut res = self.check_response(&parsed_url, res).await?; + let res = self.check_response(&url, res).await?; - let content_type = if let Some(content_type) = res.headers().get("content-type") { - if let Ok(s) = content_type.to_str() { - s.to_owned() - } else { - return Err(ErrorKind::ContentType.into()); - } - } else { - return Err(ErrorKind::ContentType.into()); - }; - - let bytes = match res.body().limit(1024 * 1024 * 4).await { - Err(e) => { - return Err(ErrorKind::ReceiveResponse(url.to_string(), e.to_string()).into()); - } - Ok(bytes) => bytes, - }; - - Ok((content_type, bytes)) + Ok(res) } #[tracing::instrument( diff --git a/src/routes/media.rs b/src/routes/media.rs index d0d86c1..8a9de62 100644 --- a/src/routes/media.rs +++ b/src/routes/media.rs @@ -1,8 +1,5 @@ use crate::{data::MediaCache, error::Error, requests::Requests}; -use actix_web::{ - http::header::{CacheControl, CacheDirective}, - web, HttpResponse, -}; +use actix_web::{body::BodyStream, web, HttpResponse}; use uuid::Uuid; #[tracing::instrument(name = "Media", skip(media, requests))] @@ -13,30 +10,17 @@ pub(crate) async fn route( ) -> Result { let uuid = uuid.into_inner(); - if let Some((content_type, bytes)) = media.get_bytes(uuid).await? { - return Ok(cached(content_type, bytes)); - } - if let Some(url) = media.get_url(uuid).await? { - let (content_type, bytes) = requests.fetch_bytes(url.as_str()).await?; + let res = requests.fetch_response(url).await?; - media - .store_bytes(uuid, content_type.clone(), bytes.clone()) - .await?; + let mut response = HttpResponse::build(res.status()); - return Ok(cached(content_type, bytes)); + for (name, value) in res.headers().iter().filter(|(h, _)| *h != "connection") { + response.insert_header((name.clone(), value.clone())); + } + + return Ok(response.body(BodyStream::new(res))); } Ok(HttpResponse::NotFound().finish()) } - -fn cached(content_type: String, bytes: web::Bytes) -> HttpResponse { - HttpResponse::Ok() - .insert_header(CacheControl(vec![ - CacheDirective::Public, - CacheDirective::MaxAge(60 * 60 * 24), - CacheDirective::Extension("immutable".to_owned(), None), - ])) - .content_type(content_type) - .body(bytes) -}