mirror of
https://git.deuxfleurs.fr/Deuxfleurs/garage.git
synced 2025-04-09 10:54:10 +00:00
admin api: move functions to their correct location
This commit is contained in:
parent
85a07c87d7
commit
576d0d950e
4 changed files with 513 additions and 501 deletions
|
@ -1,11 +1,13 @@
|
|||
use std::collections::HashMap;
|
||||
use std::fmt::Write;
|
||||
use std::sync::Arc;
|
||||
|
||||
use garage_util::crdt::*;
|
||||
use format_table::format_table_to_string;
|
||||
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::Error as GarageError;
|
||||
|
||||
use garage_rpc::layout;
|
||||
use garage_rpc::layout::PARTITION_BITS;
|
||||
|
||||
use garage_model::garage::Garage;
|
||||
|
||||
|
@ -140,6 +142,108 @@ impl RequestHandler for GetClusterHealthRequest {
|
|||
}
|
||||
}
|
||||
|
||||
impl RequestHandler for GetClusterStatisticsRequest {
|
||||
type Response = GetClusterStatisticsResponse;
|
||||
|
||||
// FIXME: return this as a JSON struct instead of text
|
||||
async fn handle(
|
||||
self,
|
||||
garage: &Arc<Garage>,
|
||||
_admin: &Admin,
|
||||
) -> Result<GetClusterStatisticsResponse, Error> {
|
||||
let mut ret = String::new();
|
||||
|
||||
// Gather storage node and free space statistics for current nodes
|
||||
let layout = &garage.system.cluster_layout();
|
||||
let mut node_partition_count = HashMap::<Uuid, u64>::new();
|
||||
for short_id in layout.current().ring_assignment_data.iter() {
|
||||
let id = layout.current().node_id_vec[*short_id as usize];
|
||||
*node_partition_count.entry(id).or_default() += 1;
|
||||
}
|
||||
let node_info = garage
|
||||
.system
|
||||
.get_known_nodes()
|
||||
.into_iter()
|
||||
.map(|n| (n.id, n))
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()];
|
||||
for (id, parts) in node_partition_count.iter() {
|
||||
let info = node_info.get(id);
|
||||
let status = info.map(|x| &x.status);
|
||||
let role = layout.current().roles.get(id).and_then(|x| x.0.as_ref());
|
||||
let hostname = status.and_then(|x| x.hostname.as_deref()).unwrap_or("?");
|
||||
let zone = role.map(|x| x.zone.as_str()).unwrap_or("?");
|
||||
let capacity = role
|
||||
.map(|x| x.capacity_string())
|
||||
.unwrap_or_else(|| "?".into());
|
||||
let avail_str = |x| match x {
|
||||
Some((avail, total)) => {
|
||||
let pct = (avail as f64) / (total as f64) * 100.;
|
||||
let avail = bytesize::ByteSize::b(avail);
|
||||
let total = bytesize::ByteSize::b(total);
|
||||
format!("{}/{} ({:.1}%)", avail, total, pct)
|
||||
}
|
||||
None => "?".into(),
|
||||
};
|
||||
let data_avail = avail_str(status.and_then(|x| x.data_disk_avail));
|
||||
let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail));
|
||||
table.push(format!(
|
||||
" {:?}\t{}\t{}\t{}\t{}\t{}\t{}",
|
||||
id, hostname, zone, capacity, parts, data_avail, meta_avail
|
||||
));
|
||||
}
|
||||
write!(
|
||||
&mut ret,
|
||||
"Storage nodes:\n{}",
|
||||
format_table_to_string(table)
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let meta_part_avail = node_partition_count
|
||||
.iter()
|
||||
.filter_map(|(id, parts)| {
|
||||
node_info
|
||||
.get(id)
|
||||
.and_then(|x| x.status.meta_disk_avail)
|
||||
.map(|c| c.0 / *parts)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let data_part_avail = node_partition_count
|
||||
.iter()
|
||||
.filter_map(|(id, parts)| {
|
||||
node_info
|
||||
.get(id)
|
||||
.and_then(|x| x.status.data_disk_avail)
|
||||
.map(|c| c.0 / *parts)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
if !meta_part_avail.is_empty() && !data_part_avail.is_empty() {
|
||||
let meta_avail =
|
||||
bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
|
||||
let data_avail =
|
||||
bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
|
||||
writeln!(
|
||||
&mut ret,
|
||||
"\nEstimated available storage space cluster-wide (might be lower in practice):"
|
||||
)
|
||||
.unwrap();
|
||||
if meta_part_avail.len() < node_partition_count.len()
|
||||
|| data_part_avail.len() < node_partition_count.len()
|
||||
{
|
||||
writeln!(&mut ret, " data: < {}", data_avail).unwrap();
|
||||
writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap();
|
||||
writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap();
|
||||
} else {
|
||||
writeln!(&mut ret, " data: {}", data_avail).unwrap();
|
||||
writeln!(&mut ret, " metadata: {}", meta_avail).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(GetClusterStatisticsResponse { freeform: ret })
|
||||
}
|
||||
}
|
||||
|
||||
impl RequestHandler for ConnectClusterNodesRequest {
|
||||
type Response = ConnectClusterNodesResponse;
|
||||
|
||||
|
@ -165,396 +269,3 @@ impl RequestHandler for ConnectClusterNodesRequest {
|
|||
Ok(ConnectClusterNodesResponse(res))
|
||||
}
|
||||
}
|
||||
|
||||
impl RequestHandler for GetClusterLayoutRequest {
|
||||
type Response = GetClusterLayoutResponse;
|
||||
|
||||
async fn handle(
|
||||
self,
|
||||
garage: &Arc<Garage>,
|
||||
_admin: &Admin,
|
||||
) -> Result<GetClusterLayoutResponse, Error> {
|
||||
Ok(format_cluster_layout(
|
||||
garage.system.cluster_layout().inner(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResponse {
|
||||
let current = layout.current();
|
||||
|
||||
let roles = current
|
||||
.roles
|
||||
.items()
|
||||
.iter()
|
||||
.filter_map(|(k, _, v)| v.0.clone().map(|x| (k, x)))
|
||||
.map(|(k, v)| {
|
||||
let stored_partitions = current.get_node_usage(k).ok().map(|x| x as u64);
|
||||
LayoutNodeRole {
|
||||
id: hex::encode(k),
|
||||
zone: v.zone.clone(),
|
||||
capacity: v.capacity,
|
||||
stored_partitions,
|
||||
usable_capacity: stored_partitions.map(|x| x * current.partition_size),
|
||||
tags: v.tags.clone(),
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let staged_role_changes = layout
|
||||
.staging
|
||||
.get()
|
||||
.roles
|
||||
.items()
|
||||
.iter()
|
||||
.filter(|(k, _, v)| current.roles.get(k) != Some(v))
|
||||
.map(|(k, _, v)| match &v.0 {
|
||||
None => NodeRoleChange {
|
||||
id: hex::encode(k),
|
||||
action: NodeRoleChangeEnum::Remove { remove: true },
|
||||
},
|
||||
Some(r) => NodeRoleChange {
|
||||
id: hex::encode(k),
|
||||
action: NodeRoleChangeEnum::Update(NodeAssignedRole {
|
||||
zone: r.zone.clone(),
|
||||
capacity: r.capacity,
|
||||
tags: r.tags.clone(),
|
||||
}),
|
||||
},
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let staged_parameters = if *layout.staging.get().parameters.get() != current.parameters {
|
||||
Some((*layout.staging.get().parameters.get()).into())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
GetClusterLayoutResponse {
|
||||
version: current.version,
|
||||
roles,
|
||||
partition_size: current.partition_size,
|
||||
parameters: current.parameters.into(),
|
||||
staged_role_changes,
|
||||
staged_parameters,
|
||||
}
|
||||
}
|
||||
|
||||
impl RequestHandler for GetClusterLayoutHistoryRequest {
|
||||
type Response = GetClusterLayoutHistoryResponse;
|
||||
|
||||
async fn handle(
|
||||
self,
|
||||
garage: &Arc<Garage>,
|
||||
_admin: &Admin,
|
||||
) -> Result<GetClusterLayoutHistoryResponse, Error> {
|
||||
let layout_helper = garage.system.cluster_layout();
|
||||
let layout = layout_helper.inner();
|
||||
let min_stored = layout.min_stored();
|
||||
|
||||
let versions = layout
|
||||
.versions
|
||||
.iter()
|
||||
.rev()
|
||||
.chain(layout.old_versions.iter().rev())
|
||||
.map(|ver| {
|
||||
let status = if ver.version == layout.current().version {
|
||||
ClusterLayoutVersionStatus::Current
|
||||
} else if ver.version >= min_stored {
|
||||
ClusterLayoutVersionStatus::Draining
|
||||
} else {
|
||||
ClusterLayoutVersionStatus::Historical
|
||||
};
|
||||
ClusterLayoutVersion {
|
||||
version: ver.version,
|
||||
status,
|
||||
storage_nodes: ver
|
||||
.roles
|
||||
.items()
|
||||
.iter()
|
||||
.filter(
|
||||
|(_, _, x)| matches!(x, layout::NodeRoleV(Some(c)) if c.capacity.is_some()),
|
||||
)
|
||||
.count() as u64,
|
||||
gateway_nodes: ver
|
||||
.roles
|
||||
.items()
|
||||
.iter()
|
||||
.filter(
|
||||
|(_, _, x)| matches!(x, layout::NodeRoleV(Some(c)) if c.capacity.is_none()),
|
||||
)
|
||||
.count() as u64,
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let all_nodes = layout.get_all_nodes();
|
||||
let min_ack = layout_helper.ack_map_min();
|
||||
|
||||
let update_trackers = if layout.versions.len() > 1 {
|
||||
Some(
|
||||
all_nodes
|
||||
.iter()
|
||||
.map(|node| {
|
||||
(
|
||||
hex::encode(&node),
|
||||
NodeUpdateTrackers {
|
||||
ack: layout.update_trackers.ack_map.get(node, min_stored),
|
||||
sync: layout.update_trackers.sync_map.get(node, min_stored),
|
||||
sync_ack: layout.update_trackers.sync_ack_map.get(node, min_stored),
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect(),
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(GetClusterLayoutHistoryResponse {
|
||||
current_version: layout.current().version,
|
||||
min_ack,
|
||||
versions,
|
||||
update_trackers,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// ----
|
||||
|
||||
// ---- update functions ----
|
||||
|
||||
impl RequestHandler for UpdateClusterLayoutRequest {
|
||||
type Response = UpdateClusterLayoutResponse;
|
||||
|
||||
async fn handle(
|
||||
self,
|
||||
garage: &Arc<Garage>,
|
||||
_admin: &Admin,
|
||||
) -> Result<UpdateClusterLayoutResponse, Error> {
|
||||
let mut layout = garage.system.cluster_layout().inner().clone();
|
||||
|
||||
let mut roles = layout.current().roles.clone();
|
||||
roles.merge(&layout.staging.get().roles);
|
||||
|
||||
for change in self.roles {
|
||||
let node = hex::decode(&change.id).ok_or_bad_request("Invalid node identifier")?;
|
||||
let node = Uuid::try_from(&node).ok_or_bad_request("Invalid node identifier")?;
|
||||
|
||||
let new_role = match change.action {
|
||||
NodeRoleChangeEnum::Remove { remove: true } => None,
|
||||
NodeRoleChangeEnum::Update(NodeAssignedRole {
|
||||
zone,
|
||||
capacity,
|
||||
tags,
|
||||
}) => {
|
||||
if matches!(capacity, Some(cap) if cap < 1024) {
|
||||
return Err(Error::bad_request("Capacity should be at least 1K (1024)"));
|
||||
}
|
||||
Some(layout::NodeRole {
|
||||
zone,
|
||||
capacity,
|
||||
tags,
|
||||
})
|
||||
}
|
||||
_ => return Err(Error::bad_request("Invalid layout change")),
|
||||
};
|
||||
|
||||
layout
|
||||
.staging
|
||||
.get_mut()
|
||||
.roles
|
||||
.merge(&roles.update_mutator(node, layout::NodeRoleV(new_role)));
|
||||
}
|
||||
|
||||
if let Some(param) = self.parameters {
|
||||
if let ZoneRedundancy::AtLeast(r_int) = param.zone_redundancy {
|
||||
if r_int > layout.current().replication_factor {
|
||||
return Err(Error::bad_request(format!(
|
||||
"The zone redundancy must be smaller or equal to the replication factor ({}).",
|
||||
layout.current().replication_factor
|
||||
)));
|
||||
} else if r_int < 1 {
|
||||
return Err(Error::bad_request(
|
||||
"The zone redundancy must be at least 1.",
|
||||
));
|
||||
}
|
||||
}
|
||||
layout.staging.get_mut().parameters.update(param.into());
|
||||
}
|
||||
|
||||
garage
|
||||
.system
|
||||
.layout_manager
|
||||
.update_cluster_layout(&layout)
|
||||
.await?;
|
||||
|
||||
let res = format_cluster_layout(&layout);
|
||||
Ok(UpdateClusterLayoutResponse(res))
|
||||
}
|
||||
}
|
||||
|
||||
impl RequestHandler for PreviewClusterLayoutChangesRequest {
|
||||
type Response = PreviewClusterLayoutChangesResponse;
|
||||
|
||||
async fn handle(
|
||||
self,
|
||||
garage: &Arc<Garage>,
|
||||
_admin: &Admin,
|
||||
) -> Result<PreviewClusterLayoutChangesResponse, Error> {
|
||||
let layout = garage.system.cluster_layout().inner().clone();
|
||||
let new_ver = layout.current().version + 1;
|
||||
match layout.apply_staged_changes(new_ver) {
|
||||
Err(GarageError::Message(error)) => {
|
||||
Ok(PreviewClusterLayoutChangesResponse::Error { error })
|
||||
}
|
||||
Err(e) => Err(e.into()),
|
||||
Ok((new_layout, msg)) => Ok(PreviewClusterLayoutChangesResponse::Success {
|
||||
message: msg,
|
||||
new_layout: format_cluster_layout(&new_layout),
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RequestHandler for ApplyClusterLayoutRequest {
|
||||
type Response = ApplyClusterLayoutResponse;
|
||||
|
||||
async fn handle(
|
||||
self,
|
||||
garage: &Arc<Garage>,
|
||||
_admin: &Admin,
|
||||
) -> Result<ApplyClusterLayoutResponse, Error> {
|
||||
let layout = garage.system.cluster_layout().inner().clone();
|
||||
let (layout, msg) = layout.apply_staged_changes(self.version)?;
|
||||
|
||||
garage
|
||||
.system
|
||||
.layout_manager
|
||||
.update_cluster_layout(&layout)
|
||||
.await?;
|
||||
|
||||
Ok(ApplyClusterLayoutResponse {
|
||||
message: msg,
|
||||
layout: format_cluster_layout(&layout),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl RequestHandler for RevertClusterLayoutRequest {
|
||||
type Response = RevertClusterLayoutResponse;
|
||||
|
||||
async fn handle(
|
||||
self,
|
||||
garage: &Arc<Garage>,
|
||||
_admin: &Admin,
|
||||
) -> Result<RevertClusterLayoutResponse, Error> {
|
||||
let layout = garage.system.cluster_layout().inner().clone();
|
||||
let layout = layout.revert_staged_changes()?;
|
||||
garage
|
||||
.system
|
||||
.layout_manager
|
||||
.update_cluster_layout(&layout)
|
||||
.await?;
|
||||
|
||||
let res = format_cluster_layout(&layout);
|
||||
Ok(RevertClusterLayoutResponse(res))
|
||||
}
|
||||
}
|
||||
|
||||
impl RequestHandler for ClusterLayoutSkipDeadNodesRequest {
|
||||
type Response = ClusterLayoutSkipDeadNodesResponse;
|
||||
|
||||
async fn handle(
|
||||
self,
|
||||
garage: &Arc<Garage>,
|
||||
_admin: &Admin,
|
||||
) -> Result<ClusterLayoutSkipDeadNodesResponse, Error> {
|
||||
let status = garage.system.get_known_nodes();
|
||||
|
||||
let mut layout = garage.system.cluster_layout().inner().clone();
|
||||
let mut ack_updated = vec![];
|
||||
let mut sync_updated = vec![];
|
||||
|
||||
if layout.versions.len() == 1 {
|
||||
return Err(Error::bad_request(
|
||||
"This command cannot be called when there is only one live cluster layout version",
|
||||
));
|
||||
}
|
||||
|
||||
let min_v = layout.min_stored();
|
||||
if self.version <= min_v || self.version > layout.current().version {
|
||||
return Err(Error::bad_request(format!(
|
||||
"Invalid version, you may use the following version numbers: {}",
|
||||
(min_v + 1..=layout.current().version)
|
||||
.map(|x| x.to_string())
|
||||
.collect::<Vec<_>>()
|
||||
.join(" ")
|
||||
)));
|
||||
}
|
||||
|
||||
let all_nodes = layout.get_all_nodes();
|
||||
for node in all_nodes.iter() {
|
||||
// Update ACK tracker for dead nodes or for all nodes if --allow-missing-data
|
||||
if self.allow_missing_data || !status.iter().any(|x| x.id == *node && x.is_up) {
|
||||
if layout.update_trackers.ack_map.set_max(*node, self.version) {
|
||||
ack_updated.push(hex::encode(node));
|
||||
}
|
||||
}
|
||||
|
||||
// If --allow-missing-data, update SYNC tracker for all nodes.
|
||||
if self.allow_missing_data {
|
||||
if layout.update_trackers.sync_map.set_max(*node, self.version) {
|
||||
sync_updated.push(hex::encode(node));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
garage
|
||||
.system
|
||||
.layout_manager
|
||||
.update_cluster_layout(&layout)
|
||||
.await?;
|
||||
|
||||
Ok(ClusterLayoutSkipDeadNodesResponse {
|
||||
ack_updated,
|
||||
sync_updated,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// ----
|
||||
|
||||
impl From<layout::ZoneRedundancy> for ZoneRedundancy {
|
||||
fn from(x: layout::ZoneRedundancy) -> Self {
|
||||
match x {
|
||||
layout::ZoneRedundancy::Maximum => ZoneRedundancy::Maximum,
|
||||
layout::ZoneRedundancy::AtLeast(x) => ZoneRedundancy::AtLeast(x),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Into<layout::ZoneRedundancy> for ZoneRedundancy {
|
||||
fn into(self) -> layout::ZoneRedundancy {
|
||||
match self {
|
||||
ZoneRedundancy::Maximum => layout::ZoneRedundancy::Maximum,
|
||||
ZoneRedundancy::AtLeast(x) => layout::ZoneRedundancy::AtLeast(x),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<layout::LayoutParameters> for LayoutParameters {
|
||||
fn from(x: layout::LayoutParameters) -> Self {
|
||||
LayoutParameters {
|
||||
zone_redundancy: x.zone_redundancy.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Into<layout::LayoutParameters> for LayoutParameters {
|
||||
fn into(self) -> layout::LayoutParameters {
|
||||
layout::LayoutParameters {
|
||||
zone_redundancy: self.zone_redundancy.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
406
src/api/admin/layout.rs
Normal file
406
src/api/admin/layout.rs
Normal file
|
@ -0,0 +1,406 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use garage_util::crdt::*;
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::Error as GarageError;
|
||||
|
||||
use garage_rpc::layout;
|
||||
|
||||
use garage_model::garage::Garage;
|
||||
|
||||
use crate::api::*;
|
||||
use crate::error::*;
|
||||
use crate::{Admin, RequestHandler};
|
||||
|
||||
impl RequestHandler for GetClusterLayoutRequest {
|
||||
type Response = GetClusterLayoutResponse;
|
||||
|
||||
async fn handle(
|
||||
self,
|
||||
garage: &Arc<Garage>,
|
||||
_admin: &Admin,
|
||||
) -> Result<GetClusterLayoutResponse, Error> {
|
||||
Ok(format_cluster_layout(
|
||||
garage.system.cluster_layout().inner(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResponse {
|
||||
let current = layout.current();
|
||||
|
||||
let roles = current
|
||||
.roles
|
||||
.items()
|
||||
.iter()
|
||||
.filter_map(|(k, _, v)| v.0.clone().map(|x| (k, x)))
|
||||
.map(|(k, v)| {
|
||||
let stored_partitions = current.get_node_usage(k).ok().map(|x| x as u64);
|
||||
LayoutNodeRole {
|
||||
id: hex::encode(k),
|
||||
zone: v.zone.clone(),
|
||||
capacity: v.capacity,
|
||||
stored_partitions,
|
||||
usable_capacity: stored_partitions.map(|x| x * current.partition_size),
|
||||
tags: v.tags.clone(),
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let staged_role_changes = layout
|
||||
.staging
|
||||
.get()
|
||||
.roles
|
||||
.items()
|
||||
.iter()
|
||||
.filter(|(k, _, v)| current.roles.get(k) != Some(v))
|
||||
.map(|(k, _, v)| match &v.0 {
|
||||
None => NodeRoleChange {
|
||||
id: hex::encode(k),
|
||||
action: NodeRoleChangeEnum::Remove { remove: true },
|
||||
},
|
||||
Some(r) => NodeRoleChange {
|
||||
id: hex::encode(k),
|
||||
action: NodeRoleChangeEnum::Update(NodeAssignedRole {
|
||||
zone: r.zone.clone(),
|
||||
capacity: r.capacity,
|
||||
tags: r.tags.clone(),
|
||||
}),
|
||||
},
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let staged_parameters = if *layout.staging.get().parameters.get() != current.parameters {
|
||||
Some((*layout.staging.get().parameters.get()).into())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
GetClusterLayoutResponse {
|
||||
version: current.version,
|
||||
roles,
|
||||
partition_size: current.partition_size,
|
||||
parameters: current.parameters.into(),
|
||||
staged_role_changes,
|
||||
staged_parameters,
|
||||
}
|
||||
}
|
||||
|
||||
impl RequestHandler for GetClusterLayoutHistoryRequest {
|
||||
type Response = GetClusterLayoutHistoryResponse;
|
||||
|
||||
async fn handle(
|
||||
self,
|
||||
garage: &Arc<Garage>,
|
||||
_admin: &Admin,
|
||||
) -> Result<GetClusterLayoutHistoryResponse, Error> {
|
||||
let layout_helper = garage.system.cluster_layout();
|
||||
let layout = layout_helper.inner();
|
||||
let min_stored = layout.min_stored();
|
||||
|
||||
let versions = layout
|
||||
.versions
|
||||
.iter()
|
||||
.rev()
|
||||
.chain(layout.old_versions.iter().rev())
|
||||
.map(|ver| {
|
||||
let status = if ver.version == layout.current().version {
|
||||
ClusterLayoutVersionStatus::Current
|
||||
} else if ver.version >= min_stored {
|
||||
ClusterLayoutVersionStatus::Draining
|
||||
} else {
|
||||
ClusterLayoutVersionStatus::Historical
|
||||
};
|
||||
ClusterLayoutVersion {
|
||||
version: ver.version,
|
||||
status,
|
||||
storage_nodes: ver
|
||||
.roles
|
||||
.items()
|
||||
.iter()
|
||||
.filter(
|
||||
|(_, _, x)| matches!(x, layout::NodeRoleV(Some(c)) if c.capacity.is_some()),
|
||||
)
|
||||
.count() as u64,
|
||||
gateway_nodes: ver
|
||||
.roles
|
||||
.items()
|
||||
.iter()
|
||||
.filter(
|
||||
|(_, _, x)| matches!(x, layout::NodeRoleV(Some(c)) if c.capacity.is_none()),
|
||||
)
|
||||
.count() as u64,
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let all_nodes = layout.get_all_nodes();
|
||||
let min_ack = layout_helper.ack_map_min();
|
||||
|
||||
let update_trackers = if layout.versions.len() > 1 {
|
||||
Some(
|
||||
all_nodes
|
||||
.iter()
|
||||
.map(|node| {
|
||||
(
|
||||
hex::encode(&node),
|
||||
NodeUpdateTrackers {
|
||||
ack: layout.update_trackers.ack_map.get(node, min_stored),
|
||||
sync: layout.update_trackers.sync_map.get(node, min_stored),
|
||||
sync_ack: layout.update_trackers.sync_ack_map.get(node, min_stored),
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect(),
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(GetClusterLayoutHistoryResponse {
|
||||
current_version: layout.current().version,
|
||||
min_ack,
|
||||
versions,
|
||||
update_trackers,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// ----
|
||||
|
||||
// ---- update functions ----
|
||||
|
||||
impl RequestHandler for UpdateClusterLayoutRequest {
|
||||
type Response = UpdateClusterLayoutResponse;
|
||||
|
||||
async fn handle(
|
||||
self,
|
||||
garage: &Arc<Garage>,
|
||||
_admin: &Admin,
|
||||
) -> Result<UpdateClusterLayoutResponse, Error> {
|
||||
let mut layout = garage.system.cluster_layout().inner().clone();
|
||||
|
||||
let mut roles = layout.current().roles.clone();
|
||||
roles.merge(&layout.staging.get().roles);
|
||||
|
||||
for change in self.roles {
|
||||
let node = hex::decode(&change.id).ok_or_bad_request("Invalid node identifier")?;
|
||||
let node = Uuid::try_from(&node).ok_or_bad_request("Invalid node identifier")?;
|
||||
|
||||
let new_role = match change.action {
|
||||
NodeRoleChangeEnum::Remove { remove: true } => None,
|
||||
NodeRoleChangeEnum::Update(NodeAssignedRole {
|
||||
zone,
|
||||
capacity,
|
||||
tags,
|
||||
}) => {
|
||||
if matches!(capacity, Some(cap) if cap < 1024) {
|
||||
return Err(Error::bad_request("Capacity should be at least 1K (1024)"));
|
||||
}
|
||||
Some(layout::NodeRole {
|
||||
zone,
|
||||
capacity,
|
||||
tags,
|
||||
})
|
||||
}
|
||||
_ => return Err(Error::bad_request("Invalid layout change")),
|
||||
};
|
||||
|
||||
layout
|
||||
.staging
|
||||
.get_mut()
|
||||
.roles
|
||||
.merge(&roles.update_mutator(node, layout::NodeRoleV(new_role)));
|
||||
}
|
||||
|
||||
if let Some(param) = self.parameters {
|
||||
if let ZoneRedundancy::AtLeast(r_int) = param.zone_redundancy {
|
||||
if r_int > layout.current().replication_factor {
|
||||
return Err(Error::bad_request(format!(
|
||||
"The zone redundancy must be smaller or equal to the replication factor ({}).",
|
||||
layout.current().replication_factor
|
||||
)));
|
||||
} else if r_int < 1 {
|
||||
return Err(Error::bad_request(
|
||||
"The zone redundancy must be at least 1.",
|
||||
));
|
||||
}
|
||||
}
|
||||
layout.staging.get_mut().parameters.update(param.into());
|
||||
}
|
||||
|
||||
garage
|
||||
.system
|
||||
.layout_manager
|
||||
.update_cluster_layout(&layout)
|
||||
.await?;
|
||||
|
||||
let res = format_cluster_layout(&layout);
|
||||
Ok(UpdateClusterLayoutResponse(res))
|
||||
}
|
||||
}
|
||||
|
||||
impl RequestHandler for PreviewClusterLayoutChangesRequest {
|
||||
type Response = PreviewClusterLayoutChangesResponse;
|
||||
|
||||
async fn handle(
|
||||
self,
|
||||
garage: &Arc<Garage>,
|
||||
_admin: &Admin,
|
||||
) -> Result<PreviewClusterLayoutChangesResponse, Error> {
|
||||
let layout = garage.system.cluster_layout().inner().clone();
|
||||
let new_ver = layout.current().version + 1;
|
||||
match layout.apply_staged_changes(new_ver) {
|
||||
Err(GarageError::Message(error)) => {
|
||||
Ok(PreviewClusterLayoutChangesResponse::Error { error })
|
||||
}
|
||||
Err(e) => Err(e.into()),
|
||||
Ok((new_layout, msg)) => Ok(PreviewClusterLayoutChangesResponse::Success {
|
||||
message: msg,
|
||||
new_layout: format_cluster_layout(&new_layout),
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RequestHandler for ApplyClusterLayoutRequest {
|
||||
type Response = ApplyClusterLayoutResponse;
|
||||
|
||||
async fn handle(
|
||||
self,
|
||||
garage: &Arc<Garage>,
|
||||
_admin: &Admin,
|
||||
) -> Result<ApplyClusterLayoutResponse, Error> {
|
||||
let layout = garage.system.cluster_layout().inner().clone();
|
||||
let (layout, msg) = layout.apply_staged_changes(self.version)?;
|
||||
|
||||
garage
|
||||
.system
|
||||
.layout_manager
|
||||
.update_cluster_layout(&layout)
|
||||
.await?;
|
||||
|
||||
Ok(ApplyClusterLayoutResponse {
|
||||
message: msg,
|
||||
layout: format_cluster_layout(&layout),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl RequestHandler for RevertClusterLayoutRequest {
|
||||
type Response = RevertClusterLayoutResponse;
|
||||
|
||||
async fn handle(
|
||||
self,
|
||||
garage: &Arc<Garage>,
|
||||
_admin: &Admin,
|
||||
) -> Result<RevertClusterLayoutResponse, Error> {
|
||||
let layout = garage.system.cluster_layout().inner().clone();
|
||||
let layout = layout.revert_staged_changes()?;
|
||||
garage
|
||||
.system
|
||||
.layout_manager
|
||||
.update_cluster_layout(&layout)
|
||||
.await?;
|
||||
|
||||
let res = format_cluster_layout(&layout);
|
||||
Ok(RevertClusterLayoutResponse(res))
|
||||
}
|
||||
}
|
||||
|
||||
impl RequestHandler for ClusterLayoutSkipDeadNodesRequest {
|
||||
type Response = ClusterLayoutSkipDeadNodesResponse;
|
||||
|
||||
async fn handle(
|
||||
self,
|
||||
garage: &Arc<Garage>,
|
||||
_admin: &Admin,
|
||||
) -> Result<ClusterLayoutSkipDeadNodesResponse, Error> {
|
||||
let status = garage.system.get_known_nodes();
|
||||
|
||||
let mut layout = garage.system.cluster_layout().inner().clone();
|
||||
let mut ack_updated = vec![];
|
||||
let mut sync_updated = vec![];
|
||||
|
||||
if layout.versions.len() == 1 {
|
||||
return Err(Error::bad_request(
|
||||
"This command cannot be called when there is only one live cluster layout version",
|
||||
));
|
||||
}
|
||||
|
||||
let min_v = layout.min_stored();
|
||||
if self.version <= min_v || self.version > layout.current().version {
|
||||
return Err(Error::bad_request(format!(
|
||||
"Invalid version, you may use the following version numbers: {}",
|
||||
(min_v + 1..=layout.current().version)
|
||||
.map(|x| x.to_string())
|
||||
.collect::<Vec<_>>()
|
||||
.join(" ")
|
||||
)));
|
||||
}
|
||||
|
||||
let all_nodes = layout.get_all_nodes();
|
||||
for node in all_nodes.iter() {
|
||||
// Update ACK tracker for dead nodes or for all nodes if --allow-missing-data
|
||||
if self.allow_missing_data || !status.iter().any(|x| x.id == *node && x.is_up) {
|
||||
if layout.update_trackers.ack_map.set_max(*node, self.version) {
|
||||
ack_updated.push(hex::encode(node));
|
||||
}
|
||||
}
|
||||
|
||||
// If --allow-missing-data, update SYNC tracker for all nodes.
|
||||
if self.allow_missing_data {
|
||||
if layout.update_trackers.sync_map.set_max(*node, self.version) {
|
||||
sync_updated.push(hex::encode(node));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
garage
|
||||
.system
|
||||
.layout_manager
|
||||
.update_cluster_layout(&layout)
|
||||
.await?;
|
||||
|
||||
Ok(ClusterLayoutSkipDeadNodesResponse {
|
||||
ack_updated,
|
||||
sync_updated,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// ----
|
||||
|
||||
impl From<layout::ZoneRedundancy> for ZoneRedundancy {
|
||||
fn from(x: layout::ZoneRedundancy) -> Self {
|
||||
match x {
|
||||
layout::ZoneRedundancy::Maximum => ZoneRedundancy::Maximum,
|
||||
layout::ZoneRedundancy::AtLeast(x) => ZoneRedundancy::AtLeast(x),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Into<layout::ZoneRedundancy> for ZoneRedundancy {
|
||||
fn into(self) -> layout::ZoneRedundancy {
|
||||
match self {
|
||||
ZoneRedundancy::Maximum => layout::ZoneRedundancy::Maximum,
|
||||
ZoneRedundancy::AtLeast(x) => layout::ZoneRedundancy::AtLeast(x),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<layout::LayoutParameters> for LayoutParameters {
|
||||
fn from(x: layout::LayoutParameters) -> Self {
|
||||
LayoutParameters {
|
||||
zone_redundancy: x.zone_redundancy.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Into<layout::LayoutParameters> for LayoutParameters {
|
||||
fn into(self) -> layout::LayoutParameters {
|
||||
layout::LayoutParameters {
|
||||
zone_redundancy: self.zone_redundancy.into(),
|
||||
}
|
||||
}
|
||||
}
|
|
@ -14,6 +14,7 @@ mod router_v2;
|
|||
mod bucket;
|
||||
mod cluster;
|
||||
mod key;
|
||||
mod layout;
|
||||
mod special;
|
||||
|
||||
mod block;
|
||||
|
|
|
@ -1,17 +1,13 @@
|
|||
use std::collections::HashMap;
|
||||
use std::fmt::Write;
|
||||
use std::sync::Arc;
|
||||
|
||||
use format_table::format_table_to_string;
|
||||
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::Error as GarageError;
|
||||
|
||||
use garage_table::replication::*;
|
||||
use garage_table::*;
|
||||
|
||||
use garage_rpc::layout::PARTITION_BITS;
|
||||
|
||||
use garage_model::garage::Garage;
|
||||
|
||||
use crate::api::*;
|
||||
|
@ -114,108 +110,6 @@ impl RequestHandler for LocalGetNodeStatisticsRequest {
|
|||
}
|
||||
}
|
||||
|
||||
impl RequestHandler for GetClusterStatisticsRequest {
|
||||
type Response = GetClusterStatisticsResponse;
|
||||
|
||||
// FIXME: return this as a JSON struct instead of text
|
||||
async fn handle(
|
||||
self,
|
||||
garage: &Arc<Garage>,
|
||||
_admin: &Admin,
|
||||
) -> Result<GetClusterStatisticsResponse, Error> {
|
||||
let mut ret = String::new();
|
||||
|
||||
// Gather storage node and free space statistics for current nodes
|
||||
let layout = &garage.system.cluster_layout();
|
||||
let mut node_partition_count = HashMap::<Uuid, u64>::new();
|
||||
for short_id in layout.current().ring_assignment_data.iter() {
|
||||
let id = layout.current().node_id_vec[*short_id as usize];
|
||||
*node_partition_count.entry(id).or_default() += 1;
|
||||
}
|
||||
let node_info = garage
|
||||
.system
|
||||
.get_known_nodes()
|
||||
.into_iter()
|
||||
.map(|n| (n.id, n))
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()];
|
||||
for (id, parts) in node_partition_count.iter() {
|
||||
let info = node_info.get(id);
|
||||
let status = info.map(|x| &x.status);
|
||||
let role = layout.current().roles.get(id).and_then(|x| x.0.as_ref());
|
||||
let hostname = status.and_then(|x| x.hostname.as_deref()).unwrap_or("?");
|
||||
let zone = role.map(|x| x.zone.as_str()).unwrap_or("?");
|
||||
let capacity = role
|
||||
.map(|x| x.capacity_string())
|
||||
.unwrap_or_else(|| "?".into());
|
||||
let avail_str = |x| match x {
|
||||
Some((avail, total)) => {
|
||||
let pct = (avail as f64) / (total as f64) * 100.;
|
||||
let avail = bytesize::ByteSize::b(avail);
|
||||
let total = bytesize::ByteSize::b(total);
|
||||
format!("{}/{} ({:.1}%)", avail, total, pct)
|
||||
}
|
||||
None => "?".into(),
|
||||
};
|
||||
let data_avail = avail_str(status.and_then(|x| x.data_disk_avail));
|
||||
let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail));
|
||||
table.push(format!(
|
||||
" {:?}\t{}\t{}\t{}\t{}\t{}\t{}",
|
||||
id, hostname, zone, capacity, parts, data_avail, meta_avail
|
||||
));
|
||||
}
|
||||
write!(
|
||||
&mut ret,
|
||||
"Storage nodes:\n{}",
|
||||
format_table_to_string(table)
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let meta_part_avail = node_partition_count
|
||||
.iter()
|
||||
.filter_map(|(id, parts)| {
|
||||
node_info
|
||||
.get(id)
|
||||
.and_then(|x| x.status.meta_disk_avail)
|
||||
.map(|c| c.0 / *parts)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let data_part_avail = node_partition_count
|
||||
.iter()
|
||||
.filter_map(|(id, parts)| {
|
||||
node_info
|
||||
.get(id)
|
||||
.and_then(|x| x.status.data_disk_avail)
|
||||
.map(|c| c.0 / *parts)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
if !meta_part_avail.is_empty() && !data_part_avail.is_empty() {
|
||||
let meta_avail =
|
||||
bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
|
||||
let data_avail =
|
||||
bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
|
||||
writeln!(
|
||||
&mut ret,
|
||||
"\nEstimated available storage space cluster-wide (might be lower in practice):"
|
||||
)
|
||||
.unwrap();
|
||||
if meta_part_avail.len() < node_partition_count.len()
|
||||
|| data_part_avail.len() < node_partition_count.len()
|
||||
{
|
||||
writeln!(&mut ret, " data: < {}", data_avail).unwrap();
|
||||
writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap();
|
||||
writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap();
|
||||
} else {
|
||||
writeln!(&mut ret, " data: {}", data_avail).unwrap();
|
||||
writeln!(&mut ret, " metadata: {}", meta_avail).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(GetClusterStatisticsResponse { freeform: ret })
|
||||
}
|
||||
}
|
||||
|
||||
fn gather_table_stats<F, R>(t: &Arc<Table<F, R>>) -> Result<String, Error>
|
||||
where
|
||||
F: TableSchema + 'static,
|
||||
|
|
Loading…
Reference in a new issue