Slight refactoring to make things clearer with DeletedFilter

This commit is contained in:
Alex Auvolat 2020-11-20 20:11:04 +01:00
parent b9e6b007a3
commit e9fd265ce6
11 changed files with 90 additions and 55 deletions

View file

@ -10,6 +10,8 @@ use garage_util::error::Error;
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_model::object_table::*; use garage_model::object_table::*;
use garage_table::DeletedFilter;
use crate::encoding::*; use crate::encoding::*;
#[derive(Debug)] #[derive(Debug)]
@ -41,7 +43,7 @@ pub async fn handle_list(
.get_range( .get_range(
&bucket.to_string(), &bucket.to_string(),
Some(next_chunk_start.clone()), Some(next_chunk_start.clone()),
Some(()), Some(DeletedFilter::NotDeleted),
max_keys + 1, max_keys + 1,
) )
.await?; .await?;

View file

@ -67,7 +67,7 @@ impl AdminRpcHandler {
let bucket_names = self let bucket_names = self
.garage .garage
.bucket_table .bucket_table
.get_range(&EmptyKey, None, Some(()), 10000) .get_range(&EmptyKey, None, Some(DeletedFilter::NotDeleted), 10000)
.await? .await?
.iter() .iter()
.map(|b| b.name.to_string()) .map(|b| b.name.to_string())
@ -101,7 +101,7 @@ impl AdminRpcHandler {
let objects = self let objects = self
.garage .garage
.object_table .object_table
.get_range(&query.name, None, Some(()), 10) .get_range(&query.name, None, Some(DeletedFilter::NotDeleted), 10)
.await?; .await?;
if !objects.is_empty() { if !objects.is_empty() {
return Err(Error::BadRPC(format!("Bucket {} is not empty", query.name))); return Err(Error::BadRPC(format!("Bucket {} is not empty", query.name)));
@ -170,7 +170,7 @@ impl AdminRpcHandler {
let key_ids = self let key_ids = self
.garage .garage
.key_table .key_table
.get_range(&EmptyKey, None, Some(()), 10000) .get_range(&EmptyKey, None, Some(DeletedFilter::NotDeleted), 10000)
.await? .await?
.iter() .iter()
.map(|k| (k.key_id.to_string(), k.name.to_string())) .map(|k| (k.key_id.to_string(), k.name.to_string()))

View file

@ -20,7 +20,7 @@ use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*; use garage_rpc::rpc_server::*;
use garage_table::table_sharded::TableShardedReplication; use garage_table::table_sharded::TableShardedReplication;
use garage_table::TableReplication; use garage_table::{TableReplication, DeletedFilter};
use crate::block_ref_table::*; use crate::block_ref_table::*;
@ -306,7 +306,7 @@ impl BlockManager {
let garage = self.garage.load_full().unwrap(); let garage = self.garage.load_full().unwrap();
let active_refs = garage let active_refs = garage
.block_ref_table .block_ref_table
.get_range(&hash, None, Some(()), 1) .get_range(&hash, None, Some(DeletedFilter::NotDeleted), 1)
.await?; .await?;
let needed_by_others = !active_refs.is_empty(); let needed_by_others = !active_refs.is_empty();
if needed_by_others { if needed_by_others {

View file

@ -47,7 +47,7 @@ impl TableSchema for BlockRefTable {
type P = Hash; type P = Hash;
type S = UUID; type S = UUID;
type E = BlockRef; type E = BlockRef;
type Filter = (); type Filter = DeletedFilter;
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> { async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> {
let block = &old.as_ref().or(new.as_ref()).unwrap().block; let block = &old.as_ref().or(new.as_ref()).unwrap().block;
@ -62,7 +62,7 @@ impl TableSchema for BlockRefTable {
Ok(()) Ok(())
} }
fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
!entry.deleted filter.apply(entry.deleted)
} }
} }

View file

@ -104,18 +104,19 @@ impl Entry<EmptyKey, String> for Bucket {
pub struct BucketTable; pub struct BucketTable;
#[async_trait] #[async_trait]
impl TableSchema for BucketTable { impl TableSchema for BucketTable {
type P = EmptyKey; type P = EmptyKey;
type S = String; type S = String;
type E = Bucket; type E = Bucket;
type Filter = (); type Filter = DeletedFilter;
async fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) -> Result<(), Error> { async fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) -> Result<(), Error> {
Ok(()) Ok(())
} }
fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
!entry.deleted filter.apply(entry.deleted)
} }
} }

View file

@ -142,13 +142,13 @@ impl TableSchema for KeyTable {
type P = EmptyKey; type P = EmptyKey;
type S = String; type S = String;
type E = Key; type E = Key;
type Filter = (); type Filter = DeletedFilter;
async fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) -> Result<(), Error> { async fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) -> Result<(), Error> {
Ok(()) Ok(())
} }
fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
!entry.deleted filter.apply(entry.deleted)
} }
} }

