Make updated() be a sync function that doesn't fail

This commit is contained in:
Alex Auvolat 2021-02-23 20:25:15 +01:00
parent 28bc967c83
commit bf25c95fe2
8 changed files with 62 additions and 83 deletions

View file

@ -1,10 +1,8 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::Arc; use std::sync::Arc;
use garage_util::background::*; use garage_util::background::*;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error;
use garage_table::*; use garage_table::*;
@ -42,24 +40,26 @@ pub struct BlockRefTable {
pub block_manager: Arc<BlockManager>, pub block_manager: Arc<BlockManager>,
} }
#[async_trait]
impl TableSchema for BlockRefTable { impl TableSchema for BlockRefTable {
type P = Hash; type P = Hash;
type S = UUID; type S = UUID;
type E = BlockRef; type E = BlockRef;
type Filter = DeletedFilter; type Filter = DeletedFilter;
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> { fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
let block = &old.as_ref().or(new.as_ref()).unwrap().block; let block = &old.as_ref().or(new.as_ref()).unwrap().block;
let was_before = old.as_ref().map(|x| !x.deleted).unwrap_or(false); let was_before = old.as_ref().map(|x| !x.deleted).unwrap_or(false);
let is_after = new.as_ref().map(|x| !x.deleted).unwrap_or(false); let is_after = new.as_ref().map(|x| !x.deleted).unwrap_or(false);
if is_after && !was_before { if is_after && !was_before {
self.block_manager.block_incref(block)?; if let Err(e) = self.block_manager.block_incref(block) {
warn!("block_incref failed for block {:?}: {}", block, e);
}
} }
if was_before && !is_after { if was_before && !is_after {
self.block_manager.block_decref(block)?; if let Err(e) = self.block_manager.block_decref(block) {
warn!("block_decref failed for block {:?}: {}", block, e);
}
} }
Ok(())
} }
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {

View file

@ -1,10 +1,8 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use garage_table::crdt::CRDT; use garage_table::crdt::CRDT;
use garage_table::*; use garage_table::*;
use garage_util::error::Error;
use crate::key_table::PermissionSet; use crate::key_table::PermissionSet;
@ -100,17 +98,12 @@ impl Entry<EmptyKey, String> for Bucket {
pub struct BucketTable; pub struct BucketTable;
#[async_trait]
impl TableSchema for BucketTable { impl TableSchema for BucketTable {
type P = EmptyKey; type P = EmptyKey;
type S = String; type S = String;
type E = Bucket; type E = Bucket;
type Filter = DeletedFilter; type Filter = DeletedFilter;
async fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) -> Result<(), Error> {
Ok(())
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
filter.apply(entry.is_deleted()) filter.apply(entry.is_deleted())
} }

View file

@ -1,11 +1,8 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use garage_table::crdt::CRDT; use garage_table::crdt::CRDT;
use garage_table::*; use garage_table::*;
use garage_util::error::Error;
use model010::key_table as prev; use model010::key_table as prev;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
@ -92,17 +89,12 @@ impl Entry<EmptyKey, String> for Key {
pub struct KeyTable; pub struct KeyTable;
#[async_trait]
impl TableSchema for KeyTable { impl TableSchema for KeyTable {
type P = EmptyKey; type P = EmptyKey;
type S = String; type S = String;
type E = Key; type E = Key;
type Filter = DeletedFilter; type Filter = DeletedFilter;
async fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) -> Result<(), Error> {
Ok(())
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
filter.apply(entry.deleted.get()) filter.apply(entry.deleted.get())
} }

View file

@ -1,11 +1,9 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::sync::Arc; use std::sync::Arc;
use garage_util::background::BackgroundRunner; use garage_util::background::BackgroundRunner;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error;
use garage_table::table_sharded::*; use garage_table::table_sharded::*;
use garage_table::*; use garage_table::*;
@ -191,41 +189,42 @@ pub struct ObjectTable {
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>, pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
} }
#[async_trait]
impl TableSchema for ObjectTable { impl TableSchema for ObjectTable {
type P = String; type P = String;
type S = String; type S = String;
type E = Object; type E = Object;
type Filter = DeletedFilter; type Filter = DeletedFilter;
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> { fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
let version_table = self.version_table.clone(); let version_table = self.version_table.clone();
if let (Some(old_v), Some(new_v)) = (old, new) { self.background.spawn(async move {
// Propagate deletion of old versions if let (Some(old_v), Some(new_v)) = (old, new) {
for v in old_v.versions.iter() { // Propagate deletion of old versions
let newly_deleted = match new_v for v in old_v.versions.iter() {
.versions let newly_deleted = match new_v
.binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())) .versions
{ .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key()))
Err(_) => true, {
Ok(i) => { Err(_) => true,
new_v.versions[i].state == ObjectVersionState::Aborted Ok(i) => {
&& v.state != ObjectVersionState::Aborted new_v.versions[i].state == ObjectVersionState::Aborted
&& v.state != ObjectVersionState::Aborted
}
};
if newly_deleted {
let deleted_version = Version::new(
v.uuid,
old_v.bucket.clone(),
old_v.key.clone(),
true,
vec![],
);
version_table.insert(&deleted_version).await?;
} }
};
if newly_deleted {
let deleted_version = Version::new(
v.uuid,
old_v.bucket.clone(),
old_v.key.clone(),
true,
vec![],
);
version_table.insert(&deleted_version).await?;
} }
} }
} Ok(())
Ok(()) })
} }
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {

View file

@ -1,10 +1,8 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::Arc; use std::sync::Arc;
use garage_util::background::BackgroundRunner; use garage_util::background::BackgroundRunner;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error;
use garage_table::table_sharded::*; use garage_table::table_sharded::*;
use garage_table::*; use garage_table::*;
@ -112,31 +110,32 @@ pub struct VersionTable {
pub block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>, pub block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
} }
#[async_trait]
impl TableSchema for VersionTable { impl TableSchema for VersionTable {
type P = Hash; type P = Hash;
type S = EmptyKey; type S = EmptyKey;
type E = Version; type E = Version;
type Filter = DeletedFilter; type Filter = DeletedFilter;
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> { fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
let block_ref_table = self.block_ref_table.clone(); let block_ref_table = self.block_ref_table.clone();
if let (Some(old_v), Some(new_v)) = (old, new) { self.background.spawn(async move {
// Propagate deletion of version blocks if let (Some(old_v), Some(new_v)) = (old, new) {
if new_v.deleted && !old_v.deleted { // Propagate deletion of version blocks
let deleted_block_refs = old_v if new_v.deleted && !old_v.deleted {
.blocks let deleted_block_refs = old_v
.iter() .blocks
.map(|vb| BlockRef { .iter()
block: vb.hash, .map(|vb| BlockRef {
version: old_v.uuid, block: vb.hash,
deleted: true, version: old_v.uuid,
}) deleted: true,
.collect::<Vec<_>>(); })
block_ref_table.insert_many(&deleted_block_refs[..]).await?; .collect::<Vec<_>>();
block_ref_table.insert_many(&deleted_block_refs[..]).await?;
}
} }
} Ok(())
Ok(()) })
} }
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {

View file

@ -1,8 +1,6 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error;
pub trait PartitionKey { pub trait PartitionKey {
fn hash(&self) -> Hash; fn hash(&self) -> Hash;
@ -45,7 +43,6 @@ pub trait Entry<P: PartitionKey, S: SortKey>:
fn merge(&mut self, other: &Self); fn merge(&mut self, other: &Self);
} }
#[async_trait]
pub trait TableSchema: Send + Sync { pub trait TableSchema: Send + Sync {
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
@ -58,7 +55,12 @@ pub trait TableSchema: Send + Sync {
None None
} }
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error>; // Updated triggers some stuff downstream, but it is not supposed to block or fail,
// as the update itself is an unchangeable fact that will never go back
// due to CRDT logic. Typically errors in propagation of info should be logged
// to stderr.
fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {}
fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool {
true true
} }

View file

@ -414,7 +414,7 @@ where
epidemic_propagate.push(new_entry.clone()); epidemic_propagate.push(new_entry.clone());
} }
self.instance.updated(old_entry, Some(new_entry)).await?; self.instance.updated(old_entry, Some(new_entry));
syncer.invalidate(&tree_key[..]); syncer.invalidate(&tree_key[..]);
} }
} }
@ -429,7 +429,7 @@ where
Ok(()) Ok(())
} }
pub(crate) async fn delete_if_equal( pub(crate) fn delete_if_equal(
self: &Arc<Self>, self: &Arc<Self>,
k: &[u8], k: &[u8],
v: &[u8], v: &[u8],
@ -445,7 +445,7 @@ where
})?; })?;
if removed { if removed {
let old_entry = self.decode_entry(v)?; let old_entry = self.decode_entry(v)?;
self.instance.updated(Some(old_entry), None).await?; self.instance.updated(Some(old_entry), None);
self.syncer.load_full().unwrap().invalidate(k); self.syncer.load_full().unwrap().invalidate(k);
} }
Ok(removed) Ok(removed)

View file

@ -348,14 +348,8 @@ where
} }
// All remote nodes have written those items, now we can delete them locally // All remote nodes have written those items, now we can delete them locally
for was_removed in join_all( for (k, v) in items.iter() {
items self.table.delete_if_equal(&k[..], &v[..])?;
.iter()
.map(|(k, v)| self.table.delete_if_equal(&k[..], &v[..])),
)
.await
{
was_removed?;
} }
Ok(()) Ok(())