Ability to have up to 4 concurrently working resync workers

This commit is contained in:
Alex Auvolat 2022-09-02 17:18:13 +02:00
parent 5e8baa433d
commit 5d4b937a00
No known key found for this signature in database
GPG key ID: 0E496D15096376BE
4 changed files with 95 additions and 22 deletions

View file

@ -125,13 +125,11 @@ impl BlockManager {
}); });
block_manager.endpoint.set_handler(block_manager.clone()); block_manager.endpoint.set_handler(block_manager.clone());
// Spawn one resync worker // Spawn a bunch of resync workers
let background = block_manager.system.background.clone(); for index in 0..MAX_RESYNC_WORKERS {
let worker = ResyncWorker::new(block_manager.clone()); let worker = ResyncWorker::new(index, block_manager.clone());
tokio::spawn(async move { block_manager.system.background.spawn_worker(worker);
tokio::time::sleep(Duration::from_secs(10)).await; }
background.spawn_worker(worker);
});
// Spawn scrub worker // Spawn scrub worker
let scrub_worker = ScrubWorker::new(block_manager.clone(), scrub_rx); let scrub_worker = ScrubWorker::new(block_manager.clone(), scrub_rx);

View file

@ -1,5 +1,6 @@
use std::collections::HashSet;
use std::convert::TryInto; use std::convert::TryInto;
use std::sync::Arc; use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
use arc_swap::ArcSwap; use arc_swap::ArcSwap;
@ -44,6 +45,9 @@ pub(crate) const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60);
// The minimum retry delay is 60 seconds = 1 minute // The minimum retry delay is 60 seconds = 1 minute
// The maximum retry delay is 60 seconds * 2^6 = 60 seconds << 6 = 64 minutes (~1 hour) // The maximum retry delay is 60 seconds * 2^6 = 60 seconds << 6 = 64 minutes (~1 hour)
pub(crate) const RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER: u64 = 6; pub(crate) const RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER: u64 = 6;
// No more than 4 resync workers can be running in the system
pub(crate) const MAX_RESYNC_WORKERS: usize = 4;
// Resync tranquility is initially set to 2, but can be changed in the CLI // Resync tranquility is initially set to 2, but can be changed in the CLI
// and the updated version is persisted over Garage restarts // and the updated version is persisted over Garage restarts
const INITIAL_RESYNC_TRANQUILITY: u32 = 2; const INITIAL_RESYNC_TRANQUILITY: u32 = 2;
@ -53,12 +57,15 @@ pub struct BlockResyncManager {
pub(crate) notify: Notify, pub(crate) notify: Notify,
pub(crate) errors: CountedTree, pub(crate) errors: CountedTree,
busy_set: BusySet,
persister: Persister<ResyncPersistedConfig>, persister: Persister<ResyncPersistedConfig>,
persisted: ArcSwap<ResyncPersistedConfig>, persisted: ArcSwap<ResyncPersistedConfig>,
} }
#[derive(Serialize, Deserialize, Clone, Copy)] #[derive(Serialize, Deserialize, Clone, Copy)]
struct ResyncPersistedConfig { struct ResyncPersistedConfig {
n_workers: usize,
tranquility: u32, tranquility: u32,
} }
@ -68,6 +75,14 @@ enum ResyncIterResult {
IdleFor(Duration), IdleFor(Duration),
} }
type BusySet = Arc<Mutex<HashSet<Vec<u8>>>>;
struct BusyBlock {
time_bytes: Vec<u8>,
hash_bytes: Vec<u8>,
busy_set: BusySet,
}
impl BlockResyncManager { impl BlockResyncManager {
pub(crate) fn new(db: &db::Db, system: &System) -> Self { pub(crate) fn new(db: &db::Db, system: &System) -> Self {
let queue = db let queue = db
@ -84,6 +99,7 @@ impl BlockResyncManager {
let persisted = match persister.load() { let persisted = match persister.load() {
Ok(v) => v, Ok(v) => v,
Err(_) => ResyncPersistedConfig { Err(_) => ResyncPersistedConfig {
n_workers: 1,
tranquility: INITIAL_RESYNC_TRANQUILITY, tranquility: INITIAL_RESYNC_TRANQUILITY,
}, },
}; };
@ -92,6 +108,7 @@ impl BlockResyncManager {
queue, queue,
notify: Notify::new(), notify: Notify::new(),
errors, errors,
busy_set: Arc::new(Mutex::new(HashSet::new())),
persister, persister,
persisted: ArcSwap::new(Arc::new(persisted)), persisted: ArcSwap::new(Arc::new(persisted)),
} }
@ -199,12 +216,12 @@ impl BlockResyncManager {
} }
async fn resync_iter(&self, manager: &BlockManager) -> Result<ResyncIterResult, db::Error> { async fn resync_iter(&self, manager: &BlockManager) -> Result<ResyncIterResult, db::Error> {
if let Some((time_bytes, hash_bytes)) = self.queue.first()? { if let Some(block) = self.get_block_to_resync()? {
let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap()); let time_msec = u64::from_be_bytes(block.time_bytes[0..8].try_into().unwrap());
let now = now_msec(); let now = now_msec();
if now >= time_msec { if now >= time_msec {
let hash = Hash::try_from(&hash_bytes[..]).unwrap(); let hash = Hash::try_from(&block.hash_bytes[..]).unwrap();
if let Some(ec) = self.errors.get(hash.as_slice())? { if let Some(ec) = self.errors.get(hash.as_slice())? {
let ec = ErrorCounter::decode(&ec); let ec = ErrorCounter::decode(&ec);
@ -217,7 +234,7 @@ impl BlockResyncManager {
// is not removing the one we added just above // is not removing the one we added just above
// (we want to do the remove after the insert to ensure // (we want to do the remove after the insert to ensure
// that the item is not lost if we crash in-between) // that the item is not lost if we crash in-between)
self.queue.remove(time_bytes)?; self.queue.remove(&block.time_bytes)?;
return Ok(ResyncIterResult::BusyDidNothing); return Ok(ResyncIterResult::BusyDidNothing);
} }
} }
@ -258,10 +275,10 @@ impl BlockResyncManager {
// err_counter.next_try() >= now + 1 > now, // err_counter.next_try() >= now + 1 > now,
// the entry we remove from the queue is not // the entry we remove from the queue is not
// the entry we inserted with put_to_resync_at // the entry we inserted with put_to_resync_at
self.queue.remove(time_bytes)?; self.queue.remove(&block.time_bytes)?;
} else { } else {
self.errors.remove(hash.as_slice())?; self.errors.remove(hash.as_slice())?;
self.queue.remove(time_bytes)?; self.queue.remove(&block.time_bytes)?;
} }
Ok(ResyncIterResult::BusyDidSomething) Ok(ResyncIterResult::BusyDidSomething)
@ -281,6 +298,22 @@ impl BlockResyncManager {
} }
} }
fn get_block_to_resync(&self) -> Result<Option<BusyBlock>, db::Error> {
let mut busy = self.busy_set.lock().unwrap();
for it in self.queue.iter()? {
let (time_bytes, hash_bytes) = it?;
if !busy.contains(&time_bytes) {
busy.insert(time_bytes.clone());
return Ok(Some(BusyBlock {
time_bytes,
hash_bytes,
busy_set: self.busy_set.clone(),
}));
}
}
return Ok(None);
}
async fn resync_block(&self, manager: &BlockManager, hash: &Hash) -> Result<(), Error> { async fn resync_block(&self, manager: &BlockManager, hash: &Hash) -> Result<(), Error> {
let BlockStatus { exists, needed } = manager.check_block_status(hash).await?; let BlockStatus { exists, needed } = manager.check_block_status(hash).await?;
@ -394,25 +427,44 @@ impl BlockResyncManager {
update(&mut cfg); update(&mut cfg);
self.persister.save_async(&cfg).await?; self.persister.save_async(&cfg).await?;
self.persisted.store(Arc::new(cfg)); self.persisted.store(Arc::new(cfg));
self.notify.notify_one(); self.notify.notify_waiters();
Ok(()) Ok(())
} }
pub async fn set_n_workers(&self, n_workers: usize) -> Result<(), Error> {
if n_workers < 1 || n_workers > MAX_RESYNC_WORKERS {
return Err(Error::Message(format!(
"Invalid number of resync workers, must be between 1 and {}",
MAX_RESYNC_WORKERS
)));
}
self.update_persisted(|cfg| cfg.n_workers = n_workers).await
}
pub async fn set_tranquility(&self, tranquility: u32) -> Result<(), Error> { pub async fn set_tranquility(&self, tranquility: u32) -> Result<(), Error> {
self.update_persisted(|cfg| cfg.tranquility = tranquility) self.update_persisted(|cfg| cfg.tranquility = tranquility)
.await .await
} }
} }
impl Drop for BusyBlock {
fn drop(&mut self) {
let mut busy = self.busy_set.lock().unwrap();
busy.remove(&self.time_bytes);
}
}
pub(crate) struct ResyncWorker { pub(crate) struct ResyncWorker {
index: usize,
manager: Arc<BlockManager>, manager: Arc<BlockManager>,
tranquilizer: Tranquilizer, tranquilizer: Tranquilizer,
next_delay: Duration, next_delay: Duration,
} }
impl ResyncWorker { impl ResyncWorker {
pub(crate) fn new(manager: Arc<BlockManager>) -> Self { pub(crate) fn new(index: usize, manager: Arc<BlockManager>) -> Self {
Self { Self {
index,
manager, manager,
tranquilizer: Tranquilizer::new(30), tranquilizer: Tranquilizer::new(30),
next_delay: Duration::from_secs(10), next_delay: Duration::from_secs(10),
@ -423,15 +475,18 @@ impl ResyncWorker {
#[async_trait] #[async_trait]
impl Worker for ResyncWorker { impl Worker for ResyncWorker {
fn name(&self) -> String { fn name(&self) -> String {
"Block resync worker".into() format!("Block resync worker #{}", self.index + 1)
} }
fn info(&self) -> Option<String> { fn info(&self) -> Option<String> {
let persisted = self.manager.resync.persisted.load();
if self.index >= persisted.n_workers {
return Some("(unused)".into());
}
let mut ret = vec![]; let mut ret = vec![];
ret.push(format!( ret.push(format!("tranquility = {}", persisted.tranquility));
"tranquility = {}",
self.manager.resync.persisted.load().tranquility
));
let qlen = self.manager.resync.queue_len().unwrap_or(0); let qlen = self.manager.resync.queue_len().unwrap_or(0);
if qlen > 0 { if qlen > 0 {
@ -447,6 +502,10 @@ impl Worker for ResyncWorker {
} }
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
if self.index >= self.manager.resync.persisted.load().n_workers {
return Ok(WorkerState::Idle);
}
self.tranquilizer.reset(); self.tranquilizer.reset();
match self.manager.resync.resync_iter(&self.manager).await { match self.manager.resync.resync_iter(&self.manager).await {
Ok(ResyncIterResult::BusyDidSomething) => Ok(self Ok(ResyncIterResult::BusyDidSomething) => Ok(self
@ -470,10 +529,15 @@ impl Worker for ResyncWorker {
} }
async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState { async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
while self.index >= self.manager.resync.persisted.load().n_workers {
self.manager.resync.notify.notified().await
}
select! { select! {
_ = tokio::time::sleep(self.next_delay) => (), _ = tokio::time::sleep(self.next_delay) => (),
_ = self.manager.resync.notify.notified() => (), _ = self.manager.resync.notify.notified() => (),
}; };
WorkerState::Busy WorkerState::Busy
} }
} }

View file

@ -847,6 +847,14 @@ impl AdminRpcHandler {
.await; .await;
Ok(AdminRpc::Ok("Scrub tranquility updated".into())) Ok(AdminRpc::Ok("Scrub tranquility updated".into()))
} }
WorkerSetCmd::ResyncNWorkers { n_workers } => {
self.garage
.block_manager
.resync
.set_n_workers(n_workers)
.await?;
Ok(AdminRpc::Ok("Number of resync workers updated".into()))
}
WorkerSetCmd::ResyncTranquility { tranquility } => { WorkerSetCmd::ResyncTranquility { tranquility } => {
self.garage self.garage
.block_manager .block_manager

View file

@ -524,7 +524,10 @@ pub enum WorkerSetCmd {
/// Set tranquility of scrub operations /// Set tranquility of scrub operations
#[structopt(name = "scrub-tranquility", version = version::garage())] #[structopt(name = "scrub-tranquility", version = version::garage())]
ScrubTranquility { tranquility: u32 }, ScrubTranquility { tranquility: u32 },
/// Set tranquility of resync operations /// Set number of concurrent block resync workers
#[structopt(name = "resync-n-workers", version = version::garage())]
ResyncNWorkers { n_workers: usize },
/// Set tranquility of block resync operations
#[structopt(name = "resync-tranquility", version = version::garage())] #[structopt(name = "resync-tranquility", version = version::garage())]
ResyncTranquility { tranquility: u32 }, ResyncTranquility { tranquility: u32 },
} }