Fix sending upload parts (set size), optimize small file uploads

This commit is contained in:
asonix 2022-10-01 13:00:07 -05:00
parent 0307dc5a3b
commit fe2a499110
4 changed files with 61 additions and 20 deletions

View file

@ -1,5 +1,7 @@
use actix_web::web::{Bytes, BytesMut}; use actix_web::{
use futures_util::{Stream, StreamExt}; body::MessageBody,
web::{Bytes, BytesMut},
};
use std::{ use std::{
collections::{vec_deque::IntoIter, VecDeque}, collections::{vec_deque::IntoIter, VecDeque},
pin::Pin, pin::Pin,
@ -29,10 +31,6 @@ impl BytesStream {
self.total_len self.total_len
} }
pub(crate) fn into_io_stream(self) -> impl Stream<Item = std::io::Result<Bytes>> + Unpin {
self.map(|bytes| Ok(bytes))
}
pub(crate) fn into_bytes(self) -> Bytes { pub(crate) fn into_bytes(self) -> Bytes {
let mut buf = BytesMut::with_capacity(self.total_len); let mut buf = BytesMut::with_capacity(self.total_len);
@ -44,14 +42,6 @@ impl BytesStream {
} }
} }
impl Stream for BytesStream {
type Item = Bytes;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(self.get_mut().inner.pop_front())
}
}
impl IntoIterator for BytesStream { impl IntoIterator for BytesStream {
type Item = Bytes; type Item = Bytes;
type IntoIter = IntoIter<Bytes>; type IntoIter = IntoIter<Bytes>;
@ -60,3 +50,29 @@ impl IntoIterator for BytesStream {
self.inner.into_iter() self.inner.into_iter()
} }
} }
impl MessageBody for BytesStream {
type Error = std::io::Error;
fn size(&self) -> actix_web::body::BodySize {
if let Ok(len) = self.len().try_into() {
actix_web::body::BodySize::Sized(len)
} else {
actix_web::body::BodySize::None
}
}
fn poll_next(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Self::Error>>> {
Poll::Ready(self.get_mut().inner.pop_front().map(Ok))
}
fn try_into_bytes(self) -> Result<Bytes, Self>
where
Self: Sized,
{
Ok(self.into_bytes())
}
}

View file

@ -163,8 +163,8 @@ async fn process_jobs<R, S, F>(
let res = job_loop(repo, store, worker_id.clone(), queue, callback).await; let res = job_loop(repo, store, worker_id.clone(), queue, callback).await;
if let Err(e) = res { if let Err(e) = res {
tracing::warn!("Error processing jobs: {}", e); tracing::warn!("Error processing jobs: {}", format!("{}", e));
tracing::warn!("{:?}", e); tracing::warn!("{}", format!("{:?}", e));
continue; continue;
} }

View file

@ -236,7 +236,7 @@ impl FileStore {
// try writing // try writing
debug!("Writing to {:?}", path.as_ref()); debug!("Writing to {:?}", path.as_ref());
if let Err(e) = file.write_from_bytes(bytes).await { if let Err(e) = file.write_from_bytes(bytes).await {
error!("Error writing {:?}, {}", path.as_ref(), e); error!("Error writing {:?}, {}", path.as_ref(), format!("{}", e));
// remove file if writing failed before completion // remove file if writing failed before completion
self.safe_remove_file(path).await?; self.safe_remove_file(path).await?;
return Err(e.into()); return Err(e.into());

View file

@ -176,6 +176,21 @@ impl Store for ObjectStore {
where where
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static, S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
{ {
let first_chunk = read_chunk(&mut stream).await?;
if first_chunk.len() < CHUNK_SIZE {
let (req, object_id) = self.put_object_request().await?;
let response = req.send_body(first_chunk).await?;
if !response.status().is_success() {
return Err(status_error(response).await);
}
return Ok(object_id);
}
let mut first_chunk = Some(first_chunk);
let (req, object_id) = self.create_multipart_request().await?; let (req, object_id) = self.create_multipart_request().await?;
let mut response = req.send().await.map_err(ObjectError::from)?; let mut response = req.send().await.map_err(ObjectError::from)?;
@ -197,7 +212,12 @@ impl Store for ObjectStore {
while !complete { while !complete {
part_number += 1; part_number += 1;
let buf = read_chunk(&mut stream).await?; let buf = if let Some(buf) = first_chunk.take() {
buf
} else {
read_chunk(&mut stream).await?
};
complete = buf.len() < CHUNK_SIZE; complete = buf.len() < CHUNK_SIZE;
let this = self.clone(); let this = self.clone();
@ -214,7 +234,7 @@ impl Store for ObjectStore {
&upload_id2, &upload_id2,
) )
.await? .await?
.send_stream(buf.into_io_stream()) .send_body(buf)
.await?; .await?;
if !response.status().is_success() { if !response.status().is_success() {
@ -234,7 +254,7 @@ impl Store for ObjectStore {
Ok(etag) as Result<String, Error> Ok(etag) as Result<String, Error>
} }
.instrument(tracing::info_span!("Upload Part")), .instrument(tracing::Span::current()),
); );
futures.push(handle); futures.push(handle);
@ -444,6 +464,8 @@ impl ObjectStore {
upload_id, upload_id,
); );
let length = buf.len();
let hashing_span = tracing::info_span!("Hashing request body"); let hashing_span = tracing::info_span!("Hashing request body");
let hash_string = actix_web::web::block(move || { let hash_string = actix_web::web::block(move || {
let guard = hashing_span.enter(); let guard = hashing_span.enter();
@ -463,6 +485,9 @@ impl ObjectStore {
.headers_mut() .headers_mut()
.insert("content-type", "application/octet-stream"); .insert("content-type", "application/octet-stream");
action.headers_mut().insert("content-md5", hash_string); action.headers_mut().insert("content-md5", hash_string);
action
.headers_mut()
.insert("content-length", length.to_string());
Ok(self.build_request(action)) Ok(self.build_request(action))
} }