Better instrument drops, jobs. Properly disarm backgrounded downloads

This commit is contained in:
Aode (lion) 2022-04-08 12:51:33 -05:00
parent 300fcdac05
commit 31c5a36c77
4 changed files with 59 additions and 20 deletions

View file

@ -6,6 +6,7 @@ use crate::{
use actix_web::web::Bytes;
use futures_util::{Stream, TryStreamExt};
use tokio_util::io::StreamReader;
use tracing::{Instrument, Span};
pub(crate) struct Backgrounded<R, S>
where
@ -72,24 +73,37 @@ where
R: FullRepo + 'static,
S: Store,
{
#[tracing::instrument(name = "Drop Backgrounded", skip(self), fields(identifier = ?self.identifier, upload_id = ?self.upload_id))]
fn drop(&mut self) {
if let Some(identifier) = self.identifier.take() {
let repo = self.repo.clone();
let cleanup_span = tracing::info_span!(parent: None, "Backgrounded cleanup Identifier", identifier = ?identifier);
cleanup_span.follows_from(Span::current());
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(async move {
let _ = crate::queue::cleanup_identifier(&repo, identifier).await;
})
actix_rt::spawn(
async move {
let _ = crate::queue::cleanup_identifier(&repo, identifier).await;
}
.instrument(cleanup_span),
)
});
}
if let Some(upload_id) = self.upload_id {
let repo = self.repo.clone();
let cleanup_span = tracing::info_span!(parent: None, "Backgrounded cleanup Upload ID", upload_id = ?upload_id);
cleanup_span.follows_from(Span::current());
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(async move {
let _ = repo.claim(upload_id).await;
})
actix_rt::spawn(
async move {
let _ = repo.claim(upload_id).await;
}
.instrument(cleanup_span),
)
});
}
}

View file

@ -8,6 +8,7 @@ use crate::{
use actix_web::web::{Bytes, BytesMut};
use futures_util::{Stream, StreamExt};
use sha2::{Digest, Sha256};
use tracing::{Instrument, Span};
mod hasher;
use hasher::Hasher;
@ -219,40 +220,62 @@ where
R: FullRepo + 'static,
S: Store,
{
#[tracing::instrument(name = "Drop Session", skip(self), fields(hash = ?self.hash, alias = ?self.alias, identifier = ?self.identifier))]
fn drop(&mut self) {
if let Some(hash) = self.hash.take() {
let repo = self.repo.clone();
let cleanup_span =
tracing::info_span!(parent: None, "Session cleanup hash", hash = ?hash);
cleanup_span.follows_from(Span::current());
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(async move {
let _ = crate::queue::cleanup_hash(&repo, hash.into()).await;
})
actix_rt::spawn(
async move {
let _ = crate::queue::cleanup_hash(&repo, hash.into()).await;
}
.instrument(cleanup_span),
)
});
}
if let Some(alias) = self.alias.take() {
let repo = self.repo.clone();
let cleanup_span =
tracing::info_span!(parent: None, "Session cleanup alias", alias = ?alias);
cleanup_span.follows_from(Span::current());
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(async move {
if let Ok(token) = repo.delete_token(&alias).await {
let _ = crate::queue::cleanup_alias(&repo, alias, token).await;
} else {
let token = DeleteToken::generate();
if let Ok(Ok(())) = repo.relate_delete_token(&alias, &token).await {
actix_rt::spawn(
async move {
if let Ok(token) = repo.delete_token(&alias).await {
let _ = crate::queue::cleanup_alias(&repo, alias, token).await;
} else {
let token = DeleteToken::generate();
if let Ok(Ok(())) = repo.relate_delete_token(&alias, &token).await {
let _ = crate::queue::cleanup_alias(&repo, alias, token).await;
}
}
}
})
.instrument(cleanup_span),
)
});
}
if let Some(identifier) = self.identifier.take() {
let repo = self.repo.clone();
let cleanup_span = tracing::info_span!(parent: None, "Session cleanup identifier", identifier = ?identifier);
cleanup_span.follows_from(Span::current());
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(async move {
let _ = crate::queue::cleanup_identifier(&repo, identifier).await;
})
actix_rt::spawn(
async move {
let _ = crate::queue::cleanup_identifier(&repo, identifier).await;
}
.instrument(cleanup_span),
)
});
}
}

View file

@ -322,6 +322,8 @@ async fn do_download_backgrounded<R: FullRepo + 'static, S: Store + 'static>(
queue::queue_ingest(&**repo, identifier, upload_id, None, true, is_cached).await?;
backgrounded.disarm();
Ok(HttpResponse::Accepted().json(&serde_json::json!({
"msg": "ok",
"uploads": [{

View file

@ -170,7 +170,7 @@ where
loop {
let bytes = repo.pop(queue, worker_id.as_bytes().to_vec()).await?;
let span = tracing::info_span!("Running Job", worker_id = ?worker_id);
let span = tracing::info_span!("Running Job", worker_id = ?worker_id, job = ?String::from_utf8_lossy(bytes.as_ref()));
span.in_scope(|| (callback)(repo, store, bytes.as_ref()))
.instrument(span)