Merge pull request 'improvements to CLI and new debug features' (#448) from cli-improvements into main

Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/448
This commit is contained in:
Alex 2023-01-02 12:42:24 +00:00
commit 7f7d53cfa9
24 changed files with 725 additions and 190 deletions

View file

@ -90,6 +90,15 @@ pub struct BlockManager {
tx_scrub_command: mpsc::Sender<ScrubWorkerCommand>, tx_scrub_command: mpsc::Sender<ScrubWorkerCommand>,
} }
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct BlockResyncErrorInfo {
pub hash: Hash,
pub refcount: u64,
pub error_count: u64,
pub last_try: u64,
pub next_try: u64,
}
// This custom struct contains functions that must only be ran // This custom struct contains functions that must only be ran
// when the lock is held. We ensure that it is the case by storing // when the lock is held. We ensure that it is the case by storing
// it INSIDE a Mutex. // it INSIDE a Mutex.
@ -114,7 +123,8 @@ impl BlockManager {
.netapp .netapp
.endpoint("garage_block/manager.rs/Rpc".to_string()); .endpoint("garage_block/manager.rs/Rpc".to_string());
let metrics = BlockManagerMetrics::new(resync.queue.clone(), resync.errors.clone()); let metrics =
BlockManagerMetrics::new(rc.rc.clone(), resync.queue.clone(), resync.errors.clone());
let (scrub_tx, scrub_rx) = mpsc::channel(1); let (scrub_tx, scrub_rx) = mpsc::channel(1);
@ -309,11 +319,41 @@ impl BlockManager {
Ok(self.rc.rc.len()?) Ok(self.rc.rc.len()?)
} }
/// Get number of items in the refcount table
pub fn rc_fast_len(&self) -> Result<Option<usize>, Error> {
Ok(self.rc.rc.fast_len()?)
}
/// Send command to start/stop/manager scrub worker /// Send command to start/stop/manager scrub worker
pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) { pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) {
let _ = self.tx_scrub_command.send(cmd).await; let _ = self.tx_scrub_command.send(cmd).await;
} }
/// Get the reference count of a block
pub fn get_block_rc(&self, hash: &Hash) -> Result<u64, Error> {
Ok(self.rc.get_block_rc(hash)?.as_u64())
}
/// List all resync errors
pub fn list_resync_errors(&self) -> Result<Vec<BlockResyncErrorInfo>, Error> {
let mut blocks = Vec::with_capacity(self.resync.errors.len());
for ent in self.resync.errors.iter()? {
let (hash, cnt) = ent?;
let cnt = ErrorCounter::decode(&cnt);
blocks.push(BlockResyncErrorInfo {
hash: Hash::try_from(&hash).unwrap(),
refcount: 0,
error_count: cnt.errors,
last_try: cnt.last_try,
next_try: cnt.next_try(),
});
}
for block in blocks.iter_mut() {
block.refcount = self.get_block_rc(&block.hash)?;
}
Ok(blocks)
}
//// ----- Managing the reference counter ---- //// ----- Managing the reference counter ----
/// Increment the number of time a block is used, putting it to resynchronization if it is /// Increment the number of time a block is used, putting it to resynchronization if it is

View file

