Migrate S3 api to use new model

This commit is contained in:
Alex Auvolat 2020-07-08 17:33:24 +02:00
parent 84bbbfaa7b
commit 64a6eda0d2
5 changed files with 100 additions and 70 deletions

View file

@ -39,27 +39,23 @@ pub async fn handle_copy(
Some(v) => v, Some(v) => v,
None => return Err(Error::NotFound), None => return Err(Error::NotFound),
}; };
let source_last_state = match &source_last_v.state {
ObjectVersionState::Complete(x) => x,
_ => unreachable!(),
};
let new_uuid = gen_uuid(); let new_uuid = gen_uuid();
let dest_object_version = ObjectVersion { let dest_object_version = ObjectVersion {
uuid: new_uuid, uuid: new_uuid,
timestamp: now_msec(), timestamp: now_msec(),
mime_type: source_last_v.mime_type.clone(), state: ObjectVersionState::Complete(source_last_state.clone()),
size: source_last_v.size,
state: ObjectVersionState::Complete,
data: source_last_v.data.clone(),
}; };
match &source_last_v.data { match &source_last_state {
ObjectVersionData::Uploading => {
return Err(Error::Message(format!(
"Version is_complete() but data is stil Uploading (internal error)"
)));
}
ObjectVersionData::DeleteMarker => { ObjectVersionData::DeleteMarker => {
return Err(Error::NotFound); return Err(Error::NotFound);
} }
ObjectVersionData::Inline(_bytes) => { ObjectVersionData::Inline(_meta, _bytes) => {
let dest_object = Object::new( let dest_object = Object::new(
dest_bucket.to_string(), dest_bucket.to_string(),
dest_key.to_string(), dest_key.to_string(),
@ -67,7 +63,7 @@ pub async fn handle_copy(
); );
garage.object_table.insert(&dest_object).await?; garage.object_table.insert(&dest_object).await?;
} }
ObjectVersionData::FirstBlock(_first_block_hash) => { ObjectVersionData::FirstBlock(_meta, _first_block_hash) => {
let source_version = garage let source_version = garage
.version_table .version_table
.get(&source_last_v.uuid, &EmptyKey) .get(&source_last_v.uuid, &EmptyKey)

View file

@ -29,7 +29,11 @@ async fn handle_delete_internal(
}; };
let interesting_versions = object.versions().iter().filter(|v| { let interesting_versions = object.versions().iter().filter(|v| {
v.data != ObjectVersionData::DeleteMarker && v.state != ObjectVersionState::Aborted match v.state {
ObjectVersionState::Aborted => false,
ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false,
_ => true,
}
}); });
let mut must_delete = None; let mut must_delete = None;
@ -54,10 +58,7 @@ async fn handle_delete_internal(
vec![ObjectVersion { vec![ObjectVersion {
uuid: version_uuid, uuid: version_uuid,
timestamp: now_msec(), timestamp: now_msec(),
mime_type: "application/x-delete-marker".into(), state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
size: 0,
state: ObjectVersionState::Complete,
data: ObjectVersionData::DeleteMarker,
}], }],
); );

View file

@ -12,13 +12,14 @@ use garage_table::EmptyKey;
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_model::object_table::*; use garage_model::object_table::*;
fn object_headers(version: &ObjectVersion) -> http::response::Builder { fn object_headers(version: &ObjectVersion, version_meta: &ObjectVersionMeta) -> http::response::Builder {
let date = UNIX_EPOCH + Duration::from_millis(version.timestamp); let date = UNIX_EPOCH + Duration::from_millis(version.timestamp);
let date_str = httpdate::fmt_http_date(date); let date_str = httpdate::fmt_http_date(date);
Response::builder() Response::builder()
.header("Content-Type", version.mime_type.to_string()) .header("Content-Type", version_meta.headers.content_type.to_string())
.header("Content-Length", format!("{}", version.size)) .header("Content-Length", format!("{}", version_meta.size))
.header("ETag", version_meta.etag.to_string())
.header("Last-Modified", date_str) .header("Last-Modified", date_str)
.header("Accept-Ranges", format!("bytes")) .header("Accept-Ranges", format!("bytes"))
} }
@ -47,9 +48,14 @@ pub async fn handle_head(
Some(v) => v, Some(v) => v,
None => return Err(Error::NotFound), None => return Err(Error::NotFound),
}; };
let version_meta = match &version.state {
ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta,
ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta,
_ => unreachable!(),
};
let body: Body = Body::from(vec![]); let body: Body = Body::from(vec![]);
let response = object_headers(&version) let response = object_headers(&version, version_meta)
.status(StatusCode::OK) .status(StatusCode::OK)
.body(body) .body(body)
.unwrap(); .unwrap();
@ -81,13 +87,22 @@ pub async fn handle_get(
Some(v) => v, Some(v) => v,
None => return Err(Error::NotFound), None => return Err(Error::NotFound),
}; };
let last_v_data = match &last_v.state {
ObjectVersionState::Complete(x) => x,
_ => unreachable!(),
};
let last_v_meta = match last_v_data {
ObjectVersionData::DeleteMarker => return Err(Error::NotFound),
ObjectVersionData::Inline(meta, _) => meta,
ObjectVersionData::FirstBlock(meta, _) => meta,
};
let range = match req.headers().get("range") { let range = match req.headers().get("range") {
Some(range) => { Some(range) => {
let range_str = range let range_str = range
.to_str() .to_str()
.map_err(|e| Error::BadRequest(format!("Invalid range header: {}", e)))?; .map_err(|e| Error::BadRequest(format!("Invalid range header: {}", e)))?;
let mut ranges = http_range::HttpRange::parse(range_str, last_v.size) let mut ranges = http_range::HttpRange::parse(range_str, last_v_meta.size)
.map_err(|_e| Error::BadRequest(format!("Invalid range")))?; .map_err(|_e| Error::BadRequest(format!("Invalid range")))?;
if ranges.len() > 1 { if ranges.len() > 1 {
return Err(Error::BadRequest(format!("Multiple ranges not supported"))); return Err(Error::BadRequest(format!("Multiple ranges not supported")));
@ -98,21 +113,18 @@ pub async fn handle_get(
None => None, None => None,
}; };
if let Some(range) = range { if let Some(range) = range {
return handle_get_range(garage, last_v, range.start, range.start + range.length).await; return handle_get_range(garage, last_v, last_v_data, last_v_meta, range.start, range.start + range.length).await;
} }
let resp_builder = object_headers(&last_v).status(StatusCode::OK); let resp_builder = object_headers(&last_v, last_v_meta).status(StatusCode::OK);
match &last_v.data { match &last_v_data {
ObjectVersionData::Uploading => Err(Error::Message(format!( ObjectVersionData::DeleteMarker => unreachable!(),
"Version is_complete() but data is stil Uploading (internal error)" ObjectVersionData::Inline(_, bytes) => {
))),
ObjectVersionData::DeleteMarker => Err(Error::NotFound),
ObjectVersionData::Inline(bytes) => {
let body: Body = Body::from(bytes.to_vec()); let body: Body = Body::from(bytes.to_vec());
Ok(resp_builder.body(body)?) Ok(resp_builder.body(body)?)
} }
ObjectVersionData::FirstBlock(first_block_hash) => { ObjectVersionData::FirstBlock(_, first_block_hash) => {
let read_first_block = garage.block_manager.rpc_get_block(&first_block_hash); let read_first_block = garage.block_manager.rpc_get_block(&first_block_hash);
let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey); let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey);
@ -155,26 +167,25 @@ pub async fn handle_get(
pub async fn handle_get_range( pub async fn handle_get_range(
garage: Arc<Garage>, garage: Arc<Garage>,
version: &ObjectVersion, version: &ObjectVersion,
version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta,
begin: u64, begin: u64,
end: u64, end: u64,
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
if end > version.size { if end > version_meta.size {
return Err(Error::BadRequest(format!("Range not included in file"))); return Err(Error::BadRequest(format!("Range not included in file")));
} }
let resp_builder = object_headers(&version) let resp_builder = object_headers(version, version_meta)
.header( .header(
"Content-Range", "Content-Range",
format!("bytes {}-{}/{}", begin, end, version.size), format!("bytes {}-{}/{}", begin, end, version_meta.size),
) )
.status(StatusCode::PARTIAL_CONTENT); .status(StatusCode::PARTIAL_CONTENT);
match &version.data { match &version_data {
ObjectVersionData::Uploading => Err(Error::Message(format!( ObjectVersionData::DeleteMarker => unreachable!(),
"Version is_complete() but data is stil Uploading (internal error)" ObjectVersionData::Inline(_meta, bytes) => {
))),
ObjectVersionData::DeleteMarker => Err(Error::NotFound),
ObjectVersionData::Inline(bytes) => {
if end as usize <= bytes.len() { if end as usize <= bytes.len() {
let body: Body = Body::from(bytes[begin as usize..end as usize].to_vec()); let body: Body = Body::from(bytes[begin as usize..end as usize].to_vec());
Ok(resp_builder.body(body)?) Ok(resp_builder.body(body)?)
@ -182,7 +193,7 @@ pub async fn handle_get_range(
Err(Error::Message(format!("Internal error: requested range not present in inline bytes when it should have been"))) Err(Error::Message(format!("Internal error: requested range not present in inline bytes when it should have been")))
} }
} }
ObjectVersionData::FirstBlock(_first_block_hash) => { ObjectVersionData::FirstBlock(_meta, _first_block_hash) => {
let version = garage.version_table.get(&version.uuid, &EmptyKey).await?; let version = garage.version_table.get(&version.uuid, &EmptyKey).await?;
let version = match version { let version = match version {
Some(v) => v, Some(v) => v,

View file

@ -8,6 +8,7 @@ use hyper::{Body, Response};
use garage_util::error::Error; use garage_util::error::Error;
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_model::object_table::*;
use crate::encoding::*; use crate::encoding::*;
@ -73,10 +74,15 @@ pub async fn handle_list(
if let Some(pfx) = common_prefix { if let Some(pfx) = common_prefix {
result_common_prefixes.insert(pfx.to_string()); result_common_prefixes.insert(pfx.to_string());
} else { } else {
let size = match &version.state {
ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta.size,
ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta.size,
_ => unreachable!(),
};
let info = match result_keys.get(&object.key) { let info = match result_keys.get(&object.key) {
None => ListResultInfo { None => ListResultInfo {
last_modified: version.timestamp, last_modified: version.timestamp,
size: version.size, size,
}, },
Some(_lri) => { Some(_lri) => {
return Err(Error::Message(format!("Duplicate key?? {}", object.key))) return Err(Error::Message(format!("Duplicate key?? {}", object.key)))

View file

@ -1,4 +1,4 @@
use std::collections::VecDeque; use std::collections::{VecDeque, BTreeMap};
use std::fmt::Write; use std::fmt::Write;
use std::sync::Arc; use std::sync::Arc;
@ -24,7 +24,11 @@ pub async fn handle_put(
key: &str, key: &str,
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
let version_uuid = gen_uuid(); let version_uuid = gen_uuid();
let mime_type = get_mime_type(&req)?; let headers = ObjectVersionHeaders{
content_type: get_mime_type(&req)?,
other: BTreeMap::new(), // TODO
};
let body = req.into_body(); let body = req.into_body();
let mut chunker = BodyChunker::new(body, garage.config.block_size); let mut chunker = BodyChunker::new(body, garage.config.block_size);
@ -36,15 +40,17 @@ pub async fn handle_put(
let mut object_version = ObjectVersion { let mut object_version = ObjectVersion {
uuid: version_uuid, uuid: version_uuid,
timestamp: now_msec(), timestamp: now_msec(),
mime_type, state: ObjectVersionState::Uploading(headers.clone()),
size: first_block.len() as u64,
state: ObjectVersionState::Uploading,
data: ObjectVersionData::Uploading,
}; };
if first_block.len() < INLINE_THRESHOLD { if first_block.len() < INLINE_THRESHOLD {
object_version.data = ObjectVersionData::Inline(first_block); object_version.state = ObjectVersionState::Complete(ObjectVersionData::Inline(
object_version.state = ObjectVersionState::Complete; ObjectVersionMeta{
headers,
size: first_block.len() as u64,
etag: "".to_string(), // TODO
},
first_block));
let object = Object::new(bucket.into(), key.into(), vec![object_version]); let object = Object::new(bucket.into(), key.into(), vec![object_version]);
garage.object_table.insert(&object).await?; garage.object_table.insert(&object).await?;
@ -54,7 +60,6 @@ pub async fn handle_put(
let version = Version::new(version_uuid, bucket.into(), key.into(), false, vec![]); let version = Version::new(version_uuid, bucket.into(), key.into(), false, vec![]);
let first_block_hash = hash(&first_block[..]); let first_block_hash = hash(&first_block[..]);
object_version.data = ObjectVersionData::FirstBlock(first_block_hash);
let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]); let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]);
garage.object_table.insert(&object).await?; garage.object_table.insert(&object).await?;
@ -70,8 +75,13 @@ pub async fn handle_put(
// TODO: if at any step we have an error, we should undo everything we did // TODO: if at any step we have an error, we should undo everything we did
object_version.state = ObjectVersionState::Complete; object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
object_version.size = total_size; ObjectVersionMeta{
headers,
size: total_size,
etag: "".to_string(), // TODO
},
first_block_hash));
let object = Object::new(bucket.into(), key.into(), vec![object_version]); let object = Object::new(bucket.into(), key.into(), vec![object_version]);
garage.object_table.insert(&object).await?; garage.object_table.insert(&object).await?;
@ -197,6 +207,7 @@ impl BodyChunker {
pub fn put_response(version_uuid: UUID) -> Response<Body> { pub fn put_response(version_uuid: UUID) -> Response<Body> {
Response::builder() Response::builder()
.header("x-amz-version-id", hex::encode(version_uuid)) .header("x-amz-version-id", hex::encode(version_uuid))
// TODO ETag
.body(Body::from(vec![])) .body(Body::from(vec![]))
.unwrap() .unwrap()
} }
@ -208,15 +219,15 @@ pub async fn handle_create_multipart_upload(
key: &str, key: &str,
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
let version_uuid = gen_uuid(); let version_uuid = gen_uuid();
let mime_type = get_mime_type(req)?; let headers = ObjectVersionHeaders{
content_type: get_mime_type(&req)?,
other: BTreeMap::new(), // TODO
};
let object_version = ObjectVersion { let object_version = ObjectVersion {
uuid: version_uuid, uuid: version_uuid,
timestamp: now_msec(), timestamp: now_msec(),
mime_type, state: ObjectVersionState::Uploading(headers),
size: 0,
state: ObjectVersionState::Uploading,
data: ObjectVersionData::Uploading,
}; };
let object = Object::new(bucket.to_string(), key.to_string(), vec![object_version]); let object = Object::new(bucket.to_string(), key.to_string(), vec![object_version]);
garage.object_table.insert(&object).await?; garage.object_table.insert(&object).await?;
@ -276,9 +287,7 @@ pub async fn handle_put_part(
Some(x) => x, Some(x) => x,
}; };
if !object.versions().iter().any(|v| { if !object.versions().iter().any(|v| {
v.uuid == version_uuid v.uuid == version_uuid && v.is_uploading()
&& v.state == ObjectVersionState::Uploading
&& v.data == ObjectVersionData::Uploading
}) { }) {
return Err(Error::BadRequest(format!( return Err(Error::BadRequest(format!(
"Multipart upload does not exist or is otherwise invalid" "Multipart upload does not exist or is otherwise invalid"
@ -322,9 +331,7 @@ pub async fn handle_complete_multipart_upload(
Some(x) => x, Some(x) => x,
}; };
let object_version = object.versions().iter().find(|v| { let object_version = object.versions().iter().find(|v| {
v.uuid == version_uuid v.uuid == version_uuid && v.is_uploading()
&& v.state == ObjectVersionState::Uploading
&& v.data == ObjectVersionData::Uploading
}); });
let mut object_version = match object_version { let mut object_version = match object_version {
None => { None => {
@ -341,6 +348,10 @@ pub async fn handle_complete_multipart_upload(
if version.blocks().len() == 0 { if version.blocks().len() == 0 {
return Err(Error::BadRequest(format!("No data was uploaded"))); return Err(Error::BadRequest(format!("No data was uploaded")));
} }
let headers = match object_version.state {
ObjectVersionState::Uploading(headers) => headers.clone(),
_ => unreachable!(),
};
// TODO: check that all the parts that they pretend they gave us are indeed there // TODO: check that all the parts that they pretend they gave us are indeed there
// TODO: check MD5 sum of all uploaded parts? but that would mean we have to store them somewhere... // TODO: check MD5 sum of all uploaded parts? but that would mean we have to store them somewhere...
@ -350,9 +361,16 @@ pub async fn handle_complete_multipart_upload(
.iter() .iter()
.map(|x| x.size) .map(|x| x.size)
.fold(0, |x, y| x + y); .fold(0, |x, y| x + y);
object_version.size = total_size; object_version.state = ObjectVersionState::Complete(
object_version.state = ObjectVersionState::Complete; ObjectVersionData::FirstBlock(
object_version.data = ObjectVersionData::FirstBlock(version.blocks()[0].hash); ObjectVersionMeta{
headers,
size: total_size,
etag: "".to_string(),// TODO
},
version.blocks()[0].hash)
);
let final_object = Object::new(bucket.clone(), key.clone(), vec![object_version]); let final_object = Object::new(bucket.clone(), key.clone(), vec![object_version]);
garage.object_table.insert(&final_object).await?; garage.object_table.insert(&final_object).await?;
@ -394,9 +412,7 @@ pub async fn handle_abort_multipart_upload(
Some(x) => x, Some(x) => x,
}; };
let object_version = object.versions().iter().find(|v| { let object_version = object.versions().iter().find(|v| {
v.uuid == version_uuid v.uuid == version_uuid && v.is_uploading()
&& v.state == ObjectVersionState::Uploading
&& v.data == ObjectVersionData::Uploading
}); });
let mut object_version = match object_version { let mut object_version = match object_version {
None => { None => {