pict-rs/src/backgrounded.rs

114 lines
3.2 KiB
Rust
Raw Permalink Normal View History

use std::sync::Arc;
use crate::{
error::Error,
2023-08-16 21:09:40 +00:00
repo::{ArcRepo, UploadId},
2024-02-04 00:18:13 +00:00
state::State,
store::Store,
};
use actix_web::web::Bytes;
2023-08-23 16:59:42 +00:00
use futures_core::Stream;
use mime::APPLICATION_OCTET_STREAM;
use tracing::{Instrument, Span};
pub(crate) struct Backgrounded {
repo: ArcRepo,
identifier: Option<Arc<str>>,
upload_id: Option<UploadId>,
}
impl Backgrounded {
pub(crate) fn disarm(mut self) {
let _ = self.identifier.take();
let _ = self.upload_id.take();
}
pub(crate) fn upload_id(&self) -> Option<UploadId> {
self.upload_id
}
pub(crate) fn identifier(&self) -> Option<&Arc<str>> {
self.identifier.as_ref()
}
2024-02-04 00:18:13 +00:00
pub(crate) async fn proxy<S, P>(state: &State<S>, stream: P) -> Result<Self, Error>
where
S: Store,
2024-02-28 02:41:39 +00:00
P: Stream<Item = Result<Bytes, Error>>,
{
let mut this = Self {
2024-02-04 00:18:13 +00:00
repo: state.repo.clone(),
identifier: None,
upload_id: None,
};
2024-02-04 00:18:13 +00:00
this.do_proxy(&state.store, stream).await?;
Ok(this)
}
2024-02-04 00:18:13 +00:00
async fn do_proxy<S, P>(&mut self, store: &S, stream: P) -> Result<(), Error>
where
S: Store,
2024-02-28 02:41:39 +00:00
P: Stream<Item = Result<Bytes, Error>>,
{
self.upload_id = Some(self.repo.create_upload().await?);
2024-02-23 00:05:04 +00:00
let stream = crate::stream::map_err(stream, |e| {
std::io::Error::new(std::io::ErrorKind::Other, e)
2024-02-23 00:05:04 +00:00
});
// use octet-stream, we don't know the upload's real type yet
2024-02-28 02:41:25 +00:00
let identifier = store
.save_stream(stream, APPLICATION_OCTET_STREAM, None)
.await?;
2022-04-03 02:15:39 +00:00
self.identifier = Some(identifier);
Ok(())
}
}
impl Drop for Backgrounded {
fn drop(&mut self) {
2023-07-23 02:11:28 +00:00
let any_items = self.identifier.is_some() || self.upload_id.is_some();
2023-07-22 21:47:59 +00:00
2024-02-04 21:45:47 +00:00
metrics::counter!(crate::init_metrics::BACKGROUND_UPLOAD, "completed" => (!any_items).to_string())
.increment(1);
2023-07-23 02:11:28 +00:00
if any_items {
let cleanup_parent_span =
tracing::info_span!(parent: None, "Dropped backgrounded cleanup");
cleanup_parent_span.follows_from(Span::current());
if let Some(identifier) = self.identifier.take() {
let repo = self.repo.clone();
let cleanup_span = tracing::info_span!(parent: &cleanup_parent_span, "Backgrounded cleanup Identifier", identifier = ?identifier);
crate::sync::spawn(
"backgrounded-cleanup-identifier",
async move {
let _ = crate::queue::cleanup_identifier(&repo, &identifier).await;
}
.instrument(cleanup_span),
);
}
if let Some(upload_id) = self.upload_id {
let repo = self.repo.clone();
let cleanup_span = tracing::info_span!(parent: &cleanup_parent_span, "Backgrounded cleanup Upload ID", upload_id = ?upload_id);
crate::sync::spawn(
"backgrounded-claim-upload",
async move {
let _ = repo.claim(upload_id).await;
}
.instrument(cleanup_span),
);
}
}
}
}