@ -1,9 +1,11 @@
use opentelemetry::{global, metrics::*}; use opentelemetry::{global, metrics::*};
use garage_db as db;
use garage_db::counted_tree_hack::CountedTree; use garage_db::counted_tree_hack::CountedTree;
/// TableMetrics reference all counter used for metrics /// TableMetrics reference all counter used for metrics
pub struct BlockManagerMetrics { pub struct BlockManagerMetrics {
pub(crate) _rc_size: ValueObserver<u64>,
pub(crate) _resync_queue_len: ValueObserver<u64>, pub(crate) _resync_queue_len: ValueObserver<u64>,
pub(crate) _resync_errored_blocks: ValueObserver<u64>, pub(crate) _resync_errored_blocks: ValueObserver<u64>,
@ -23,9 +25,17 @@ pub struct BlockManagerMetrics {
} }
impl BlockManagerMetrics { impl BlockManagerMetrics {
pub fn new(resync_queue: CountedTree, resync_errors: CountedTree) -> Self { pub fn new(rc_tree: db::Tree, resync_queue: CountedTree, resync_errors: CountedTree) -> Self {
let meter = global::meter("garage_model/block"); let meter = global::meter("garage_model/block");
Self { Self {
_rc_size: meter
.u64_value_observer("block.rc_size", move |observer| {
if let Ok(Some(v)) = rc_tree.fast_len() {
observer.observe(v as u64, &[])
}
})
.with_description("Number of blocks known to the reference counter")
.init(),
_resync_queue_len: meter _resync_queue_len: meter
.u64_value_observer("block.resync_queue_length", move |observer| { .u64_value_observer("block.resync_queue_length", move |observer| {
observer.observe(resync_queue.len() as u64, &[]) observer.observe(resync_queue.len() as u64, &[])

View file

@ -169,4 +169,11 @@ impl RcEntry {
pub(crate) fn is_needed(&self) -> bool { pub(crate) fn is_needed(&self) -> bool {
!self.is_deletable() !self.is_deletable()
} }
pub(crate) fn as_u64(&self) -> u64 {
match self {
RcEntry::Present { count } => *count,
_ => 0,
}
}
} }

View file

@ -53,7 +53,7 @@ impl Worker for RepairWorker {
"Block repair worker".into() "Block repair worker".into()
} }
fn info(&self) -> Option<String> { fn status(&self) -> WorkerStatus {
match self.block_iter.as_ref() { match self.block_iter.as_ref() {
None => { None => {
let idx_bytes = self let idx_bytes = self
@ -66,9 +66,20 @@ impl Worker for RepairWorker {
} else { } else {
idx_bytes idx_bytes
}; };
Some(format!("Phase 1: {}", hex::encode(idx_bytes))) WorkerStatus {
progress: Some("0.00%".into()),
freeform: vec![format!(
"Currently in phase 1, iterator position: {}",
hex::encode(idx_bytes)
)],
..Default::default()
} }
Some(bi) => Some(format!("Phase 2: {:.2}% done", bi.progress() * 100.)), }
Some(bi) => WorkerStatus {
progress: Some(format!("{:.2}%", bi.progress() * 100.)),
freeform: vec!["Currently in phase 2".into()],
..Default::default()
},
} }
} }
@ -271,29 +282,28 @@ impl Worker for ScrubWorker {
"Block scrub worker".into() "Block scrub worker".into()
} }
fn info(&self) -> Option<String> { fn status(&self) -> WorkerStatus {
let s = match &self.work { let mut s = WorkerStatus {
ScrubWorkerState::Running(bsi) => format!( persistent_errors: Some(self.persisted.corruptions_detected),
"{:.2}% done (tranquility = {})", tranquility: Some(self.persisted.tranquility),
bsi.progress() * 100., ..Default::default()
self.persisted.tranquility
),
ScrubWorkerState::Paused(bsi, rt) => {
format!(
"Paused, {:.2}% done, resumes at {}",
bsi.progress() * 100.,
msec_to_rfc3339(*rt)
)
}
ScrubWorkerState::Finished => format!(
"Last completed scrub: {}",
msec_to_rfc3339(self.persisted.time_last_complete_scrub)
),
}; };
Some(format!( match &self.work {
"{} ; corruptions detected: {}", ScrubWorkerState::Running(bsi) => {
s, self.persisted.corruptions_detected s.progress = Some(format!("{:.2}%", bsi.progress() * 100.));
)) }
ScrubWorkerState::Paused(bsi, rt) => {
s.progress = Some(format!("{:.2}%", bsi.progress() * 100.));
s.freeform = vec![format!("Scrub paused, resumes at {}", msec_to_rfc3339(*rt))];
}
ScrubWorkerState::Finished => {
s.freeform = vec![format!(
"Last scrub completed at {}",
msec_to_rfc3339(self.persisted.time_last_complete_scrub)
)];
}
}
s
} }
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {

View file

@ -123,6 +123,24 @@ impl BlockResyncManager {
Ok(self.errors.len()) Ok(self.errors.len())
} }
/// Clear the error counter for a block and put it in queue immediately
pub fn clear_backoff(&self, hash: &Hash) -> Result<(), Error> {
let now = now_msec();
if let Some(ec) = self.errors.get(hash)? {
let mut ec = ErrorCounter::decode(&ec);
if ec.errors > 0 {
ec.last_try = now - ec.delay_msec();
self.errors.insert(hash, ec.encode())?;
self.put_to_resync_at(hash, now)?;
return Ok(());
}
}
Err(Error::Message(format!(
"Block {:?} was not in an errored state",
hash
)))
}
// ---- Resync loop ---- // ---- Resync loop ----
// This part manages a queue of blocks that need to be // This part manages a queue of blocks that need to be
@ -257,7 +275,7 @@ impl BlockResyncManager {
if let Err(e) = &res { if let Err(e) = &res {
manager.metrics.resync_error_counter.add(1); manager.metrics.resync_error_counter.add(1);
warn!("Error when resyncing {:?}: {}", hash, e); error!("Error when resyncing {:?}: {}", hash, e);
let err_counter = match self.errors.get(hash.as_slice())? { let err_counter = match self.errors.get(hash.as_slice())? {
Some(ec) => ErrorCounter::decode(&ec).add1(now + 1), Some(ec) => ErrorCounter::decode(&ec).add1(now + 1),
@ -477,27 +495,22 @@ impl Worker for ResyncWorker {
format!("Block resync worker #{}", self.index + 1) format!("Block resync worker #{}", self.index + 1)
} }
fn info(&self) -> Option<String> { fn status(&self) -> WorkerStatus {
let persisted = self.manager.resync.persisted.load(); let persisted = self.manager.resync.persisted.load();
if self.index >= persisted.n_workers { if self.index >= persisted.n_workers {
return Some("(unused)".into()); return WorkerStatus {
freeform: vec!["This worker is currently disabled".into()],
..Default::default()
};
} }
let mut ret = vec![]; WorkerStatus {
ret.push(format!("tranquility = {}", persisted.tranquility)); queue_length: Some(self.manager.resync.queue_len().unwrap_or(0) as u64),
tranquility: Some(persisted.tranquility),
let qlen = self.manager.resync.queue_len().unwrap_or(0); persistent_errors: Some(self.manager.resync.errors_len().unwrap_or(0) as u64),
if qlen > 0 { ..Default::default()
ret.push(format!("{} blocks in queue", qlen));
} }
let elen = self.manager.resync.errors_len().unwrap_or(0);
if elen > 0 {
ret.push(format!("{} blocks in error state", elen));
}
Some(ret.join(", "))
} }
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
@ -545,9 +558,9 @@ impl Worker for ResyncWorker {
/// and the time of the last try. /// and the time of the last try.
/// Used to implement exponential backoff. /// Used to implement exponential backoff.
#[derive(Clone, Copy, Debug)] #[derive(Clone, Copy, Debug)]
struct ErrorCounter { pub(crate) struct ErrorCounter {
errors: u64, pub(crate) errors: u64,
last_try: u64, pub(crate) last_try: u64,
} }
impl ErrorCounter { impl ErrorCounter {
@ -558,12 +571,13 @@ impl ErrorCounter {
} }
} }
fn decode(data: &[u8]) -> Self { pub(crate) fn decode(data: &[u8]) -> Self {
Self { Self {
errors: u64::from_be_bytes(data[0..8].try_into().unwrap()), errors: u64::from_be_bytes(data[0..8].try_into().unwrap()),
last_try: u64::from_be_bytes(data[8..16].try_into().unwrap()), last_try: u64::from_be_bytes(data[8..16].try_into().unwrap()),
} }
} }
fn encode(&self) -> Vec<u8> { fn encode(&self) -> Vec<u8> {
[ [
u64::to_be_bytes(self.errors), u64::to_be_bytes(self.errors),
@ -583,7 +597,8 @@ impl ErrorCounter {
(RESYNC_RETRY_DELAY.as_millis() as u64) (RESYNC_RETRY_DELAY.as_millis() as u64)
<< std::cmp::min(self.errors - 1, RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER) << std::cmp::min(self.errors - 1, RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER)
} }
fn next_try(&self) -> u64 {
pub(crate) fn next_try(&self) -> u64 {
self.last_try + self.delay_msec() self.last_try + self.delay_msec()
} }
} }

View file

@ -181,6 +181,10 @@ impl Tree {
pub fn len(&self) -> Result<usize> { pub fn len(&self) -> Result<usize> {
self.0.len(self.1) self.0.len(self.1)
} }
#[inline]
pub fn fast_len(&self) -> Result<Option<usize>> {
self.0.fast_len(self.1)
}
#[inline] #[inline]
pub fn first(&self) -> Result<Option<(Value, Value)>> { pub fn first(&self) -> Result<Option<(Value, Value)>> {
@ -323,6 +327,9 @@ pub(crate) trait IDb: Send + Sync {
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>; fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>;
fn len(&self, tree: usize) -> Result<usize>; fn len(&self, tree: usize) -> Result<usize>;
fn fast_len(&self, _tree: usize) -> Result<Option<usize>> {
Ok(None)
}
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>>; fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>>;
fn remove(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>; fn remove(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>;

View file

@ -121,6 +121,10 @@ impl IDb for LmdbDb {
Ok(tree.len(&tx)?.try_into().unwrap()) Ok(tree.len(&tx)?.try_into().unwrap())
} }
fn fast_len(&self, tree: usize) -> Result<Option<usize>> {
Ok(Some(self.len(tree)?))
}
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>> { fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>> {
let tree = self.get_tree(tree)?; let tree = self.get_tree(tree)?;
let mut tx = self.db.write_txn()?; let mut tx = self.db.write_txn()?;

View file

@ -144,6 +144,10 @@ impl IDb for SqliteDb {
} }
} }
fn fast_len(&self, tree: usize) -> Result<Option<usize>> {
Ok(Some(self.len(tree)?))
}
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>> { fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>> {
trace!("insert {}: lock db", tree); trace!("insert {}: lock db", tree);
let this = self.0.lock().unwrap(); let this = self.0.lock().unwrap();

View file

@ -8,6 +8,7 @@ use serde::{Deserialize, Serialize};
use garage_util::crdt::*; use garage_util::crdt::*;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error as GarageError; use garage_util::error::Error as GarageError;
use garage_util::formater::format_table_to_string;
use garage_util::time::*; use garage_util::time::*;
use garage_table::replication::*; use garage_table::replication::*;
@ -15,6 +16,7 @@ use garage_table::*;
use garage_rpc::*; use garage_rpc::*;
use garage_block::manager::BlockResyncErrorInfo;
use garage_block::repair::ScrubWorkerCommand; use garage_block::repair::ScrubWorkerCommand;
use garage_model::bucket_alias_table::*; use garage_model::bucket_alias_table::*;
@ -24,6 +26,8 @@ use garage_model::helper::error::{Error, OkOrBadRequest};
use garage_model::key_table::*; use garage_model::key_table::*;
use garage_model::migrate::Migrate; use garage_model::migrate::Migrate;
use garage_model::permission::*; use garage_model::permission::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::Version;
use crate::cli::*; use crate::cli::*;
use crate::repair::online::launch_online_repair; use crate::repair::online::launch_online_repair;
@ -38,7 +42,8 @@ pub enum AdminRpc {
LaunchRepair(RepairOpt), LaunchRepair(RepairOpt),
Migrate(MigrateOpt), Migrate(MigrateOpt),
Stats(StatsOpt), Stats(StatsOpt),
Worker(WorkerOpt), Worker(WorkerOperation),
BlockOperation(BlockOperation),
// Replies // Replies
Ok(String), Ok(String),
@ -54,6 +59,13 @@ pub enum AdminRpc {
HashMap<usize, garage_util::background::WorkerInfo>, HashMap<usize, garage_util::background::WorkerInfo>,
WorkerListOpt, WorkerListOpt,
), ),
WorkerInfo(usize, garage_util::background::WorkerInfo),
BlockErrorList(Vec<BlockResyncErrorInfo>),
BlockInfo {
hash: Hash,
refcount: u64,
versions: Vec<Result<Version, Uuid>>,
},
} }
impl Rpc for AdminRpc { impl Rpc for AdminRpc {
@ -73,6 +85,8 @@ impl AdminRpcHandler {
admin admin
} }
// ================ BUCKET COMMANDS ====================
async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result<AdminRpc, Error> { async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result<AdminRpc, Error> {
match cmd { match cmd {
BucketOperation::List => self.handle_list_buckets().await, BucketOperation::List => self.handle_list_buckets().await,
@ -551,6 +565,8 @@ impl AdminRpcHandler {
Ok(AdminRpc::Ok(ret)) Ok(AdminRpc::Ok(ret))
} }
// ================ KEY COMMANDS ====================
async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> { async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> {
match cmd { match cmd {
KeyOperation::List => self.handle_list_keys().await, KeyOperation::List => self.handle_list_keys().await,
@ -688,6 +704,8 @@ impl AdminRpcHandler {
Ok(AdminRpc::KeyInfo(key, relevant_buckets)) Ok(AdminRpc::KeyInfo(key, relevant_buckets))
} }
// ================ MIGRATION COMMANDS ====================
async fn handle_migrate(self: &Arc<Self>, opt: MigrateOpt) -> Result<AdminRpc, Error> { async fn handle_migrate(self: &Arc<Self>, opt: MigrateOpt) -> Result<AdminRpc, Error> {
if !opt.yes { if !opt.yes {
return Err(Error::BadRequest( return Err(Error::BadRequest(
@ -704,6 +722,8 @@ impl AdminRpcHandler {
Ok(AdminRpc::Ok("Migration successfull.".into())) Ok(AdminRpc::Ok("Migration successfull.".into()))
} }
// ================ REPAIR COMMANDS ====================
async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRpc, Error> { async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRpc, Error> {
if !opt.yes { if !opt.yes {
return Err(Error::BadRequest( return Err(Error::BadRequest(
@ -747,6 +767,8 @@ impl AdminRpcHandler {
} }
} }
// ================ STATS COMMANDS ====================
async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> { async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> {
if opt.all_nodes { if opt.all_nodes {
let mut ret = String::new(); let mut ret = String::new();
@ -763,11 +785,12 @@ impl AdminRpcHandler {
match self match self
.endpoint .endpoint
.call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL) .call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL)
.await? .await
{ {
Ok(AdminRpc::Ok(s)) => writeln!(&mut ret, "{}", s).unwrap(), Ok(Ok(AdminRpc::Ok(s))) => writeln!(&mut ret, "{}", s).unwrap(),
Ok(x) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(), Ok(Ok(x)) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(),
Err(e) => writeln!(&mut ret, "Error: {}", e).unwrap(), Ok(Err(e)) => writeln!(&mut ret, "Remote error: {}", e).unwrap(),
Err(e) => writeln!(&mut ret, "Network error: {}", e).unwrap(),
} }
} }
Ok(AdminRpc::Ok(ret)) Ok(AdminRpc::Ok(ret))
@ -787,6 +810,7 @@ impl AdminRpcHandler {
.unwrap_or_else(|| "(unknown)".into()), .unwrap_or_else(|| "(unknown)".into()),
) )
.unwrap(); .unwrap();
writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap(); writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap();
// Gather ring statistics // Gather ring statistics
@ -805,21 +829,38 @@ impl AdminRpcHandler {
writeln!(&mut ret, " {:?} {}", n, c).unwrap(); writeln!(&mut ret, " {:?} {}", n, c).unwrap();
} }
self.gather_table_stats(&mut ret, &self.garage.bucket_table, &opt)?; // Gather table statistics
self.gather_table_stats(&mut ret, &self.garage.key_table, &opt)?; let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()];
self.gather_table_stats(&mut ret, &self.garage.object_table, &opt)?; table.push(self.gather_table_stats(&self.garage.bucket_table, opt.detailed)?);
self.gather_table_stats(&mut ret, &self.garage.version_table, &opt)?; table.push(self.gather_table_stats(&self.garage.key_table, opt.detailed)?);
self.gather_table_stats(&mut ret, &self.garage.block_ref_table, &opt)?; table.push(self.gather_table_stats(&self.garage.object_table, opt.detailed)?);
table.push(self.gather_table_stats(&self.garage.version_table, opt.detailed)?);
table.push(self.gather_table_stats(&self.garage.block_ref_table, opt.detailed)?);
write!(
&mut ret,
"\nTable stats:\n{}",
format_table_to_string(table)
)
.unwrap();
// Gather block manager statistics
writeln!(&mut ret, "\nBlock manager stats:").unwrap(); writeln!(&mut ret, "\nBlock manager stats:").unwrap();
if opt.detailed { let rc_len = if opt.detailed {
self.garage.block_manager.rc_len()?.to_string()
} else {
self.garage
.block_manager
.rc_fast_len()?
.map(|x| x.to_string())
.unwrap_or_else(|| "NC".into())
};
writeln!( writeln!(
&mut ret, &mut ret,
" number of RC entries (~= number of blocks): {}", " number of RC entries (~= number of blocks): {}",
self.garage.block_manager.rc_len()? rc_len
) )
.unwrap(); .unwrap();
}
writeln!( writeln!(
&mut ret, &mut ret,
" resync queue length: {}", " resync queue length: {}",
@ -833,67 +874,84 @@ impl AdminRpcHandler {
) )
.unwrap(); .unwrap();
if !opt.detailed {
writeln!(&mut ret, "\nIf values are missing (marked as NC), consider adding the --detailed flag - this will be slow.").unwrap();
}
Ok(ret) Ok(ret)
} }
fn gather_table_stats<F, R>( fn gather_table_stats<F, R>(
&self, &self,
to: &mut String,
t: &Arc<Table<F, R>>, t: &Arc<Table<F, R>>,
opt: &StatsOpt, detailed: bool,
) -> Result<(), Error> ) -> Result<String, Error>
where where
F: TableSchema + 'static, F: TableSchema + 'static,
R: TableReplication + 'static, R: TableReplication + 'static,
{ {
writeln!(to, "\nTable stats for {}", F::TABLE_NAME).unwrap(); let (data_len, mkl_len) = if detailed {
if opt.detailed { (
writeln!( t.data.store.len().map_err(GarageError::from)?.to_string(),
to, t.merkle_updater.merkle_tree_len()?.to_string(),
" number of items: {}",
t.data.store.len().map_err(GarageError::from)?
) )
.unwrap(); } else {
writeln!( (
to, t.data
" Merkle tree size: {}", .store
t.merkle_updater.merkle_tree_len()? .fast_len()
.map_err(GarageError::from)?
.map(|x| x.to_string())
.unwrap_or_else(|| "NC".into()),
t.merkle_updater
.merkle_tree_fast_len()?
.map(|x| x.to_string())
.unwrap_or_else(|| "NC".into()),
) )
.unwrap(); };
}
writeln!(
to,
" Merkle updater todo queue length: {}",
t.merkle_updater.todo_len()?
)
.unwrap();
writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()?).unwrap();
Ok(()) Ok(format!(
" {}\t{}\t{}\t{}\t{}",
F::TABLE_NAME,
data_len,
mkl_len,
t.merkle_updater.todo_len()?,
t.data.gc_todo_len()?
))
} }
// ---- // ================ WORKER COMMANDS ====================
async fn handle_worker_cmd(&self, opt: WorkerOpt) -> Result<AdminRpc, Error> { async fn handle_worker_cmd(&self, cmd: &WorkerOperation) -> Result<AdminRpc, Error> {
match opt.cmd { match cmd {
WorkerCmd::List { opt } => { WorkerOperation::List { opt } => {
let workers = self.garage.background.get_worker_info(); let workers = self.garage.background.get_worker_info();
Ok(AdminRpc::WorkerList(workers, opt)) Ok(AdminRpc::WorkerList(workers, *opt))
} }
WorkerCmd::Set { opt } => match opt { WorkerOperation::Info { tid } => {
let info = self
.garage
.background
.get_worker_info()
.get(tid)
.ok_or_bad_request(format!("No worker with TID {}", tid))?
.clone();
Ok(AdminRpc::WorkerInfo(*tid, info))
}
WorkerOperation::Set { opt } => match opt {
WorkerSetCmd::ScrubTranquility { tranquility } => { WorkerSetCmd::ScrubTranquility { tranquility } => {
let scrub_command = ScrubWorkerCommand::SetTranquility(tranquility); let scrub_command = ScrubWorkerCommand::SetTranquility(*tranquility);
self.garage self.garage
.block_manager .block_manager
.send_scrub_command(scrub_command) .send_scrub_command(scrub_command)
.await; .await;
Ok(AdminRpc::Ok("Scrub tranquility updated".into())) Ok(AdminRpc::Ok("Scrub tranquility updated".into()))
} }
WorkerSetCmd::ResyncNWorkers { n_workers } => { WorkerSetCmd::ResyncWorkerCount { worker_count } => {
self.garage self.garage
.block_manager .block_manager
.resync .resync
.set_n_workers(n_workers) .set_n_workers(*worker_count)
.await?; .await?;
Ok(AdminRpc::Ok("Number of resync workers updated".into())) Ok(AdminRpc::Ok("Number of resync workers updated".into()))
} }
@ -901,13 +959,154 @@ impl AdminRpcHandler {
self.garage self.garage
.block_manager .block_manager
.resync .resync
.set_tranquility(tranquility) .set_tranquility(*tranquility)
.await?; .await?;
Ok(AdminRpc::Ok("Resync tranquility updated".into())) Ok(AdminRpc::Ok("Resync tranquility updated".into()))
} }
}, },
} }
} }
// ================ BLOCK COMMANDS ====================
async fn handle_block_cmd(&self, cmd: &BlockOperation) -> Result<AdminRpc, Error> {
match cmd {
BlockOperation::ListErrors => Ok(AdminRpc::BlockErrorList(
self.garage.block_manager.list_resync_errors()?,
)),
BlockOperation::Info { hash } => {
let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
let refcount = self.garage.block_manager.get_block_rc(&hash)?;
let block_refs = self
.garage
.block_ref_table
.get_range(&hash, None, None, 10000, Default::default())
.await?;
let mut versions = vec![];
for br in block_refs {
if let Some(v) = self
.garage
.version_table
.get(&br.version, &EmptyKey)
.await?
{
versions.push(Ok(v));
} else {
versions.push(Err(br.version));
}
}
Ok(AdminRpc::BlockInfo {
hash,
refcount,
versions,
})
}
BlockOperation::RetryNow { all, blocks } => {
if *all {
if !blocks.is_empty() {
return Err(Error::BadRequest(
"--all was specified, cannot also specify blocks".into(),
));
}
let blocks = self.garage.block_manager.list_resync_errors()?;
for b in blocks.iter() {
self.garage.block_manager.resync.clear_backoff(&b.hash)?;
}
Ok(AdminRpc::Ok(format!(
"{} blocks returned in queue for a retry now (check logs to see results)",
blocks.len()
)))
} else {
for hash in blocks {
let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
self.garage.block_manager.resync.clear_backoff(&hash)?;
}
Ok(AdminRpc::Ok(format!(
"{} blocks returned in queue for a retry now (check logs to see results)",
blocks.len()
)))
}
}
BlockOperation::Purge { yes, blocks } => {
if !yes {
return Err(Error::BadRequest(
"Pass the --yes flag to confirm block purge operation.".into(),
));
}
let mut obj_dels = 0;
let mut ver_dels = 0;
for hash in blocks {
let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
let block_refs = self
.garage
.block_ref_table
.get_range(&hash, None, None, 10000, Default::default())
.await?;
for br in block_refs {
let version = match self
.garage
.version_table
.get(&br.version, &EmptyKey)
.await?
{
Some(v) => v,
None => continue,
};
if let Some(object) = self
.garage
.object_table
.get(&version.bucket_id, &version.key)
.await?
{
let ov = object.versions().iter().rev().find(|v| v.is_complete());
if let Some(ov) = ov {
if ov.uuid == br.version {
let del_uuid = gen_uuid();
let deleted_object = Object::new(
version.bucket_id,
version.key.clone(),
vec![ObjectVersion {
uuid: del_uuid,
timestamp: ov.timestamp + 1,
state: ObjectVersionState::Complete(
ObjectVersionData::DeleteMarker,
),
}],
);
self.garage.object_table.insert(&deleted_object).await?;
obj_dels += 1;
}
}
}
if !version.deleted.get() {
let deleted_version = Version::new(
version.uuid,
version.bucket_id,
version.key.clone(),
true,
);
self.garage.version_table.insert(&deleted_version).await?;
ver_dels += 1;
}
}
}
Ok(AdminRpc::Ok(format!(
"{} blocks were purged: {} object deletion markers added, {} versions marked deleted",
blocks.len(),
obj_dels,
ver_dels
)))
}
}
}
} }
#[async_trait] #[async_trait]
@ -923,7 +1122,8 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler {
AdminRpc::Migrate(opt) => self.handle_migrate(opt.clone()).await, AdminRpc::Migrate(opt) => self.handle_migrate(opt.clone()).await,
AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await, AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await, AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
AdminRpc::Worker(opt) => self.handle_worker_cmd(opt.clone()).await, AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,
AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
m => Err(GarageError::unexpected_rpc_message(m).into()), m => Err(GarageError::unexpected_rpc_message(m).into()),
} }
} }

View file

@ -41,6 +41,9 @@ pub async fn cli_command_dispatch(
} }
Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await, Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await,
Command::Worker(wo) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Worker(wo)).await, Command::Worker(wo) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Worker(wo)).await,
Command::Block(bo) => {
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BlockOperation(bo)).await
}
_ => unreachable!(), _ => unreachable!(),
} }
} }
@ -186,7 +189,20 @@ pub async fn cmd_admin(
print_key_info(&key, &rb); print_key_info(&key, &rb);
} }
AdminRpc::WorkerList(wi, wlo) => { AdminRpc::WorkerList(wi, wlo) => {
print_worker_info(wi, wlo); print_worker_list(wi, wlo);
}
AdminRpc::WorkerInfo(tid, wi) => {
print_worker_info(tid, wi);
}
AdminRpc::BlockErrorList(el) => {
print_block_error_list(el);
}
AdminRpc::BlockInfo {
hash,
refcount,
versions,
} => {
print_block_info(hash, refcount, versions);
} }
r => { r => {
error!("Unexpected response: {:?}", r); error!("Unexpected response: {:?}", r);

View file

@ -49,7 +49,11 @@ pub enum Command {
/// Manage background workers /// Manage background workers
#[structopt(name = "worker", version = garage_version())] #[structopt(name = "worker", version = garage_version())]
Worker(WorkerOpt), Worker(WorkerOperation),
/// Low-level debug operations on data blocks
#[structopt(name = "block", version = garage_version())]
Block(BlockOperation),
} }
#[derive(StructOpt, Debug)] #[derive(StructOpt, Debug)]
@ -502,20 +506,17 @@ pub struct StatsOpt {
pub detailed: bool, pub detailed: bool,
} }
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub struct WorkerOpt {
#[structopt(subcommand)]
pub cmd: WorkerCmd,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum WorkerCmd { pub enum WorkerOperation {
/// List all workers on Garage node /// List all workers on Garage node
#[structopt(name = "list", version = garage_version())] #[structopt(name = "list", version = garage_version())]
List { List {
#[structopt(flatten)] #[structopt(flatten)]
opt: WorkerListOpt, opt: WorkerListOpt,
}, },
/// Get detailed information about a worker
#[structopt(name = "info", version = garage_version())]
Info { tid: usize },
/// Set worker parameter /// Set worker parameter
#[structopt(name = "set", version = garage_version())] #[structopt(name = "set", version = garage_version())]
Set { Set {
@ -540,9 +541,41 @@ pub enum WorkerSetCmd {
#[structopt(name = "scrub-tranquility", version = garage_version())] #[structopt(name = "scrub-tranquility", version = garage_version())]
ScrubTranquility { tranquility: u32 }, ScrubTranquility { tranquility: u32 },
/// Set number of concurrent block resync workers /// Set number of concurrent block resync workers
#[structopt(name = "resync-n-workers", version = garage_version())] #[structopt(name = "resync-worker-count", version = garage_version())]
ResyncNWorkers { n_workers: usize }, ResyncWorkerCount { worker_count: usize },
/// Set tranquility of block resync operations /// Set tranquility of block resync operations
#[structopt(name = "resync-tranquility", version = garage_version())] #[structopt(name = "resync-tranquility", version = garage_version())]
ResyncTranquility { tranquility: u32 }, ResyncTranquility { tranquility: u32 },
} }
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum BlockOperation {
/// List all blocks that currently have a resync error
#[structopt(name = "list-errors", version = garage_version())]
ListErrors,
/// Get detailed information about a single block
#[structopt(name = "info", version = garage_version())]
Info {
/// Hash of the block for which to retrieve information
hash: String,
},
/// Retry now the resync of one or many blocks
#[structopt(name = "retry-now", version = garage_version())]
RetryNow {
/// Retry all blocks that have a resync error
#[structopt(long = "all")]
all: bool,
/// Hashes of the block to retry to resync now
blocks: Vec<String>,
},
/// Delete all objects referencing a missing block
#[structopt(name = "purge", version = garage_version())]
Purge {
/// Mandatory to confirm this operation
#[structopt(long = "yes")]
yes: bool,
/// Hashes of the block to purge
#[structopt(required = true)]
blocks: Vec<String>,
},
}

View file

@ -3,14 +3,17 @@ use std::time::Duration;
use garage_util::background::*; use garage_util::background::*;
use garage_util::crdt::*; use garage_util::crdt::*;
use garage_util::data::Uuid; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_util::formater::format_table; use garage_util::formater::format_table;
use garage_util::time::*; use garage_util::time::*;
use garage_block::manager::BlockResyncErrorInfo;
use garage_model::bucket_table::*; use garage_model::bucket_table::*;
use garage_model::key_table::*; use garage_model::key_table::*;
use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS}; use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS};
use garage_model::s3::version_table::Version;
use crate::cli::structs::WorkerListOpt; use crate::cli::structs::WorkerListOpt;
@ -241,7 +244,7 @@ pub fn find_matching_node(
} }
} }
pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) { pub fn print_worker_list(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
let mut wi = wi.into_iter().collect::<Vec<_>>(); let mut wi = wi.into_iter().collect::<Vec<_>>();
wi.sort_by_key(|(tid, info)| { wi.sort_by_key(|(tid, info)| {
( (
@ -254,7 +257,7 @@ pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
) )
}); });
let mut table = vec![]; let mut table = vec!["TID\tState\tName\tTranq\tDone\tQueue\tErrors\tConsec\tLast".to_string()];
for (tid, info) in wi.iter() { for (tid, info) in wi.iter() {
if wlo.busy && !matches!(info.state, WorkerState::Busy | WorkerState::Throttled(_)) { if wlo.busy && !matches!(info.state, WorkerState::Busy | WorkerState::Throttled(_)) {
continue; continue;
@ -263,33 +266,147 @@ pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
continue; continue;
} }
table.push(format!("{}\t{}\t{}", tid, info.state, info.name));
if let Some(i) = &info.info {
table.push(format!("\t\t {}", i));
}
let tf = timeago::Formatter::new(); let tf = timeago::Formatter::new();
let (err_ago, err_msg) = info let err_ago = info
.last_error .last_error
.as_ref() .as_ref()
.map(|(m, t)| { .map(|(_, t)| tf.convert(Duration::from_millis(now_msec() - t)))
( .unwrap_or_default();
tf.convert(Duration::from_millis(now_msec() - t)), let (total_err, consec_err) = if info.errors > 0 {
m.as_str(), (info.errors.to_string(), info.consecutive_errors.to_string())
) } else {
}) ("-".into(), "-".into())
.unwrap_or(("(?) ago".into(), "(?)")); };
if info.consecutive_errors > 0 {
table.push(format!( table.push(format!(
"\t\t {} consecutive errors ({} total), last {}", "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}",
info.consecutive_errors, info.errors, err_ago, tid,
info.state,
info.name,
info.status
.tranquility
.as_ref()
.map(ToString::to_string)
.unwrap_or_else(|| "-".into()),
info.status.progress.as_deref().unwrap_or("-"),
info.status
.queue_length
.as_ref()
.map(ToString::to_string)
.unwrap_or_else(|| "-".into()),
total_err,
consec_err,
err_ago,
)); ));
table.push(format!("\t\t {}", err_msg));
} else if info.errors > 0 {
table.push(format!("\t\t ({} errors, last {})", info.errors, err_ago,));
if wlo.errors {
table.push(format!("\t\t {}", err_msg));
} }
format_table(table);
}
pub fn print_worker_info(tid: usize, info: WorkerInfo) {
let mut table = vec![];
table.push(format!("Task id:\t{}", tid));
table.push(format!("Worker name:\t{}", info.name));
match info.state {
WorkerState::Throttled(t) => {
table.push(format!(
"Worker state:\tBusy (throttled, paused for {:.3}s)",
t
));
}
s => {
table.push(format!("Worker state:\t{}", s));
}
};
if let Some(tql) = info.status.tranquility {
table.push(format!("Tranquility:\t{}", tql));
}
table.push("".into());
table.push(format!("Total errors:\t{}", info.errors));
table.push(format!("Consecutive errs:\t{}", info.consecutive_errors));
if let Some((s, t)) = info.last_error {
table.push(format!("Last error:\t{}", s));
let tf = timeago::Formatter::new();
table.push(format!(
"Last error time:\t{}",
tf.convert(Duration::from_millis(now_msec() - t))
));
}
table.push("".into());
if let Some(p) = info.status.progress {
table.push(format!("Progress:\t{}", p));
}
if let Some(ql) = info.status.queue_length {
table.push(format!("Queue length:\t{}", ql));
}
if let Some(pe) = info.status.persistent_errors {
table.push(format!("Persistent errors:\t{}", pe));
}
for (i, s) in info.status.freeform.iter().enumerate() {
if i == 0 {
if table.last() != Some(&"".into()) {
table.push("".into());
}
table.push(format!("Message:\t{}", s));
} else {
table.push(format!("\t{}", s));
} }
} }
format_table(table); format_table(table);
} }
pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
let now = now_msec();
let tf = timeago::Formatter::new();
let mut tf2 = timeago::Formatter::new();
tf2.ago("");
let mut table = vec!["Hash\tRC\tErrors\tLast error\tNext try".into()];
for e in el {
table.push(format!(
"{}\t{}\t{}\t{}\tin {}",
hex::encode(e.hash.as_slice()),
e.refcount,
e.error_count,
tf.convert(Duration::from_millis(now - e.last_try)),
tf2.convert(Duration::from_millis(e.next_try - now))
));
}
format_table(table);
}
pub fn print_block_info(hash: Hash, refcount: u64, versions: Vec<Result<Version, Uuid>>) {
println!("Block hash: {}", hex::encode(hash.as_slice()));
println!("Refcount: {}", refcount);
println!();
let mut table = vec!["Version\tBucket\tKey\tDeleted".into()];
let mut nondeleted_count = 0;
for v in versions.iter() {
match v {
Ok(ver) => {
table.push(format!(
"{:?}\t{:?}\t{}\t{:?}",
ver.uuid,
ver.bucket_id,
ver.key,
ver.deleted.get()
));
if !ver.deleted.get() {
nondeleted_count += 1;
}
}
Err(vh) => {
table.push(format!("{:?}\t\t\tyes", vh));
}
}
}
format_table(table);
if refcount != nondeleted_count {
println!();
println!("Warning: refcount does not match number of non-deleted versions");
}
}

View file

@ -127,9 +127,16 @@ async fn main() {
std::process::abort(); std::process::abort();
})); }));
// Parse arguments and dispatch command line
let opt = Opt::from_clap(&Opt::clap().version(version.as_str()).get_matches());
// Initialize logging as well as other libraries used in Garage // Initialize logging as well as other libraries used in Garage
if std::env::var("RUST_LOG").is_err() { if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "netapp=info,garage=info") let default_log = match &opt.cmd {
Command::Server => "netapp=info,garage=info",
_ => "netapp=warn,garage=warn",
};
std::env::set_var("RUST_LOG", default_log)
} }
tracing_subscriber::fmt() tracing_subscriber::fmt()
.with_writer(std::io::stderr) .with_writer(std::io::stderr)
@ -137,9 +144,6 @@ async fn main() {
.init(); .init();
sodiumoxide::init().expect("Unable to init sodiumoxide"); sodiumoxide::init().expect("Unable to init sodiumoxide");
// Parse arguments and dispatch command line
let opt = Opt::from_clap(&Opt::clap().version(version.as_str()).get_matches());
let res = match opt.cmd { let res = match opt.cmd {
Command::Server => server::run_server(opt.config_file).await, Command::Server => server::run_server(opt.config_file).await,
Command::OfflineRepair(repair_opt) => { Command::OfflineRepair(repair_opt) => {
@ -182,9 +186,9 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, sk); let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, sk);
// Find and parse the address of the target host // Find and parse the address of the target host
let (id, addr) = if let Some(h) = opt.rpc_host { let (id, addr, is_default_addr) = if let Some(h) = opt.rpc_host {
let (id, addrs) = parse_and_resolve_peer_addr(&h).ok_or_else(|| format!("Invalid RPC remote node identifier: {}. Expected format is <pubkey>@<IP or hostname>:<port>.", h))?; let (id, addrs) = parse_and_resolve_peer_addr(&h).ok_or_else(|| format!("Invalid RPC remote node identifier: {}. Expected format is <pubkey>@<IP or hostname>:<port>.", h))?;
(id, addrs[0]) (id, addrs[0], false)
} else { } else {
let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir) let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir)
.err_context(READ_KEY_ERROR)?; .err_context(READ_KEY_ERROR)?;
@ -195,24 +199,26 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
.ok_or_message("unable to resolve rpc_public_addr specified in config file")? .ok_or_message("unable to resolve rpc_public_addr specified in config file")?
.next() .next()
.ok_or_message("unable to resolve rpc_public_addr specified in config file")?; .ok_or_message("unable to resolve rpc_public_addr specified in config file")?;
(node_id, a) (node_id, a, false)
} else { } else {
let default_addr = SocketAddr::new( let default_addr = SocketAddr::new(
"127.0.0.1".parse().unwrap(), "127.0.0.1".parse().unwrap(),
config.as_ref().unwrap().rpc_bind_addr.port(), config.as_ref().unwrap().rpc_bind_addr.port(),
); );
warn!( (node_id, default_addr, true)
"Trying to contact Garage node at default address {}",
default_addr
);
warn!("If this doesn't work, consider adding rpc_public_addr in your config file or specifying the -h command line parameter.");
(node_id, default_addr)
} }
}; };
// Connect to target host // Connect to target host
netapp.clone().try_connect(addr, id).await if let Err(e) = netapp.clone().try_connect(addr, id).await {
.err_context("Unable to connect to destination RPC host. Check that you are using the same value of rpc_secret as them, and that you have their correct public key.")?; if is_default_addr {
warn!(
"Tried to contact Garage node at default address {}, which didn't work. If that address is wrong, consider setting rpc_public_addr in your config file.",
addr
);
}
Err(e).err_context("Unable to connect to destination RPC host. Check that you are using the same value of rpc_secret as them, and that you have their correct public key.")?;
}
let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into()); let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into());
let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into()); let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into());

View file

@ -85,8 +85,11 @@ impl Worker for RepairVersionsWorker {
"Version repair worker".into() "Version repair worker".into()
} }
fn info(&self) -> Option<String> { fn status(&self) -> WorkerStatus {
Some(format!("{} items done", self.counter)) WorkerStatus {
progress: Some(self.counter.to_string()),
..Default::default()
}
} }
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
@ -163,8 +166,11 @@ impl Worker for RepairBlockrefsWorker {
"Block refs repair worker".into() "Block refs repair worker".into()
} }
fn info(&self) -> Option<String> { fn status(&self) -> WorkerStatus {
Some(format!("{} items done", self.counter)) WorkerStatus {
progress: Some(self.counter.to_string()),
..Default::default()
}
} }
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {

View file

@ -404,14 +404,13 @@ impl<T: CountedItem> IndexPropagatorWorker<T> {
#[async_trait] #[async_trait]
impl<T: CountedItem> Worker for IndexPropagatorWorker<T> { impl<T: CountedItem> Worker for IndexPropagatorWorker<T> {
fn name(&self) -> String { fn name(&self) -> String {
format!("{} index counter propagator", T::COUNTER_TABLE_NAME) format!("{} counter", T::COUNTER_TABLE_NAME)
} }
fn info(&self) -> Option<String> { fn status(&self) -> WorkerStatus {
if !self.buf.is_empty() { WorkerStatus {
Some(format!("{} items in queue", self.buf.len())) queue_length: Some(self.buf.len() as u64),
} else { ..Default::default()
None
} }
} }

View file

@ -58,7 +58,13 @@ where
.expect("Unable to open DB tree"); .expect("Unable to open DB tree");
let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2"); let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2");
let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone()); let metrics = TableMetrics::new(
F::TABLE_NAME,
store.clone(),
merkle_tree.clone(),
merkle_todo.clone(),
gc_todo.clone(),
);
Arc::new(Self { Arc::new(Self {
system, system,

View file

@ -330,12 +330,10 @@ where
format!("{} GC", F::TABLE_NAME) format!("{} GC", F::TABLE_NAME)
} }
fn info(&self) -> Option<String> { fn status(&self) -> WorkerStatus {
let l = self.gc.data.gc_todo_len().unwrap_or(0); WorkerStatus {
if l > 0 { queue_length: Some(self.gc.data.gc_todo_len().unwrap_or(0) as u64),
Some(format!("{} items in queue", l)) ..Default::default()
} else {
None
} }
} }

View file

@ -293,6 +293,10 @@ where
Ok(self.data.merkle_tree.len()?) Ok(self.data.merkle_tree.len()?)
} }
pub fn merkle_tree_fast_len(&self) -> Result<Option<usize>, Error> {
Ok(self.data.merkle_tree.fast_len()?)
}
pub fn todo_len(&self) -> Result<usize, Error> { pub fn todo_len(&self) -> Result<usize, Error> {
Ok(self.data.merkle_todo.len()?) Ok(self.data.merkle_todo.len()?)
} }
@ -310,15 +314,13 @@ where
R: TableReplication + 'static, R: TableReplication + 'static,
{ {
fn name(&self) -> String { fn name(&self) -> String {
format!("{} Merkle tree updater", F::TABLE_NAME) format!("{} Merkle", F::TABLE_NAME)
} }
fn info(&self) -> Option<String> { fn status(&self) -> WorkerStatus {
let l = self.0.todo_len().unwrap_or(0); WorkerStatus {
if l > 0 { queue_length: Some(self.0.todo_len().unwrap_or(0) as u64),
Some(format!("{} items in queue", l)) ..Default::default()
} else {
None
} }
} }

View file

@ -5,6 +5,8 @@ use garage_db::counted_tree_hack::CountedTree;
/// TableMetrics reference all counter used for metrics /// TableMetrics reference all counter used for metrics
pub struct TableMetrics { pub struct TableMetrics {
pub(crate) _table_size: ValueObserver<u64>,
pub(crate) _merkle_tree_size: ValueObserver<u64>,
pub(crate) _merkle_todo_len: ValueObserver<u64>, pub(crate) _merkle_todo_len: ValueObserver<u64>,
pub(crate) _gc_todo_len: ValueObserver<u64>, pub(crate) _gc_todo_len: ValueObserver<u64>,
@ -20,9 +22,43 @@ pub struct TableMetrics {
pub(crate) sync_items_received: Counter<u64>, pub(crate) sync_items_received: Counter<u64>,
} }
impl TableMetrics { impl TableMetrics {
pub fn new(table_name: &'static str, merkle_todo: db::Tree, gc_todo: CountedTree) -> Self { pub fn new(
table_name: &'static str,
store: db::Tree,
merkle_tree: db::Tree,
merkle_todo: db::Tree,
gc_todo: CountedTree,
) -> Self {
let meter = global::meter(table_name); let meter = global::meter(table_name);
TableMetrics { TableMetrics {
_table_size: meter
.u64_value_observer(
"table.size",
move |observer| {
if let Ok(Some(v)) = store.fast_len() {
observer.observe(
v as u64,
&[KeyValue::new("table_name", table_name)],
);
}
},
)
.with_description("Number of items in table")
.init(),
_merkle_tree_size: meter
.u64_value_observer(
"table.merkle_tree_size",
move |observer| {
if let Ok(Some(v)) = merkle_tree.fast_len() {
observer.observe(
v as u64,
&[KeyValue::new("table_name", table_name)],
);
}
},
)
.with_description("Number of nodes in table's Merkle tree")
.init(),
_merkle_todo_len: meter _merkle_todo_len: meter
.u64_value_observer( .u64_value_observer(
"table.merkle_updater_todo_queue_length", "table.merkle_updater_todo_queue_length",

View file

@ -570,12 +570,10 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWor
format!("{} sync", F::TABLE_NAME) format!("{} sync", F::TABLE_NAME)
} }
fn info(&self) -> Option<String> { fn status(&self) -> WorkerStatus {
let l = self.todo.len(); WorkerStatus {
if l > 0 { queue_length: Some(self.todo.len() as u64),
Some(format!("{} partitions remaining", l)) ..Default::default()
} else {
None
} }
} }

View file

@ -49,3 +49,9 @@ impl EnumerationOrder {
} }
} }
} }
impl Default for EnumerationOrder {
fn default() -> Self {
EnumerationOrder::Forward
}
}

View file

@ -29,13 +29,24 @@ pub struct BackgroundRunner {
#[derive(Clone, Serialize, Deserialize, Debug)] #[derive(Clone, Serialize, Deserialize, Debug)]
pub struct WorkerInfo { pub struct WorkerInfo {
pub name: String, pub name: String,
pub info: Option<String>, pub status: WorkerStatus,
pub state: WorkerState, pub state: WorkerState,
pub errors: usize, pub errors: usize,
pub consecutive_errors: usize, pub consecutive_errors: usize,
pub last_error: Option<(String, u64)>, pub last_error: Option<(String, u64)>,
} }
/// WorkerStatus is a struct returned by the worker with a bunch of canonical
/// fields to indicate their status to CLI users. All fields are optional.
#[derive(Clone, Serialize, Deserialize, Debug, Default)]
pub struct WorkerStatus {
pub tranquility: Option<u32>,
pub progress: Option<String>,
pub queue_length: Option<u64>,
pub persistent_errors: Option<u64>,
pub freeform: Vec<String>,
}
impl BackgroundRunner { impl BackgroundRunner {
/// Create a new BackgroundRunner /// Create a new BackgroundRunner
pub fn new( pub fn new(

View file

@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize};
use tokio::select; use tokio::select;
use tokio::sync::{mpsc, watch}; use tokio::sync::{mpsc, watch};
use crate::background::WorkerInfo; use crate::background::{WorkerInfo, WorkerStatus};
use crate::error::Error; use crate::error::Error;
use crate::time::now_msec; use crate::time::now_msec;
@ -26,7 +26,7 @@ impl std::fmt::Display for WorkerState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self { match self {
WorkerState::Busy => write!(f, "Busy"), WorkerState::Busy => write!(f, "Busy"),
WorkerState::Throttled(t) => write!(f, "Thr:{:.3}", t), WorkerState::Throttled(_) => write!(f, "Busy*"),
WorkerState::Idle => write!(f, "Idle"), WorkerState::Idle => write!(f, "Idle"),
WorkerState::Done => write!(f, "Done"), WorkerState::Done => write!(f, "Done"),
} }
@ -37,8 +37,8 @@ impl std::fmt::Display for WorkerState {
pub trait Worker: Send { pub trait Worker: Send {
fn name(&self) -> String; fn name(&self) -> String;
fn info(&self) -> Option<String> { fn status(&self) -> WorkerStatus {
None Default::default()
} }
/// Work: do a basic unit of work, if one is available (otherwise, should return /// Work: do a basic unit of work, if one is available (otherwise, should return
@ -119,7 +119,7 @@ impl WorkerProcessor {
match wi.get_mut(&worker.task_id) { match wi.get_mut(&worker.task_id) {
Some(i) => { Some(i) => {
i.state = worker.state; i.state = worker.state;
i.info = worker.worker.info(); i.status = worker.worker.status();
i.errors = worker.errors; i.errors = worker.errors;
i.consecutive_errors = worker.consecutive_errors; i.consecutive_errors = worker.consecutive_errors;
if worker.last_error.is_some() { if worker.last_error.is_some() {
@ -130,7 +130,7 @@ impl WorkerProcessor {
wi.insert(worker.task_id, WorkerInfo { wi.insert(worker.task_id, WorkerInfo {
name: worker.worker.name(), name: worker.worker.name(),
state: worker.state, state: worker.state,
info: worker.worker.info(), status: worker.worker.status(),
errors: worker.errors, errors: worker.errors,
consecutive_errors: worker.consecutive_errors, consecutive_errors: worker.consecutive_errors,
last_error: worker.last_error.take(), last_error: worker.last_error.take(),

View file

@ -1,4 +1,4 @@
pub fn format_table(data: Vec<String>) { pub fn format_table_to_string(data: Vec<String>) -> String {
let data = data let data = data
.iter() .iter()
.map(|s| s.split('\t').collect::<Vec<_>>()) .map(|s| s.split('\t').collect::<Vec<_>>())
@ -24,5 +24,9 @@ pub fn format_table(data: Vec<String>) {
out.push('\n'); out.push('\n');
} }
print!("{}", out); out
}
pub fn format_table(data: Vec<String>) {
print!("{}", format_table_to_string(data));
} }