mirror of
https://git.asonix.dog/asonix/pict-rs.git
synced 2024-05-14 17:52:42 +00:00
Compare commits
3 commits
d41fca5b6c
...
4bb3bad703
Author | SHA1 | Date | |
---|---|---|---|
4bb3bad703 | |||
4021458be8 | |||
eca3697410 |
2
Cargo.lock
generated
2
Cargo.lock
generated
|
@ -1819,7 +1819,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "pict-rs"
|
||||
version = "0.5.11"
|
||||
version = "0.5.12"
|
||||
dependencies = [
|
||||
"actix-form-data",
|
||||
"actix-web",
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "pict-rs"
|
||||
description = "A simple image hosting service"
|
||||
version = "0.5.11"
|
||||
version = "0.5.12"
|
||||
authors = ["asonix <asonix@asonix.dog>"]
|
||||
license = "AGPL-3.0"
|
||||
readme = "README.md"
|
||||
|
|
|
@ -11,7 +11,7 @@
|
|||
|
||||
rustPlatform.buildRustPackage {
|
||||
pname = "pict-rs";
|
||||
version = "0.5.11";
|
||||
version = "0.5.12";
|
||||
src = ./.;
|
||||
|
||||
cargoLock = {
|
||||
|
|
46
releases/0.5.12.md
Normal file
46
releases/0.5.12.md
Normal file
|
@ -0,0 +1,46 @@
|
|||
# pict-rs 0.5.12
|
||||
|
||||
pict-rs is a simple image hosting microservice, designed to handle storing and retrieving images,
|
||||
animations, and videos, as well as providing basic image processing functionality.
|
||||
|
||||
## Overview
|
||||
|
||||
pict-rs 0.5.12 is a bugfix release to remove two issues that, when compounded, would cause pict-rs
|
||||
to fail to process media.
|
||||
|
||||
### Fixes
|
||||
|
||||
- [Panic Handling in Background Jobs](#panic-handling-in-background-jobs)
|
||||
- [BytesStream Divide-by-Zero](#bytes-stream-divide-by-zero)
|
||||
|
||||
|
||||
## Upgrade Notes
|
||||
|
||||
There are no significant differences from 0.5.11. Upgrading should be as simple as pulling a new
|
||||
version of pict-rs.
|
||||
|
||||
|
||||
## Descriptions
|
||||
|
||||
### Panic Handling in Background Jobs
|
||||
|
||||
pict-rs makes an effort to never use explicitly panicking code, but since there's no static way to
|
||||
guarantee that a given function wont panic, pict-rs needs to be able to deal with that. pict-rs
|
||||
0.5.12 now wraps invocations of jobs in spawned tasks, which can catch and report panics that happen
|
||||
in background jobs.
|
||||
|
||||
Previously, a panic in a background job would bring down that thread's job processor, which resulted
|
||||
in future jobs never being processed. Now job processing should properly continue after panics
|
||||
occur.
|
||||
|
||||
|
||||
### BytesStream Divide-by-Zero
|
||||
|
||||
Part of my rework of BytesStream recently included adding debug logs around how many bytes chunks
|
||||
were in a given stream, and their average length. Unfortunately, if there were no bytes in the
|
||||
stream, this would cause the "average chunk length" calculation to divide by 0. In previous versions
|
||||
of pict-rs, this would generally result in a failed request for processed media, but in pict-rs
|
||||
0.5.11 this would end up killing the background jobs processor.
|
||||
|
||||
This specific panic has been fixed by ensuring we divide by the number of chunks or 1, whichever is
|
||||
greater.
|
|
@ -35,7 +35,7 @@ impl BytesStream {
|
|||
tracing::debug!(
|
||||
"BytesStream with {} chunks, avg length {}",
|
||||
bs.chunks_len(),
|
||||
bs.len() / bs.chunks_len()
|
||||
bs.len() / bs.chunks_len().max(1)
|
||||
);
|
||||
|
||||
Ok(bs)
|
||||
|
|
76
src/queue.rs
76
src/queue.rs
|
@ -11,9 +11,11 @@ use crate::{
|
|||
|
||||
use std::{
|
||||
ops::Deref,
|
||||
rc::Rc,
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tokio::task::JoinError;
|
||||
use tracing::Instrument;
|
||||
|
||||
pub(crate) mod cleanup;
|
||||
|
@ -297,54 +299,66 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
fn job_result(result: &JobResult) -> crate::repo::JobResult {
|
||||
fn job_result(result: &Result<JobResult, JoinError>) -> crate::repo::JobResult {
|
||||
match result {
|
||||
Ok(()) => crate::repo::JobResult::Success,
|
||||
Err(JobError::Retry(_)) => crate::repo::JobResult::Failure,
|
||||
Err(JobError::Abort(_)) => crate::repo::JobResult::Aborted,
|
||||
Ok(Ok(())) => crate::repo::JobResult::Success,
|
||||
Ok(Err(JobError::Retry(_))) => crate::repo::JobResult::Failure,
|
||||
Ok(Err(JobError::Abort(_))) => crate::repo::JobResult::Aborted,
|
||||
Err(_) => crate::repo::JobResult::Aborted,
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_jobs<S, F>(state: State<S>, queue: &'static str, callback: F)
|
||||
where
|
||||
S: Store,
|
||||
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy,
|
||||
S: Store + 'static,
|
||||
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy + 'static,
|
||||
{
|
||||
let worker_id = uuid::Uuid::new_v4();
|
||||
let state = Rc::new(state);
|
||||
|
||||
loop {
|
||||
tracing::trace!("process_jobs: looping");
|
||||
|
||||
crate::sync::cooperate().await;
|
||||
|
||||
let res = job_loop(&state, worker_id, queue, callback)
|
||||
.with_poll_timer("job-loop")
|
||||
.await;
|
||||
// add a panic boundary by spawning a task
|
||||
let res = crate::sync::spawn(
|
||||
"job-loop",
|
||||
job_loop(state.clone(), worker_id, queue, callback),
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Err(e) = res {
|
||||
tracing::warn!("Error processing jobs: {}", format!("{e}"));
|
||||
tracing::warn!("{}", format!("{e:?}"));
|
||||
match res {
|
||||
// clean exit
|
||||
Ok(Ok(())) => break,
|
||||
|
||||
if e.is_disconnected() {
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
// job error
|
||||
Ok(Err(e)) => {
|
||||
tracing::warn!("Error processing jobs: {}", format!("{e}"));
|
||||
tracing::warn!("{}", format!("{e:?}"));
|
||||
|
||||
if e.is_disconnected() {
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
}
|
||||
}
|
||||
|
||||
continue;
|
||||
// job panic
|
||||
Err(_) => {
|
||||
tracing::warn!("Panic while processing jobs");
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
async fn job_loop<S, F>(
|
||||
state: &State<S>,
|
||||
state: Rc<State<S>>,
|
||||
worker_id: uuid::Uuid,
|
||||
queue: &'static str,
|
||||
callback: F,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
S: Store,
|
||||
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy,
|
||||
S: Store + 'static,
|
||||
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy + 'static,
|
||||
{
|
||||
loop {
|
||||
tracing::trace!("job_loop: looping");
|
||||
|
@ -360,14 +374,18 @@ where
|
|||
|
||||
let guard = MetricsGuard::guard(worker_id, queue);
|
||||
|
||||
let res = heartbeat(
|
||||
&state.repo,
|
||||
queue,
|
||||
worker_id,
|
||||
job_id,
|
||||
(callback)(state, job),
|
||||
)
|
||||
.with_poll_timer("job-and-heartbeat")
|
||||
let state2 = state.clone();
|
||||
let res = crate::sync::spawn("job-and-heartbeat", async move {
|
||||
let state = state2;
|
||||
heartbeat(
|
||||
&state.repo,
|
||||
queue,
|
||||
worker_id,
|
||||
job_id,
|
||||
(callback)(&state, job),
|
||||
)
|
||||
.await
|
||||
})
|
||||
.await;
|
||||
|
||||
state
|
||||
|
@ -376,7 +394,7 @@ where
|
|||
.with_poll_timer("job-complete")
|
||||
.await?;
|
||||
|
||||
res?;
|
||||
res.map_err(|_| UploadError::Canceled)??;
|
||||
|
||||
guard.disarm();
|
||||
|
||||
|
|
Loading…
Reference in a new issue