Change the way new layout assignations are computed.

The function now computes an optimal assignation (with respect to partition size) that minimizes the distance to the former assignation, using flow algorithms.

This commit was written by Mendes Oulamara <mendes.oulamara@pm.me>
This commit is contained in:
Alex Auvolat 2022-05-01 09:54:19 +02:00
parent c9ef3e461b
commit c1d1646c4d
No known key found for this signature in database
GPG key ID: 09EC5284AA804D3C
4 changed files with 806 additions and 413 deletions

View file

@ -23,6 +23,7 @@ gethostname = "0.2"
hex = "0.4"
tracing = "0.1.30"
rand = "0.8"
itertools="0.10"
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
async-trait = "0.1.7"

View file

@ -1,10 +1,14 @@
use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use std::cmp::{min};
use std::collections::{HashMap};
use serde::{Deserialize, Serialize};
use garage_util::crdt::{AutoCrdt, Crdt, LwwMap};
use garage_util::data::*;
use garage_util::bipartite::*;
use rand::prelude::SliceRandom;
use crate::ring::*;
@ -164,445 +168,454 @@ impl ClusterLayout {
true
}
/// Calculate an assignation of partitions to nodes
pub fn calculate_partition_assignation(&mut self) -> bool {
let (configured_nodes, zones) = self.configured_nodes_and_zones();
let n_zones = zones.len();
println!("Calculating updated partition assignation, this may take some time...");
println!();
/// This function calculates a new partition-to-node assignation.
/// The computed assignation maximizes the capacity of a
/// partition (assuming all partitions have the same size).
/// Among such optimal assignation, it minimizes the distance to
/// the former assignation (if any) to minimize the amount of
/// data to be moved. A heuristic ensures node triplets
/// dispersion (in garage_util::bipartite::optimize_matching()).
pub fn calculate_partition_assignation(&mut self) -> bool {
//The nodes might have been updated, some might have been deleted.
//So we need to first update the list of nodes and retrieve the
//assignation.
let old_node_assignation = self.update_nodes_and_ring();
// Get old partition assignation
let old_partitions = self.parse_assignation_data();
let (node_zone, _) = self.get_node_zone_capacity();
//We compute the optimal number of partition to assign to
//every node and zone.
if let Some((part_per_nod, part_per_zone)) = self.optimal_proportions(){
//We collect part_per_zone in a vec to not rely on the
//arbitrary order in which elements are iterated in
//Hashmap::iter()
let part_per_zone_vec = part_per_zone.iter()
.map(|(x,y)| (x.clone(),*y))
.collect::<Vec<(String,usize)>>();
//We create an indexing of the zones
let mut zone_id = HashMap::<String,usize>::new();
for i in 0..part_per_zone_vec.len(){
zone_id.insert(part_per_zone_vec[i].0.clone(), i);
}
//We compute a candidate for the new partition to zone
//assignation.
let nb_zones = part_per_zone.len();
let nb_nodes = part_per_nod.len();
let nb_partitions = 1<<PARTITION_BITS;
let left_cap_vec = vec![self.replication_factor as u32 ; nb_partitions];
let right_cap_vec = part_per_zone_vec.iter().map(|(_,y)| *y as u32)
.collect();
let mut zone_assignation =
dinic_compute_matching(left_cap_vec, right_cap_vec);
// Start new partition assignation with nodes from old assignation where it is relevant
let mut partitions = old_partitions
.iter()
.map(|old_part| {
let mut new_part = PartitionAss::new();
for node in old_part.nodes.iter() {
if let Some(role) = node.1 {
if role.capacity.is_some() {
new_part.add(None, n_zones, node.0, role);
}
}
}
new_part
})
.collect::<Vec<_>>();
//We create the structure for the partition-to-node assignation.
let mut node_assignation =
vec![vec![None; self.replication_factor ];nb_partitions];
//We will decrement part_per_nod to keep track of the number
//of partitions that we still have to associate.
let mut part_per_nod = part_per_nod.clone();
//We minimize the distance to the former assignation(if any)
//We get the id of the zones of the former assignation
//(and the id no_zone if there is no node assignated)
let no_zone = part_per_zone_vec.len();
let old_zone_assignation : Vec<Vec<usize>> =
old_node_assignation.iter().map(|x| x.iter().map(
|id| match *id { Some(i) => zone_id[&node_zone[i]] ,
None => no_zone }
).collect()).collect();
// In various cases, not enough nodes will have been added for all partitions
// in the step above (e.g. due to node removals, or new zones being added).
// Here we add more nodes to make a complete (but sub-optimal) assignation,
// using an initial partition assignation that is calculated using the multi-dc maglev trick
match self.initial_partition_assignation() {
Some(initial_partitions) => {
for (part, ipart) in partitions.iter_mut().zip(initial_partitions.iter()) {
for (id, info) in ipart.nodes.iter() {
if part.nodes.len() < self.replication_factor {
part.add(None, n_zones, id, info.unwrap());
}
}
assert!(part.nodes.len() == self.replication_factor);
}
}
None => {
// Not enough nodes in cluster to build a correct assignation.
// Signal it by returning an error.
return false;
}
}
//We minimize the distance to the former zone assignation
zone_assignation = optimize_matching(
&old_zone_assignation, &zone_assignation, nb_zones+1); //+1 for no_zone
// Calculate how many partitions each node should ideally store,
// and how many partitions they are storing with the current assignation
// This defines our target for which we will optimize in the following loop.
let total_capacity = configured_nodes
.iter()
.map(|(_, info)| info.capacity.unwrap_or(0))
.sum::<u32>() as usize;
let total_partitions = self.replication_factor * (1 << PARTITION_BITS);
let target_partitions_per_node = configured_nodes
.iter()
.map(|(id, info)| {
(
*id,
info.capacity.unwrap_or(0) as usize * total_partitions / total_capacity,
)
})
.collect::<HashMap<&Uuid, usize>>();
//We need to assign partitions to nodes in their zone
//We first put the nodes assignation that can stay the same
for i in 0..nb_partitions{
for j in 0..self.replication_factor {
if let Some(Some(former_node)) = old_node_assignation[i].iter().find(
|x| if let Some(id) = x {
zone_id[&node_zone[*id]] == zone_assignation[i][j]
}
else {false}
)
{
if part_per_nod[*former_node] > 0 {
node_assignation[i][j] = Some(*former_node);
part_per_nod[*former_node] -= 1;
}
}
}
}
let mut partitions_per_node = self.partitions_per_node(&partitions[..]);
//We complete the assignation of partitions to nodes
let mut rng = rand::thread_rng();
for i in 0..nb_partitions {
for j in 0..self.replication_factor {
if node_assignation[i][j] == None {
let possible_nodes : Vec<usize> = (0..nb_nodes)
.filter(
|id| zone_id[&node_zone[*id]] == zone_assignation[i][j]
&& part_per_nod[*id] > 0).collect();
assert!(possible_nodes.len()>0);
//We randomly pick a node
if let Some(nod) = possible_nodes.choose(&mut rng){
node_assignation[i][j] = Some(*nod);
part_per_nod[*nod] -= 1;
}
}
}
}
println!("Target number of partitions per node:");
for (node, npart) in target_partitions_per_node.iter() {
println!("{:?}\t{}", node, npart);
}
println!();
//We write the assignation in the 1D table
self.ring_assignation_data = Vec::<CompactNodeType>::new();
for i in 0..nb_partitions{
for j in 0..self.replication_factor {
if let Some(id) = node_assignation[i][j] {
self.ring_assignation_data.push(id as CompactNodeType);
}
else {assert!(false)}
}
}
// Shuffle partitions between nodes so that nodes will reach (or better approach)
// their target number of stored partitions
loop {
let mut option = None;
for (i, part) in partitions.iter_mut().enumerate() {
for (irm, (idrm, _)) in part.nodes.iter().enumerate() {
let errratio = |node, parts| {
let tgt = *target_partitions_per_node.get(node).unwrap() as f32;
(parts - tgt) / tgt
};
let square = |x| x * x;
true
}
else { false }
}
let partsrm = partitions_per_node.get(*idrm).cloned().unwrap_or(0) as f32;
/// The LwwMap of node roles might have changed. This function updates the node_id_vec
/// and returns the assignation given by ring, with the new indices of the nodes, and
/// None of the node is not present anymore.
/// We work with the assumption that only this function and calculate_new_assignation
/// do modify assignation_ring and node_id_vec.
fn update_nodes_and_ring(&mut self) -> Vec<Vec<Option<usize>>> {
let nb_partitions = 1usize<<PARTITION_BITS;
let mut node_assignation =
vec![vec![None; self.replication_factor ];nb_partitions];
let rf = self.replication_factor;
let ring = &self.ring_assignation_data;
let new_node_id_vec : Vec::<Uuid> = self.roles.items().iter()
.map(|(k, _, _)| *k)
.collect();
if ring.len() == rf*nb_partitions {
for i in 0..nb_partitions {
for j in 0..self.replication_factor {
node_assignation[i][j] = new_node_id_vec.iter()
.position(|id| *id == self.node_id_vec[ring[i*rf + j] as usize]);
}
}
}
for (idadd, infoadd) in configured_nodes.iter() {
// skip replacing a node by itself
// and skip replacing by gateway nodes
if idadd == idrm || infoadd.capacity.is_none() {
continue;
}
self.node_id_vec = new_node_id_vec;
self.ring_assignation_data = vec![];
return node_assignation;
}
///This function compute the number of partition to assign to
///every node and zone, so that every partition is replicated
///self.replication_factor times and the capacity of a partition
///is maximized.
fn optimal_proportions(&mut self) -> Option<(Vec<usize>, HashMap<String, usize>)> {
let mut zone_capacity :HashMap<String, u32>= HashMap::new();
let (node_zone, node_capacity) = self.get_node_zone_capacity();
let nb_nodes = self.node_id_vec.len();
// We want to try replacing node idrm by node idadd
// if that brings us close to our goal.
let partsadd = partitions_per_node.get(*idadd).cloned().unwrap_or(0) as f32;
let oldcost = square(errratio(*idrm, partsrm) - errratio(*idadd, partsadd));
let newcost =
square(errratio(*idrm, partsrm - 1.) - errratio(*idadd, partsadd + 1.));
if newcost >= oldcost {
// not closer to our goal
continue;
}
let gain = oldcost - newcost;
for i in 0..nb_nodes
{
if zone_capacity.contains_key(&node_zone[i]) {
zone_capacity.insert(node_zone[i].clone(), zone_capacity[&node_zone[i]] + node_capacity[i]);
}
else{
zone_capacity.insert(node_zone[i].clone(), node_capacity[i]);
}
}
let mut newpart = part.clone();
//Compute the optimal number of partitions per zone
let sum_capacities: u32 =zone_capacity.values().sum();
if sum_capacities <= 0 {
println!("No storage capacity in the network.");
return None;
}
newpart.nodes.remove(irm);
if !newpart.add(None, n_zones, idadd, infoadd) {
continue;
}
assert!(newpart.nodes.len() == self.replication_factor);
let nb_partitions = 1<<PARTITION_BITS;
//Initially we would like to use zones porportionally to
//their capacity.
//However, a large zone can be associated to at most
//nb_partitions to ensure replication of the date.
//So we take the min with nb_partitions:
let mut part_per_zone : HashMap<String, usize> =
zone_capacity.iter()
.map(|(k, v)| (k.clone(), min(nb_partitions,
(self.replication_factor*nb_partitions
**v as usize)/sum_capacities as usize) ) ).collect();
if !old_partitions[i]
.is_valid_transition_to(&newpart, self.replication_factor)
{
continue;
}
//The replication_factor-1 upper bounds the number of
//part_per_zones that are greater than nb_partitions
for _ in 1..self.replication_factor {
//The number of partitions that are not assignated to
//a zone that takes nb_partitions.
let sum_capleft : u32 = zone_capacity.keys()
.filter(| k | {part_per_zone[*k] < nb_partitions} )
.map(|k| zone_capacity[k]).sum();
//The number of replication of the data that we need
//to ensure.
let repl_left = self.replication_factor
- part_per_zone.values()
.filter(|x| {**x == nb_partitions})
.count();
if repl_left == 0 {
break;
}
if option
.as_ref()
.map(|(old_gain, _, _, _, _)| gain > *old_gain)
.unwrap_or(true)
{
option = Some((gain, i, idadd, idrm, newpart));
}
}
}
}
if let Some((_gain, i, idadd, idrm, newpart)) = option {
*partitions_per_node.entry(idadd).or_insert(0) += 1;
*partitions_per_node.get_mut(idrm).unwrap() -= 1;
partitions[i] = newpart;
} else {
break;
}
}
for k in zone_capacity.keys() {
if part_per_zone[k] != nb_partitions
{
part_per_zone.insert(k.to_string() , min(nb_partitions,
(nb_partitions*zone_capacity[k] as usize
*repl_left)/sum_capleft as usize));
}
}
}
// Check we completed the assignation correctly
// (this is a set of checks for the algorithm's consistency)
assert!(partitions.len() == (1 << PARTITION_BITS));
assert!(partitions
.iter()
.all(|p| p.nodes.len() == self.replication_factor));
//Now we divide the zone's partition share proportionally
//between their nodes.
let mut part_per_nod : Vec<usize> = (0..nb_nodes).map(
|i| (part_per_zone[&node_zone[i]]*node_capacity[i] as usize)/zone_capacity[&node_zone[i]] as usize
)
.collect();
let new_partitions_per_node = self.partitions_per_node(&partitions[..]);
assert!(new_partitions_per_node == partitions_per_node);
//We must update the part_per_zone to make it correspond to
//part_per_nod (because of integer rounding)
part_per_zone = part_per_zone.iter().map(|(k,_)|
(k.clone(), 0))
.collect();
for i in 0..nb_nodes {
part_per_zone.insert(
node_zone[i].clone() ,
part_per_zone[&node_zone[i]] + part_per_nod[i]);
}
// Show statistics
println!("New number of partitions per node:");
for (node, npart) in partitions_per_node.iter() {
let tgt = *target_partitions_per_node.get(node).unwrap();
let pct = 100f32 * (*npart as f32) / (tgt as f32);
println!("{:?}\t{}\t({}% of {})", node, npart, pct as i32, tgt);
}
println!();
//Because of integer rounding, the total sum of part_per_nod
//might not be replication_factor*nb_partitions.
// We need at most to add 1 to every non maximal value of
// part_per_nod. The capacity of a partition will be bounded
// by the minimal value of
// node_capacity_vec[i]/part_per_nod[i]
// so we try to maximize this minimal value, keeping the
// part_per_zone capped
let mut diffcount = HashMap::new();
for (oldpart, newpart) in old_partitions.iter().zip(partitions.iter()) {
let nminus = oldpart.txtplus(newpart);
let nplus = newpart.txtplus(oldpart);
if nminus != "[...]" || nplus != "[...]" {
let tup = (nminus, nplus);
*diffcount.entry(tup).or_insert(0) += 1;
}
}
if diffcount.is_empty() {
println!("No data will be moved between nodes.");
} else {
let mut diffcount = diffcount.into_iter().collect::<Vec<_>>();
diffcount.sort();
println!("Number of partitions that move:");
for ((nminus, nplus), npart) in diffcount {
println!("\t{}\t{} -> {}", npart, nminus, nplus);
}
}
println!();
let discrepancy : usize =
nb_partitions*self.replication_factor
- part_per_nod.iter().sum::<usize>();
//We use a stupid O(N^2) algorithm. If the number of nodes
//is actually expected to be high, one should optimize this.
// Calculate and save new assignation data
let (nodes, assignation_data) =
self.compute_assignation_data(&configured_nodes[..], &partitions[..]);
for _ in 0..discrepancy {
if let Some(idmax) = (0..nb_nodes)
.filter(|i| part_per_zone[&node_zone[*i]] < nb_partitions)
.max_by( |i,j|
(node_capacity[*i]*(part_per_nod[*j]+1) as u32)
.cmp(&(node_capacity[*j]*(part_per_nod[*i]+1) as u32))
)
{
part_per_nod[idmax] += 1;
part_per_zone.insert(node_zone[idmax].clone(),part_per_zone[&node_zone[idmax]]+1);
}
}
self.node_id_vec = nodes;
self.ring_assignation_data = assignation_data;
//We check the algorithm consistency
let discrepancy : usize =
nb_partitions*self.replication_factor
- part_per_nod.iter().sum::<usize>();
assert!(discrepancy == 0);
assert!(if let Some(v) = part_per_zone.values().max()
{*v <= nb_partitions} else {false} );
Some((part_per_nod, part_per_zone))
}
//Returns vectors of zone and capacity; indexed by the same (temporary)
//indices as node_id_vec.
fn get_node_zone_capacity(& self) -> (Vec<String> , Vec<u32>) {
let node_zone = self.node_id_vec.iter().map(
|id_nod| match self.node_role(id_nod) {
Some(NodeRole{zone,capacity:_,tags:_}) => zone.clone() ,
_ => "".to_string()
}
).collect();
let node_capacity = self.node_id_vec.iter().map(
|id_nod| match self.node_role(id_nod) {
Some(NodeRole{zone:_,capacity,tags:_}) =>
if let Some(c)=capacity
{*c}
else {0},
_ => 0
}
).collect();
true
}
(node_zone,node_capacity)
}
fn initial_partition_assignation(&self) -> Option<Vec<PartitionAss<'_>>> {
let (configured_nodes, zones) = self.configured_nodes_and_zones();
let n_zones = zones.len();
// Create a vector of partition indices (0 to 2**PARTITION_BITS-1)
let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>();
// Prepare ring
let mut partitions: Vec<PartitionAss> = partitions_idx
.iter()
.map(|_i| PartitionAss::new())
.collect::<Vec<_>>();
// Create MagLev priority queues for each node
let mut queues = configured_nodes
.iter()
.filter(|(_id, info)| info.capacity.is_some())
.map(|(node_id, node_info)| {
let mut parts = partitions_idx
.iter()
.map(|i| {
let part_data =
[&u16::to_be_bytes(*i as u16)[..], node_id.as_slice()].concat();
(*i, fasthash(&part_data[..]))
})
.collect::<Vec<_>>();
parts.sort_by_key(|(_i, h)| *h);
let parts_i = parts.iter().map(|(i, _h)| *i).collect::<Vec<_>>();
(node_id, node_info, parts_i, 0)
})
.collect::<Vec<_>>();
let max_capacity = configured_nodes
.iter()
.filter_map(|(_, node_info)| node_info.capacity)
.fold(0, std::cmp::max);
// Fill up ring
for rep in 0..self.replication_factor {
queues.sort_by_key(|(ni, _np, _q, _p)| {
let queue_data = [&u16::to_be_bytes(rep as u16)[..], ni.as_slice()].concat();
fasthash(&queue_data[..])
});
for (_, _, _, pos) in queues.iter_mut() {
*pos = 0;
}
let mut remaining = partitions_idx.len();
while remaining > 0 {
let remaining0 = remaining;
for i_round in 0..max_capacity {
for (node_id, node_info, q, pos) in queues.iter_mut() {
if i_round >= node_info.capacity.unwrap() {
continue;
}
for (pos2, &qv) in q.iter().enumerate().skip(*pos) {
if partitions[qv].add(Some(rep + 1), n_zones, node_id, node_info) {
remaining -= 1;
*pos = pos2 + 1;
break;
}
}
}
}
if remaining == remaining0 {
// No progress made, exit
return None;
}
}
}
Some(partitions)
}
fn configured_nodes_and_zones(&self) -> (Vec<(&Uuid, &NodeRole)>, HashSet<&str>) {
let configured_nodes = self
.roles
.items()
.iter()
.filter(|(_id, _, info)| info.0.is_some())
.map(|(id, _, info)| (id, info.0.as_ref().unwrap()))
.collect::<Vec<(&Uuid, &NodeRole)>>();
let zones = configured_nodes
.iter()
.filter(|(_id, info)| info.capacity.is_some())
.map(|(_id, info)| info.zone.as_str())
.collect::<HashSet<&str>>();
(configured_nodes, zones)
}
fn compute_assignation_data<'a>(
&self,
configured_nodes: &[(&'a Uuid, &'a NodeRole)],
partitions: &[PartitionAss<'a>],
) -> (Vec<Uuid>, Vec<CompactNodeType>) {
assert!(partitions.len() == (1 << PARTITION_BITS));
// Make a canonical order for nodes
let mut nodes = configured_nodes
.iter()
.filter(|(_id, info)| info.capacity.is_some())
.map(|(id, _)| **id)
.collect::<Vec<_>>();
let nodes_rev = nodes
.iter()
.enumerate()
.map(|(i, id)| (*id, i as CompactNodeType))
.collect::<HashMap<Uuid, CompactNodeType>>();
let mut assignation_data = vec![];
for partition in partitions.iter() {
assert!(partition.nodes.len() == self.replication_factor);
for (id, _) in partition.nodes.iter() {
assignation_data.push(*nodes_rev.get(id).unwrap());
}
}
nodes.extend(
configured_nodes
.iter()
.filter(|(_id, info)| info.capacity.is_none())
.map(|(id, _)| **id),
);
(nodes, assignation_data)
}
fn parse_assignation_data(&self) -> Vec<PartitionAss<'_>> {
if self.ring_assignation_data.len() == self.replication_factor * (1 << PARTITION_BITS) {
// If the previous assignation data is correct, use that
let mut partitions = vec![];
for i in 0..(1 << PARTITION_BITS) {
let mut part = PartitionAss::new();
for node_i in self.ring_assignation_data
[i * self.replication_factor..(i + 1) * self.replication_factor]
.iter()
{
let node_id = &self.node_id_vec[*node_i as usize];
if let Some(NodeRoleV(Some(info))) = self.roles.get(node_id) {
part.nodes.push((node_id, Some(info)));
} else {
part.nodes.push((node_id, None));
}
}
partitions.push(part);
}
partitions
} else {
// Otherwise start fresh
(0..(1 << PARTITION_BITS))
.map(|_| PartitionAss::new())
.collect()
}
}
fn partitions_per_node<'a>(&self, partitions: &[PartitionAss<'a>]) -> HashMap<&'a Uuid, usize> {
let mut partitions_per_node = HashMap::<&Uuid, usize>::new();
for p in partitions.iter() {
for (id, _) in p.nodes.iter() {
*partitions_per_node.entry(*id).or_insert(0) += 1;
}
}
partitions_per_node
}
}
// ---- Internal structs for partition assignation in layout ----
#[derive(Clone)]
struct PartitionAss<'a> {
nodes: Vec<(&'a Uuid, Option<&'a NodeRole>)>,
#[cfg(test)]
mod tests {
use super::*;
use itertools::Itertools;
fn check_assignation(cl : &ClusterLayout) {
//Check that input data has the right format
let nb_partitions = 1usize<<PARTITION_BITS;
assert!([1,2,3].contains(&cl.replication_factor));
assert!(cl.ring_assignation_data.len() == nb_partitions*cl.replication_factor);
let (node_zone, node_capacity) = cl.get_node_zone_capacity();
//Check that is is a correct assignation with zone redundancy
let rf = cl.replication_factor;
for i in 0..nb_partitions{
assert!( rf ==
cl.ring_assignation_data[rf*i..rf*(i+1)].iter()
.map(|nod| node_zone[*nod as usize].clone())
.unique()
.count() );
}
let nb_nodes = cl.node_id_vec.len();
//Check optimality
let node_nb_part =(0..nb_nodes).map(|i| cl.ring_assignation_data
.iter()
.filter(|x| **x==i as u8)
.count())
.collect::<Vec::<_>>();
let zone_vec = node_zone.iter().unique().collect::<Vec::<_>>();
let zone_nb_part = zone_vec.iter().map( |z| cl.ring_assignation_data.iter()
.filter(|x| node_zone[**x as usize] == **z)
.count()
).collect::<Vec::<_>>();
//Check optimality of the zone assignation : would it be better for the
//node_capacity/node_partitions ratio to change the assignation of a partition
if let Some(idmin) = (0..nb_nodes).min_by(
|i,j| (node_capacity[*i]*node_nb_part[*j] as u32)
.cmp(&(node_capacity[*j]*node_nb_part[*i] as u32))
){
if let Some(idnew) = (0..nb_nodes)
.filter( |i| if let Some(p) = zone_vec.iter().position(|z| **z==node_zone[*i])
{zone_nb_part[p] < nb_partitions }
else { false })
.max_by(
|i,j| (node_capacity[*i]*(node_nb_part[*j]as u32+1))
.cmp(&(node_capacity[*j]*(node_nb_part[*i] as u32+1)))
){
assert!(node_capacity[idmin]*(node_nb_part[idnew] as u32+1) >=
node_capacity[idnew]*node_nb_part[idmin] as u32);
}
}
//In every zone, check optimality of the nod assignation
for z in zone_vec {
let node_of_z_iter = (0..nb_nodes).filter(|id| node_zone[*id] == *z );
if let Some(idmin) = node_of_z_iter.clone().min_by(
|i,j| (node_capacity[*i]*node_nb_part[*j] as u32)
.cmp(&(node_capacity[*j]*node_nb_part[*i] as u32))
){
if let Some(idnew) = node_of_z_iter.min_by(
|i,j| (node_capacity[*i]*(node_nb_part[*j] as u32+1))
.cmp(&(node_capacity[*j]*(node_nb_part[*i] as u32+1)))
){
assert!(node_capacity[idmin]*(node_nb_part[idnew] as u32+1) >=
node_capacity[idnew]*node_nb_part[idmin] as u32);
}
}
}
}
fn update_layout(cl : &mut ClusterLayout, node_id_vec : &Vec<u8>,
node_capacity_vec : &Vec<u32> , node_zone_vec : &Vec<String>) {
for i in 0..node_id_vec.len(){
if let Some(x) = FixedBytes32::try_from(&[i as u8;32]) {
cl.node_id_vec.push(x);
}
let update = cl.roles.update_mutator(cl.node_id_vec[i] ,
NodeRoleV(Some(NodeRole{
zone : (node_zone_vec[i].to_string()),
capacity : (Some(node_capacity_vec[i])),
tags : (vec![])})));
cl.roles.merge(&update);
}
}
#[test]
fn test_assignation() {
let mut node_id_vec = vec![1,2,3];
let mut node_capacity_vec = vec![4000,1000,2000];
let mut node_zone_vec= vec!["A", "B", "C"].into_iter().map(|x| x.to_string()).collect();
let mut cl = ClusterLayout {
node_id_vec: vec![],
roles : LwwMap::new(),
replication_factor: 3,
ring_assignation_data : vec![],
version:0,
staging: LwwMap::new(),
staging_hash: sha256sum(&[1;32]),
};
update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec);
cl.calculate_partition_assignation();
check_assignation(&cl);
node_id_vec = vec![1,2,3, 4, 5, 6, 7, 8, 9];
node_capacity_vec = vec![4000,1000,1000, 3000, 1000, 1000, 2000, 10000, 2000];
node_zone_vec= vec!["A", "B", "C", "C", "C", "B", "G", "H", "I"].into_iter().map(|x| x.to_string()).collect();
update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec);
cl.calculate_partition_assignation();
check_assignation(&cl);
node_capacity_vec = vec![4000,1000,2000, 7000, 1000, 1000, 2000, 10000, 2000];
update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec);
cl.calculate_partition_assignation();
check_assignation(&cl);
node_capacity_vec = vec![4000,4000,2000, 7000, 1000, 9000, 2000, 10, 2000];
update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec);
cl.calculate_partition_assignation();
check_assignation(&cl);
}
}
impl<'a> PartitionAss<'a> {
fn new() -> Self {
Self { nodes: Vec::new() }
}
fn nplus(&self, other: &PartitionAss<'a>) -> usize {
self.nodes
.iter()
.filter(|x| !other.nodes.contains(x))
.count()
}
fn txtplus(&self, other: &PartitionAss<'a>) -> String {
let mut nodes = self
.nodes
.iter()
.filter(|x| !other.nodes.contains(x))
.map(|x| format!("{:?}", x.0))
.collect::<Vec<_>>();
nodes.sort();
if self.nodes.iter().any(|x| other.nodes.contains(x)) {
nodes.push("...".into());
}
format!("[{}]", nodes.join(" "))
}
fn is_valid_transition_to(&self, other: &PartitionAss<'a>, replication_factor: usize) -> bool {
let min_keep_nodes_per_part = (replication_factor + 1) / 2;
let n_removed = self.nplus(other);
if self.nodes.len() <= min_keep_nodes_per_part {
n_removed == 0
} else {
n_removed <= self.nodes.len() - min_keep_nodes_per_part
}
}
// add is a key function in creating a PartitionAss, i.e. the list of nodes
// to which a partition is assigned. It tries to add a certain node id to the
// assignation, but checks that doing so is compatible with the NECESSARY
// condition that the partition assignation must be dispersed over different
// zones (datacenters) if enough zones exist. This is why it takes a n_zones
// parameter, which is the total number of zones that have existing nodes:
// if nodes in the assignation already cover all n_zones zones, then any node
// that is not yet in the assignation can be added. Otherwise, only nodes
// that are in a new zone can be added.
fn add(
&mut self,
target_len: Option<usize>,
n_zones: usize,
node: &'a Uuid,
role: &'a NodeRole,
) -> bool {
if let Some(tl) = target_len {
if self.nodes.len() != tl - 1 {
return false;
}
}
let p_zns = self
.nodes
.iter()
.map(|(_id, info)| info.unwrap().zone.as_str())
.collect::<HashSet<&str>>();
if (p_zns.len() < n_zones && !p_zns.contains(&role.zone.as_str()))
|| (p_zns.len() == n_zones && !self.nodes.iter().any(|(id, _)| *id == node))
{
self.nodes.push((node, Some(role)));
true
} else {
false
}
}
}

