lifecycle worker: mitigate potential bugs + refactoring

This commit is contained in:
Alex Auvolat 2023-08-31 00:28:37 +02:00
parent be03a4610f
commit 1cfcc61de8

View file

@ -197,16 +197,21 @@ impl Worker for LifecycleWorker {
async fn wait_for_work(&mut self) -> WorkerState {
match &self.state {
State::Completed(d) => {
let now = now_msec();
let next_start = midnight_ts(d.succ_opt().expect("no next day"));
if now < next_start {
tokio::time::sleep_until(
(Instant::now() + Duration::from_millis(next_start - now)).into(),
)
.await;
let next_day = d.succ_opt().expect("no next day");
let next_start = midnight_ts(next_day);
loop {
let now = now_msec();
if now < next_start {
tokio::time::sleep_until(
(Instant::now() + Duration::from_millis(next_start - now)).into(),
)
.await;
} else {
break;
}
}
self.state = State::Running {
date: today(),
date: std::cmp::max(next_day, today()),
pos: vec![],
counter: 0,
objects_expired: 0,
@ -228,6 +233,14 @@ async fn process_object(
mpu_aborted: &mut usize,
last_bucket: &mut Option<Bucket>,
) -> Result<Skip, Error> {
if !object
.versions()
.iter()
.any(|x| x.is_data() || x.is_uploading(None))
{
return Ok(Skip::NextObject);
}
let bucket = match last_bucket.take() {
Some(b) if b.id == object.bucket_id => b,
_ => garage
@ -276,7 +289,7 @@ async fn process_object(
if let Ok(exp_date) = parse_lifecycle_date(&exp_date) {
now_date >= exp_date
} else {
warn!("Invalid expiraiton date stored in bucket {:?} lifecycle config: {}", bucket.id, exp_date);
warn!("Invalid expiration date stored in bucket {:?} lifecycle config: {}", bucket.id, exp_date);
false
}
}
@ -309,17 +322,15 @@ async fn process_object(
.iter()
.filter_map(|v| {
let version_date = next_date(v.timestamp);
match &v.state {
ObjectVersionState::Uploading { .. }
if (now_date - version_date)
>= chrono::Duration::days(*abort_mpu_days as i64) =>
{
Some(ObjectVersion {
state: ObjectVersionState::Aborted,
..*v
})
}
_ => None,
if (now_date - version_date) >= chrono::Duration::days(*abort_mpu_days as i64)
&& matches!(&v.state, ObjectVersionState::Uploading { .. })
{
Some(ObjectVersion {
state: ObjectVersionState::Aborted,
..*v
})
} else {
None
}
})
.collect::<Vec<_>>();