layout: some refactoring of nongateway nodes

This commit is contained in:
Alex Auvolat 2023-11-14 12:48:38 +01:00
parent 9a491fa137
commit 8e292e06b3
No known key found for this signature in database
GPG key ID: 0E496D15096376BE
5 changed files with 95 additions and 61 deletions

View file

@ -25,8 +25,11 @@ pub async fn handle_read_index(
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
let reverse = reverse.unwrap_or(false); let reverse = reverse.unwrap_or(false);
// TODO: not only current let node_id_vec = garage
let node_id_vec = garage.system.cluster_layout().current().node_ids().to_vec(); .system
.cluster_layout()
.all_nongateway_nodes()
.into_owned();
let (partition_keys, more, next_start) = read_range( let (partition_keys, more, next_start) = read_range(
&garage.k2v.counter_table.table, &garage.k2v.counter_table.table,

View file

@ -84,8 +84,8 @@ impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> {
impl<T: CountedItem> CounterEntry<T> { impl<T: CountedItem> CounterEntry<T> {
pub fn filtered_values(&self, layout: &LayoutHistory) -> HashMap<String, i64> { pub fn filtered_values(&self, layout: &LayoutHistory) -> HashMap<String, i64> {
let nodes = &layout.current().node_id_vec[..]; let nodes = layout.all_nongateway_nodes();
self.filtered_values_with_nodes(nodes) self.filtered_values_with_nodes(&nodes)
} }
pub fn filtered_values_with_nodes(&self, nodes: &[Uuid]) -> HashMap<String, i64> { pub fn filtered_values_with_nodes(&self, nodes: &[Uuid]) -> HashMap<String, i64> {

View file

@ -1,3 +1,4 @@
use std::borrow::Cow;
use std::collections::HashSet; use std::collections::HashSet;
use garage_util::crdt::{Crdt, Lww, LwwMap}; use garage_util::crdt::{Crdt, Lww, LwwMap};
@ -59,13 +60,19 @@ impl LayoutHistory {
(self.current().version, self.all_ack(), self.min_stored()) (self.current().version, self.all_ack(), self.min_stored())
} }
pub fn all_nongateway_nodes(&self) -> HashSet<Uuid> { pub fn all_nongateway_nodes(&self) -> Cow<'_, [Uuid]> {
// TODO: cache this // TODO: cache this
self.versions if self.versions.len() == 1 {
.iter() self.versions[0].nongateway_nodes().into()
.map(|x| x.nongateway_nodes()) } else {
.flatten() let set = self
.collect::<HashSet<_>>() .versions
.iter()
.map(|x| x.nongateway_nodes())
.flatten()
.collect::<HashSet<_>>();
set.into_iter().copied().collect::<Vec<_>>().into()
}
} }
pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> { pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
@ -202,14 +209,11 @@ To know the correct value of the new layout version, invoke `garage layout show`
} }
// Compute new version and add it to history // Compute new version and add it to history
let mut new_version = self.current().clone(); let (new_version, msg) = self
new_version.version += 1; .current()
.clone()
.calculate_next_version(&self.staging.get())?;
new_version.roles.merge(&self.staging.get().roles);
new_version.roles.retain(|(_, _, v)| v.0.is_some());
new_version.parameters = *self.staging.get().parameters.get();
let msg = new_version.calculate_partition_assignment()?;
self.versions.push(new_version); self.versions.push(new_version);
if self.current().check().is_ok() { if self.current().check().is_ok() {
while self.versions.first().unwrap().check().is_err() { while self.versions.first().unwrap().check().is_err() {

View file

@ -212,6 +212,8 @@ mod v010 {
/// see comment in v08::ClusterLayout /// see comment in v08::ClusterLayout
pub node_id_vec: Vec<Uuid>, pub node_id_vec: Vec<Uuid>,
/// number of non-gateway nodes, which are the first ids in node_id_vec
pub nongateway_node_count: usize,
/// see comment in v08::ClusterLayout /// see comment in v08::ClusterLayout
#[serde(with = "serde_bytes")] #[serde(with = "serde_bytes")]
pub ring_assignment_data: Vec<CompactNodeType>, pub ring_assignment_data: Vec<CompactNodeType>,
@ -265,6 +267,18 @@ mod v010 {
type Previous = v09::ClusterLayout; type Previous = v09::ClusterLayout;
fn migrate(previous: Self::Previous) -> Self { fn migrate(previous: Self::Previous) -> Self {
let nongateway_node_count = previous
.node_id_vec
.iter()
.enumerate()
.filter(|(_, uuid)| {
let role = previous.roles.get(uuid);
matches!(role, Some(NodeRoleV(Some(role))) if role.capacity.is_some())
})
.map(|(i, _)| i)
.max()
.unwrap_or(0);
let version = LayoutVersion { let version = LayoutVersion {
version: previous.version, version: previous.version,
replication_factor: previous.replication_factor, replication_factor: previous.replication_factor,
@ -272,11 +286,14 @@ mod v010 {
parameters: previous.parameters, parameters: previous.parameters,
roles: previous.roles, roles: previous.roles,
node_id_vec: previous.node_id_vec, node_id_vec: previous.node_id_vec,
nongateway_node_count,
ring_assignment_data: previous.ring_assignment_data, ring_assignment_data: previous.ring_assignment_data,
}; };
let update_tracker = UpdateTracker( let update_tracker = UpdateTracker(
version version
.nongateway_nodes() .nongateway_nodes()
.iter()
.copied()
.map(|x| (x, version.version)) .map(|x| (x, version.version))
.collect::<BTreeMap<Uuid, u64>>(), .collect::<BTreeMap<Uuid, u64>>(),
); );

View file

@ -5,7 +5,7 @@ use std::convert::TryInto;
use bytesize::ByteSize; use bytesize::ByteSize;
use itertools::Itertools; use itertools::Itertools;
use garage_util::crdt::LwwMap; use garage_util::crdt::{Crdt, LwwMap};
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
@ -30,6 +30,7 @@ impl LayoutVersion {
partition_size: 0, partition_size: 0,
roles: LwwMap::new(), roles: LwwMap::new(),
node_id_vec: Vec::new(), node_id_vec: Vec::new(),
nongateway_node_count: 0,
ring_assignment_data: Vec::new(), ring_assignment_data: Vec::new(),
parameters, parameters,
} }
@ -43,6 +44,11 @@ impl LayoutVersion {
&self.node_id_vec[..] &self.node_id_vec[..]
} }
/// Returns the uuids of the non_gateway nodes in self.node_id_vec.
pub fn nongateway_nodes(&self) -> &[Uuid] {
&self.node_id_vec[..self.nongateway_node_count]
}
pub fn num_nodes(&self) -> usize { pub fn num_nodes(&self) -> usize {
self.node_id_vec.len() self.node_id_vec.len()
} }
@ -56,18 +62,14 @@ impl LayoutVersion {
} }
/// Given a node uuids, this function returns its capacity or fails if it does not have any /// Given a node uuids, this function returns its capacity or fails if it does not have any
pub fn get_node_capacity(&self, uuid: &Uuid) -> Result<u64, Error> { pub fn get_node_capacity(&self, uuid: &Uuid) -> Option<u64> {
match self.node_role(uuid) { match self.node_role(uuid) {
Some(NodeRole { Some(NodeRole {
capacity: Some(cap), capacity: Some(cap),
zone: _, zone: _,
tags: _, tags: _,
}) => Ok(*cap), }) => Some(*cap),
_ => Err(Error::Message( _ => None,
"The Uuid does not correspond to a node present in the \
cluster or this node does not have a positive capacity."
.into(),
)),
} }
} }
@ -131,17 +133,6 @@ impl LayoutVersion {
// ===================== internal information extractors ====================== // ===================== internal information extractors ======================
/// Returns the uuids of the non_gateway nodes in self.node_id_vec.
pub(crate) fn nongateway_nodes(&self) -> impl Iterator<Item = Uuid> + '_ {
self.node_id_vec
.iter()
.copied()
.filter(move |uuid| match self.node_role(uuid) {
Some(role) if role.capacity.is_some() => true,
_ => false,
})
}
/// Given a node uuids, this function returns the label of its zone /// Given a node uuids, this function returns the label of its zone
pub(crate) fn get_node_zone(&self, uuid: &Uuid) -> Result<&str, Error> { pub(crate) fn get_node_zone(&self, uuid: &Uuid) -> Result<&str, Error> {
match self.node_role(uuid) { match self.node_role(uuid) {
@ -152,11 +143,16 @@ impl LayoutVersion {
} }
} }
fn expect_get_node_capacity(&self, uuid: &Uuid) -> u64 {
self.get_node_capacity(&uuid)
.expect("non-gateway node with zero capacity")
}
/// Returns the sum of capacities of non gateway nodes in the cluster /// Returns the sum of capacities of non gateway nodes in the cluster
fn get_total_capacity(&self) -> Result<u64, Error> { fn get_total_capacity(&self) -> Result<u64, Error> {
let mut total_capacity = 0; let mut total_capacity = 0;
for uuid in self.nongateway_nodes() { for uuid in self.nongateway_nodes() {
total_capacity += self.get_node_capacity(&uuid)?; total_capacity += self.expect_get_node_capacity(&uuid);
} }
Ok(total_capacity) Ok(total_capacity)
} }
@ -257,7 +253,7 @@ impl LayoutVersion {
if *usage > 0 { if *usage > 0 {
let uuid = self.node_id_vec[n]; let uuid = self.node_id_vec[n];
let partusage = usage * self.partition_size; let partusage = usage * self.partition_size;
let nodecap = self.get_node_capacity(&uuid).unwrap(); let nodecap = self.expect_get_node_capacity(&uuid);
if partusage > nodecap { if partusage > nodecap {
return Err(format!( return Err(format!(
"node usage ({}) is bigger than node capacity ({})", "node usage ({}) is bigger than node capacity ({})",
@ -288,6 +284,21 @@ impl LayoutVersion {
// ================== updates to layout, internals =================== // ================== updates to layout, internals ===================
pub(crate) fn calculate_next_version(
mut self,
staging: &LayoutStaging,
) -> Result<(Self, Message), Error> {
self.version += 1;
self.roles.merge(&staging.roles);
self.roles.retain(|(_, _, v)| v.0.is_some());
self.parameters = *staging.parameters.get();
let msg = self.calculate_partition_assignment()?;
Ok((self, msg))
}
/// This function calculates a new partition-to-node assignment. /// This function calculates a new partition-to-node assignment.
/// The computed assignment respects the node replication factor /// The computed assignment respects the node replication factor
/// and the zone redundancy parameter It maximizes the capacity of a /// and the zone redundancy parameter It maximizes the capacity of a
@ -297,7 +308,7 @@ impl LayoutVersion {
/// data to be moved. /// data to be moved.
/// Staged role changes must be merged with nodes roles before calling this function, /// Staged role changes must be merged with nodes roles before calling this function,
/// hence it must only be called from apply_staged_changes() and hence is not public. /// hence it must only be called from apply_staged_changes() and hence is not public.
pub(crate) fn calculate_partition_assignment(&mut self) -> Result<Message, Error> { fn calculate_partition_assignment(&mut self) -> Result<Message, Error> {
// We update the node ids, since the node role list might have changed with the // We update the node ids, since the node role list might have changed with the
// changes in the layout. We retrieve the old_assignment reframed with new ids // changes in the layout. We retrieve the old_assignment reframed with new ids
let old_assignment_opt = self.update_node_id_vec()?; let old_assignment_opt = self.update_node_id_vec()?;
@ -317,12 +328,12 @@ impl LayoutVersion {
// to use them as indices in the flow graphs. // to use them as indices in the flow graphs.
let (id_to_zone, zone_to_id) = self.generate_nongateway_zone_ids()?; let (id_to_zone, zone_to_id) = self.generate_nongateway_zone_ids()?;
let nb_nongateway_nodes = self.nongateway_nodes().count(); if self.nongateway_nodes().len() < self.replication_factor {
if nb_nongateway_nodes < self.replication_factor {
return Err(Error::Message(format!( return Err(Error::Message(format!(
"The number of nodes with positive \ "The number of nodes with positive \
capacity ({}) is smaller than the replication factor ({}).", capacity ({}) is smaller than the replication factor ({}).",
nb_nongateway_nodes, self.replication_factor self.nongateway_nodes().len(),
self.replication_factor
))); )));
} }
if id_to_zone.len() < zone_redundancy { if id_to_zone.len() < zone_redundancy {
@ -420,12 +431,14 @@ impl LayoutVersion {
.map(|(k, _, _)| *k) .map(|(k, _, _)| *k)
.collect(); .collect();
let mut new_node_id_vec = Vec::<Uuid>::new(); let old_node_id_vec = std::mem::take(&mut self.node_id_vec);
new_node_id_vec.extend(new_non_gateway_nodes);
new_node_id_vec.extend(new_gateway_nodes);
let old_node_id_vec = self.node_id_vec.clone(); self.nongateway_node_count = new_non_gateway_nodes.len();
self.node_id_vec = new_node_id_vec.clone(); self.node_id_vec.clear();
self.node_id_vec.extend(new_non_gateway_nodes);
self.node_id_vec.extend(new_gateway_nodes);
let new_node_id_vec = &self.node_id_vec;
// (2) We retrieve the old association // (2) We retrieve the old association
// We rewrite the old association with the new indices. We only consider partition // We rewrite the old association with the new indices. We only consider partition
@ -464,7 +477,7 @@ impl LayoutVersion {
} }
} }
// We write the ring // We clear the ring assignemnt data
self.ring_assignment_data = Vec::<CompactNodeType>::new(); self.ring_assignment_data = Vec::<CompactNodeType>::new();
Ok(Some(old_assignment)) Ok(Some(old_assignment))
@ -478,8 +491,7 @@ impl LayoutVersion {
let mut id_to_zone = Vec::<String>::new(); let mut id_to_zone = Vec::<String>::new();
let mut zone_to_id = HashMap::<String, usize>::new(); let mut zone_to_id = HashMap::<String, usize>::new();
let nongateway_nodes = self.nongateway_nodes().collect::<Vec<_>>(); for uuid in self.nongateway_nodes().iter() {
for uuid in nongateway_nodes.iter() {
let r = self.node_role(uuid).unwrap(); let r = self.node_role(uuid).unwrap();
if !zone_to_id.contains_key(&r.zone) && r.capacity.is_some() { if !zone_to_id.contains_key(&r.zone) && r.capacity.is_some() {
zone_to_id.insert(r.zone.clone(), id_to_zone.len()); zone_to_id.insert(r.zone.clone(), id_to_zone.len());
@ -556,10 +568,8 @@ impl LayoutVersion {
exclude_assoc: &HashSet<(usize, usize)>, exclude_assoc: &HashSet<(usize, usize)>,
zone_redundancy: usize, zone_redundancy: usize,
) -> Result<Graph<FlowEdge>, Error> { ) -> Result<Graph<FlowEdge>, Error> {
let vertices = LayoutVersion::generate_graph_vertices( let vertices =
zone_to_id.len(), LayoutVersion::generate_graph_vertices(zone_to_id.len(), self.nongateway_nodes().len());
self.nongateway_nodes().count(),
);
let mut g = Graph::<FlowEdge>::new(&vertices); let mut g = Graph::<FlowEdge>::new(&vertices);
let nb_zones = zone_to_id.len(); let nb_zones = zone_to_id.len();
for p in 0..NB_PARTITIONS { for p in 0..NB_PARTITIONS {
@ -578,8 +588,8 @@ impl LayoutVersion {
)?; )?;
} }
} }
for n in 0..self.nongateway_nodes().count() { for n in 0..self.nongateway_nodes().len() {
let node_capacity = self.get_node_capacity(&self.node_id_vec[n])?; let node_capacity = self.expect_get_node_capacity(&self.node_id_vec[n]);
let node_zone = zone_to_id[self.get_node_zone(&self.node_id_vec[n])?]; let node_zone = zone_to_id[self.get_node_zone(&self.node_id_vec[n])?];
g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / partition_size)?; g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / partition_size)?;
for p in 0..NB_PARTITIONS { for p in 0..NB_PARTITIONS {
@ -602,7 +612,7 @@ impl LayoutVersion {
// previous assignment // previous assignment
let mut exclude_edge = HashSet::<(usize, usize)>::new(); let mut exclude_edge = HashSet::<(usize, usize)>::new();
if let Some(prev_assign) = prev_assign_opt { if let Some(prev_assign) = prev_assign_opt {
let nb_nodes = self.nongateway_nodes().count(); let nb_nodes = self.nongateway_nodes().len();
for (p, prev_assign_p) in prev_assign.iter().enumerate() { for (p, prev_assign_p) in prev_assign.iter().enumerate() {
for n in 0..nb_nodes { for n in 0..nb_nodes {
exclude_edge.insert((p, n)); exclude_edge.insert((p, n));
@ -654,7 +664,7 @@ impl LayoutVersion {
// We compute the maximal length of a simple path in gflow. It is used in the // We compute the maximal length of a simple path in gflow. It is used in the
// Bellman-Ford algorithm in optimize_flow_with_cost to set the number // Bellman-Ford algorithm in optimize_flow_with_cost to set the number
// of iterations. // of iterations.
let nb_nodes = self.nongateway_nodes().count(); let nb_nodes = self.nongateway_nodes().len();
let path_length = 4 * nb_nodes; let path_length = 4 * nb_nodes;
gflow.optimize_flow_with_cost(&cost, path_length)?; gflow.optimize_flow_with_cost(&cost, path_length)?;
@ -732,7 +742,7 @@ impl LayoutVersion {
} }
// We define and fill in the following tables // We define and fill in the following tables
let storing_nodes = self.nongateway_nodes().collect::<Vec<_>>(); let storing_nodes = self.nongateway_nodes();
let mut new_partitions = vec![0; storing_nodes.len()]; let mut new_partitions = vec![0; storing_nodes.len()];
let mut stored_partitions = vec![0; storing_nodes.len()]; let mut stored_partitions = vec![0; storing_nodes.len()];
@ -804,13 +814,13 @@ impl LayoutVersion {
let available_cap_z: u64 = self.partition_size * replicated_partitions as u64; let available_cap_z: u64 = self.partition_size * replicated_partitions as u64;
let mut total_cap_z = 0; let mut total_cap_z = 0;
for n in nodes_of_z.iter() { for n in nodes_of_z.iter() {
total_cap_z += self.get_node_capacity(&self.node_id_vec[*n])?; total_cap_z += self.expect_get_node_capacity(&self.node_id_vec[*n]);
} }
let percent_cap_z = 100.0 * (available_cap_z as f32) / (total_cap_z as f32); let percent_cap_z = 100.0 * (available_cap_z as f32) / (total_cap_z as f32);
for n in nodes_of_z.iter() { for n in nodes_of_z.iter() {
let available_cap_n = stored_partitions[*n] as u64 * self.partition_size; let available_cap_n = stored_partitions[*n] as u64 * self.partition_size;
let total_cap_n = self.get_node_capacity(&self.node_id_vec[*n])?; let total_cap_n = self.expect_get_node_capacity(&self.node_id_vec[*n]);
let tags_n = (self.node_role(&self.node_id_vec[*n]).ok_or("<??>"))?.tags_string(); let tags_n = (self.node_role(&self.node_id_vec[*n]).ok_or("<??>"))?.tags_string();
table.push(format!( table.push(format!(
" {:?}\t{}\t{} ({} new)\t{}\t{} ({:.1}%)", " {:?}\t{}\t{} ({} new)\t{}\t{} ({:.1}%)",