Some progress

This commit is contained in:
Alex Auvolat 2020-04-09 17:32:28 +02:00
parent 4c1aee42d5
commit 101444abb3
8 changed files with 169 additions and 113 deletions

View file

@ -39,16 +39,10 @@ pub async fn run_api_server(garage: Arc<Garage>, shutdown_signal: impl Future<Ou
async fn handler(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr) -> Result<Response<Body>, Error> {
match handler_inner(garage, req, addr).await {
Ok(x) => Ok(x),
Err(Error::BadRequest(e)) => {
let mut bad_request = Response::new(Body::from(format!("{}\n", e)));
*bad_request.status_mut() = StatusCode::BAD_REQUEST;
Ok(bad_request)
}
Err(e) => {
let mut ise = Response::new(Body::from(
format!("Internal server error: {}\n", e)));
*ise.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
Ok(ise)
let mut http_error = Response::new(Body::from(format!("{}\n", e)));
*http_error.status_mut() = e.http_status_code();
Ok(http_error)
}
}
}
@ -65,9 +59,7 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr
match req.method() {
&Method::GET => {
Ok(Response::new(Body::from(
"TODO: implement GET object",
)))
Ok(handle_get(garage, &bucket, &key).await?)
}
&Method::PUT => {
let mime_type = req.headers()
@ -97,27 +89,30 @@ async fn handle_put(garage: Arc<Garage>,
None => return Err(Error::BadRequest(format!("Empty body"))),
};
let mut version = VersionMeta{
let mut object = Object {
bucket: bucket.into(),
key: key.into(),
timestamp: now_msec(),
versions: Vec::new(),
};
object.versions.push(Box::new(Version{
uuid: version_uuid.clone(),
timestamp: now_msec(),
mime_type: mime_type.to_string(),
size: first_block.len() as u64,
is_complete: false,
data: VersionData::DeleteMarker,
};
}));
if first_block.len() < INLINE_THRESHOLD {
version.data = VersionData::Inline(first_block);
version.is_complete = true;
garage.version_table.insert(&version).await?;
object.versions[0].data = VersionData::Inline(first_block);
object.versions[0].is_complete = true;
garage.object_table.insert(&object).await?;
return Ok(version_uuid)
}
let first_block_hash = hash(&first_block[..]);
version.data = VersionData::FirstBlock(first_block_hash);
garage.version_table.insert(&version).await?;
object.versions[0].data = VersionData::FirstBlock(first_block_hash);
garage.object_table.insert(&object).await?;
let block_meta = BlockMeta{
version_uuid: version_uuid.clone(),
@ -143,8 +138,9 @@ async fn handle_put(garage: Arc<Garage>,
// TODO: if at any step we have an error, we should undo everything we did
version.is_complete = true;
garage.version_table.insert(&version).await?;
object.versions[0].is_complete = true;
object.versions[0].size = next_offset as u64;
garage.object_table.insert(&object).await?;
Ok(version_uuid)
}
@ -198,3 +194,32 @@ impl BodyChunker {
}
}
}
async fn handle_get(garage: Arc<Garage>, bucket: &str, key: &str) -> Result<Response<Body>, Error> {
let mut object = match garage.object_table.get(&bucket.to_string(), &key.to_string()).await? {
None => return Err(Error::NotFound),
Some(o) => o
};
let last_v = match object.versions.drain(..)
.rev().filter(|v| v.is_complete)
.next() {
Some(v) => v,
None => return Err(Error::NotFound),
};
let resp_builder = Response::builder()
.header("Content-Type", last_v.mime_type)
.status(StatusCode::OK);
match last_v.data {
VersionData::DeleteMarker => Err(Error::NotFound),
VersionData::Inline(bytes) => {
Ok(resp_builder.body(bytes.into())?)
}
VersionData::FirstBlock(hash) => {
// TODO
unimplemented!()
}
}
}

View file

@ -121,7 +121,7 @@ pub struct SplitpointMeta {
pub deleted: bool,
}
pub use crate::version_table::*;
pub use crate::object_table::*;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BlockMeta {

View file

@ -1,5 +1,6 @@
use err_derive::Error;
use std::io;
use err_derive::Error;
use hyper::StatusCode;
#[derive(Debug, Error)]
pub enum Error {
@ -32,9 +33,22 @@ pub enum Error {
#[error(display = "RPC error: {}", _0)]
RPCError(String),
#[error(display = "{}", _0)]
#[error(display = "Bad request: {}", _0)]
BadRequest(String),
#[error(display = "Not found")]
NotFound,
#[error(display = "{}", _0)]
Message(String),
}
impl Error {
pub fn http_status_code(&self) -> StatusCode {
match self {
Error::BadRequest(_) => StatusCode::BAD_REQUEST,
Error::NotFound => StatusCode::NOT_FOUND,
_ => StatusCode::INTERNAL_SERVER_ERROR,
}
}
}

View file

@ -5,7 +5,7 @@ mod proto;
mod membership;
mod table;
mod version_table;
mod object_table;
mod server;
mod rpc_server;

88
src/object_table.rs Normal file
View file

@ -0,0 +1,88 @@
use std::sync::Arc;
use serde::{Serialize, Deserialize};
use async_trait::async_trait;
use tokio::sync::RwLock;
use crate::data::*;
use crate::table::*;
use crate::server::Garage;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Object {
pub bucket: String,
pub key: String,
pub versions: Vec<Box<Version>>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Version {
pub uuid: UUID,
pub timestamp: u64,
pub mime_type: String,
pub size: u64,
pub is_complete: bool,
pub data: VersionData,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum VersionData {
DeleteMarker,
Inline(#[serde(with="serde_bytes")] Vec<u8>),
FirstBlock(Hash),
}
pub struct ObjectTable {
pub garage: RwLock<Option<Arc<Garage>>>,
}
impl Entry<String, String> for Object {
fn partition_key(&self) -> &String {
&self.bucket
}
fn sort_key(&self) -> &String {
&self.key
}
fn merge(&mut self, other: &Self) {
for other_v in other.versions.iter() {
match self.versions.binary_search_by(|v| (v.timestamp, &v.uuid).cmp(&(other_v.timestamp, &other_v.uuid))) {
Ok(i) => {
let mut v = &mut self.versions[i];
if other_v.size > v.size {
v.size = other_v.size;
}
if other_v.is_complete {
v.is_complete = true;
}
}
Err(i) => {
self.versions.insert(i, other_v.clone());
}
}
}
let last_complete = self.versions
.iter().enumerate().rev()
.filter(|(_, v)| v.is_complete)
.next()
.map(|(vi, _)| vi);
if let Some(last_vi) = last_complete {
self.versions = self.versions.drain(last_vi..).collect::<Vec<_>>();
}
}
}
#[async_trait]
impl TableFormat for ObjectTable {
type P = String;
type S = String;
type E = Object;
async fn updated(&self, old: Option<&Self::E>, new: &Self::E) {
unimplemented!()
}
}

View file

@ -21,7 +21,7 @@ pub struct Garage {
pub table_rpc_handlers: HashMap<String, Box<dyn TableRpcHandler + Sync + Send>>,
pub version_table: Arc<Table<VersionTable>>,
pub object_table: Arc<Table<ObjectTable>>,
}
impl Garage {
@ -35,25 +35,25 @@ impl Garage {
timeout: DEFAULT_TIMEOUT,
};
let version_table = Arc::new(Table::new(
VersionTable{garage: RwLock::new(None)},
let object_table = Arc::new(Table::new(
ObjectTable{garage: RwLock::new(None)},
system.clone(),
&db,
"version".to_string(),
"object".to_string(),
meta_rep_param.clone()));
let mut garage = Self{
db,
system: system.clone(),
table_rpc_handlers: HashMap::new(),
version_table,
object_table,
};
garage.table_rpc_handlers.insert(
garage.version_table.name.clone(),
garage.version_table.clone().rpc_handler());
garage.object_table.name.clone(),
garage.object_table.clone().rpc_handler());
let garage = Arc::new(garage);
*garage.version_table.instance.garage.write().await = Some(garage.clone());
*garage.object_table.instance.garage.write().await = Some(garage.clone());
garage
}
}

View file

@ -64,11 +64,11 @@ pub struct Partition {
pub other_nodes: Vec<UUID>,
}
pub trait PartitionKey: Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync {
pub trait PartitionKey {
fn hash(&self) -> Hash;
}
pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync {
pub trait SortKey {
fn sort_key(&self) -> &[u8];
}
@ -87,33 +87,21 @@ impl SortKey for EmptySortKey {
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct StringKey(String);
impl PartitionKey for StringKey {
impl<T: AsRef<str>> PartitionKey for T {
fn hash(&self) -> Hash {
hash(self.0.as_bytes())
hash(self.as_ref().as_bytes())
}
}
impl SortKey for StringKey {
impl<T: AsRef<str>> SortKey for T {
fn sort_key(&self) -> &[u8] {
self.0.as_bytes()
}
}
impl AsRef<str> for StringKey {
fn as_ref(&self) -> &str {
&self.0
}
}
impl From<&str> for StringKey {
fn from(s: &str) -> StringKey {
StringKey(s.to_string())
self.as_ref().as_bytes()
}
}
#[async_trait]
pub trait TableFormat: Send + Sync {
type P: PartitionKey;
type S: SortKey;
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
type E: Entry<Self::P, Self::S>;
async fn updated(&self, old: Option<&Self::E>, new: &Self::E);

View file

@ -1,59 +0,0 @@
use std::sync::Arc;
use serde::{Serialize, Deserialize};
use async_trait::async_trait;
use tokio::sync::RwLock;
use crate::data::*;
use crate::table::*;
use crate::server::Garage;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct VersionMeta {
pub bucket: StringKey,
pub key: StringKey,
pub timestamp: u64,
pub uuid: UUID,
pub mime_type: String,
pub size: u64,
pub is_complete: bool,
pub data: VersionData,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum VersionData {
DeleteMarker,
Inline(#[serde(with="serde_bytes")] Vec<u8>),
FirstBlock(Hash),
}
pub struct VersionTable {
pub garage: RwLock<Option<Arc<Garage>>>,
}
impl Entry<StringKey, StringKey> for VersionMeta {
fn partition_key(&self) -> &StringKey {
&self.bucket
}
fn sort_key(&self) -> &StringKey {
&self.key
}
fn merge(&mut self, other: &Self) {
unimplemented!()
}
}
#[async_trait]
impl TableFormat for VersionTable {
type P = StringKey;
type S = StringKey;
type E = VersionMeta;
async fn updated(&self, old: Option<&Self::E>, new: &Self::E) {
unimplemented!()
}
}