Locally, transactions

This commit is contained in:
Alex Auvolat 2020-04-09 20:58:39 +02:00
parent 1d786c2c66
commit a3eb88e601
3 changed files with 56 additions and 17 deletions

View file

@ -54,3 +54,12 @@ impl Error {
} }
} }
} }
impl From<sled::TransactionError<Error>> for Error {
fn from(e: sled::TransactionError<Error>) -> Error {
match e {
sled::TransactionError::Abort(x) => x,
sled::TransactionError::Storage(x) => Error::Sled(x),
}
}
}

View file

@ -47,20 +47,25 @@ impl Entry<String, String> for Object {
&self.key &self.key
} }
fn merge(&mut self, other: &Self) { fn merge(&mut self, other: &Self) -> bool {
let mut has_change = false;
for other_v in other.versions.iter() { for other_v in other.versions.iter() {
match self.versions.binary_search_by(|v| (v.timestamp, &v.uuid).cmp(&(other_v.timestamp, &other_v.uuid))) { match self.versions.binary_search_by(|v| (v.timestamp, &v.uuid).cmp(&(other_v.timestamp, &other_v.uuid))) {
Ok(i) => { Ok(i) => {
let mut v = &mut self.versions[i]; let mut v = &mut self.versions[i];
if other_v.size > v.size { if other_v.size > v.size {
v.size = other_v.size; v.size = other_v.size;
has_change = true;
} }
if other_v.is_complete { if other_v.is_complete && !v.is_complete {
v.is_complete = true; v.is_complete = true;
has_change = true;
} }
} }
Err(i) => { Err(i) => {
self.versions.insert(i, other_v.clone()); self.versions.insert(i, other_v.clone());
has_change = true;
} }
} }
} }
@ -73,6 +78,8 @@ impl Entry<String, String> for Object {
if let Some(last_vi) = last_complete { if let Some(last_vi) = last_complete {
self.versions = self.versions.drain(last_vi..).collect::<Vec<_>>(); self.versions = self.versions.drain(last_vi..).collect::<Vec<_>>();
} }
has_change
} }
} }

View file

@ -76,7 +76,7 @@ pub trait Entry<P: PartitionKey, S: SortKey>: Clone + Serialize + for<'de> Deser
fn partition_key(&self) -> &P; fn partition_key(&self) -> &P;
fn sort_key(&self) -> &S; fn sort_key(&self) -> &S;
fn merge(&mut self, other: &Self); fn merge(&mut self, other: &Self) -> bool;
} }
#[derive(Clone, Serialize, Deserialize)] #[derive(Clone, Serialize, Deserialize)]
@ -152,13 +152,17 @@ impl<F: TableFormat + 'static> Table<F> {
.await?; .await?;
let mut ret = None; let mut ret = None;
let mut not_all_same = false;
for resp in resps { for resp in resps {
if let TableRPC::ReadEntryResponse(value) = resp { if let TableRPC::ReadEntryResponse(value) = resp {
if let Some(v) = value { if let Some(v) = value {
ret = match ret { ret = match ret {
None => Some(v), None => Some(v),
Some(mut x) => { Some(mut x) => {
x.merge(&v); let updated = x.merge(&v);
if updated {
not_all_same = true;
}
Some(x) Some(x)
} }
} }
@ -167,6 +171,16 @@ impl<F: TableFormat + 'static> Table<F> {
return Err(Error::Message(format!("Invalid return value to read"))); return Err(Error::Message(format!("Invalid return value to read")));
} }
} }
if let Some(ret_entry) = &ret {
if not_all_same {
// Repair on read
let _: Result<_, _> = self.rpc_try_call_many(
&who[..],
&TableRPC::<F>::Update(vec![ret_entry.clone()]),
who.len())
.await;
}
}
Ok(ret) Ok(ret)
} }
@ -221,22 +235,31 @@ impl<F: TableFormat + 'static> Table<F> {
} }
async fn handle_update(&self, mut entries: Vec<F::E>) -> Result<(), Error> { async fn handle_update(&self, mut entries: Vec<F::E>) -> Result<(), Error> {
for mut entry in entries.drain(..) { for update in entries.drain(..) {
let tree_key = self.tree_key(entry.partition_key(), entry.sort_key()); let tree_key = self.tree_key(update.partition_key(), update.sort_key());
let old_val = match self.store.get(&tree_key)? { let (old_entry, new_entry) = self.store.transaction(|db| {
let mut new_entry = update.clone();
let old_entry = match db.get(&tree_key)? {
Some(prev_bytes) => { Some(prev_bytes) => {
let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&prev_bytes)?; let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&prev_bytes)
entry.merge(&old_entry); .map_err(Error::RMPDecode)
.map_err(sled::ConflictableTransactionError::Abort)?;
new_entry.merge(&old_entry);
Some(old_entry) Some(old_entry)
} }
None => None None => None
}; };
let new_bytes = rmp_to_vec_all_named(&entry)?; let new_bytes = rmp_to_vec_all_named(&new_entry)
self.store.insert(&tree_key, new_bytes)?; .map_err(Error::RMPEncode)
.map_err(sled::ConflictableTransactionError::Abort)?;
db.insert(tree_key.clone(), new_bytes)?;
Ok((old_entry, new_entry))
})?;
self.instance.updated(old_val.as_ref(), &entry).await; self.instance.updated(old_entry.as_ref(), &new_entry).await;
} }
Ok(()) Ok(())
} }