mirror of
https://git.deuxfleurs.fr/Deuxfleurs/garage.git
synced 2024-11-24 17:11:01 +00:00
Merge pull request 'add request context helper' (#751) from yuka/garage:req-ctx into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/751
This commit is contained in:
commit
3168bb34a0
19 changed files with 458 additions and 431 deletions
|
@ -1,4 +1,5 @@
|
||||||
use std::convert::Infallible;
|
use std::convert::Infallible;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use futures::{Stream, StreamExt, TryStreamExt};
|
use futures::{Stream, StreamExt, TryStreamExt};
|
||||||
|
|
||||||
|
@ -10,6 +11,10 @@ use hyper::{
|
||||||
use idna::domain_to_unicode;
|
use idna::domain_to_unicode;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
use garage_model::bucket_table::BucketParams;
|
||||||
|
use garage_model::garage::Garage;
|
||||||
|
use garage_model::key_table::Key;
|
||||||
|
use garage_util::data::Uuid;
|
||||||
use garage_util::error::Error as GarageError;
|
use garage_util::error::Error as GarageError;
|
||||||
|
|
||||||
use crate::common_error::{CommonError as Error, *};
|
use crate::common_error::{CommonError as Error, *};
|
||||||
|
@ -27,6 +32,15 @@ pub enum Authorization {
|
||||||
Owner,
|
Owner,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The values which are known for each request related to a bucket
|
||||||
|
pub struct ReqCtx {
|
||||||
|
pub garage: Arc<Garage>,
|
||||||
|
pub bucket_id: Uuid,
|
||||||
|
pub bucket_name: String,
|
||||||
|
pub bucket_params: BucketParams,
|
||||||
|
pub api_key: Key,
|
||||||
|
}
|
||||||
|
|
||||||
/// Host to bucket
|
/// Host to bucket
|
||||||
///
|
///
|
||||||
/// Convert a host, like "bucket.garage-site.tld" to the corresponding bucket "bucket",
|
/// Convert a host, like "bucket.garage-site.tld" to the corresponding bucket "bucket",
|
||||||
|
|
|
@ -95,6 +95,7 @@ impl ApiHandler for K2VApiServer {
|
||||||
.bucket_helper()
|
.bucket_helper()
|
||||||
.get_existing_bucket(bucket_id)
|
.get_existing_bucket(bucket_id)
|
||||||
.await?;
|
.await?;
|
||||||
|
let bucket_params = bucket.state.into_option().unwrap();
|
||||||
|
|
||||||
let allowed = match endpoint.authorization_type() {
|
let allowed = match endpoint.authorization_type() {
|
||||||
Authorization::Read => api_key.allow_read(&bucket_id),
|
Authorization::Read => api_key.allow_read(&bucket_id),
|
||||||
|
@ -112,40 +113,42 @@ impl ApiHandler for K2VApiServer {
|
||||||
// are always preflighted, i.e. the browser should make
|
// are always preflighted, i.e. the browser should make
|
||||||
// an OPTIONS call before to check it is allowed
|
// an OPTIONS call before to check it is allowed
|
||||||
let matching_cors_rule = match *req.method() {
|
let matching_cors_rule = match *req.method() {
|
||||||
Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req)
|
Method::GET | Method::HEAD | Method::POST => {
|
||||||
.ok_or_internal_error("Error looking up CORS rule")?,
|
find_matching_cors_rule(&bucket_params, &req)
|
||||||
|
.ok_or_internal_error("Error looking up CORS rule")?
|
||||||
|
.cloned()
|
||||||
|
}
|
||||||
_ => None,
|
_ => None,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let ctx = ReqCtx {
|
||||||
|
garage,
|
||||||
|
bucket_id,
|
||||||
|
bucket_name,
|
||||||
|
bucket_params,
|
||||||
|
api_key,
|
||||||
|
};
|
||||||
|
|
||||||
let resp = match endpoint {
|
let resp = match endpoint {
|
||||||
Endpoint::DeleteItem {
|
Endpoint::DeleteItem {
|
||||||
partition_key,
|
partition_key,
|
||||||
sort_key,
|
sort_key,
|
||||||
} => handle_delete_item(garage, req, bucket_id, &partition_key, &sort_key).await,
|
} => handle_delete_item(ctx, req, &partition_key, &sort_key).await,
|
||||||
Endpoint::InsertItem {
|
Endpoint::InsertItem {
|
||||||
partition_key,
|
partition_key,
|
||||||
sort_key,
|
sort_key,
|
||||||
} => handle_insert_item(garage, req, bucket_id, &partition_key, &sort_key).await,
|
} => handle_insert_item(ctx, req, &partition_key, &sort_key).await,
|
||||||
Endpoint::ReadItem {
|
Endpoint::ReadItem {
|
||||||
partition_key,
|
partition_key,
|
||||||
sort_key,
|
sort_key,
|
||||||
} => handle_read_item(garage, &req, bucket_id, &partition_key, &sort_key).await,
|
} => handle_read_item(ctx, &req, &partition_key, &sort_key).await,
|
||||||
Endpoint::PollItem {
|
Endpoint::PollItem {
|
||||||
partition_key,
|
partition_key,
|
||||||
sort_key,
|
sort_key,
|
||||||
causality_token,
|
causality_token,
|
||||||
timeout,
|
timeout,
|
||||||
} => {
|
} => {
|
||||||
handle_poll_item(
|
handle_poll_item(ctx, &req, partition_key, sort_key, causality_token, timeout).await
|
||||||
garage,
|
|
||||||
&req,
|
|
||||||
bucket_id,
|
|
||||||
partition_key,
|
|
||||||
sort_key,
|
|
||||||
causality_token,
|
|
||||||
timeout,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
}
|
}
|
||||||
Endpoint::ReadIndex {
|
Endpoint::ReadIndex {
|
||||||
prefix,
|
prefix,
|
||||||
|
@ -153,12 +156,12 @@ impl ApiHandler for K2VApiServer {
|
||||||
end,
|
end,
|
||||||
limit,
|
limit,
|
||||||
reverse,
|
reverse,
|
||||||
} => handle_read_index(garage, bucket_id, prefix, start, end, limit, reverse).await,
|
} => handle_read_index(ctx, prefix, start, end, limit, reverse).await,
|
||||||
Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await,
|
Endpoint::InsertBatch {} => handle_insert_batch(ctx, req).await,
|
||||||
Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await,
|
Endpoint::ReadBatch {} => handle_read_batch(ctx, req).await,
|
||||||
Endpoint::DeleteBatch {} => handle_delete_batch(garage, bucket_id, req).await,
|
Endpoint::DeleteBatch {} => handle_delete_batch(ctx, req).await,
|
||||||
Endpoint::PollRange { partition_key } => {
|
Endpoint::PollRange { partition_key } => {
|
||||||
handle_poll_range(garage, bucket_id, &partition_key, req).await
|
handle_poll_range(ctx, &partition_key, req).await
|
||||||
}
|
}
|
||||||
Endpoint::Options => unreachable!(),
|
Endpoint::Options => unreachable!(),
|
||||||
};
|
};
|
||||||
|
@ -167,7 +170,7 @@ impl ApiHandler for K2VApiServer {
|
||||||
// add the corresponding CORS headers to the response
|
// add the corresponding CORS headers to the response
|
||||||
let mut resp_ok = resp?;
|
let mut resp_ok = resp?;
|
||||||
if let Some(rule) = matching_cors_rule {
|
if let Some(rule) = matching_cors_rule {
|
||||||
add_cors_headers(&mut resp_ok, rule)
|
add_cors_headers(&mut resp_ok, &rule)
|
||||||
.ok_or_internal_error("Invalid bucket CORS configuration")?;
|
.ok_or_internal_error("Invalid bucket CORS configuration")?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,14 +1,9 @@
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use base64::prelude::*;
|
use base64::prelude::*;
|
||||||
use hyper::{Request, Response, StatusCode};
|
use hyper::{Request, Response, StatusCode};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use garage_util::data::*;
|
|
||||||
|
|
||||||
use garage_table::{EnumerationOrder, TableSchema};
|
use garage_table::{EnumerationOrder, TableSchema};
|
||||||
|
|
||||||
use garage_model::garage::Garage;
|
|
||||||
use garage_model::k2v::causality::*;
|
use garage_model::k2v::causality::*;
|
||||||
use garage_model::k2v::item_table::*;
|
use garage_model::k2v::item_table::*;
|
||||||
|
|
||||||
|
@ -18,10 +13,12 @@ use crate::k2v::error::*;
|
||||||
use crate::k2v::range::read_range;
|
use crate::k2v::range::read_range;
|
||||||
|
|
||||||
pub async fn handle_insert_batch(
|
pub async fn handle_insert_batch(
|
||||||
garage: Arc<Garage>,
|
ctx: ReqCtx,
|
||||||
bucket_id: Uuid,
|
|
||||||
req: Request<ReqBody>,
|
req: Request<ReqBody>,
|
||||||
) -> Result<Response<ResBody>, Error> {
|
) -> Result<Response<ResBody>, Error> {
|
||||||
|
let ReqCtx {
|
||||||
|
garage, bucket_id, ..
|
||||||
|
} = &ctx;
|
||||||
let items = parse_json_body::<Vec<InsertBatchItem>, _, Error>(req).await?;
|
let items = parse_json_body::<Vec<InsertBatchItem>, _, Error>(req).await?;
|
||||||
|
|
||||||
let mut items2 = vec![];
|
let mut items2 = vec![];
|
||||||
|
@ -38,7 +35,7 @@ pub async fn handle_insert_batch(
|
||||||
items2.push((it.pk, it.sk, ct, v));
|
items2.push((it.pk, it.sk, ct, v));
|
||||||
}
|
}
|
||||||
|
|
||||||
garage.k2v.rpc.insert_batch(bucket_id, items2).await?;
|
garage.k2v.rpc.insert_batch(*bucket_id, items2).await?;
|
||||||
|
|
||||||
Ok(Response::builder()
|
Ok(Response::builder()
|
||||||
.status(StatusCode::NO_CONTENT)
|
.status(StatusCode::NO_CONTENT)
|
||||||
|
@ -46,8 +43,7 @@ pub async fn handle_insert_batch(
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_read_batch(
|
pub async fn handle_read_batch(
|
||||||
garage: Arc<Garage>,
|
ctx: ReqCtx,
|
||||||
bucket_id: Uuid,
|
|
||||||
req: Request<ReqBody>,
|
req: Request<ReqBody>,
|
||||||
) -> Result<Response<ResBody>, Error> {
|
) -> Result<Response<ResBody>, Error> {
|
||||||
let queries = parse_json_body::<Vec<ReadBatchQuery>, _, Error>(req).await?;
|
let queries = parse_json_body::<Vec<ReadBatchQuery>, _, Error>(req).await?;
|
||||||
|
@ -55,7 +51,7 @@ pub async fn handle_read_batch(
|
||||||
let resp_results = futures::future::join_all(
|
let resp_results = futures::future::join_all(
|
||||||
queries
|
queries
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|q| handle_read_batch_query(&garage, bucket_id, q)),
|
.map(|q| handle_read_batch_query(&ctx, q)),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
@ -68,12 +64,15 @@ pub async fn handle_read_batch(
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_read_batch_query(
|
async fn handle_read_batch_query(
|
||||||
garage: &Arc<Garage>,
|
ctx: &ReqCtx,
|
||||||
bucket_id: Uuid,
|
|
||||||
query: ReadBatchQuery,
|
query: ReadBatchQuery,
|
||||||
) -> Result<ReadBatchResponse, Error> {
|
) -> Result<ReadBatchResponse, Error> {
|
||||||
|
let ReqCtx {
|
||||||
|
garage, bucket_id, ..
|
||||||
|
} = ctx;
|
||||||
|
|
||||||
let partition = K2VItemPartition {
|
let partition = K2VItemPartition {
|
||||||
bucket_id,
|
bucket_id: *bucket_id,
|
||||||
partition_key: query.partition_key.clone(),
|
partition_key: query.partition_key.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -138,8 +137,7 @@ async fn handle_read_batch_query(
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_delete_batch(
|
pub async fn handle_delete_batch(
|
||||||
garage: Arc<Garage>,
|
ctx: ReqCtx,
|
||||||
bucket_id: Uuid,
|
|
||||||
req: Request<ReqBody>,
|
req: Request<ReqBody>,
|
||||||
) -> Result<Response<ResBody>, Error> {
|
) -> Result<Response<ResBody>, Error> {
|
||||||
let queries = parse_json_body::<Vec<DeleteBatchQuery>, _, Error>(req).await?;
|
let queries = parse_json_body::<Vec<DeleteBatchQuery>, _, Error>(req).await?;
|
||||||
|
@ -147,7 +145,7 @@ pub async fn handle_delete_batch(
|
||||||
let resp_results = futures::future::join_all(
|
let resp_results = futures::future::join_all(
|
||||||
queries
|
queries
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|q| handle_delete_batch_query(&garage, bucket_id, q)),
|
.map(|q| handle_delete_batch_query(&ctx, q)),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
@ -160,12 +158,15 @@ pub async fn handle_delete_batch(
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_delete_batch_query(
|
async fn handle_delete_batch_query(
|
||||||
garage: &Arc<Garage>,
|
ctx: &ReqCtx,
|
||||||
bucket_id: Uuid,
|
|
||||||
query: DeleteBatchQuery,
|
query: DeleteBatchQuery,
|
||||||
) -> Result<DeleteBatchResponse, Error> {
|
) -> Result<DeleteBatchResponse, Error> {
|
||||||
|
let ReqCtx {
|
||||||
|
garage, bucket_id, ..
|
||||||
|
} = &ctx;
|
||||||
|
|
||||||
let partition = K2VItemPartition {
|
let partition = K2VItemPartition {
|
||||||
bucket_id,
|
bucket_id: *bucket_id,
|
||||||
partition_key: query.partition_key.clone(),
|
partition_key: query.partition_key.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -195,7 +196,7 @@ async fn handle_delete_batch_query(
|
||||||
.k2v
|
.k2v
|
||||||
.rpc
|
.rpc
|
||||||
.insert(
|
.insert(
|
||||||
bucket_id,
|
*bucket_id,
|
||||||
i.partition.partition_key,
|
i.partition.partition_key,
|
||||||
i.sort_key,
|
i.sort_key,
|
||||||
Some(cc),
|
Some(cc),
|
||||||
|
@ -235,7 +236,7 @@ async fn handle_delete_batch_query(
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
let n = items.len();
|
let n = items.len();
|
||||||
|
|
||||||
garage.k2v.rpc.insert_batch(bucket_id, items).await?;
|
garage.k2v.rpc.insert_batch(*bucket_id, items).await?;
|
||||||
|
|
||||||
n
|
n
|
||||||
};
|
};
|
||||||
|
@ -251,11 +252,13 @@ async fn handle_delete_batch_query(
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn handle_poll_range(
|
pub(crate) async fn handle_poll_range(
|
||||||
garage: Arc<Garage>,
|
ctx: ReqCtx,
|
||||||
bucket_id: Uuid,
|
|
||||||
partition_key: &str,
|
partition_key: &str,
|
||||||
req: Request<ReqBody>,
|
req: Request<ReqBody>,
|
||||||
) -> Result<Response<ResBody>, Error> {
|
) -> Result<Response<ResBody>, Error> {
|
||||||
|
let ReqCtx {
|
||||||
|
garage, bucket_id, ..
|
||||||
|
} = ctx;
|
||||||
use garage_model::k2v::sub::PollRange;
|
use garage_model::k2v::sub::PollRange;
|
||||||
|
|
||||||
let query = parse_json_body::<PollRangeQuery, _, Error>(req).await?;
|
let query = parse_json_body::<PollRangeQuery, _, Error>(req).await?;
|
||||||
|
|
|
@ -3,12 +3,9 @@ use std::sync::Arc;
|
||||||
use hyper::Response;
|
use hyper::Response;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
|
|
||||||
use garage_util::data::*;
|
|
||||||
|
|
||||||
use garage_rpc::ring::Ring;
|
use garage_rpc::ring::Ring;
|
||||||
use garage_table::util::*;
|
use garage_table::util::*;
|
||||||
|
|
||||||
use garage_model::garage::Garage;
|
|
||||||
use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
|
use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
|
||||||
|
|
||||||
use crate::helpers::*;
|
use crate::helpers::*;
|
||||||
|
@ -17,14 +14,17 @@ use crate::k2v::error::*;
|
||||||
use crate::k2v::range::read_range;
|
use crate::k2v::range::read_range;
|
||||||
|
|
||||||
pub async fn handle_read_index(
|
pub async fn handle_read_index(
|
||||||
garage: Arc<Garage>,
|
ctx: ReqCtx,
|
||||||
bucket_id: Uuid,
|
|
||||||
prefix: Option<String>,
|
prefix: Option<String>,
|
||||||
start: Option<String>,
|
start: Option<String>,
|
||||||
end: Option<String>,
|
end: Option<String>,
|
||||||
limit: Option<u64>,
|
limit: Option<u64>,
|
||||||
reverse: Option<bool>,
|
reverse: Option<bool>,
|
||||||
) -> Result<Response<ResBody>, Error> {
|
) -> Result<Response<ResBody>, Error> {
|
||||||
|
let ReqCtx {
|
||||||
|
garage, bucket_id, ..
|
||||||
|
} = &ctx;
|
||||||
|
|
||||||
let reverse = reverse.unwrap_or(false);
|
let reverse = reverse.unwrap_or(false);
|
||||||
|
|
||||||
let ring: Arc<Ring> = garage.system.ring.borrow().clone();
|
let ring: Arc<Ring> = garage.system.ring.borrow().clone();
|
||||||
|
|
|
@ -1,13 +1,8 @@
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use base64::prelude::*;
|
use base64::prelude::*;
|
||||||
use http::header;
|
use http::header;
|
||||||
|
|
||||||
use hyper::{Request, Response, StatusCode};
|
use hyper::{Request, Response, StatusCode};
|
||||||
|
|
||||||
use garage_util::data::*;
|
|
||||||
|
|
||||||
use garage_model::garage::Garage;
|
|
||||||
use garage_model::k2v::causality::*;
|
use garage_model::k2v::causality::*;
|
||||||
use garage_model::k2v::item_table::*;
|
use garage_model::k2v::item_table::*;
|
||||||
|
|
||||||
|
@ -100,12 +95,15 @@ impl ReturnFormat {
|
||||||
/// Handle ReadItem request
|
/// Handle ReadItem request
|
||||||
#[allow(clippy::ptr_arg)]
|
#[allow(clippy::ptr_arg)]
|
||||||
pub async fn handle_read_item(
|
pub async fn handle_read_item(
|
||||||
garage: Arc<Garage>,
|
ctx: ReqCtx,
|
||||||
req: &Request<ReqBody>,
|
req: &Request<ReqBody>,
|
||||||
bucket_id: Uuid,
|
|
||||||
partition_key: &str,
|
partition_key: &str,
|
||||||
sort_key: &String,
|
sort_key: &String,
|
||||||
) -> Result<Response<ResBody>, Error> {
|
) -> Result<Response<ResBody>, Error> {
|
||||||
|
let ReqCtx {
|
||||||
|
garage, bucket_id, ..
|
||||||
|
} = &ctx;
|
||||||
|
|
||||||
let format = ReturnFormat::from(req)?;
|
let format = ReturnFormat::from(req)?;
|
||||||
|
|
||||||
let item = garage
|
let item = garage
|
||||||
|
@ -113,7 +111,7 @@ pub async fn handle_read_item(
|
||||||
.item_table
|
.item_table
|
||||||
.get(
|
.get(
|
||||||
&K2VItemPartition {
|
&K2VItemPartition {
|
||||||
bucket_id,
|
bucket_id: *bucket_id,
|
||||||
partition_key: partition_key.to_string(),
|
partition_key: partition_key.to_string(),
|
||||||
},
|
},
|
||||||
sort_key,
|
sort_key,
|
||||||
|
@ -125,12 +123,14 @@ pub async fn handle_read_item(
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_insert_item(
|
pub async fn handle_insert_item(
|
||||||
garage: Arc<Garage>,
|
ctx: ReqCtx,
|
||||||
req: Request<ReqBody>,
|
req: Request<ReqBody>,
|
||||||
bucket_id: Uuid,
|
|
||||||
partition_key: &str,
|
partition_key: &str,
|
||||||
sort_key: &str,
|
sort_key: &str,
|
||||||
) -> Result<Response<ResBody>, Error> {
|
) -> Result<Response<ResBody>, Error> {
|
||||||
|
let ReqCtx {
|
||||||
|
garage, bucket_id, ..
|
||||||
|
} = &ctx;
|
||||||
let causal_context = req
|
let causal_context = req
|
||||||
.headers()
|
.headers()
|
||||||
.get(X_GARAGE_CAUSALITY_TOKEN)
|
.get(X_GARAGE_CAUSALITY_TOKEN)
|
||||||
|
@ -149,7 +149,7 @@ pub async fn handle_insert_item(
|
||||||
.k2v
|
.k2v
|
||||||
.rpc
|
.rpc
|
||||||
.insert(
|
.insert(
|
||||||
bucket_id,
|
*bucket_id,
|
||||||
partition_key.to_string(),
|
partition_key.to_string(),
|
||||||
sort_key.to_string(),
|
sort_key.to_string(),
|
||||||
causal_context,
|
causal_context,
|
||||||
|
@ -163,12 +163,14 @@ pub async fn handle_insert_item(
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_delete_item(
|
pub async fn handle_delete_item(
|
||||||
garage: Arc<Garage>,
|
ctx: ReqCtx,
|
||||||
req: Request<ReqBody>,
|
req: Request<ReqBody>,
|
||||||
bucket_id: Uuid,
|
|
||||||
partition_key: &str,
|
partition_key: &str,
|
||||||
sort_key: &str,
|
sort_key: &str,
|
||||||
) -> Result<Response<ResBody>, Error> {
|
) -> Result<Response<ResBody>, Error> {
|
||||||
|
let ReqCtx {
|
||||||
|
garage, bucket_id, ..
|
||||||
|
} = &ctx;
|
||||||
let causal_context = req
|
let causal_context = req
|
||||||
.headers()
|
.headers()
|
||||||
.get(X_GARAGE_CAUSALITY_TOKEN)
|
.get(X_GARAGE_CAUSALITY_TOKEN)
|
||||||
|
@ -183,7 +185,7 @@ pub async fn handle_delete_item(
|
||||||
.k2v
|
.k2v
|
||||||
.rpc
|
.rpc
|
||||||
.insert(
|
.insert(
|
||||||
bucket_id,
|
*bucket_id,
|
||||||
partition_key.to_string(),
|
partition_key.to_string(),
|
||||||
sort_key.to_string(),
|
sort_key.to_string(),
|
||||||
causal_context,
|
causal_context,
|
||||||
|
@ -199,14 +201,16 @@ pub async fn handle_delete_item(
|
||||||
/// Handle ReadItem request
|
/// Handle ReadItem request
|
||||||
#[allow(clippy::ptr_arg)]
|
#[allow(clippy::ptr_arg)]
|
||||||
pub async fn handle_poll_item(
|
pub async fn handle_poll_item(
|
||||||
garage: Arc<Garage>,
|
ctx: ReqCtx,
|
||||||
req: &Request<ReqBody>,
|
req: &Request<ReqBody>,
|
||||||
bucket_id: Uuid,
|
|
||||||
partition_key: String,
|
partition_key: String,
|
||||||
sort_key: String,
|
sort_key: String,
|
||||||
causality_token: String,
|
causality_token: String,
|
||||||
timeout_secs: Option<u64>,
|
timeout_secs: Option<u64>,
|
||||||
) -> Result<Response<ResBody>, Error> {
|
) -> Result<Response<ResBody>, Error> {
|
||||||
|
let ReqCtx {
|
||||||
|
garage, bucket_id, ..
|
||||||
|
} = &ctx;
|
||||||
let format = ReturnFormat::from(req)?;
|
let format = ReturnFormat::from(req)?;
|
||||||
|
|
||||||
let causal_context =
|
let causal_context =
|
||||||
|
@ -218,7 +222,7 @@ pub async fn handle_poll_item(
|
||||||
.k2v
|
.k2v
|
||||||
.rpc
|
.rpc
|
||||||
.poll_item(
|
.poll_item(
|
||||||
bucket_id,
|
*bucket_id,
|
||||||
partition_key,
|
partition_key,
|
||||||
sort_key,
|
sort_key,
|
||||||
causal_context,
|
causal_context,
|
||||||
|
|
|
@ -155,6 +155,7 @@ impl ApiHandler for S3ApiServer {
|
||||||
.bucket_helper()
|
.bucket_helper()
|
||||||
.get_existing_bucket(bucket_id)
|
.get_existing_bucket(bucket_id)
|
||||||
.await?;
|
.await?;
|
||||||
|
let bucket_params = bucket.state.into_option().unwrap();
|
||||||
|
|
||||||
let allowed = match endpoint.authorization_type() {
|
let allowed = match endpoint.authorization_type() {
|
||||||
Authorization::Read => api_key.allow_read(&bucket_id),
|
Authorization::Read => api_key.allow_read(&bucket_id),
|
||||||
|
@ -167,12 +168,20 @@ impl ApiHandler for S3ApiServer {
|
||||||
return Err(Error::forbidden("Operation is not allowed for this key."));
|
return Err(Error::forbidden("Operation is not allowed for this key."));
|
||||||
}
|
}
|
||||||
|
|
||||||
let matching_cors_rule = find_matching_cors_rule(&bucket, &req)?;
|
let matching_cors_rule = find_matching_cors_rule(&bucket_params, &req)?.cloned();
|
||||||
|
|
||||||
|
let ctx = ReqCtx {
|
||||||
|
garage,
|
||||||
|
bucket_id,
|
||||||
|
bucket_name,
|
||||||
|
bucket_params,
|
||||||
|
api_key,
|
||||||
|
};
|
||||||
|
|
||||||
let resp = match endpoint {
|
let resp = match endpoint {
|
||||||
Endpoint::HeadObject {
|
Endpoint::HeadObject {
|
||||||
key, part_number, ..
|
key, part_number, ..
|
||||||
} => handle_head(garage, &req, bucket_id, &key, part_number).await,
|
} => handle_head(ctx, &req, &key, part_number).await,
|
||||||
Endpoint::GetObject {
|
Endpoint::GetObject {
|
||||||
key,
|
key,
|
||||||
part_number,
|
part_number,
|
||||||
|
@ -192,74 +201,37 @@ impl ApiHandler for S3ApiServer {
|
||||||
response_content_type,
|
response_content_type,
|
||||||
response_expires,
|
response_expires,
|
||||||
};
|
};
|
||||||
handle_get(garage, &req, bucket_id, &key, part_number, overrides).await
|
handle_get(ctx, &req, &key, part_number, overrides).await
|
||||||
}
|
}
|
||||||
Endpoint::UploadPart {
|
Endpoint::UploadPart {
|
||||||
key,
|
key,
|
||||||
part_number,
|
part_number,
|
||||||
upload_id,
|
upload_id,
|
||||||
} => {
|
} => handle_put_part(ctx, req, &key, part_number, &upload_id, content_sha256).await,
|
||||||
handle_put_part(
|
Endpoint::CopyObject { key } => handle_copy(ctx, &req, &key).await,
|
||||||
garage,
|
|
||||||
req,
|
|
||||||
bucket_id,
|
|
||||||
&key,
|
|
||||||
part_number,
|
|
||||||
&upload_id,
|
|
||||||
content_sha256,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
Endpoint::CopyObject { key } => {
|
|
||||||
handle_copy(garage, &api_key, &req, bucket_id, &key).await
|
|
||||||
}
|
|
||||||
Endpoint::UploadPartCopy {
|
Endpoint::UploadPartCopy {
|
||||||
key,
|
key,
|
||||||
part_number,
|
part_number,
|
||||||
upload_id,
|
upload_id,
|
||||||
} => {
|
} => handle_upload_part_copy(ctx, &req, &key, part_number, &upload_id).await,
|
||||||
handle_upload_part_copy(
|
Endpoint::PutObject { key } => handle_put(ctx, req, &key, content_sha256).await,
|
||||||
garage,
|
|
||||||
&api_key,
|
|
||||||
&req,
|
|
||||||
bucket_id,
|
|
||||||
&key,
|
|
||||||
part_number,
|
|
||||||
&upload_id,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
Endpoint::PutObject { key } => {
|
|
||||||
handle_put(garage, req, &bucket, &key, content_sha256).await
|
|
||||||
}
|
|
||||||
Endpoint::AbortMultipartUpload { key, upload_id } => {
|
Endpoint::AbortMultipartUpload { key, upload_id } => {
|
||||||
handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await
|
handle_abort_multipart_upload(ctx, &key, &upload_id).await
|
||||||
}
|
}
|
||||||
Endpoint::DeleteObject { key, .. } => handle_delete(garage, bucket_id, &key).await,
|
Endpoint::DeleteObject { key, .. } => handle_delete(ctx, &key).await,
|
||||||
Endpoint::CreateMultipartUpload { key } => {
|
Endpoint::CreateMultipartUpload { key } => {
|
||||||
handle_create_multipart_upload(garage, &req, &bucket_name, bucket_id, &key).await
|
handle_create_multipart_upload(ctx, &req, &key).await
|
||||||
}
|
}
|
||||||
Endpoint::CompleteMultipartUpload { key, upload_id } => {
|
Endpoint::CompleteMultipartUpload { key, upload_id } => {
|
||||||
handle_complete_multipart_upload(
|
handle_complete_multipart_upload(ctx, req, &key, &upload_id, content_sha256).await
|
||||||
garage,
|
|
||||||
req,
|
|
||||||
&bucket_name,
|
|
||||||
&bucket,
|
|
||||||
&key,
|
|
||||||
&upload_id,
|
|
||||||
content_sha256,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
}
|
}
|
||||||
Endpoint::CreateBucket {} => unreachable!(),
|
Endpoint::CreateBucket {} => unreachable!(),
|
||||||
Endpoint::HeadBucket {} => {
|
Endpoint::HeadBucket {} => {
|
||||||
let response = Response::builder().body(empty_body()).unwrap();
|
let response = Response::builder().body(empty_body()).unwrap();
|
||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
Endpoint::DeleteBucket {} => {
|
Endpoint::DeleteBucket {} => handle_delete_bucket(ctx).await,
|
||||||
handle_delete_bucket(&garage, bucket_id, bucket_name, &api_key.key_id).await
|
Endpoint::GetBucketLocation {} => handle_get_bucket_location(ctx),
|
||||||
}
|
|
||||||
Endpoint::GetBucketLocation {} => handle_get_bucket_location(garage),
|
|
||||||
Endpoint::GetBucketVersioning {} => handle_get_bucket_versioning(),
|
Endpoint::GetBucketVersioning {} => handle_get_bucket_versioning(),
|
||||||
Endpoint::ListObjects {
|
Endpoint::ListObjects {
|
||||||
delimiter,
|
delimiter,
|
||||||
|
@ -268,24 +240,21 @@ impl ApiHandler for S3ApiServer {
|
||||||
max_keys,
|
max_keys,
|
||||||
prefix,
|
prefix,
|
||||||
} => {
|
} => {
|
||||||
handle_list(
|
let query = ListObjectsQuery {
|
||||||
garage,
|
common: ListQueryCommon {
|
||||||
&ListObjectsQuery {
|
bucket_name: ctx.bucket_name.clone(),
|
||||||
common: ListQueryCommon {
|
bucket_id,
|
||||||
bucket_name,
|
delimiter,
|
||||||
bucket_id,
|
page_size: max_keys.unwrap_or(1000).clamp(1, 1000),
|
||||||
delimiter,
|
prefix: prefix.unwrap_or_default(),
|
||||||
page_size: max_keys.unwrap_or(1000).clamp(1, 1000),
|
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
|
||||||
prefix: prefix.unwrap_or_default(),
|
|
||||||
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
|
|
||||||
},
|
|
||||||
is_v2: false,
|
|
||||||
marker,
|
|
||||||
continuation_token: None,
|
|
||||||
start_after: None,
|
|
||||||
},
|
},
|
||||||
)
|
is_v2: false,
|
||||||
.await
|
marker,
|
||||||
|
continuation_token: None,
|
||||||
|
start_after: None,
|
||||||
|
};
|
||||||
|
handle_list(ctx, &query).await
|
||||||
}
|
}
|
||||||
Endpoint::ListObjectsV2 {
|
Endpoint::ListObjectsV2 {
|
||||||
delimiter,
|
delimiter,
|
||||||
|
@ -298,24 +267,21 @@ impl ApiHandler for S3ApiServer {
|
||||||
..
|
..
|
||||||
} => {
|
} => {
|
||||||
if list_type == "2" {
|
if list_type == "2" {
|
||||||
handle_list(
|
let query = ListObjectsQuery {
|
||||||
garage,
|
common: ListQueryCommon {
|
||||||
&ListObjectsQuery {
|
bucket_name: ctx.bucket_name.clone(),
|
||||||
common: ListQueryCommon {
|
bucket_id,
|
||||||
bucket_name,
|
delimiter,
|
||||||
bucket_id,
|
page_size: max_keys.unwrap_or(1000).clamp(1, 1000),
|
||||||
delimiter,
|
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
|
||||||
page_size: max_keys.unwrap_or(1000).clamp(1, 1000),
|
prefix: prefix.unwrap_or_default(),
|
||||||
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
|
|
||||||
prefix: prefix.unwrap_or_default(),
|
|
||||||
},
|
|
||||||
is_v2: true,
|
|
||||||
marker: None,
|
|
||||||
continuation_token,
|
|
||||||
start_after,
|
|
||||||
},
|
},
|
||||||
)
|
is_v2: true,
|
||||||
.await
|
marker: None,
|
||||||
|
continuation_token,
|
||||||
|
start_after,
|
||||||
|
};
|
||||||
|
handle_list(ctx, &query).await
|
||||||
} else {
|
} else {
|
||||||
Err(Error::bad_request(format!(
|
Err(Error::bad_request(format!(
|
||||||
"Invalid endpoint: list-type={}",
|
"Invalid endpoint: list-type={}",
|
||||||
|
@ -331,22 +297,19 @@ impl ApiHandler for S3ApiServer {
|
||||||
prefix,
|
prefix,
|
||||||
upload_id_marker,
|
upload_id_marker,
|
||||||
} => {
|
} => {
|
||||||
handle_list_multipart_upload(
|
let query = ListMultipartUploadsQuery {
|
||||||
garage,
|
common: ListQueryCommon {
|
||||||
&ListMultipartUploadsQuery {
|
bucket_name: ctx.bucket_name.clone(),
|
||||||
common: ListQueryCommon {
|
bucket_id,
|
||||||
bucket_name,
|
delimiter,
|
||||||
bucket_id,
|
page_size: max_uploads.unwrap_or(1000).clamp(1, 1000),
|
||||||
delimiter,
|
prefix: prefix.unwrap_or_default(),
|
||||||
page_size: max_uploads.unwrap_or(1000).clamp(1, 1000),
|
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
|
||||||
prefix: prefix.unwrap_or_default(),
|
|
||||||
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
|
|
||||||
},
|
|
||||||
key_marker,
|
|
||||||
upload_id_marker,
|
|
||||||
},
|
},
|
||||||
)
|
key_marker,
|
||||||
.await
|
upload_id_marker,
|
||||||
|
};
|
||||||
|
handle_list_multipart_upload(ctx, &query).await
|
||||||
}
|
}
|
||||||
Endpoint::ListParts {
|
Endpoint::ListParts {
|
||||||
key,
|
key,
|
||||||
|
@ -354,39 +317,28 @@ impl ApiHandler for S3ApiServer {
|
||||||
part_number_marker,
|
part_number_marker,
|
||||||
upload_id,
|
upload_id,
|
||||||
} => {
|
} => {
|
||||||
handle_list_parts(
|
let query = ListPartsQuery {
|
||||||
garage,
|
bucket_name: ctx.bucket_name.clone(),
|
||||||
&ListPartsQuery {
|
bucket_id,
|
||||||
bucket_name,
|
key,
|
||||||
bucket_id,
|
upload_id,
|
||||||
key,
|
part_number_marker: part_number_marker.map(|p| p.min(10000)),
|
||||||
upload_id,
|
max_parts: max_parts.unwrap_or(1000).clamp(1, 1000),
|
||||||
part_number_marker: part_number_marker.map(|p| p.min(10000)),
|
};
|
||||||
max_parts: max_parts.unwrap_or(1000).clamp(1, 1000),
|
handle_list_parts(ctx, &query).await
|
||||||
},
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
}
|
}
|
||||||
Endpoint::DeleteObjects {} => {
|
Endpoint::DeleteObjects {} => handle_delete_objects(ctx, req, content_sha256).await,
|
||||||
handle_delete_objects(garage, bucket_id, req, content_sha256).await
|
Endpoint::GetBucketWebsite {} => handle_get_website(ctx).await,
|
||||||
}
|
Endpoint::PutBucketWebsite {} => handle_put_website(ctx, req, content_sha256).await,
|
||||||
Endpoint::GetBucketWebsite {} => handle_get_website(&bucket).await,
|
Endpoint::DeleteBucketWebsite {} => handle_delete_website(ctx).await,
|
||||||
Endpoint::PutBucketWebsite {} => {
|
Endpoint::GetBucketCors {} => handle_get_cors(ctx).await,
|
||||||
handle_put_website(garage, bucket.clone(), req, content_sha256).await
|
Endpoint::PutBucketCors {} => handle_put_cors(ctx, req, content_sha256).await,
|
||||||
}
|
Endpoint::DeleteBucketCors {} => handle_delete_cors(ctx).await,
|
||||||
Endpoint::DeleteBucketWebsite {} => handle_delete_website(garage, bucket.clone()).await,
|
Endpoint::GetBucketLifecycleConfiguration {} => handle_get_lifecycle(ctx).await,
|
||||||
Endpoint::GetBucketCors {} => handle_get_cors(&bucket).await,
|
|
||||||
Endpoint::PutBucketCors {} => {
|
|
||||||
handle_put_cors(garage, bucket.clone(), req, content_sha256).await
|
|
||||||
}
|
|
||||||
Endpoint::DeleteBucketCors {} => handle_delete_cors(garage, bucket.clone()).await,
|
|
||||||
Endpoint::GetBucketLifecycleConfiguration {} => handle_get_lifecycle(&bucket).await,
|
|
||||||
Endpoint::PutBucketLifecycleConfiguration {} => {
|
Endpoint::PutBucketLifecycleConfiguration {} => {
|
||||||
handle_put_lifecycle(garage, bucket.clone(), req, content_sha256).await
|
handle_put_lifecycle(ctx, req, content_sha256).await
|
||||||
}
|
|
||||||
Endpoint::DeleteBucketLifecycle {} => {
|
|
||||||
handle_delete_lifecycle(garage, bucket.clone()).await
|
|
||||||
}
|
}
|
||||||
|
Endpoint::DeleteBucketLifecycle {} => handle_delete_lifecycle(ctx).await,
|
||||||
endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
|
endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -394,7 +346,7 @@ impl ApiHandler for S3ApiServer {
|
||||||
// add the corresponding CORS headers to the response
|
// add the corresponding CORS headers to the response
|
||||||
let mut resp_ok = resp?;
|
let mut resp_ok = resp?;
|
||||||
if let Some(rule) = matching_cors_rule {
|
if let Some(rule) = matching_cors_rule {
|
||||||
add_cors_headers(&mut resp_ok, rule)
|
add_cors_headers(&mut resp_ok, &rule)
|
||||||
.ok_or_internal_error("Invalid bucket CORS configuration")?;
|
.ok_or_internal_error("Invalid bucket CORS configuration")?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use http_body_util::BodyExt;
|
use http_body_util::BodyExt;
|
||||||
use hyper::{Request, Response, StatusCode};
|
use hyper::{Request, Response, StatusCode};
|
||||||
|
@ -21,7 +20,8 @@ use crate::s3::error::*;
|
||||||
use crate::s3::xml as s3_xml;
|
use crate::s3::xml as s3_xml;
|
||||||
use crate::signature::verify_signed_content;
|
use crate::signature::verify_signed_content;
|
||||||
|
|
||||||
pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<ResBody>, Error> {
|
pub fn handle_get_bucket_location(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
|
||||||
|
let ReqCtx { garage, .. } = ctx;
|
||||||
let loc = s3_xml::LocationConstraint {
|
let loc = s3_xml::LocationConstraint {
|
||||||
xmlns: (),
|
xmlns: (),
|
||||||
region: garage.config.s3_api.s3_region.to_string(),
|
region: garage.config.s3_api.s3_region.to_string(),
|
||||||
|
@ -204,21 +204,20 @@ pub async fn handle_create_bucket(
|
||||||
.unwrap())
|
.unwrap())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_delete_bucket(
|
pub async fn handle_delete_bucket(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
|
||||||
garage: &Garage,
|
let ReqCtx {
|
||||||
bucket_id: Uuid,
|
garage,
|
||||||
bucket_name: String,
|
bucket_id,
|
||||||
api_key_id: &String,
|
bucket_name,
|
||||||
) -> Result<Response<ResBody>, Error> {
|
bucket_params: bucket_state,
|
||||||
|
api_key,
|
||||||
|
..
|
||||||
|
} = &ctx;
|
||||||
let helper = garage.locked_helper().await;
|
let helper = garage.locked_helper().await;
|
||||||
|
|
||||||
let api_key = helper.key().get_existing_key(api_key_id).await?;
|
|
||||||
let key_params = api_key.params().unwrap();
|
let key_params = api_key.params().unwrap();
|
||||||
|
|
||||||
let is_local_alias = matches!(key_params.local_aliases.get(&bucket_name), Some(Some(_)));
|
let is_local_alias = matches!(key_params.local_aliases.get(bucket_name), Some(Some(_)));
|
||||||
|
|
||||||
let mut bucket = helper.bucket().get_existing_bucket(bucket_id).await?;
|
|
||||||
let bucket_state = bucket.state.as_option().unwrap();
|
|
||||||
|
|
||||||
// If the bucket has no other aliases, this is a true deletion.
|
// If the bucket has no other aliases, this is a true deletion.
|
||||||
// Otherwise, it is just an alias removal.
|
// Otherwise, it is just an alias removal.
|
||||||
|
@ -228,20 +227,20 @@ pub async fn handle_delete_bucket(
|
||||||
.items()
|
.items()
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|(_, _, active)| *active)
|
.filter(|(_, _, active)| *active)
|
||||||
.any(|(n, _, _)| is_local_alias || (*n != bucket_name));
|
.any(|(n, _, _)| is_local_alias || (*n != *bucket_name));
|
||||||
|
|
||||||
let has_other_local_aliases = bucket_state
|
let has_other_local_aliases = bucket_state
|
||||||
.local_aliases
|
.local_aliases
|
||||||
.items()
|
.items()
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|(_, _, active)| *active)
|
.filter(|(_, _, active)| *active)
|
||||||
.any(|((k, n), _, _)| !is_local_alias || *n != bucket_name || *k != api_key.key_id);
|
.any(|((k, n), _, _)| !is_local_alias || *n != *bucket_name || *k != api_key.key_id);
|
||||||
|
|
||||||
if !has_other_global_aliases && !has_other_local_aliases {
|
if !has_other_global_aliases && !has_other_local_aliases {
|
||||||
// Delete bucket
|
// Delete bucket
|
||||||
|
|
||||||
// Check bucket is empty
|
// Check bucket is empty
|
||||||
if !helper.bucket().is_bucket_empty(bucket_id).await? {
|
if !helper.bucket().is_bucket_empty(*bucket_id).await? {
|
||||||
return Err(CommonError::BucketNotEmpty.into());
|
return Err(CommonError::BucketNotEmpty.into());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -249,33 +248,36 @@ pub async fn handle_delete_bucket(
|
||||||
// 1. delete bucket alias
|
// 1. delete bucket alias
|
||||||
if is_local_alias {
|
if is_local_alias {
|
||||||
helper
|
helper
|
||||||
.unset_local_bucket_alias(bucket_id, &api_key.key_id, &bucket_name)
|
.unset_local_bucket_alias(*bucket_id, &api_key.key_id, bucket_name)
|
||||||
.await?;
|
.await?;
|
||||||
} else {
|
} else {
|
||||||
helper
|
helper
|
||||||
.unset_global_bucket_alias(bucket_id, &bucket_name)
|
.unset_global_bucket_alias(*bucket_id, bucket_name)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. delete authorization from keys that had access
|
// 2. delete authorization from keys that had access
|
||||||
for (key_id, _) in bucket.authorized_keys() {
|
for (key_id, _) in bucket_state.authorized_keys.items() {
|
||||||
helper
|
helper
|
||||||
.set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS)
|
.set_bucket_key_permissions(*bucket_id, key_id, BucketKeyPerm::NO_PERMISSIONS)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let bucket = Bucket {
|
||||||
|
id: *bucket_id,
|
||||||
|
state: Deletable::delete(),
|
||||||
|
};
|
||||||
// 3. delete bucket
|
// 3. delete bucket
|
||||||
bucket.state = Deletable::delete();
|
|
||||||
garage.bucket_table.insert(&bucket).await?;
|
garage.bucket_table.insert(&bucket).await?;
|
||||||
} else if is_local_alias {
|
} else if is_local_alias {
|
||||||
// Just unalias
|
// Just unalias
|
||||||
helper
|
helper
|
||||||
.unset_local_bucket_alias(bucket_id, &api_key.key_id, &bucket_name)
|
.unset_local_bucket_alias(*bucket_id, &api_key.key_id, bucket_name)
|
||||||
.await?;
|
.await?;
|
||||||
} else {
|
} else {
|
||||||
// Just unalias (but from global namespace)
|
// Just unalias (but from global namespace)
|
||||||
helper
|
helper
|
||||||
.unset_global_bucket_alias(bucket_id, &bucket_name)
|
.unset_global_bucket_alias(*bucket_id, bucket_name)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
|
||||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||||
|
|
||||||
use futures::{stream, stream::Stream, StreamExt};
|
use futures::{stream, stream::Stream, StreamExt};
|
||||||
|
@ -15,8 +14,6 @@ use garage_table::*;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::time::*;
|
use garage_util::time::*;
|
||||||
|
|
||||||
use garage_model::garage::Garage;
|
|
||||||
use garage_model::key_table::Key;
|
|
||||||
use garage_model::s3::block_ref_table::*;
|
use garage_model::s3::block_ref_table::*;
|
||||||
use garage_model::s3::mpu_table::*;
|
use garage_model::s3::mpu_table::*;
|
||||||
use garage_model::s3::object_table::*;
|
use garage_model::s3::object_table::*;
|
||||||
|
@ -30,15 +27,19 @@ use crate::s3::put::get_headers;
|
||||||
use crate::s3::xml::{self as s3_xml, xmlns_tag};
|
use crate::s3::xml::{self as s3_xml, xmlns_tag};
|
||||||
|
|
||||||
pub async fn handle_copy(
|
pub async fn handle_copy(
|
||||||
garage: Arc<Garage>,
|
ctx: ReqCtx,
|
||||||
api_key: &Key,
|
|
||||||
req: &Request<ReqBody>,
|
req: &Request<ReqBody>,
|
||||||
dest_bucket_id: Uuid,
|
|
||||||
dest_key: &str,
|
dest_key: &str,
|
||||||
) -> Result<Response<ResBody>, Error> {
|
) -> Result<Response<ResBody>, Error> {
|
||||||
let copy_precondition = CopyPreconditionHeaders::parse(req)?;
|
let copy_precondition = CopyPreconditionHeaders::parse(req)?;
|
||||||
|
|
||||||
let source_object = get_copy_source(&garage, api_key, req).await?;
|
let source_object = get_copy_source(&ctx, req).await?;
|
||||||
|
|
||||||
|
let ReqCtx {
|
||||||
|
garage,
|
||||||
|
bucket_id: dest_bucket_id,
|
||||||
|
..
|
||||||
|
} = ctx;
|
||||||
|
|
||||||
let (source_version, source_version_data, source_version_meta) =
|
let (source_version, source_version_data, source_version_meta) =
|
||||||
extract_source_info(&source_object)?;
|
extract_source_info(&source_object)?;
|
||||||
|
@ -181,10 +182,8 @@ pub async fn handle_copy(
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_upload_part_copy(
|
pub async fn handle_upload_part_copy(
|
||||||
garage: Arc<Garage>,
|
ctx: ReqCtx,
|
||||||
api_key: &Key,
|
|
||||||
req: &Request<ReqBody>,
|
req: &Request<ReqBody>,
|
||||||
dest_bucket_id: Uuid,
|
|
||||||
dest_key: &str,
|
dest_key: &str,
|
||||||
part_number: u64,
|
part_number: u64,
|
||||||
upload_id: &str,
|
upload_id: &str,
|
||||||
|
@ -195,10 +194,12 @@ pub async fn handle_upload_part_copy(
|
||||||
|
|
||||||
let dest_key = dest_key.to_string();
|
let dest_key = dest_key.to_string();
|
||||||
let (source_object, (_, _, mut dest_mpu)) = futures::try_join!(
|
let (source_object, (_, _, mut dest_mpu)) = futures::try_join!(
|
||||||
get_copy_source(&garage, api_key, req),
|
get_copy_source(&ctx, req),
|
||||||
multipart::get_upload(&garage, &dest_bucket_id, &dest_key, &dest_upload_id)
|
multipart::get_upload(&ctx, &dest_key, &dest_upload_id)
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
let ReqCtx { garage, .. } = ctx;
|
||||||
|
|
||||||
let (source_object_version, source_version_data, source_version_meta) =
|
let (source_object_version, source_version_data, source_version_meta) =
|
||||||
extract_source_info(&source_object)?;
|
extract_source_info(&source_object)?;
|
||||||
|
|
||||||
|
@ -439,11 +440,11 @@ pub async fn handle_upload_part_copy(
|
||||||
.body(string_body(resp_xml))?)
|
.body(string_body(resp_xml))?)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_copy_source(
|
async fn get_copy_source(ctx: &ReqCtx, req: &Request<ReqBody>) -> Result<Object, Error> {
|
||||||
garage: &Garage,
|
let ReqCtx {
|
||||||
api_key: &Key,
|
garage, api_key, ..
|
||||||
req: &Request<ReqBody>,
|
} = ctx;
|
||||||
) -> Result<Object, Error> {
|
|
||||||
let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?;
|
let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?;
|
||||||
let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?;
|
let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?;
|
||||||
|
|
||||||
|
|
|
@ -21,16 +21,13 @@ use crate::s3::error::*;
|
||||||
use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
|
use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
|
||||||
use crate::signature::verify_signed_content;
|
use crate::signature::verify_signed_content;
|
||||||
|
|
||||||
use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule};
|
use garage_model::bucket_table::{Bucket, BucketParams, CorsRule as GarageCorsRule};
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
|
||||||
pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<ResBody>, Error> {
|
pub async fn handle_get_cors(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
|
||||||
let param = bucket
|
let ReqCtx { bucket_params, .. } = ctx;
|
||||||
.params()
|
if let Some(cors) = bucket_params.cors_config.get() {
|
||||||
.ok_or_internal_error("Bucket should not be deleted at this point")?;
|
|
||||||
|
|
||||||
if let Some(cors) = param.cors_config.get() {
|
|
||||||
let wc = CorsConfiguration {
|
let wc = CorsConfiguration {
|
||||||
xmlns: (),
|
xmlns: (),
|
||||||
cors_rules: cors
|
cors_rules: cors
|
||||||
|
@ -50,16 +47,18 @@ pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<ResBody>, Error
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_delete_cors(
|
pub async fn handle_delete_cors(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
|
||||||
garage: Arc<Garage>,
|
let ReqCtx {
|
||||||
mut bucket: Bucket,
|
garage,
|
||||||
) -> Result<Response<ResBody>, Error> {
|
bucket_id,
|
||||||
let param = bucket
|
mut bucket_params,
|
||||||
.params_mut()
|
..
|
||||||
.ok_or_internal_error("Bucket should not be deleted at this point")?;
|
} = ctx;
|
||||||
|
bucket_params.cors_config.update(None);
|
||||||
param.cors_config.update(None);
|
garage
|
||||||
garage.bucket_table.insert(&bucket).await?;
|
.bucket_table
|
||||||
|
.insert(&Bucket::present(bucket_id, bucket_params))
|
||||||
|
.await?;
|
||||||
|
|
||||||
Ok(Response::builder()
|
Ok(Response::builder()
|
||||||
.status(StatusCode::NO_CONTENT)
|
.status(StatusCode::NO_CONTENT)
|
||||||
|
@ -67,28 +66,33 @@ pub async fn handle_delete_cors(
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_put_cors(
|
pub async fn handle_put_cors(
|
||||||
garage: Arc<Garage>,
|
ctx: ReqCtx,
|
||||||
mut bucket: Bucket,
|
|
||||||
req: Request<ReqBody>,
|
req: Request<ReqBody>,
|
||||||
content_sha256: Option<Hash>,
|
content_sha256: Option<Hash>,
|
||||||
) -> Result<Response<ResBody>, Error> {
|
) -> Result<Response<ResBody>, Error> {
|
||||||
|
let ReqCtx {
|
||||||
|
garage,
|
||||||
|
bucket_id,
|
||||||
|
mut bucket_params,
|
||||||
|
..
|
||||||
|
} = ctx;
|
||||||
|
|
||||||
let body = BodyExt::collect(req.into_body()).await?.to_bytes();
|
let body = BodyExt::collect(req.into_body()).await?.to_bytes();
|
||||||
|
|
||||||
if let Some(content_sha256) = content_sha256 {
|
if let Some(content_sha256) = content_sha256 {
|
||||||
verify_signed_content(content_sha256, &body[..])?;
|
verify_signed_content(content_sha256, &body[..])?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let param = bucket
|
|
||||||
.params_mut()
|
|
||||||
.ok_or_internal_error("Bucket should not be deleted at this point")?;
|
|
||||||
|
|
||||||
let conf: CorsConfiguration = from_reader(&body as &[u8])?;
|
let conf: CorsConfiguration = from_reader(&body as &[u8])?;
|
||||||
conf.validate()?;
|
conf.validate()?;
|
||||||
|
|
||||||
param
|
bucket_params
|
||||||
.cors_config
|
.cors_config
|
||||||
.update(Some(conf.into_garage_cors_config()?));
|
.update(Some(conf.into_garage_cors_config()?));
|
||||||
garage.bucket_table.insert(&bucket).await?;
|
garage
|
||||||
|
.bucket_table
|
||||||
|
.insert(&Bucket::present(bucket_id, bucket_params))
|
||||||
|
.await?;
|
||||||
|
|
||||||
Ok(Response::builder()
|
Ok(Response::builder()
|
||||||
.status(StatusCode::OK)
|
.status(StatusCode::OK)
|
||||||
|
@ -115,7 +119,8 @@ pub async fn handle_options_api(
|
||||||
let bucket_id = helper.resolve_global_bucket_name(&bn).await?;
|
let bucket_id = helper.resolve_global_bucket_name(&bn).await?;
|
||||||
if let Some(id) = bucket_id {
|
if let Some(id) = bucket_id {
|
||||||
let bucket = garage.bucket_helper().get_existing_bucket(id).await?;
|
let bucket = garage.bucket_helper().get_existing_bucket(id).await?;
|
||||||
handle_options_for_bucket(req, &bucket)
|
let bucket_params = bucket.state.into_option().unwrap();
|
||||||
|
handle_options_for_bucket(req, &bucket_params)
|
||||||
} else {
|
} else {
|
||||||
// If there is a bucket name in the request, but that name
|
// If there is a bucket name in the request, but that name
|
||||||
// does not correspond to a global alias for a bucket,
|
// does not correspond to a global alias for a bucket,
|
||||||
|
@ -145,7 +150,7 @@ pub async fn handle_options_api(
|
||||||
|
|
||||||
pub fn handle_options_for_bucket(
|
pub fn handle_options_for_bucket(
|
||||||
req: &Request<IncomingBody>,
|
req: &Request<IncomingBody>,
|
||||||
bucket: &Bucket,
|
bucket_params: &BucketParams,
|
||||||
) -> Result<Response<EmptyBody>, CommonError> {
|
) -> Result<Response<EmptyBody>, CommonError> {
|
||||||
let origin = req
|
let origin = req
|
||||||
.headers()
|
.headers()
|
||||||
|
@ -162,7 +167,7 @@ pub fn handle_options_for_bucket(
|
||||||
None => vec![],
|
None => vec![],
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some(cors_config) = bucket.params().unwrap().cors_config.get() {
|
if let Some(cors_config) = bucket_params.cors_config.get() {
|
||||||
let matching_rule = cors_config
|
let matching_rule = cors_config
|
||||||
.iter()
|
.iter()
|
||||||
.find(|rule| cors_rule_matches(rule, origin, request_method, request_headers.iter()));
|
.find(|rule| cors_rule_matches(rule, origin, request_method, request_headers.iter()));
|
||||||
|
@ -181,10 +186,10 @@ pub fn handle_options_for_bucket(
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn find_matching_cors_rule<'a>(
|
pub fn find_matching_cors_rule<'a>(
|
||||||
bucket: &'a Bucket,
|
bucket_params: &'a BucketParams,
|
||||||
req: &Request<impl Body>,
|
req: &Request<impl Body>,
|
||||||
) -> Result<Option<&'a GarageCorsRule>, Error> {
|
) -> Result<Option<&'a GarageCorsRule>, Error> {
|
||||||
if let Some(cors_config) = bucket.params().unwrap().cors_config.get() {
|
if let Some(cors_config) = bucket_params.cors_config.get() {
|
||||||
if let Some(origin) = req.headers().get("Origin") {
|
if let Some(origin) = req.headers().get("Origin") {
|
||||||
let origin = origin.to_str()?;
|
let origin = origin.to_str()?;
|
||||||
let request_headers = match req.headers().get(ACCESS_CONTROL_REQUEST_HEADERS) {
|
let request_headers = match req.headers().get(ACCESS_CONTROL_REQUEST_HEADERS) {
|
||||||
|
|
|
@ -1,11 +1,8 @@
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use http_body_util::BodyExt;
|
use http_body_util::BodyExt;
|
||||||
use hyper::{Request, Response, StatusCode};
|
use hyper::{Request, Response, StatusCode};
|
||||||
|
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
|
||||||
use garage_model::garage::Garage;
|
|
||||||
use garage_model::s3::object_table::*;
|
use garage_model::s3::object_table::*;
|
||||||
|
|
||||||
use crate::helpers::*;
|
use crate::helpers::*;
|
||||||
|
@ -15,14 +12,13 @@ use crate::s3::put::next_timestamp;
|
||||||
use crate::s3::xml as s3_xml;
|
use crate::s3::xml as s3_xml;
|
||||||
use crate::signature::verify_signed_content;
|
use crate::signature::verify_signed_content;
|
||||||
|
|
||||||
async fn handle_delete_internal(
|
async fn handle_delete_internal(ctx: &ReqCtx, key: &str) -> Result<(Uuid, Uuid), Error> {
|
||||||
garage: &Garage,
|
let ReqCtx {
|
||||||
bucket_id: Uuid,
|
garage, bucket_id, ..
|
||||||
key: &str,
|
} = ctx;
|
||||||
) -> Result<(Uuid, Uuid), Error> {
|
|
||||||
let object = garage
|
let object = garage
|
||||||
.object_table
|
.object_table
|
||||||
.get(&bucket_id, &key.to_string())
|
.get(bucket_id, &key.to_string())
|
||||||
.await?
|
.await?
|
||||||
.ok_or(Error::NoSuchKey)?; // No need to delete
|
.ok_or(Error::NoSuchKey)?; // No need to delete
|
||||||
|
|
||||||
|
@ -44,7 +40,7 @@ async fn handle_delete_internal(
|
||||||
};
|
};
|
||||||
|
|
||||||
let object = Object::new(
|
let object = Object::new(
|
||||||
bucket_id,
|
*bucket_id,
|
||||||
key.into(),
|
key.into(),
|
||||||
vec![ObjectVersion {
|
vec![ObjectVersion {
|
||||||
uuid: del_uuid,
|
uuid: del_uuid,
|
||||||
|
@ -58,12 +54,8 @@ async fn handle_delete_internal(
|
||||||
Ok((deleted_version, del_uuid))
|
Ok((deleted_version, del_uuid))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_delete(
|
pub async fn handle_delete(ctx: ReqCtx, key: &str) -> Result<Response<ResBody>, Error> {
|
||||||
garage: Arc<Garage>,
|
match handle_delete_internal(&ctx, key).await {
|
||||||
bucket_id: Uuid,
|
|
||||||
key: &str,
|
|
||||||
) -> Result<Response<ResBody>, Error> {
|
|
||||||
match handle_delete_internal(&garage, bucket_id, key).await {
|
|
||||||
Ok(_) | Err(Error::NoSuchKey) => Ok(Response::builder()
|
Ok(_) | Err(Error::NoSuchKey) => Ok(Response::builder()
|
||||||
.status(StatusCode::NO_CONTENT)
|
.status(StatusCode::NO_CONTENT)
|
||||||
.body(empty_body())
|
.body(empty_body())
|
||||||
|
@ -73,8 +65,7 @@ pub async fn handle_delete(
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_delete_objects(
|
pub async fn handle_delete_objects(
|
||||||
garage: Arc<Garage>,
|
ctx: ReqCtx,
|
||||||
bucket_id: Uuid,
|
|
||||||
req: Request<ReqBody>,
|
req: Request<ReqBody>,
|
||||||
content_sha256: Option<Hash>,
|
content_sha256: Option<Hash>,
|
||||||
) -> Result<Response<ResBody>, Error> {
|
) -> Result<Response<ResBody>, Error> {
|
||||||
|
@ -91,7 +82,7 @@ pub async fn handle_delete_objects(
|
||||||
let mut ret_errors = Vec::new();
|
let mut ret_errors = Vec::new();
|
||||||
|
|
||||||
for obj in cmd.objects.iter() {
|
for obj in cmd.objects.iter() {
|
||||||
match handle_delete_internal(&garage, bucket_id, &obj.key).await {
|
match handle_delete_internal(&ctx, &obj.key).await {
|
||||||
Ok((deleted_version, delete_marker_version)) => {
|
Ok((deleted_version, delete_marker_version)) => {
|
||||||
if cmd.quiet {
|
if cmd.quiet {
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -131,6 +131,16 @@ fn try_answer_cached(
|
||||||
|
|
||||||
/// Handle HEAD request
|
/// Handle HEAD request
|
||||||
pub async fn handle_head(
|
pub async fn handle_head(
|
||||||
|
ctx: ReqCtx,
|
||||||
|
req: &Request<impl Body>,
|
||||||
|
key: &str,
|
||||||
|
part_number: Option<u64>,
|
||||||
|
) -> Result<Response<ResBody>, Error> {
|
||||||
|
handle_head_without_ctx(ctx.garage, req, ctx.bucket_id, key, part_number).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handle HEAD request for website
|
||||||
|
pub async fn handle_head_without_ctx(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
req: &Request<impl Body>,
|
req: &Request<impl Body>,
|
||||||
bucket_id: Uuid,
|
bucket_id: Uuid,
|
||||||
|
@ -218,6 +228,17 @@ pub async fn handle_head(
|
||||||
|
|
||||||
/// Handle GET request
|
/// Handle GET request
|
||||||
pub async fn handle_get(
|
pub async fn handle_get(
|
||||||
|
ctx: ReqCtx,
|
||||||
|
req: &Request<impl Body>,
|
||||||
|
key: &str,
|
||||||
|
part_number: Option<u64>,
|
||||||
|
overrides: GetObjectOverrides,
|
||||||
|
) -> Result<Response<ResBody>, Error> {
|
||||||
|
handle_get_without_ctx(ctx.garage, req, ctx.bucket_id, key, part_number, overrides).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handle GET request
|
||||||
|
pub async fn handle_get_without_ctx(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
req: &Request<impl Body>,
|
req: &Request<impl Body>,
|
||||||
bucket_id: Uuid,
|
bucket_id: Uuid,
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
use quick_xml::de::from_reader;
|
use quick_xml::de::from_reader;
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use http_body_util::BodyExt;
|
use http_body_util::BodyExt;
|
||||||
use hyper::{Request, Response, StatusCode};
|
use hyper::{Request, Response, StatusCode};
|
||||||
|
@ -16,15 +15,12 @@ use garage_model::bucket_table::{
|
||||||
parse_lifecycle_date, Bucket, LifecycleExpiration as GarageLifecycleExpiration,
|
parse_lifecycle_date, Bucket, LifecycleExpiration as GarageLifecycleExpiration,
|
||||||
LifecycleFilter as GarageLifecycleFilter, LifecycleRule as GarageLifecycleRule,
|
LifecycleFilter as GarageLifecycleFilter, LifecycleRule as GarageLifecycleRule,
|
||||||
};
|
};
|
||||||
use garage_model::garage::Garage;
|
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
|
||||||
pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<ResBody>, Error> {
|
pub async fn handle_get_lifecycle(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
|
||||||
let param = bucket
|
let ReqCtx { bucket_params, .. } = ctx;
|
||||||
.params()
|
|
||||||
.ok_or_internal_error("Bucket should not be deleted at this point")?;
|
|
||||||
|
|
||||||
if let Some(lifecycle) = param.lifecycle_config.get() {
|
if let Some(lifecycle) = bucket_params.lifecycle_config.get() {
|
||||||
let wc = LifecycleConfiguration::from_garage_lifecycle_config(lifecycle);
|
let wc = LifecycleConfiguration::from_garage_lifecycle_config(lifecycle);
|
||||||
let xml = to_xml_with_header(&wc)?;
|
let xml = to_xml_with_header(&wc)?;
|
||||||
Ok(Response::builder()
|
Ok(Response::builder()
|
||||||
|
@ -38,16 +34,18 @@ pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<ResBody>,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_delete_lifecycle(
|
pub async fn handle_delete_lifecycle(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
|
||||||
garage: Arc<Garage>,
|
let ReqCtx {
|
||||||
mut bucket: Bucket,
|
garage,
|
||||||
) -> Result<Response<ResBody>, Error> {
|
bucket_id,
|
||||||
let param = bucket
|
mut bucket_params,
|
||||||
.params_mut()
|
..
|
||||||
.ok_or_internal_error("Bucket should not be deleted at this point")?;
|
} = ctx;
|
||||||
|
bucket_params.lifecycle_config.update(None);
|
||||||
param.lifecycle_config.update(None);
|
garage
|
||||||
garage.bucket_table.insert(&bucket).await?;
|
.bucket_table
|
||||||
|
.insert(&Bucket::present(bucket_id, bucket_params))
|
||||||
|
.await?;
|
||||||
|
|
||||||
Ok(Response::builder()
|
Ok(Response::builder()
|
||||||
.status(StatusCode::NO_CONTENT)
|
.status(StatusCode::NO_CONTENT)
|
||||||
|
@ -55,28 +53,33 @@ pub async fn handle_delete_lifecycle(
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_put_lifecycle(
|
pub async fn handle_put_lifecycle(
|
||||||
garage: Arc<Garage>,
|
ctx: ReqCtx,
|
||||||
mut bucket: Bucket,
|
|
||||||
req: Request<ReqBody>,
|
req: Request<ReqBody>,
|
||||||
content_sha256: Option<Hash>,
|
content_sha256: Option<Hash>,
|
||||||
) -> Result<Response<ResBody>, Error> {
|
) -> Result<Response<ResBody>, Error> {
|
||||||
|
let ReqCtx {
|
||||||
|
garage,
|
||||||
|
bucket_id,
|
||||||
|
mut bucket_params,
|
||||||
|
..
|
||||||
|
} = ctx;
|
||||||
|
|
||||||
let body = BodyExt::collect(req.into_body()).await?.to_bytes();
|
let body = BodyExt::collect(req.into_body()).await?.to_bytes();
|
||||||
|
|
||||||
if let Some(content_sha256) = content_sha256 {
|
if let Some(content_sha256) = content_sha256 {
|
||||||
verify_signed_content(content_sha256, &body[..])?;
|
verify_signed_content(content_sha256, &body[..])?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let param = bucket
|
|
||||||
.params_mut()
|
|
||||||
.ok_or_internal_error("Bucket should not be deleted at this point")?;
|
|
||||||
|
|
||||||
let conf: LifecycleConfiguration = from_reader(&body as &[u8])?;
|
let conf: LifecycleConfiguration = from_reader(&body as &[u8])?;
|
||||||
let config = conf
|
let config = conf
|
||||||
.validate_into_garage_lifecycle_config()
|
.validate_into_garage_lifecycle_config()
|
||||||
.ok_or_bad_request("Invalid lifecycle configuration")?;
|
.ok_or_bad_request("Invalid lifecycle configuration")?;
|
||||||
|
|
||||||
param.lifecycle_config.update(Some(config));
|
bucket_params.lifecycle_config.update(Some(config));
|
||||||
garage.bucket_table.insert(&bucket).await?;
|
garage
|
||||||
|
.bucket_table
|
||||||
|
.insert(&Bucket::present(bucket_id, bucket_params))
|
||||||
|
.await?;
|
||||||
|
|
||||||
Ok(Response::builder()
|
Ok(Response::builder()
|
||||||
.status(StatusCode::OK)
|
.status(StatusCode::OK)
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
use std::collections::{BTreeMap, BTreeSet};
|
use std::collections::{BTreeMap, BTreeSet};
|
||||||
use std::iter::{Iterator, Peekable};
|
use std::iter::{Iterator, Peekable};
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use base64::prelude::*;
|
use base64::prelude::*;
|
||||||
use hyper::Response;
|
use hyper::Response;
|
||||||
|
@ -9,7 +8,6 @@ use garage_util::data::*;
|
||||||
use garage_util::error::Error as GarageError;
|
use garage_util::error::Error as GarageError;
|
||||||
use garage_util::time::*;
|
use garage_util::time::*;
|
||||||
|
|
||||||
use garage_model::garage::Garage;
|
|
||||||
use garage_model::s3::mpu_table::*;
|
use garage_model::s3::mpu_table::*;
|
||||||
use garage_model::s3::object_table::*;
|
use garage_model::s3::object_table::*;
|
||||||
|
|
||||||
|
@ -62,9 +60,10 @@ pub struct ListPartsQuery {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_list(
|
pub async fn handle_list(
|
||||||
garage: Arc<Garage>,
|
ctx: ReqCtx,
|
||||||
query: &ListObjectsQuery,
|
query: &ListObjectsQuery,
|
||||||
) -> Result<Response<ResBody>, Error> {
|
) -> Result<Response<ResBody>, Error> {
|
||||||
|
let ReqCtx { garage, .. } = &ctx;
|
||||||
let io = |bucket, key, count| {
|
let io = |bucket, key, count| {
|
||||||
let t = &garage.object_table;
|
let t = &garage.object_table;
|
||||||
async move {
|
async move {
|
||||||
|
@ -167,9 +166,11 @@ pub async fn handle_list(
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_list_multipart_upload(
|
pub async fn handle_list_multipart_upload(
|
||||||
garage: Arc<Garage>,
|
ctx: ReqCtx,
|
||||||
query: &ListMultipartUploadsQuery,
|
query: &ListMultipartUploadsQuery,
|
||||||
) -> Result<Response<ResBody>, Error> {
|
) -> Result<Response<ResBody>, Error> {
|
||||||
|
let ReqCtx { garage, .. } = &ctx;
|
||||||
|
|
||||||
let io = |bucket, key, count| {
|
let io = |bucket, key, count| {
|
||||||
let t = &garage.object_table;
|
let t = &garage.object_table;
|
||||||
async move {
|
async move {
|
||||||
|
@ -269,15 +270,14 @@ pub async fn handle_list_multipart_upload(
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_list_parts(
|
pub async fn handle_list_parts(
|
||||||
garage: Arc<Garage>,
|
ctx: ReqCtx,
|
||||||
query: &ListPartsQuery,
|
query: &ListPartsQuery,
|
||||||
) -> Result<Response<ResBody>, Error> {
|
) -> Result<Response<ResBody>, Error> {
|
||||||
debug!("ListParts {:?}", query);
|
debug!("ListParts {:?}", query);
|
||||||
|
|
||||||
let upload_id = s3_multipart::decode_upload_id(&query.upload_id)?;
|
let upload_id = s3_multipart::decode_upload_id(&query.upload_id)?;
|
||||||
|
|
||||||
let (_, _, mpu) =
|
let (_, _, mpu) = s3_multipart::get_upload(&ctx, &query.key, &upload_id).await?;
|
||||||
s3_multipart::get_upload(&garage, &query.bucket_id, &query.key, &upload_id).await?;
|
|
||||||
|
|
||||||
let (info, next) = fetch_part_info(query, &mpu)?;
|
let (info, next) = fetch_part_info(query, &mpu)?;
|
||||||
|
|
||||||
|
|
|
@ -8,7 +8,6 @@ use md5::{Digest as Md5Digest, Md5};
|
||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
|
||||||
use garage_model::bucket_table::Bucket;
|
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
use garage_model::s3::block_ref_table::*;
|
use garage_model::s3::block_ref_table::*;
|
||||||
use garage_model::s3::mpu_table::*;
|
use garage_model::s3::mpu_table::*;
|
||||||
|
@ -25,12 +24,16 @@ use crate::signature::verify_signed_content;
|
||||||
// ----
|
// ----
|
||||||
|
|
||||||
pub async fn handle_create_multipart_upload(
|
pub async fn handle_create_multipart_upload(
|
||||||
garage: Arc<Garage>,
|
ctx: ReqCtx,
|
||||||
req: &Request<ReqBody>,
|
req: &Request<ReqBody>,
|
||||||
bucket_name: &str,
|
|
||||||
bucket_id: Uuid,
|
|
||||||
key: &String,
|
key: &String,
|
||||||
) -> Result<Response<ResBody>, Error> {
|
) -> Result<Response<ResBody>, Error> {
|
||||||
|
let ReqCtx {
|
||||||
|
garage,
|
||||||
|
bucket_id,
|
||||||
|
bucket_name,
|
||||||
|
..
|
||||||
|
} = &ctx;
|
||||||
let existing_object = garage.object_table.get(&bucket_id, &key).await?;
|
let existing_object = garage.object_table.get(&bucket_id, &key).await?;
|
||||||
|
|
||||||
let upload_id = gen_uuid();
|
let upload_id = gen_uuid();
|
||||||
|
@ -47,13 +50,13 @@ pub async fn handle_create_multipart_upload(
|
||||||
headers,
|
headers,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
let object = Object::new(bucket_id, key.to_string(), vec![object_version]);
|
let object = Object::new(*bucket_id, key.to_string(), vec![object_version]);
|
||||||
garage.object_table.insert(&object).await?;
|
garage.object_table.insert(&object).await?;
|
||||||
|
|
||||||
// Create multipart upload in mpu table
|
// Create multipart upload in mpu table
|
||||||
// This multipart upload will hold references to uploaded parts
|
// This multipart upload will hold references to uploaded parts
|
||||||
// (which are entries in the Version table)
|
// (which are entries in the Version table)
|
||||||
let mpu = MultipartUpload::new(upload_id, timestamp, bucket_id, key.into(), false);
|
let mpu = MultipartUpload::new(upload_id, timestamp, *bucket_id, key.into(), false);
|
||||||
garage.mpu_table.insert(&mpu).await?;
|
garage.mpu_table.insert(&mpu).await?;
|
||||||
|
|
||||||
// Send success response
|
// Send success response
|
||||||
|
@ -69,14 +72,15 @@ pub async fn handle_create_multipart_upload(
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_put_part(
|
pub async fn handle_put_part(
|
||||||
garage: Arc<Garage>,
|
ctx: ReqCtx,
|
||||||
req: Request<ReqBody>,
|
req: Request<ReqBody>,
|
||||||
bucket_id: Uuid,
|
|
||||||
key: &str,
|
key: &str,
|
||||||
part_number: u64,
|
part_number: u64,
|
||||||
upload_id: &str,
|
upload_id: &str,
|
||||||
content_sha256: Option<Hash>,
|
content_sha256: Option<Hash>,
|
||||||
) -> Result<Response<ResBody>, Error> {
|
) -> Result<Response<ResBody>, Error> {
|
||||||
|
let ReqCtx { garage, .. } = &ctx;
|
||||||
|
|
||||||
let upload_id = decode_upload_id(upload_id)?;
|
let upload_id = decode_upload_id(upload_id)?;
|
||||||
|
|
||||||
let content_md5 = match req.headers().get("content-md5") {
|
let content_md5 = match req.headers().get("content-md5") {
|
||||||
|
@ -90,10 +94,8 @@ pub async fn handle_put_part(
|
||||||
let stream = body_stream(req.into_body());
|
let stream = body_stream(req.into_body());
|
||||||
let mut chunker = StreamChunker::new(stream, garage.config.block_size);
|
let mut chunker = StreamChunker::new(stream, garage.config.block_size);
|
||||||
|
|
||||||
let ((_, _, mut mpu), first_block) = futures::try_join!(
|
let ((_, _, mut mpu), first_block) =
|
||||||
get_upload(&garage, &bucket_id, &key, &upload_id),
|
futures::try_join!(get_upload(&ctx, &key, &upload_id), chunker.next(),)?;
|
||||||
chunker.next(),
|
|
||||||
)?;
|
|
||||||
|
|
||||||
// Check object is valid and part can be accepted
|
// Check object is valid and part can be accepted
|
||||||
let first_block = first_block.ok_or_bad_request("Empty body")?;
|
let first_block = first_block.ok_or_bad_request("Empty body")?;
|
||||||
|
@ -135,7 +137,7 @@ pub async fn handle_put_part(
|
||||||
|
|
||||||
// Copy data to version
|
// Copy data to version
|
||||||
let (total_size, data_md5sum, data_sha256sum, _) =
|
let (total_size, data_md5sum, data_sha256sum, _) =
|
||||||
read_and_put_blocks(&garage, &version, part_number, first_block, &mut chunker).await?;
|
read_and_put_blocks(&ctx, &version, part_number, first_block, &mut chunker).await?;
|
||||||
|
|
||||||
// Verify that checksums map
|
// Verify that checksums map
|
||||||
ensure_checksum_matches(
|
ensure_checksum_matches(
|
||||||
|
@ -200,14 +202,19 @@ impl Drop for InterruptedCleanup {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_complete_multipart_upload(
|
pub async fn handle_complete_multipart_upload(
|
||||||
garage: Arc<Garage>,
|
ctx: ReqCtx,
|
||||||
req: Request<ReqBody>,
|
req: Request<ReqBody>,
|
||||||
bucket_name: &str,
|
|
||||||
bucket: &Bucket,
|
|
||||||
key: &str,
|
key: &str,
|
||||||
upload_id: &str,
|
upload_id: &str,
|
||||||
content_sha256: Option<Hash>,
|
content_sha256: Option<Hash>,
|
||||||
) -> Result<Response<ResBody>, Error> {
|
) -> Result<Response<ResBody>, Error> {
|
||||||
|
let ReqCtx {
|
||||||
|
garage,
|
||||||
|
bucket_id,
|
||||||
|
bucket_name,
|
||||||
|
..
|
||||||
|
} = &ctx;
|
||||||
|
|
||||||
let body = http_body_util::BodyExt::collect(req.into_body())
|
let body = http_body_util::BodyExt::collect(req.into_body())
|
||||||
.await?
|
.await?
|
||||||
.to_bytes();
|
.to_bytes();
|
||||||
|
@ -228,8 +235,7 @@ pub async fn handle_complete_multipart_upload(
|
||||||
|
|
||||||
// Get object and multipart upload
|
// Get object and multipart upload
|
||||||
let key = key.to_string();
|
let key = key.to_string();
|
||||||
let (object, mut object_version, mpu) =
|
let (object, mut object_version, mpu) = get_upload(&ctx, &key, &upload_id).await?;
|
||||||
get_upload(&garage, &bucket.id, &key, &upload_id).await?;
|
|
||||||
|
|
||||||
if mpu.parts.is_empty() {
|
if mpu.parts.is_empty() {
|
||||||
return Err(Error::bad_request("No data was uploaded"));
|
return Err(Error::bad_request("No data was uploaded"));
|
||||||
|
@ -283,7 +289,7 @@ pub async fn handle_complete_multipart_upload(
|
||||||
let mut final_version = Version::new(
|
let mut final_version = Version::new(
|
||||||
upload_id,
|
upload_id,
|
||||||
VersionBacklink::Object {
|
VersionBacklink::Object {
|
||||||
bucket_id: bucket.id,
|
bucket_id: *bucket_id,
|
||||||
key: key.to_string(),
|
key: key.to_string(),
|
||||||
},
|
},
|
||||||
false,
|
false,
|
||||||
|
@ -327,9 +333,9 @@ pub async fn handle_complete_multipart_upload(
|
||||||
// Calculate total size of final object
|
// Calculate total size of final object
|
||||||
let total_size = parts.iter().map(|x| x.size.unwrap()).sum();
|
let total_size = parts.iter().map(|x| x.size.unwrap()).sum();
|
||||||
|
|
||||||
if let Err(e) = check_quotas(&garage, bucket, total_size, Some(&object)).await {
|
if let Err(e) = check_quotas(&ctx, total_size, Some(&object)).await {
|
||||||
object_version.state = ObjectVersionState::Aborted;
|
object_version.state = ObjectVersionState::Aborted;
|
||||||
let final_object = Object::new(bucket.id, key.clone(), vec![object_version]);
|
let final_object = Object::new(*bucket_id, key.clone(), vec![object_version]);
|
||||||
garage.object_table.insert(&final_object).await?;
|
garage.object_table.insert(&final_object).await?;
|
||||||
|
|
||||||
return Err(e);
|
return Err(e);
|
||||||
|
@ -345,7 +351,7 @@ pub async fn handle_complete_multipart_upload(
|
||||||
final_version.blocks.items()[0].1.hash,
|
final_version.blocks.items()[0].1.hash,
|
||||||
));
|
));
|
||||||
|
|
||||||
let final_object = Object::new(bucket.id, key.clone(), vec![object_version]);
|
let final_object = Object::new(*bucket_id, key.clone(), vec![object_version]);
|
||||||
garage.object_table.insert(&final_object).await?;
|
garage.object_table.insert(&final_object).await?;
|
||||||
|
|
||||||
// Send response saying ok we're done
|
// Send response saying ok we're done
|
||||||
|
@ -362,18 +368,20 @@ pub async fn handle_complete_multipart_upload(
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_abort_multipart_upload(
|
pub async fn handle_abort_multipart_upload(
|
||||||
garage: Arc<Garage>,
|
ctx: ReqCtx,
|
||||||
bucket_id: Uuid,
|
|
||||||
key: &str,
|
key: &str,
|
||||||
upload_id: &str,
|
upload_id: &str,
|
||||||
) -> Result<Response<ResBody>, Error> {
|
) -> Result<Response<ResBody>, Error> {
|
||||||
|
let ReqCtx {
|
||||||
|
garage, bucket_id, ..
|
||||||
|
} = &ctx;
|
||||||
|
|
||||||
let upload_id = decode_upload_id(upload_id)?;
|
let upload_id = decode_upload_id(upload_id)?;
|
||||||
|
|
||||||
let (_, mut object_version, _) =
|
let (_, mut object_version, _) = get_upload(&ctx, &key.to_string(), &upload_id).await?;
|
||||||
get_upload(&garage, &bucket_id, &key.to_string(), &upload_id).await?;
|
|
||||||
|
|
||||||
object_version.state = ObjectVersionState::Aborted;
|
object_version.state = ObjectVersionState::Aborted;
|
||||||
let final_object = Object::new(bucket_id, key.to_string(), vec![object_version]);
|
let final_object = Object::new(*bucket_id, key.to_string(), vec![object_version]);
|
||||||
garage.object_table.insert(&final_object).await?;
|
garage.object_table.insert(&final_object).await?;
|
||||||
|
|
||||||
Ok(Response::new(empty_body()))
|
Ok(Response::new(empty_body()))
|
||||||
|
@ -383,11 +391,13 @@ pub async fn handle_abort_multipart_upload(
|
||||||
|
|
||||||
#[allow(clippy::ptr_arg)]
|
#[allow(clippy::ptr_arg)]
|
||||||
pub(crate) async fn get_upload(
|
pub(crate) async fn get_upload(
|
||||||
garage: &Garage,
|
ctx: &ReqCtx,
|
||||||
bucket_id: &Uuid,
|
|
||||||
key: &String,
|
key: &String,
|
||||||
upload_id: &Uuid,
|
upload_id: &Uuid,
|
||||||
) -> Result<(Object, ObjectVersion, MultipartUpload), Error> {
|
) -> Result<(Object, ObjectVersion, MultipartUpload), Error> {
|
||||||
|
let ReqCtx {
|
||||||
|
garage, bucket_id, ..
|
||||||
|
} = ctx;
|
||||||
let (object, mpu) = futures::try_join!(
|
let (object, mpu) = futures::try_join!(
|
||||||
garage.object_table.get(bucket_id, key).map_err(Error::from),
|
garage.object_table.get(bucket_id, key).map_err(Error::from),
|
||||||
garage
|
garage
|
||||||
|
|
|
@ -120,6 +120,12 @@ pub async fn handle_post_object(
|
||||||
.bucket_helper()
|
.bucket_helper()
|
||||||
.get_existing_bucket(bucket_id)
|
.get_existing_bucket(bucket_id)
|
||||||
.await?;
|
.await?;
|
||||||
|
let bucket_params = bucket.state.into_option().unwrap();
|
||||||
|
let matching_cors_rule = find_matching_cors_rule(
|
||||||
|
&bucket_params,
|
||||||
|
&Request::from_parts(head.clone(), empty_body::<Infallible>()),
|
||||||
|
)?
|
||||||
|
.cloned();
|
||||||
|
|
||||||
let decoded_policy = BASE64_STANDARD
|
let decoded_policy = BASE64_STANDARD
|
||||||
.decode(policy)
|
.decode(policy)
|
||||||
|
@ -213,11 +219,19 @@ pub async fn handle_post_object(
|
||||||
let headers = get_headers(¶ms)?;
|
let headers = get_headers(¶ms)?;
|
||||||
|
|
||||||
let stream = field.map(|r| r.map_err(Into::into));
|
let stream = field.map(|r| r.map_err(Into::into));
|
||||||
let (_, md5) = save_stream(
|
|
||||||
|
let ctx = ReqCtx {
|
||||||
garage,
|
garage,
|
||||||
|
bucket_id,
|
||||||
|
bucket_name,
|
||||||
|
bucket_params,
|
||||||
|
api_key,
|
||||||
|
};
|
||||||
|
|
||||||
|
let (_, md5) = save_stream(
|
||||||
|
&ctx,
|
||||||
headers,
|
headers,
|
||||||
StreamLimiter::new(stream, conditions.content_length),
|
StreamLimiter::new(stream, conditions.content_length),
|
||||||
&bucket,
|
|
||||||
&key,
|
&key,
|
||||||
None,
|
None,
|
||||||
None,
|
None,
|
||||||
|
@ -234,7 +248,7 @@ pub async fn handle_post_object(
|
||||||
{
|
{
|
||||||
target
|
target
|
||||||
.query_pairs_mut()
|
.query_pairs_mut()
|
||||||
.append_pair("bucket", &bucket_name)
|
.append_pair("bucket", &ctx.bucket_name)
|
||||||
.append_pair("key", &key)
|
.append_pair("key", &key)
|
||||||
.append_pair("etag", &etag);
|
.append_pair("etag", &etag);
|
||||||
let target = target.to_string();
|
let target = target.to_string();
|
||||||
|
@ -278,7 +292,7 @@ pub async fn handle_post_object(
|
||||||
let xml = s3_xml::PostObject {
|
let xml = s3_xml::PostObject {
|
||||||
xmlns: (),
|
xmlns: (),
|
||||||
location: s3_xml::Value(location),
|
location: s3_xml::Value(location),
|
||||||
bucket: s3_xml::Value(bucket_name),
|
bucket: s3_xml::Value(ctx.bucket_name),
|
||||||
key: s3_xml::Value(key),
|
key: s3_xml::Value(key),
|
||||||
etag: s3_xml::Value(etag),
|
etag: s3_xml::Value(etag),
|
||||||
};
|
};
|
||||||
|
@ -291,12 +305,8 @@ pub async fn handle_post_object(
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let matching_cors_rule = find_matching_cors_rule(
|
|
||||||
&bucket,
|
|
||||||
&Request::from_parts(head, empty_body::<Infallible>()),
|
|
||||||
)?;
|
|
||||||
if let Some(rule) = matching_cors_rule {
|
if let Some(rule) = matching_cors_rule {
|
||||||
add_cors_headers(&mut resp, rule)
|
add_cors_headers(&mut resp, &rule)
|
||||||
.ok_or_internal_error("Invalid bucket CORS configuration")?;
|
.ok_or_internal_error("Invalid bucket CORS configuration")?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,6 @@ use garage_util::error::Error as GarageError;
|
||||||
use garage_util::time::*;
|
use garage_util::time::*;
|
||||||
|
|
||||||
use garage_block::manager::INLINE_THRESHOLD;
|
use garage_block::manager::INLINE_THRESHOLD;
|
||||||
use garage_model::bucket_table::Bucket;
|
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
use garage_model::index_counter::CountedItem;
|
use garage_model::index_counter::CountedItem;
|
||||||
use garage_model::s3::block_ref_table::*;
|
use garage_model::s3::block_ref_table::*;
|
||||||
|
@ -42,9 +41,8 @@ use crate::s3::error::*;
|
||||||
const PUT_BLOCKS_MAX_PARALLEL: usize = 3;
|
const PUT_BLOCKS_MAX_PARALLEL: usize = 3;
|
||||||
|
|
||||||
pub async fn handle_put(
|
pub async fn handle_put(
|
||||||
garage: Arc<Garage>,
|
ctx: ReqCtx,
|
||||||
req: Request<ReqBody>,
|
req: Request<ReqBody>,
|
||||||
bucket: &Bucket,
|
|
||||||
key: &String,
|
key: &String,
|
||||||
content_sha256: Option<Hash>,
|
content_sha256: Option<Hash>,
|
||||||
) -> Result<Response<ResBody>, Error> {
|
) -> Result<Response<ResBody>, Error> {
|
||||||
|
@ -59,35 +57,27 @@ pub async fn handle_put(
|
||||||
|
|
||||||
let stream = body_stream(req.into_body());
|
let stream = body_stream(req.into_body());
|
||||||
|
|
||||||
save_stream(
|
save_stream(&ctx, headers, stream, key, content_md5, content_sha256)
|
||||||
garage,
|
.await
|
||||||
headers,
|
.map(|(uuid, md5)| put_response(uuid, md5))
|
||||||
stream,
|
|
||||||
bucket,
|
|
||||||
key,
|
|
||||||
content_md5,
|
|
||||||
content_sha256,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.map(|(uuid, md5)| put_response(uuid, md5))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
garage: Arc<Garage>,
|
ctx: &ReqCtx,
|
||||||
headers: ObjectVersionHeaders,
|
headers: ObjectVersionHeaders,
|
||||||
body: S,
|
body: S,
|
||||||
bucket: &Bucket,
|
|
||||||
key: &String,
|
key: &String,
|
||||||
content_md5: Option<String>,
|
content_md5: Option<String>,
|
||||||
content_sha256: Option<FixedBytes32>,
|
content_sha256: Option<FixedBytes32>,
|
||||||
) -> Result<(Uuid, String), Error> {
|
) -> Result<(Uuid, String), Error> {
|
||||||
|
let ReqCtx {
|
||||||
|
garage, bucket_id, ..
|
||||||
|
} = ctx;
|
||||||
|
|
||||||
let mut chunker = StreamChunker::new(body, garage.config.block_size);
|
let mut chunker = StreamChunker::new(body, garage.config.block_size);
|
||||||
let (first_block_opt, existing_object) = try_join!(
|
let (first_block_opt, existing_object) = try_join!(
|
||||||
chunker.next(),
|
chunker.next(),
|
||||||
garage
|
garage.object_table.get(bucket_id, key).map_err(Error::from),
|
||||||
.object_table
|
|
||||||
.get(&bucket.id, key)
|
|
||||||
.map_err(Error::from),
|
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
let first_block = first_block_opt.unwrap_or_default();
|
let first_block = first_block_opt.unwrap_or_default();
|
||||||
|
@ -114,7 +104,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
content_sha256,
|
content_sha256,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
check_quotas(&garage, bucket, size, existing_object.as_ref()).await?;
|
check_quotas(ctx, size, existing_object.as_ref()).await?;
|
||||||
|
|
||||||
let object_version = ObjectVersion {
|
let object_version = ObjectVersion {
|
||||||
uuid: version_uuid,
|
uuid: version_uuid,
|
||||||
|
@ -129,7 +119,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
)),
|
)),
|
||||||
};
|
};
|
||||||
|
|
||||||
let object = Object::new(bucket.id, key.into(), vec![object_version]);
|
let object = Object::new(*bucket_id, key.into(), vec![object_version]);
|
||||||
garage.object_table.insert(&object).await?;
|
garage.object_table.insert(&object).await?;
|
||||||
|
|
||||||
return Ok((version_uuid, data_md5sum_hex));
|
return Ok((version_uuid, data_md5sum_hex));
|
||||||
|
@ -140,7 +130,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
// before everything is finished (cleanup is done using the Drop trait).
|
// before everything is finished (cleanup is done using the Drop trait).
|
||||||
let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner {
|
let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner {
|
||||||
garage: garage.clone(),
|
garage: garage.clone(),
|
||||||
bucket_id: bucket.id,
|
bucket_id: *bucket_id,
|
||||||
key: key.into(),
|
key: key.into(),
|
||||||
version_uuid,
|
version_uuid,
|
||||||
version_timestamp,
|
version_timestamp,
|
||||||
|
@ -156,7 +146,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
multipart: false,
|
multipart: false,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]);
|
let object = Object::new(*bucket_id, key.into(), vec![object_version.clone()]);
|
||||||
garage.object_table.insert(&object).await?;
|
garage.object_table.insert(&object).await?;
|
||||||
|
|
||||||
// Initialize corresponding entry in version table
|
// Initialize corresponding entry in version table
|
||||||
|
@ -166,7 +156,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
let version = Version::new(
|
let version = Version::new(
|
||||||
version_uuid,
|
version_uuid,
|
||||||
VersionBacklink::Object {
|
VersionBacklink::Object {
|
||||||
bucket_id: bucket.id,
|
bucket_id: *bucket_id,
|
||||||
key: key.into(),
|
key: key.into(),
|
||||||
},
|
},
|
||||||
false,
|
false,
|
||||||
|
@ -175,7 +165,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
|
|
||||||
// Transfer data and verify checksum
|
// Transfer data and verify checksum
|
||||||
let (total_size, data_md5sum, data_sha256sum, first_block_hash) =
|
let (total_size, data_md5sum, data_sha256sum, first_block_hash) =
|
||||||
read_and_put_blocks(&garage, &version, 1, first_block, &mut chunker).await?;
|
read_and_put_blocks(ctx, &version, 1, first_block, &mut chunker).await?;
|
||||||
|
|
||||||
ensure_checksum_matches(
|
ensure_checksum_matches(
|
||||||
data_md5sum.as_slice(),
|
data_md5sum.as_slice(),
|
||||||
|
@ -184,7 +174,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
content_sha256,
|
content_sha256,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
check_quotas(&garage, bucket, total_size, existing_object.as_ref()).await?;
|
check_quotas(ctx, total_size, existing_object.as_ref()).await?;
|
||||||
|
|
||||||
// Save final object state, marked as Complete
|
// Save final object state, marked as Complete
|
||||||
let md5sum_hex = hex::encode(data_md5sum);
|
let md5sum_hex = hex::encode(data_md5sum);
|
||||||
|
@ -196,7 +186,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
},
|
},
|
||||||
first_block_hash,
|
first_block_hash,
|
||||||
));
|
));
|
||||||
let object = Object::new(bucket.id, key.into(), vec![object_version]);
|
let object = Object::new(*bucket_id, key.into(), vec![object_version]);
|
||||||
garage.object_table.insert(&object).await?;
|
garage.object_table.insert(&object).await?;
|
||||||
|
|
||||||
// We were not interrupted, everything went fine.
|
// We were not interrupted, everything went fine.
|
||||||
|
@ -235,12 +225,18 @@ pub(crate) fn ensure_checksum_matches(
|
||||||
|
|
||||||
/// Check that inserting this object with this size doesn't exceed bucket quotas
|
/// Check that inserting this object with this size doesn't exceed bucket quotas
|
||||||
pub(crate) async fn check_quotas(
|
pub(crate) async fn check_quotas(
|
||||||
garage: &Arc<Garage>,
|
ctx: &ReqCtx,
|
||||||
bucket: &Bucket,
|
|
||||||
size: u64,
|
size: u64,
|
||||||
prev_object: Option<&Object>,
|
prev_object: Option<&Object>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let quotas = bucket.state.as_option().unwrap().quotas.get();
|
let ReqCtx {
|
||||||
|
garage,
|
||||||
|
bucket_id,
|
||||||
|
bucket_params,
|
||||||
|
..
|
||||||
|
} = ctx;
|
||||||
|
|
||||||
|
let quotas = bucket_params.quotas.get();
|
||||||
if quotas.max_objects.is_none() && quotas.max_size.is_none() {
|
if quotas.max_objects.is_none() && quotas.max_size.is_none() {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
};
|
};
|
||||||
|
@ -248,7 +244,7 @@ pub(crate) async fn check_quotas(
|
||||||
let counters = garage
|
let counters = garage
|
||||||
.object_counter_table
|
.object_counter_table
|
||||||
.table
|
.table
|
||||||
.get(&bucket.id, &EmptyKey)
|
.get(bucket_id, &EmptyKey)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let counters = counters
|
let counters = counters
|
||||||
|
@ -292,7 +288,7 @@ pub(crate) async fn check_quotas(
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
garage: &Garage,
|
ctx: &ReqCtx,
|
||||||
version: &Version,
|
version: &Version,
|
||||||
part_number: u64,
|
part_number: u64,
|
||||||
first_block: Bytes,
|
first_block: Bytes,
|
||||||
|
@ -417,7 +413,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
|
||||||
let offset = written_bytes;
|
let offset = written_bytes;
|
||||||
written_bytes += block.len() as u64;
|
written_bytes += block.len() as u64;
|
||||||
write_futs.push_back(put_block_and_meta(
|
write_futs.push_back(put_block_and_meta(
|
||||||
garage,
|
ctx,
|
||||||
version,
|
version,
|
||||||
part_number,
|
part_number,
|
||||||
offset,
|
offset,
|
||||||
|
@ -447,7 +443,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn put_block_and_meta(
|
async fn put_block_and_meta(
|
||||||
garage: &Garage,
|
ctx: &ReqCtx,
|
||||||
version: &Version,
|
version: &Version,
|
||||||
part_number: u64,
|
part_number: u64,
|
||||||
offset: u64,
|
offset: u64,
|
||||||
|
@ -455,6 +451,8 @@ async fn put_block_and_meta(
|
||||||
block: Bytes,
|
block: Bytes,
|
||||||
order_tag: OrderTag,
|
order_tag: OrderTag,
|
||||||
) -> Result<(), GarageError> {
|
) -> Result<(), GarageError> {
|
||||||
|
let ReqCtx { garage, .. } = ctx;
|
||||||
|
|
||||||
let mut version = version.clone();
|
let mut version = version.clone();
|
||||||
version.blocks.put(
|
version.blocks.put(
|
||||||
VersionBlockKey {
|
VersionBlockKey {
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
use quick_xml::de::from_reader;
|
use quick_xml::de::from_reader;
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use http_body_util::BodyExt;
|
use http_body_util::BodyExt;
|
||||||
use hyper::{Request, Response, StatusCode};
|
use hyper::{Request, Response, StatusCode};
|
||||||
|
@ -12,15 +11,11 @@ use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
|
||||||
use crate::signature::verify_signed_content;
|
use crate::signature::verify_signed_content;
|
||||||
|
|
||||||
use garage_model::bucket_table::*;
|
use garage_model::bucket_table::*;
|
||||||
use garage_model::garage::Garage;
|
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
|
||||||
pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<ResBody>, Error> {
|
pub async fn handle_get_website(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
|
||||||
let param = bucket
|
let ReqCtx { bucket_params, .. } = ctx;
|
||||||
.params()
|
if let Some(website) = bucket_params.website_config.get() {
|
||||||
.ok_or_internal_error("Bucket should not be deleted at this point")?;
|
|
||||||
|
|
||||||
if let Some(website) = param.website_config.get() {
|
|
||||||
let wc = WebsiteConfiguration {
|
let wc = WebsiteConfiguration {
|
||||||
xmlns: (),
|
xmlns: (),
|
||||||
error_document: website.error_document.as_ref().map(|v| Key {
|
error_document: website.error_document.as_ref().map(|v| Key {
|
||||||
|
@ -44,16 +39,18 @@ pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<ResBody>, Er
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_delete_website(
|
pub async fn handle_delete_website(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
|
||||||
garage: Arc<Garage>,
|
let ReqCtx {
|
||||||
mut bucket: Bucket,
|
garage,
|
||||||
) -> Result<Response<ResBody>, Error> {
|
bucket_id,
|
||||||
let param = bucket
|
mut bucket_params,
|
||||||
.params_mut()
|
..
|
||||||
.ok_or_internal_error("Bucket should not be deleted at this point")?;
|
} = ctx;
|
||||||
|
bucket_params.website_config.update(None);
|
||||||
param.website_config.update(None);
|
garage
|
||||||
garage.bucket_table.insert(&bucket).await?;
|
.bucket_table
|
||||||
|
.insert(&Bucket::present(bucket_id, bucket_params))
|
||||||
|
.await?;
|
||||||
|
|
||||||
Ok(Response::builder()
|
Ok(Response::builder()
|
||||||
.status(StatusCode::NO_CONTENT)
|
.status(StatusCode::NO_CONTENT)
|
||||||
|
@ -61,28 +58,33 @@ pub async fn handle_delete_website(
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_put_website(
|
pub async fn handle_put_website(
|
||||||
garage: Arc<Garage>,
|
ctx: ReqCtx,
|
||||||
mut bucket: Bucket,
|
|
||||||
req: Request<ReqBody>,
|
req: Request<ReqBody>,
|
||||||
content_sha256: Option<Hash>,
|
content_sha256: Option<Hash>,
|
||||||
) -> Result<Response<ResBody>, Error> {
|
) -> Result<Response<ResBody>, Error> {
|
||||||
|
let ReqCtx {
|
||||||
|
garage,
|
||||||
|
bucket_id,
|
||||||
|
mut bucket_params,
|
||||||
|
..
|
||||||
|
} = ctx;
|
||||||
|
|
||||||
let body = BodyExt::collect(req.into_body()).await?.to_bytes();
|
let body = BodyExt::collect(req.into_body()).await?.to_bytes();
|
||||||
|
|
||||||
if let Some(content_sha256) = content_sha256 {
|
if let Some(content_sha256) = content_sha256 {
|
||||||
verify_signed_content(content_sha256, &body[..])?;
|
verify_signed_content(content_sha256, &body[..])?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let param = bucket
|
|
||||||
.params_mut()
|
|
||||||
.ok_or_internal_error("Bucket should not be deleted at this point")?;
|
|
||||||
|
|
||||||
let conf: WebsiteConfiguration = from_reader(&body as &[u8])?;
|
let conf: WebsiteConfiguration = from_reader(&body as &[u8])?;
|
||||||
conf.validate()?;
|
conf.validate()?;
|
||||||
|
|
||||||
param
|
bucket_params
|
||||||
.website_config
|
.website_config
|
||||||
.update(Some(conf.into_garage_website_config()?));
|
.update(Some(conf.into_garage_website_config()?));
|
||||||
garage.bucket_table.insert(&bucket).await?;
|
garage
|
||||||
|
.bucket_table
|
||||||
|
.insert(&Bucket::present(bucket_id, bucket_params))
|
||||||
|
.await?;
|
||||||
|
|
||||||
Ok(Response::builder()
|
Ok(Response::builder()
|
||||||
.status(StatusCode::OK)
|
.status(StatusCode::OK)
|
||||||
|
|
|
@ -191,6 +191,13 @@ impl Bucket {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn present(id: Uuid, params: BucketParams) -> Self {
|
||||||
|
Bucket {
|
||||||
|
id,
|
||||||
|
state: crdt::Deletable::present(params),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns true if this represents a deleted bucket
|
/// Returns true if this represents a deleted bucket
|
||||||
pub fn is_deleted(&self) -> bool {
|
pub fn is_deleted(&self) -> bool {
|
||||||
self.state.is_deleted()
|
self.state.is_deleted()
|
||||||
|
|
|
@ -26,7 +26,7 @@ use garage_api::s3::cors::{add_cors_headers, find_matching_cors_rule, handle_opt
|
||||||
use garage_api::s3::error::{
|
use garage_api::s3::error::{
|
||||||
CommonErrorDerivative, Error as ApiError, OkOrBadRequest, OkOrInternalError,
|
CommonErrorDerivative, Error as ApiError, OkOrBadRequest, OkOrInternalError,
|
||||||
};
|
};
|
||||||
use garage_api::s3::get::{handle_get, handle_head};
|
use garage_api::s3::get::{handle_get_without_ctx, handle_head_without_ctx};
|
||||||
|
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
|
|
||||||
|
@ -219,14 +219,13 @@ impl WebServer {
|
||||||
// Check bucket isn't deleted and has website access enabled
|
// Check bucket isn't deleted and has website access enabled
|
||||||
let bucket = self
|
let bucket = self
|
||||||
.garage
|
.garage
|
||||||
.bucket_table
|
.bucket_helper()
|
||||||
.get(&EmptyKey, &bucket_id)
|
.get_existing_bucket(bucket_id)
|
||||||
.await?
|
.await
|
||||||
.ok_or(Error::NotFound)?;
|
.map_err(|_| Error::NotFound)?;
|
||||||
|
let bucket_params = bucket.state.into_option().unwrap();
|
||||||
|
|
||||||
let website_config = bucket
|
let website_config = bucket_params
|
||||||
.params()
|
|
||||||
.ok_or(Error::NotFound)?
|
|
||||||
.website_config
|
.website_config
|
||||||
.get()
|
.get()
|
||||||
.as_ref()
|
.as_ref()
|
||||||
|
@ -243,14 +242,16 @@ impl WebServer {
|
||||||
);
|
);
|
||||||
|
|
||||||
let ret_doc = match *req.method() {
|
let ret_doc = match *req.method() {
|
||||||
Method::OPTIONS => handle_options_for_bucket(req, &bucket)
|
Method::OPTIONS => handle_options_for_bucket(req, &bucket_params)
|
||||||
.map_err(ApiError::from)
|
.map_err(ApiError::from)
|
||||||
.map(|res| res.map(|_empty_body: EmptyBody| empty_body())),
|
.map(|res| res.map(|_empty_body: EmptyBody| empty_body())),
|
||||||
Method::HEAD => handle_head(self.garage.clone(), &req, bucket_id, &key, None).await,
|
Method::HEAD => {
|
||||||
|
handle_head_without_ctx(self.garage.clone(), req, bucket_id, &key, None).await
|
||||||
|
}
|
||||||
Method::GET => {
|
Method::GET => {
|
||||||
handle_get(
|
handle_get_without_ctx(
|
||||||
self.garage.clone(),
|
self.garage.clone(),
|
||||||
&req,
|
req,
|
||||||
bucket_id,
|
bucket_id,
|
||||||
&key,
|
&key,
|
||||||
None,
|
None,
|
||||||
|
@ -301,7 +302,7 @@ impl WebServer {
|
||||||
.body(empty_body::<Infallible>())
|
.body(empty_body::<Infallible>())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
match handle_get(
|
match handle_get_without_ctx(
|
||||||
self.garage.clone(),
|
self.garage.clone(),
|
||||||
&req2,
|
&req2,
|
||||||
bucket_id,
|
bucket_id,
|
||||||
|
@ -344,7 +345,7 @@ impl WebServer {
|
||||||
}
|
}
|
||||||
Ok(mut resp) => {
|
Ok(mut resp) => {
|
||||||
// Maybe add CORS headers
|
// Maybe add CORS headers
|
||||||
if let Some(rule) = find_matching_cors_rule(&bucket, req)? {
|
if let Some(rule) = find_matching_cors_rule(&bucket_params, req)? {
|
||||||
add_cors_headers(&mut resp, rule)
|
add_cors_headers(&mut resp, rule)
|
||||||
.ok_or_internal_error("Invalid bucket CORS configuration")?;
|
.ok_or_internal_error("Invalid bucket CORS configuration")?;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue