Add hostname to node info

This commit is contained in:
Alex Auvolat 2020-04-19 19:08:48 +02:00
parent a6129d8626
commit e325c7f47a
5 changed files with 52 additions and 11 deletions

11
Cargo.lock generated
View file

@ -304,6 +304,7 @@ dependencies = [
"futures-channel 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "futures-channel 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-core 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "futures-core 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-util 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "futures-util 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
"gethostname 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper 0.13.4 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.13.4 (registry+https://github.com/rust-lang/crates.io-index)",
@ -332,6 +333,15 @@ dependencies = [
"typenum 1.11.2 (registry+https://github.com/rust-lang/crates.io-index)", "typenum 1.11.2 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "gethostname"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"libc 0.2.68 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "getrandom" name = "getrandom"
version = "0.1.14" version = "0.1.14"
@ -1273,6 +1283,7 @@ dependencies = [
"checksum futures-util 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "22766cf25d64306bedf0384da004d05c9974ab104fcc4528f1236181c18004c5" "checksum futures-util 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "22766cf25d64306bedf0384da004d05c9974ab104fcc4528f1236181c18004c5"
"checksum fxhash 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" "checksum fxhash 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c"
"checksum generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c68f0274ae0e023facc3c97b2e00f076be70e254bc851d972503b328db79b2ec" "checksum generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c68f0274ae0e023facc3c97b2e00f076be70e254bc851d972503b328db79b2ec"
"checksum gethostname 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e692e296bfac1d2533ef168d0b60ff5897b8b70a4009276834014dd8924cc028"
"checksum getrandom 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb" "checksum getrandom 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb"
"checksum h2 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "377038bf3c89d18d6ca1431e7a5027194fbd724ca10592b9487ede5e8e144f42" "checksum h2 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "377038bf3c89d18d6ca1431e7a5027194fbd724ca10592b9487ede5e8e144f42"
"checksum heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205" "checksum heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205"

View file

@ -30,6 +30,7 @@ async-trait = "0.1.30"
reduce = "0.1.2" reduce = "0.1.2"
serde_json = "1.0" serde_json = "1.0"
arc-swap = "0.4" arc-swap = "0.4"
gethostname = "0.2"
rustls = "0.17" rustls = "0.17"
tokio-rustls = "0.13" tokio-rustls = "0.13"

View file

@ -247,8 +247,8 @@ async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Re
for adv in status.iter() { for adv in status.iter() {
if let Some(cfg) = config.members.get(&adv.id) { if let Some(cfg) = config.members.get(&adv.id) {
println!( println!(
"{:?}\t{}\t{}\t{}", "{:?}\t{}\t{}\t{}\t{}",
adv.id, cfg.datacenter, cfg.n_tokens, adv.addr adv.id, adv.state_info.hostname, adv.addr, cfg.datacenter, cfg.n_tokens
); );
} }
} }
@ -274,7 +274,7 @@ async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Re
println!("\nUnconfigured nodes:"); println!("\nUnconfigured nodes:");
for adv in status.iter() { for adv in status.iter() {
if !config.members.contains_key(&adv.id) { if !config.members.contains_key(&adv.id) {
println!("{:?}\t{}", adv.id, adv.addr); println!("{:?}\t{}\t{}", adv.id, adv.state_info.hostname, adv.addr);
} }
} }
} }

View file

@ -48,12 +48,15 @@ pub struct PingMessage {
pub status_hash: Hash, pub status_hash: Hash,
pub config_version: u64, pub config_version: u64,
pub state_info: StateInfo,
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AdvertisedNode { pub struct AdvertisedNode {
pub id: UUID, pub id: UUID,
pub addr: SocketAddr, pub addr: SocketAddr,
pub state_info: StateInfo,
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
@ -72,6 +75,8 @@ pub struct System {
pub config: Config, pub config: Config,
pub id: UUID, pub id: UUID,
pub state_info: StateInfo,
pub rpc_http_client: Arc<RpcHttpClient>, pub rpc_http_client: Arc<RpcHttpClient>,
rpc_client: Arc<RpcClient<Message>>, rpc_client: Arc<RpcClient<Message>>,
@ -85,14 +90,20 @@ pub struct System {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Status { pub struct Status {
pub nodes: HashMap<UUID, NodeStatus>, pub nodes: HashMap<UUID, StatusEntry>,
pub hash: Hash, pub hash: Hash,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct NodeStatus { pub struct StatusEntry {
pub addr: SocketAddr, pub addr: SocketAddr,
pub remaining_ping_attempts: usize, pub remaining_ping_attempts: usize,
pub state_info: StateInfo,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StateInfo {
pub hostname: String,
} }
#[derive(Clone)] #[derive(Clone)]
@ -114,9 +125,10 @@ impl Status {
let addr = SocketAddr::new(ip, info.rpc_port); let addr = SocketAddr::new(ip, info.rpc_port);
let old_status = self.nodes.insert( let old_status = self.nodes.insert(
info.id.clone(), info.id.clone(),
NodeStatus { StatusEntry {
addr: addr.clone(), addr: addr.clone(),
remaining_ping_attempts: MAX_FAILED_PINGS, remaining_ping_attempts: MAX_FAILED_PINGS,
state_info: info.state_info.clone(),
}, },
); );
match old_status { match old_status {
@ -268,6 +280,12 @@ impl System {
status.recalculate_hash(); status.recalculate_hash();
let (update_status, status) = watch::channel(Arc::new(status)); let (update_status, status) = watch::channel(Arc::new(status));
let state_info = StateInfo {
hostname: gethostname::gethostname()
.into_string()
.unwrap_or("<invalid utf-8>".to_string()),
};
let mut ring = Ring { let mut ring = Ring {
config: net_config, config: net_config,
ring: Vec::new(), ring: Vec::new(),
@ -289,6 +307,7 @@ impl System {
let sys = Arc::new(System { let sys = Arc::new(System {
config, config,
id, id,
state_info,
rpc_http_client, rpc_http_client,
rpc_client, rpc_client,
status, status,
@ -346,6 +365,7 @@ impl System {
rpc_port: self.config.rpc_port, rpc_port: self.config.rpc_port,
status_hash: status.hash.clone(), status_hash: status.hash.clone(),
config_version: ring.config.version, config_version: ring.config.version,
state_info: self.state_info.clone(),
}) })
} }
@ -408,6 +428,7 @@ impl System {
to_advertise.push(AdvertisedNode { to_advertise.push(AdvertisedNode {
id: info.id.clone(), id: info.id.clone(),
addr: addr.clone(), addr: addr.clone(),
state_info: info.state_info.clone(),
}); });
} }
if is_new || status.hash != info.status_hash { if is_new || status.hash != info.status_hash {
@ -486,9 +507,15 @@ impl System {
let status = self.status.borrow().clone(); let status = self.status.borrow().clone();
let mut mem = vec![]; let mut mem = vec![];
for (node, status) in status.nodes.iter() { for (node, status) in status.nodes.iter() {
let state_info = if *node == self.id {
self.state_info.clone()
} else {
status.state_info.clone()
};
mem.push(AdvertisedNode { mem.push(AdvertisedNode {
id: node.clone(), id: node.clone(),
addr: status.addr.clone(), addr: status.addr.clone(),
state_info,
}); });
} }
Ok(Message::AdvertiseNodesUp(mem)) Ok(Message::AdvertiseNodesUp(mem))
@ -515,9 +542,10 @@ impl System {
let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_port); let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_port);
let old_self = status.nodes.insert( let old_self = status.nodes.insert(
node.id.clone(), node.id.clone(),
NodeStatus { StatusEntry {
addr: self_addr, addr: self_addr,
remaining_ping_attempts: MAX_FAILED_PINGS, remaining_ping_attempts: MAX_FAILED_PINGS,
state_info: self.state_info.clone(),
}, },
); );
has_changed = match old_self { has_changed = match old_self {

View file

@ -231,10 +231,11 @@ impl RpcHttpClient {
let status = resp.status(); let status = resp.status();
let body = hyper::body::to_bytes(resp.into_body()).await?; let body = hyper::body::to_bytes(resp.into_body()).await?;
match rmp_serde::decode::from_read::<_, Result<M, String>>(body.into_buf()) { match rmp_serde::decode::from_read::<_, Result<M, String>>(body.into_buf()) {
Err(e) => Err(e) => Err(Error::RPCError(
Err(Error::RPCError(format!("Invalid reply"), status)), format!("Invalid reply (deserialize error: {})", e),
Ok(Err(e)) => status,
Err(Error::RPCError(e, status)), )),
Ok(Err(e)) => Err(Error::RPCError(e, status)),
Ok(Ok(x)) => Ok(x), Ok(Ok(x)) => Ok(x),
} }
} }