Forgot to recalculate hash at strategic locations

This commit is contained in:
Alex Auvolat 2020-04-06 23:10:28 +02:00
parent a09f019cc5
commit a7b85146fe

View file

@ -70,9 +70,12 @@ impl Members {
nodes.sort_by_key(|(id, _status)| *id); nodes.sort_by_key(|(id, _status)| *id);
let mut hasher = Sha256::new(); let mut hasher = Sha256::new();
eprintln!("Current set of pingable nodes: --");
for (id, status) in nodes { for (id, status) in nodes {
eprintln!("{} {}", hex::encode(id), status.addr);
hasher.input(format!("{} {}\n", hex::encode(id), status.addr)); hasher.input(format!("{} {}\n", hex::encode(id), status.addr));
} }
eprintln!("END --");
self.status_hash.copy_from_slice(&hasher.result()[..]); self.status_hash.copy_from_slice(&hasher.result()[..]);
} }
} }
@ -85,18 +88,20 @@ pub struct NodeStatus {
impl System { impl System {
pub fn new(config: Config, id: UUID) -> Self { pub fn new(config: Config, id: UUID) -> Self {
System{ let mut members = Members{
config,
id,
rpc_client: Client::new(),
members: RwLock::new(Members{
status: HashMap::new(), status: HashMap::new(),
status_hash: [0u8; 32], status_hash: [0u8; 32],
config: NetworkConfig{ config: NetworkConfig{
members: HashMap::new(), members: HashMap::new(),
version: 0, version: 0,
}, },
}), };
members.recalculate_status_hash();
System{
config,
id,
rpc_client: Client::new(),
members: RwLock::new(members),
} }
} }
@ -197,9 +202,9 @@ impl System {
propagate.push(node.clone()); propagate.push(node.clone());
} }
} }
drop(members);
if propagate.len() > 0 { if propagate.len() > 0 {
members.recalculate_status_hash();
tokio::spawn(self.clone().broadcast(Message::AdvertiseNodesUp(propagate), PING_TIMEOUT)); tokio::spawn(self.clone().broadcast(Message::AdvertiseNodesUp(propagate), PING_TIMEOUT));
} }
@ -242,9 +247,14 @@ impl System {
})).await; })).await;
let mut members = self.members.write().await; let mut members = self.members.write().await;
let mut has_changes = false;
for (id, addr, ping_resp) in ping_resps { for (id, addr, ping_resp) in ping_resps {
if let Ok(Message::Ping(ping)) = ping_resp { if let Ok(Message::Ping(ping)) = ping_resp {
let is_new = members.handle_ping(addr.ip(), &ping); let is_new = members.handle_ping(addr.ip(), &ping);
if is_new {
has_changes = true;
}
if is_new || members.status_hash != ping.status_hash { if is_new || members.status_hash != ping.status_hash {
tokio::spawn(self.clone().pull_status(ping.id.clone())); tokio::spawn(self.clone().pull_status(ping.id.clone()));
} }
@ -256,6 +266,7 @@ impl System {
if remaining_attempts == 0 { if remaining_attempts == 0 {
eprintln!("Removing node {} after too many failed pings", hex::encode(id)); eprintln!("Removing node {} after too many failed pings", hex::encode(id));
members.status.remove(id); members.status.remove(id);
has_changes = true;
} else { } else {
if let Some(st) = members.status.get_mut(id) { if let Some(st) = members.status.get_mut(id) {
st.remaining_ping_attempts = remaining_attempts - 1; st.remaining_ping_attempts = remaining_attempts - 1;
@ -263,6 +274,9 @@ impl System {
} }
} }
} }
if has_changes {
members.recalculate_status_hash();
}
drop(members); drop(members);
restart_at.await restart_at.await