mirror of
synced 2025-03-13 22:12:40 +00:00
Prettier worker list table; remove useless CLI log messages
This commit is contained in:
11 changed files with 139 additions and 116 deletions
@ -53,7 +53,7 @@ impl Worker for RepairWorker {
"Block repair worker".into()
fn info(&self) -> Option<String> {
fn status(&self) -> WorkerStatus {
match self.block_iter.as_ref() {
None => {
let idx_bytes = self
@ -66,9 +66,17 @@ impl Worker for RepairWorker {
} else {
Some(format!("Phase 1: {}", hex::encode(idx_bytes)))
WorkerStatus {
progress: Some("Phase 1".into()),
freeform: vec![format!("Now at: {}", hex::encode(idx_bytes))],
Some(bi) => Some(format!("Phase 2: {:.2}% done", bi.progress() * 100.)),
Some(bi) => WorkerStatus {
progress: Some(format!("{:.2}%", bi.progress() * 100.)),
freeform: vec!["Phase 2".into()],
@ -271,29 +279,28 @@ impl Worker for ScrubWorker {
"Block scrub worker".into()
fn info(&self) -> Option<String> {
let s = match &self.work {
ScrubWorkerState::Running(bsi) => format!(
"{:.2}% done (tranquility = {})",
bsi.progress() * 100.,
ScrubWorkerState::Paused(bsi, rt) => {
"Paused, {:.2}% done, resumes at {}",
bsi.progress() * 100.,
ScrubWorkerState::Finished => format!(
"Last completed scrub: {}",
fn status(&self) -> WorkerStatus {
let mut s = WorkerStatus {
persistent_errors: Some(self.persisted.corruptions_detected),
tranquility: Some(self.persisted.tranquility),
"{} ; corruptions detected: {}",
s, self.persisted.corruptions_detected
match &self.work {
ScrubWorkerState::Running(bsi) => {
s.progress = Some(format!("{:.2}%", bsi.progress() * 100.));
ScrubWorkerState::Paused(bsi, rt) => {
s.progress = Some(format!("{:.2}%", bsi.progress() * 100.));
s.freeform = vec![format!("Paused, resumes at {}", msec_to_rfc3339(*rt))];
ScrubWorkerState::Finished => {
s.freeform = vec![format!(
"Completed {}",
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
@ -477,27 +477,22 @@ impl Worker for ResyncWorker {
format!("Block resync worker #{}", self.index + 1)
fn info(&self) -> Option<String> {
fn status(&self) -> WorkerStatus {
let persisted = self.manager.resync.persisted.load();
if self.index >= persisted.n_workers {
return Some("(unused)".into());
return WorkerStatus {
freeform: vec!["(unused)".into()],
let mut ret = vec![];
ret.push(format!("tranquility = {}", persisted.tranquility));
let qlen = self.manager.resync.queue_len().unwrap_or(0);
if qlen > 0 {
ret.push(format!("{} blocks in queue", qlen));
WorkerStatus {
queue_length: Some(self.manager.resync.queue_len().unwrap_or(0) as u64),
tranquility: Some(persisted.tranquility),
persistent_errors: Some(self.manager.resync.errors_len().unwrap_or(0) as u64),
let elen = self.manager.resync.errors_len().unwrap_or(0);
if elen > 0 {
ret.push(format!("{} blocks in error state", elen));
Some(ret.join(", "))
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
@ -254,7 +254,7 @@ pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
let mut table = vec![];
let mut table = vec!["TID\tState\tName\tTranq\tDone\tQueue\tErrors\tConsec\tLast".to_string()];
for (tid, info) in wi.iter() {
if wlo.busy && !matches!(info.state, WorkerState::Busy | WorkerState::Throttled(_)) {
@ -263,33 +263,38 @@ pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
table.push(format!("{}\t{}\t{}", tid, info.state, info.name));
if let Some(i) = &info.info {
table.push(format!("\t\t {}", i));
let tf = timeago::Formatter::new();
let (err_ago, err_msg) = info
let err_ago = info
.map(|(m, t)| {
tf.convert(Duration::from_millis(now_msec() - t)),
.unwrap_or(("(?) ago".into(), "(?)"));
if info.consecutive_errors > 0 {
"\t\t {} consecutive errors ({} total), last {}",
info.consecutive_errors, info.errors, err_ago,
table.push(format!("\t\t {}", err_msg));
} else if info.errors > 0 {
table.push(format!("\t\t ({} errors, last {})", info.errors, err_ago,));
if wlo.errors {
table.push(format!("\t\t {}", err_msg));
.map(|(_, t)| tf.convert(Duration::from_millis(now_msec() - t)))
let (total_err, consec_err) = if info.errors > 0 {
(info.errors.to_string(), info.consecutive_errors.to_string())
} else {
("-".into(), "-".into())
@ -127,9 +127,16 @@ async fn main() {
// Parse arguments and dispatch command line
let opt = Opt::from_clap(&Opt::clap().version(version.as_str()).get_matches());
// Initialize logging as well as other libraries used in Garage
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "netapp=info,garage=info")
let default_log = match &opt.cmd {
Command::Server => "netapp=info,garage=info",
_ => "netapp=warn,garage=warn",
std::env::set_var("RUST_LOG", default_log)
@ -137,9 +144,6 @@ async fn main() {
sodiumoxide::init().expect("Unable to init sodiumoxide");
// Parse arguments and dispatch command line
let opt = Opt::from_clap(&Opt::clap().version(version.as_str()).get_matches());
let res = match opt.cmd {
Command::Server => server::run_server(opt.config_file).await,
Command::OfflineRepair(repair_opt) => {
@ -182,9 +186,9 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, sk);
// Find and parse the address of the target host
let (id, addr) = if let Some(h) = opt.rpc_host {
let (id, addr, is_default_addr) = if let Some(h) = opt.rpc_host {
let (id, addrs) = parse_and_resolve_peer_addr(&h).ok_or_else(|| format!("Invalid RPC remote node identifier: {}. Expected format is <pubkey>@<IP or hostname>:<port>.", h))?;
(id, addrs[0])
(id, addrs[0], false)
} else {
let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir)
@ -195,24 +199,26 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
.ok_or_message("unable to resolve rpc_public_addr specified in config file")?
.ok_or_message("unable to resolve rpc_public_addr specified in config file")?;
(node_id, a)
(node_id, a, false)
} else {
let default_addr = SocketAddr::new(
"Trying to contact Garage node at default address {}",
warn!("If this doesn't work, consider adding rpc_public_addr in your config file or specifying the -h command line parameter.");
(node_id, default_addr)
(node_id, default_addr, true)
// Connect to target host
netapp.clone().try_connect(addr, id).await
.err_context("Unable to connect to destination RPC host. Check that you are using the same value of rpc_secret as them, and that you have their correct public key.")?;
if let Err(e) = netapp.clone().try_connect(addr, id).await {
if is_default_addr {
"Tried to contact Garage node at default address {}, which didn't work. If that address is wrong, consider setting rpc_public_addr in your config file.",
Err(e).err_context("Unable to connect to destination RPC host. Check that you are using the same value of rpc_secret as them, and that you have their correct public key.")?;
let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into());
let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into());
@ -85,8 +85,11 @@ impl Worker for RepairVersionsWorker {
"Version repair worker".into()
fn info(&self) -> Option<String> {
Some(format!("{} items done", self.counter))
fn status(&self) -> WorkerStatus {
WorkerStatus {
progress: Some(self.counter.to_string()),
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
@ -163,8 +166,11 @@ impl Worker for RepairBlockrefsWorker {
"Block refs repair worker".into()
fn info(&self) -> Option<String> {
Some(format!("{} items done", self.counter))
fn status(&self) -> WorkerStatus {
WorkerStatus {
progress: Some(self.counter.to_string()),
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
@ -404,14 +404,13 @@ impl<T: CountedItem> IndexPropagatorWorker<T> {
impl<T: CountedItem> Worker for IndexPropagatorWorker<T> {
fn name(&self) -> String {
format!("{} index counter propagator", T::COUNTER_TABLE_NAME)
format!("{} counter", T::COUNTER_TABLE_NAME)
fn info(&self) -> Option<String> {
if !self.buf.is_empty() {
Some(format!("{} items in queue", self.buf.len()))
} else {
fn status(&self) -> WorkerStatus {
WorkerStatus {
queue_length: Some(self.buf.len() as u64),
@ -330,12 +330,10 @@ where
format!("{} GC", F::TABLE_NAME)
fn info(&self) -> Option<String> {
let l = self.gc.data.gc_todo_len().unwrap_or(0);
if l > 0 {
Some(format!("{} items in queue", l))
} else {
fn status(&self) -> WorkerStatus {
WorkerStatus {
queue_length: Some(self.gc.data.gc_todo_len().unwrap_or(0) as u64),
@ -310,15 +310,13 @@ where
R: TableReplication + 'static,
fn name(&self) -> String {
format!("{} Merkle tree updater", F::TABLE_NAME)
format!("{} Merkle", F::TABLE_NAME)
fn info(&self) -> Option<String> {
let l = self.0.todo_len().unwrap_or(0);
if l > 0 {
Some(format!("{} items in queue", l))
} else {
fn status(&self) -> WorkerStatus {
WorkerStatus {
queue_length: Some(self.0.todo_len().unwrap_or(0) as u64),
@ -570,12 +570,10 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWor
format!("{} sync", F::TABLE_NAME)
fn info(&self) -> Option<String> {
let l = self.todo.len();
if l > 0 {
Some(format!("{} partitions remaining", l))
} else {
fn status(&self) -> WorkerStatus {
WorkerStatus {
queue_length: Some(self.todo.len() as u64),
@ -29,13 +29,24 @@ pub struct BackgroundRunner {
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct WorkerInfo {
pub name: String,
pub info: Option<String>,
pub status: WorkerStatus,
pub state: WorkerState,
pub errors: usize,
pub consecutive_errors: usize,
pub last_error: Option<(String, u64)>,
/// WorkerStatus is a struct returned by the worker with a bunch of canonical
/// fields to indicate their status to CLI users. All fields are optional.
#[derive(Clone, Serialize, Deserialize, Debug, Default)]
pub struct WorkerStatus {
pub tranquility: Option<u32>,
pub progress: Option<String>,
pub queue_length: Option<u64>,
pub persistent_errors: Option<u64>,
pub freeform: Vec<String>,
impl BackgroundRunner {
/// Create a new BackgroundRunner
pub fn new(
@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize};
use tokio::select;
use tokio::sync::{mpsc, watch};
use crate::background::WorkerInfo;
use crate::background::{WorkerInfo, WorkerStatus};
use crate::error::Error;
use crate::time::now_msec;
@ -26,7 +26,7 @@ impl std::fmt::Display for WorkerState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WorkerState::Busy => write!(f, "Busy"),
WorkerState::Throttled(t) => write!(f, "Thr:{:.3}", t),
WorkerState::Throttled(_) => write!(f, "Busy*"),
WorkerState::Idle => write!(f, "Idle"),
WorkerState::Done => write!(f, "Done"),
@ -37,8 +37,8 @@ impl std::fmt::Display for WorkerState {
pub trait Worker: Send {
fn name(&self) -> String;
fn info(&self) -> Option<String> {
fn status(&self) -> WorkerStatus {
/// Work: do a basic unit of work, if one is available (otherwise, should return
@ -119,7 +119,7 @@ impl WorkerProcessor {
match wi.get_mut(&worker.task_id) {
Some(i) => {
i.state = worker.state;
i.info = worker.worker.info();
i.status = worker.worker.status();
i.errors = worker.errors;
i.consecutive_errors = worker.consecutive_errors;
if worker.last_error.is_some() {
@ -130,7 +130,7 @@ impl WorkerProcessor {
wi.insert(worker.task_id, WorkerInfo {
name: worker.worker.name(),
state: worker.state,
info: worker.worker.info(),
status: worker.worker.status(),
errors: worker.errors,
consecutive_errors: worker.consecutive_errors,
last_error: worker.last_error.take(),
Reference in a new issue