378
src/util/bipartite.rs Normal file
View file

@ -0,0 +1,378 @@
/*
* This module deals with graph algorithm in complete bipartite
* graphs. It is used in layout.rs to build the partition to node
* assignation.
* */
use std::cmp::{min,max};
use std::collections::VecDeque;
use rand::prelude::SliceRandom;
//Graph data structure for the flow algorithm.
#[derive(Clone,Copy,Debug)]
struct EdgeFlow{
c : i32,
flow : i32,
v : usize,
rev : usize,
}
//Graph data structure for the detection of positive cycles.
#[derive(Clone,Copy,Debug)]
struct WeightedEdge{
w : i32,
u : usize,
v : usize,
}
/* This function takes two matchings (old_match and new_match) in a
* complete bipartite graph. It returns a matching that has the
* same degree as new_match at every vertex, and that is as close
* as possible to old_match.
* */
pub fn optimize_matching( old_match : &Vec<Vec<usize>> ,
new_match : &Vec<Vec<usize>> ,
nb_right : usize )
-> Vec<Vec<usize>> {
let nb_left = old_match.len();
let ed = WeightedEdge{w:-1,u:0,v:0};
let mut edge_vec = vec![ed ; nb_left*nb_right];
//We build the complete bipartite graph structure, represented
//by the list of all edges.
for i in 0..nb_left {
for j in 0..nb_right{
edge_vec[i*nb_right + j].u = i;
edge_vec[i*nb_right + j].v = nb_left+j;
}
}
for i in 0..edge_vec.len() {
//We add the old matchings
if old_match[edge_vec[i].u].contains(&(edge_vec[i].v-nb_left)) {
edge_vec[i].w *= -1;
}
//We add the new matchings
if new_match[edge_vec[i].u].contains(&(edge_vec[i].v-nb_left)) {
(edge_vec[i].u,edge_vec[i].v) =
(edge_vec[i].v,edge_vec[i].u);
edge_vec[i].w *= -1;
}
}
//Now edge_vec is a graph where edges are oriented LR if we
//can add them to new_match, and RL otherwise. If
//adding/removing them makes the matching closer to old_match
//they have weight 1; and -1 otherwise.
//We shuffle the edge list so that there is no bias depending in
//partitions/zone label in the triplet dispersion
let mut rng = rand::thread_rng();
edge_vec.shuffle(&mut rng);
//Discovering and flipping a cycle with positive weight in this
//graph will make the matching closer to old_match.
//We use Bellman Ford algorithm to discover positive cycles
loop{
if let Some(cycle) = positive_cycle(&edge_vec, nb_left, nb_right) {
for i in cycle {
//We flip the edges of the cycle.
(edge_vec[i].u,edge_vec[i].v) =
(edge_vec[i].v,edge_vec[i].u);
edge_vec[i].w *= -1;
}
}
else {
//If there is no cycle, we return the optimal matching.
break;
}
}
//The optimal matching is build from the graph structure.
let mut matching = vec![Vec::<usize>::new() ; nb_left];
for e in edge_vec {
if e.u > e.v {
matching[e.v].push(e.u-nb_left);
}
}
matching
}
//This function finds a positive cycle in a bipartite wieghted graph.
fn positive_cycle( edge_vec : &Vec<WeightedEdge>, nb_left : usize,
nb_right : usize) -> Option<Vec<usize>> {
let nb_side_min = min(nb_left, nb_right);
let nb_vertices = nb_left+nb_right;
let weight_lowerbound = -((nb_left +nb_right) as i32) -1;
let mut accessed = vec![false ; nb_left];
//We try to find a positive cycle accessible from the left
//vertex i.
for i in 0..nb_left{
if accessed[i] {
continue;
}
let mut weight =vec![weight_lowerbound ; nb_vertices];
let mut prev =vec![ edge_vec.len() ; nb_vertices];
weight[i] = 0;
//We compute largest weighted paths from i.
//Since the graph is bipartite, any simple cycle has length
//at most 2*nb_side_min. In the general Bellman-Ford
//algorithm, the bound here is the number of vertices. Since
//the number of partitions can be much larger than the
//number of nodes, we optimize that.
for _ in 0..(2*nb_side_min) {
for j in 0..edge_vec.len() {
let e = edge_vec[j];
if weight[e.v] < weight[e.u]+e.w {
weight[e.v] = weight[e.u]+e.w;
prev[e.v] = j;
}
}
}
//We update the accessed table
for i in 0..nb_left {
if weight[i] > weight_lowerbound {
accessed[i] = true;
}
}
//We detect positive cycle
for e in edge_vec {
if weight[e.v] < weight[e.u]+e.w {
//it means e is on a path branching from a positive cycle
let mut was_seen = vec![false ; nb_vertices];
let mut curr = e.u;
//We track back with prev until we reach the cycle.
while !was_seen[curr]{
was_seen[curr] = true;
curr = edge_vec[prev[curr]].u;
}
//Now curr is on the cycle. We collect the edges ids.
let mut cycle = Vec::<usize>::new();
cycle.push(prev[curr]);
let mut cycle_vert = edge_vec[prev[curr]].u;
while cycle_vert != curr {
cycle.push(prev[cycle_vert]);
cycle_vert = edge_vec[prev[cycle_vert]].u;
}
return Some(cycle);
}
}
}
None
}
// This function takes two arrays of capacity and computes the
// maximal matching in the complete bipartite graph such that the
// left vertex i is matched to left_cap_vec[i] right vertices, and
// the right vertex j is matched to right_cap_vec[j] left vertices.
// To do so, we use Dinic's maximum flow algorithm.
pub fn dinic_compute_matching( left_cap_vec : Vec<u32>,
right_cap_vec : Vec<u32>) -> Vec< Vec<usize> >
{
let mut graph = Vec::<Vec::<EdgeFlow> >::new();
let ed = EdgeFlow{c:0,flow:0,v:0, rev:0};
// 0 will be the source
graph.push(vec![ed ; left_cap_vec.len()]);
for i in 0..left_cap_vec.len()
{
graph[0][i].c = left_cap_vec[i] as i32;
graph[0][i].v = i+2;
graph[0][i].rev = 0;
}
//1 will be the sink
graph.push(vec![ed ; right_cap_vec.len()]);
for i in 0..right_cap_vec.len()
{
graph[1][i].c = right_cap_vec[i] as i32;
graph[1][i].v = i+2+left_cap_vec.len();
graph[1][i].rev = 0;
}
//we add left vertices
for i in 0..left_cap_vec.len() {
graph.push(vec![ed ; 1+right_cap_vec.len()]);
graph[i+2][0].c = 0; //directed
graph[i+2][0].v = 0;
graph[i+2][0].rev = i;
for j in 0..right_cap_vec.len() {
graph[i+2][j+1].c = 1;
graph[i+2][j+1].v = 2+left_cap_vec.len()+j;
graph[i+2][j+1].rev = i+1;
}
}
//we add right vertices
for i in 0..right_cap_vec.len() {
let lft_ln = left_cap_vec.len();
graph.push(vec![ed ; 1+lft_ln]);
graph[i+lft_ln+2][0].c = graph[1][i].c;
graph[i+lft_ln+2][0].v = 1;
graph[i+lft_ln+2][0].rev = i;
for j in 0..left_cap_vec.len() {
graph[i+2+lft_ln][j+1].c = 0; //directed
graph[i+2+lft_ln][j+1].v = j+2;
graph[i+2+lft_ln][j+1].rev = i+1;
}
}
//To ensure the dispersion of the triplets generated by the
//assignation, we shuffle the neighbours of the nodes. Hence,
//left vertices do not consider the right ones in the same order.
let mut rng = rand::thread_rng();
for i in 0..graph.len() {
graph[i].shuffle(&mut rng);
//We need to update the ids of the reverse edges.
for j in 0..graph[i].len() {
let target_v = graph[i][j].v;
let target_rev = graph[i][j].rev;
graph[target_v][target_rev].rev = j;
}
}
let nb_vertices = graph.len();
//We run Dinic's max flow algorithm
loop{
//We build the level array from Dinic's algorithm.
let mut level = vec![-1; nb_vertices];
let mut fifo = VecDeque::new();
fifo.push_back((0,0));
while !fifo.is_empty() {
if let Some((id,lvl)) = fifo.pop_front(){
if level[id] == -1 {
level[id] = lvl;
for e in graph[id].iter(){
if e.c-e.flow > 0{
fifo.push_back((e.v,lvl+1));
}
}
}
}
}
if level[1] == -1 {
//There is no residual flow
break;
}
//Now we run DFS respecting the level array
let mut next_nbd = vec![0; nb_vertices];
let mut lifo = VecDeque::new();
let flow_upper_bound;
if let Some(x) = left_cap_vec.iter().max() {
flow_upper_bound=*x as i32;
}
else {
flow_upper_bound = 0;
assert!(false);
}
lifo.push_back((0,flow_upper_bound));
loop
{
if let Some((id_tmp, f_tmp)) = lifo.back() {
let id = *id_tmp;
let f = *f_tmp;
if id == 1 {
//The DFS reached the sink, we can add a
//residual flow.
lifo.pop_back();
while !lifo.is_empty() {
if let Some((id,_)) = lifo.pop_back(){
let nbd=next_nbd[id];
graph[id][nbd].flow += f;
let id_v = graph[id][nbd].v;
let nbd_v = graph[id][nbd].rev;
graph[id_v][nbd_v].flow -= f;
}
}
lifo.push_back((0,flow_upper_bound));
continue;
}
//else we did not reach the sink
let nbd = next_nbd[id];
if nbd >= graph[id].len() {
//There is nothing to explore from id anymore
lifo.pop_back();
if let Some((parent, _)) = lifo.back(){
next_nbd[*parent] +=1;
}
continue;
}
//else we can try to send flow from id to its nbd
let new_flow = min(f,graph[id][nbd].c
- graph[id][nbd].flow);
if level[graph[id][nbd].v] <= level[id] ||
new_flow == 0 {
//We cannot send flow to nbd.
next_nbd[id] += 1;
continue;
}
//otherwise, we send flow to nbd.
lifo.push_back((graph[id][nbd].v, new_flow));
}
else {
break;
}
}
}
//We return the association
let assoc_table = (0..left_cap_vec.len()).map(
|id| graph[id+2].iter()
.filter(|e| e.flow > 0)
.map( |e| e.v-2-left_cap_vec.len())
.collect()).collect();
//consistency check
//it is a flow
for i in 3..graph.len(){
assert!( graph[i].iter().map(|e| e.flow).sum::<i32>() == 0);
for e in graph[i].iter(){
assert!(e.flow + graph[e.v][e.rev].flow == 0);
}
}
//it solves the matching problem
for i in 0..left_cap_vec.len(){
assert!(left_cap_vec[i] as i32 ==
graph[i+2].iter().map(|e| max(0,e.flow)).sum::<i32>());
}
for i in 0..right_cap_vec.len(){
assert!(right_cap_vec[i] as i32 ==
graph[i+2+left_cap_vec.len()].iter()
.map(|e| max(0,e.flow)).sum::<i32>());
}
assoc_table
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_flow() {
let left_vec = vec![3;8];
let right_vec = vec![0,4,8,4,8];
//There are asserts in the function that computes the flow
let _ = dinic_compute_matching(left_vec, right_vec);
}
//maybe add tests relative to the matching optilization ?
}

View file

@ -4,6 +4,7 @@
extern crate tracing;
pub mod background;
pub mod bipartite;
pub mod config;
pub mod crdt;
pub mod data;