Better prints again, and a great question

This commit is contained in:
Alex Auvolat 2020-04-16 19:57:13 +02:00
parent 2a05fd135a
commit 768d22ccdb
2 changed files with 11 additions and 4 deletions

2
TODO
View file

@ -5,6 +5,8 @@ Replication
- every node watches the current ring and state of the network - every node watches the current ring and state of the network
- and thus determines the interval of tokens for which they are responsible - and thus determines the interval of tokens for which they are responsible
How are we going to test that our replication method works correctly?
We will have to introduce lots of dummy data and then add/remove nodes many times.
To do list To do list

View file

@ -149,9 +149,8 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
} }
async fn sync_partition(self: Arc<Self>, partition: &Partition, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> { async fn sync_partition(self: Arc<Self>, partition: &Partition, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> {
eprintln!("({}) Calculating root checksum for {:?}...", self.table.name, partition); eprintln!("({}) Preparing to sync {:?}...", self.table.name, partition);
let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit).await?; let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit).await?;
eprintln!("({}) Root checksum for {:?}: {:?}", self.table.name, partition, root_cks);
let nodes = self.table.system.ring.borrow().clone().walk_ring(&partition.begin, self.table.param.replication_factor); let nodes = self.table.system.ring.borrow().clone().walk_ring(&partition.begin, self.table.param.replication_factor);
let mut sync_futures = nodes.iter() let mut sync_futures = nodes.iter()
@ -196,9 +195,14 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
drop(cache); drop(cache);
let v = self.range_checksum_inner(&range, must_exit).await?; let v = self.range_checksum_inner(&range, must_exit).await?;
eprintln!("({}) New checksum calculated for {}-{}/{}, {} children",
self.table.name,
hex::encode(&range.begin[..]),
hex::encode(&range.end[..]),
range.level,
v.children.len());
let mut cache = self.cache[range.level].lock().await; let mut cache = self.cache[range.level].lock().await;
eprintln!("({}) Checksum for {:?}: {:?}", self.table.name, range, v);
cache.insert(range.clone(), v.clone()); cache.insert(range.clone(), v.clone());
Ok(v) Ok(v)
}.boxed() }.boxed()
@ -281,7 +285,8 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
todo.push_back(root_ck); todo.push_back(root_ck);
while !todo.is_empty() && !*must_exit.borrow() { while !todo.is_empty() && !*must_exit.borrow() {
eprintln!("({}) Sync with {:?}: {} remaining", self.table.name, who, todo.len()); let total_children = todo.iter().map(|x| x.children.len()).fold(0, |x, y| x + y);
eprintln!("({}) Sync with {:?}: {} ({}) remaining", self.table.name, who, todo.len(), total_children);
let end = std::cmp::min(16, todo.len()); let end = std::cmp::min(16, todo.len());
let step = todo.drain(..end).collect::<Vec<_>>(); let step = todo.drain(..end).collect::<Vec<_>>();