View file

@ -196,7 +196,7 @@ impl TableSchema for ObjectTable {
type P = String; type P = String;
type S = String; type S = String;
type E = Object; type E = Object;
type Filter = (); type Filter = DeletedFilter;
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> { async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> {
let version_table = self.version_table.clone(); let version_table = self.version_table.clone();
@ -228,8 +228,9 @@ impl TableSchema for ObjectTable {
Ok(()) Ok(())
} }
fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
entry.versions.iter().any(|v| v.is_data()) let deleted = !entry.versions.iter().any(|v| v.is_data());
filter.apply(deleted)
} }
fn try_migrate(bytes: &[u8]) -> Option<Self::E> { fn try_migrate(bytes: &[u8]) -> Option<Self::E> {

View file

@ -117,7 +117,7 @@ impl TableSchema for VersionTable {
type P = Hash; type P = Hash;
type S = EmptyKey; type S = EmptyKey;
type E = Version; type E = Version;
type Filter = (); type Filter = DeletedFilter;
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> { async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> {
let block_ref_table = self.block_ref_table.clone(); let block_ref_table = self.block_ref_table.clone();
@ -139,7 +139,7 @@ impl TableSchema for VersionTable {
Ok(()) Ok(())
} }
fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
!entry.deleted filter.apply(entry.deleted)
} }
} }

View file

@ -4,10 +4,13 @@
extern crate log; extern crate log;
pub mod schema; pub mod schema;
pub mod util;
pub mod table; pub mod table;
pub mod table_fullcopy; pub mod table_fullcopy;
pub mod table_sharded; pub mod table_sharded;
pub mod table_sync; pub mod table_sync;
pub use schema::*; pub use schema::*;
pub use util::*;
pub use table::*; pub use table::*;

View file

@ -8,10 +8,36 @@ pub trait PartitionKey {
fn hash(&self) -> Hash; fn hash(&self) -> Hash;
} }
impl PartitionKey for String {
fn hash(&self) -> Hash {
hash(self.as_bytes())
}
}
impl PartitionKey for Hash {
fn hash(&self) -> Hash {
self.clone()
}
}
pub trait SortKey { pub trait SortKey {
fn sort_key(&self) -> &[u8]; fn sort_key(&self) -> &[u8];
} }
impl SortKey for String {
fn sort_key(&self) -> &[u8] {
self.as_bytes()
}
}
impl SortKey for Hash {
fn sort_key(&self) -> &[u8] {
self.as_slice()
}
}
pub trait Entry<P: PartitionKey, S: SortKey>: pub trait Entry<P: PartitionKey, S: SortKey>:
PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
{ {
@ -21,40 +47,6 @@ pub trait Entry<P: PartitionKey, S: SortKey>:
fn merge(&mut self, other: &Self); fn merge(&mut self, other: &Self);
} }
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct EmptyKey;
impl SortKey for EmptyKey {
fn sort_key(&self) -> &[u8] {
&[]
}
}
impl PartitionKey for EmptyKey {
fn hash(&self) -> Hash {
[0u8; 32].into()
}
}
impl PartitionKey for String {
fn hash(&self) -> Hash {
hash(self.as_bytes())
}
}
impl SortKey for String {
fn sort_key(&self) -> &[u8] {
self.as_bytes()
}
}
impl PartitionKey for Hash {
fn hash(&self) -> Hash {
self.clone()
}
}
impl SortKey for Hash {
fn sort_key(&self) -> &[u8] {
self.as_slice()
}
}
#[async_trait] #[async_trait]
pub trait TableSchema: Send + Sync { pub trait TableSchema: Send + Sync {
@ -74,3 +66,4 @@ pub trait TableSchema: Send + Sync {
true true
} }
} }

35
src/table/util.rs Normal file
View file

@ -0,0 +1,35 @@
use serde::{Deserialize, Serialize};
use garage_util::data::*;
use crate::schema::*;
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct EmptyKey;
impl SortKey for EmptyKey {
fn sort_key(&self) -> &[u8] {
&[]
}
}
impl PartitionKey for EmptyKey {
fn hash(&self) -> Hash {
[0u8; 32].into()
}
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub enum DeletedFilter {
All,
Deleted,
NotDeleted,
}
impl DeletedFilter {
pub fn apply(&self, deleted: bool) -> bool {
match self {
DeletedFilter::All => true,
DeletedFilter::Deleted => deleted,
DeletedFilter::NotDeleted => !deleted,
}
}
}