Reorganize table API

This commit is contained in:
Alex Auvolat 2020-04-09 16:16:27 +02:00
parent a450103ed0
commit 4c1aee42d5
4 changed files with 96 additions and 60 deletions

2
TODO
View file

@ -5,7 +5,7 @@ Object table
Rename version table to object table Rename version table to object table
In value handle the different versions In value handle the different versions
So that the table becomes (bucket, key) -> CRDT(list of versions) So that the table becomes bucket + Sort key = object key -> CRDT(list of versions)
CRDT merge rule: CRDT merge rule:
- keep one complete version (the one with the highest timestamp) - keep one complete version (the one with the highest timestamp)

View file

@ -97,11 +97,9 @@ async fn handle_put(garage: Arc<Garage>,
None => return Err(Error::BadRequest(format!("Empty body"))), None => return Err(Error::BadRequest(format!("Empty body"))),
}; };
let version_key = VersionMetaKey{ let mut version = VersionMeta{
bucket: bucket.to_string(), bucket: bucket.into(),
key: key.to_string(), key: key.into(),
};
let mut version_value = VersionMetaValue {
timestamp: now_msec(), timestamp: now_msec(),
uuid: version_uuid.clone(), uuid: version_uuid.clone(),
mime_type: mime_type.to_string(), mime_type: mime_type.to_string(),
@ -111,15 +109,15 @@ async fn handle_put(garage: Arc<Garage>,
}; };
if first_block.len() < INLINE_THRESHOLD { if first_block.len() < INLINE_THRESHOLD {
version_value.data = VersionData::Inline(first_block); version.data = VersionData::Inline(first_block);
version_value.is_complete = true; version.is_complete = true;
garage.version_table.insert(&version_key, &version_value).await?; garage.version_table.insert(&version).await?;
return Ok(version_uuid) return Ok(version_uuid)
} }
let first_block_hash = hash(&first_block[..]); let first_block_hash = hash(&first_block[..]);
version_value.data = VersionData::FirstBlock(first_block_hash); version.data = VersionData::FirstBlock(first_block_hash);
garage.version_table.insert(&version_key, &version_value).await?; garage.version_table.insert(&version).await?;
let block_meta = BlockMeta{ let block_meta = BlockMeta{
version_uuid: version_uuid.clone(), version_uuid: version_uuid.clone(),
@ -145,8 +143,8 @@ async fn handle_put(garage: Arc<Garage>,
// TODO: if at any step we have an error, we should undo everything we did // TODO: if at any step we have an error, we should undo everything we did
version_value.is_complete = true; version.is_complete = true;
garage.version_table.insert(&version_key, &version_value).await?; garage.version_table.insert(&version).await?;
Ok(version_uuid) Ok(version_uuid)
} }

View file

@ -52,10 +52,10 @@ impl<F: TableFormat + 'static> TableRpcHandler for TableRpcHandlerAdapter<F> {
pub enum TableRPC<F: TableFormat> { pub enum TableRPC<F: TableFormat> {
Ok, Ok,
ReadEntry(F::K, Vec<u8>), ReadEntry(F::P, F::S),
ReadEntryResponse(Option<F::V>), ReadEntryResponse(Option<F::E>),
Update(Vec<(F::K, F::V)>), Update(Vec<F::E>),
} }
pub struct Partition { pub struct Partition {
@ -64,21 +64,59 @@ pub struct Partition {
pub other_nodes: Vec<UUID>, pub other_nodes: Vec<UUID>,
} }
pub trait TableKey { pub trait PartitionKey: Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync {
fn hash(&self) -> Hash; fn hash(&self) -> Hash;
} }
pub trait TableValue { pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync {
fn sort_key(&self) -> Vec<u8>; fn sort_key(&self) -> &[u8];
}
pub trait Entry<P: PartitionKey, S: SortKey>: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync {
fn partition_key(&self) -> &P;
fn sort_key(&self) -> &S;
fn merge(&mut self, other: &Self); fn merge(&mut self, other: &Self);
} }
#[derive(Clone, Serialize, Deserialize)]
pub struct EmptySortKey;
impl SortKey for EmptySortKey {
fn sort_key(&self) -> &[u8] {
&[]
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct StringKey(String);
impl PartitionKey for StringKey {
fn hash(&self) -> Hash {
hash(self.0.as_bytes())
}
}
impl SortKey for StringKey {
fn sort_key(&self) -> &[u8] {
self.0.as_bytes()
}
}
impl AsRef<str> for StringKey {
fn as_ref(&self) -> &str {
&self.0
}
}
impl From<&str> for StringKey {
fn from(s: &str) -> StringKey {
StringKey(s.to_string())
}
}
#[async_trait] #[async_trait]
pub trait TableFormat: Send + Sync { pub trait TableFormat: Send + Sync {
type K: Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + TableKey + Send + Sync; type P: PartitionKey;
type V: Clone + Serialize + for<'de> Deserialize<'de> + TableValue + Send + Sync; type S: SortKey;
type E: Entry<Self::P, Self::S>;
async fn updated(&self, key: &Self::K, old: Option<&Self::V>, new: &Self::V); async fn updated(&self, old: Option<&Self::E>, new: &Self::E);
} }
impl<F: TableFormat + 'static> Table<F> { impl<F: TableFormat + 'static> Table<F> {
@ -99,12 +137,12 @@ impl<F: TableFormat + 'static> Table<F> {
Box::new(TableRpcHandlerAdapter::<F>{ table: self }) Box::new(TableRpcHandlerAdapter::<F>{ table: self })
} }
pub async fn insert(&self, k: &F::K, v: &F::V) -> Result<(), Error> { pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
let hash = k.hash(); let hash = e.partition_key().hash();
let who = self.system.members.read().await let who = self.system.members.read().await
.walk_ring(&hash, self.param.replication_factor); .walk_ring(&hash, self.param.replication_factor);
let rpc = &TableRPC::<F>::Update(vec![(k.clone(), v.clone())]); let rpc = &TableRPC::<F>::Update(vec![e.clone()]);
self.rpc_try_call_many(&who[..], self.rpc_try_call_many(&who[..],
&rpc, &rpc,
@ -112,12 +150,12 @@ impl<F: TableFormat + 'static> Table<F> {
Ok(()) Ok(())
} }
pub async fn get(&self, k: &F::K, sort_key: &[u8]) -> Result<Option<F::V>, Error> { pub async fn get(&self, partition_key: &F::P, sort_key: &F::S) -> Result<Option<F::E>, Error> {
let hash = k.hash(); let hash = partition_key.hash();
let who = self.system.members.read().await let who = self.system.members.read().await
.walk_ring(&hash, self.param.replication_factor); .walk_ring(&hash, self.param.replication_factor);
let rpc = &TableRPC::<F>::ReadEntry(k.clone(), sort_key.to_vec()); let rpc = &TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
let resps = self.rpc_try_call_many(&who[..], let resps = self.rpc_try_call_many(&who[..],
&rpc, &rpc,
self.param.read_quorum) self.param.read_quorum)
@ -179,36 +217,40 @@ impl<F: TableFormat + 'static> Table<F> {
} }
} }
fn handle_read_entry(&self, key: &F::K, sort_key: &[u8]) -> Result<Option<F::V>, Error> { fn handle_read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<F::E>, Error> {
let mut tree_key = key.hash().to_vec(); let tree_key = self.tree_key(p, s);
tree_key.extend(sort_key);
if let Some(bytes) = self.store.get(&tree_key)? { if let Some(bytes) = self.store.get(&tree_key)? {
let (_, v) = rmp_serde::decode::from_read_ref::<_, (F::K, F::V)>(&bytes)?; let e = rmp_serde::decode::from_read_ref::<_, F::E>(&bytes)?;
Ok(Some(v)) Ok(Some(e))
} else { } else {
Ok(None) Ok(None)
} }
} }
async fn handle_update(&self, mut pairs: Vec<(F::K, F::V)>) -> Result<(), Error> { async fn handle_update(&self, mut entries: Vec<F::E>) -> Result<(), Error> {
for mut pair in pairs.drain(..) { for mut entry in entries.drain(..) {
let mut tree_key = pair.0.hash().to_vec(); let tree_key = self.tree_key(entry.partition_key(), entry.sort_key());
tree_key.extend(pair.1.sort_key());
let old_val = match self.store.get(&tree_key)? { let old_val = match self.store.get(&tree_key)? {
Some(prev_bytes) => { Some(prev_bytes) => {
let (_, old_val) = rmp_serde::decode::from_read_ref::<_, (F::K, F::V)>(&prev_bytes)?; let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&prev_bytes)?;
pair.1.merge(&old_val); entry.merge(&old_entry);
Some(old_val) Some(old_entry)
} }
None => None None => None
}; };
let new_bytes = rmp_serde::encode::to_vec_named(&pair)?; let new_bytes = rmp_serde::encode::to_vec_named(&entry)?;
self.store.insert(&tree_key, new_bytes)?; self.store.insert(&tree_key, new_bytes)?;
self.instance.updated(&pair.0, old_val.as_ref(), &pair.1).await; self.instance.updated(old_val.as_ref(), &entry).await;
} }
Ok(()) Ok(())
} }
fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
let mut ret = p.hash().to_vec();
ret.extend(s.sort_key());
ret
}
} }

View file

@ -8,14 +8,11 @@ use crate::table::*;
use crate::server::Garage; use crate::server::Garage;
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct VersionMetaKey {
pub bucket: String,
pub key: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct VersionMetaValue { pub struct VersionMeta {
pub bucket: StringKey,
pub key: StringKey,
pub timestamp: u64, pub timestamp: u64,
pub uuid: UUID, pub uuid: UUID,
@ -37,16 +34,14 @@ pub struct VersionTable {
pub garage: RwLock<Option<Arc<Garage>>>, pub garage: RwLock<Option<Arc<Garage>>>,
} }
impl TableKey for VersionMetaKey { impl Entry<StringKey, StringKey> for VersionMeta {
fn hash(&self) -> Hash { fn partition_key(&self) -> &StringKey {
hash(self.bucket.as_bytes()) &self.bucket
}
fn sort_key(&self) -> &StringKey {
&self.key
} }
}
impl TableValue for VersionMetaValue {
fn sort_key(&self) -> Vec<u8> {
vec![]
}
fn merge(&mut self, other: &Self) { fn merge(&mut self, other: &Self) {
unimplemented!() unimplemented!()
} }
@ -54,10 +49,11 @@ impl TableValue for VersionMetaValue {
#[async_trait] #[async_trait]
impl TableFormat for VersionTable { impl TableFormat for VersionTable {
type K = VersionMetaKey; type P = StringKey;
type V = VersionMetaValue; type S = StringKey;
type E = VersionMeta;
async fn updated(&self, key: &Self::K, old: Option<&Self::V>, new: &Self::V) { async fn updated(&self, old: Option<&Self::E>, new: &Self::E) {
unimplemented!() unimplemented!()
} }
} }