diff --git a/src/bytes_stream.rs b/src/bytes_stream.rs index d18450a..25ff6c9 100644 --- a/src/bytes_stream.rs +++ b/src/bytes_stream.rs @@ -1,5 +1,7 @@ -use actix_web::web::{Bytes, BytesMut}; -use futures_util::{Stream, StreamExt}; +use actix_web::{ + body::MessageBody, + web::{Bytes, BytesMut}, +}; use std::{ collections::{vec_deque::IntoIter, VecDeque}, pin::Pin, @@ -29,10 +31,6 @@ impl BytesStream { self.total_len } - pub(crate) fn into_io_stream(self) -> impl Stream> + Unpin { - self.map(|bytes| Ok(bytes)) - } - pub(crate) fn into_bytes(self) -> Bytes { let mut buf = BytesMut::with_capacity(self.total_len); @@ -44,14 +42,6 @@ impl BytesStream { } } -impl Stream for BytesStream { - type Item = Bytes; - - fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(self.get_mut().inner.pop_front()) - } -} - impl IntoIterator for BytesStream { type Item = Bytes; type IntoIter = IntoIter; @@ -60,3 +50,29 @@ impl IntoIterator for BytesStream { self.inner.into_iter() } } + +impl MessageBody for BytesStream { + type Error = std::io::Error; + + fn size(&self) -> actix_web::body::BodySize { + if let Ok(len) = self.len().try_into() { + actix_web::body::BodySize::Sized(len) + } else { + actix_web::body::BodySize::None + } + } + + fn poll_next( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll>> { + Poll::Ready(self.get_mut().inner.pop_front().map(Ok)) + } + + fn try_into_bytes(self) -> Result + where + Self: Sized, + { + Ok(self.into_bytes()) + } +} diff --git a/src/queue.rs b/src/queue.rs index 0610f0e..ddbab9f 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -163,8 +163,8 @@ async fn process_jobs( let res = job_loop(repo, store, worker_id.clone(), queue, callback).await; if let Err(e) = res { - tracing::warn!("Error processing jobs: {}", e); - tracing::warn!("{:?}", e); + tracing::warn!("Error processing jobs: {}", format!("{}", e)); + tracing::warn!("{}", format!("{:?}", e)); continue; } diff --git a/src/store/file_store.rs b/src/store/file_store.rs index fde1b81..455683c 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -236,7 +236,7 @@ impl FileStore { // try writing debug!("Writing to {:?}", path.as_ref()); if let Err(e) = file.write_from_bytes(bytes).await { - error!("Error writing {:?}, {}", path.as_ref(), e); + error!("Error writing {:?}, {}", path.as_ref(), format!("{}", e)); // remove file if writing failed before completion self.safe_remove_file(path).await?; return Err(e.into()); diff --git a/src/store/object_store.rs b/src/store/object_store.rs index c3cb033..f15963d 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -176,6 +176,21 @@ impl Store for ObjectStore { where S: Stream> + Unpin + 'static, { + let first_chunk = read_chunk(&mut stream).await?; + + if first_chunk.len() < CHUNK_SIZE { + let (req, object_id) = self.put_object_request().await?; + let response = req.send_body(first_chunk).await?; + + if !response.status().is_success() { + return Err(status_error(response).await); + } + + return Ok(object_id); + } + + let mut first_chunk = Some(first_chunk); + let (req, object_id) = self.create_multipart_request().await?; let mut response = req.send().await.map_err(ObjectError::from)?; @@ -197,7 +212,12 @@ impl Store for ObjectStore { while !complete { part_number += 1; - let buf = read_chunk(&mut stream).await?; + let buf = if let Some(buf) = first_chunk.take() { + buf + } else { + read_chunk(&mut stream).await? + }; + complete = buf.len() < CHUNK_SIZE; let this = self.clone(); @@ -214,7 +234,7 @@ impl Store for ObjectStore { &upload_id2, ) .await? - .send_stream(buf.into_io_stream()) + .send_body(buf) .await?; if !response.status().is_success() { @@ -234,7 +254,7 @@ impl Store for ObjectStore { Ok(etag) as Result } - .instrument(tracing::info_span!("Upload Part")), + .instrument(tracing::Span::current()), ); futures.push(handle); @@ -444,6 +464,8 @@ impl ObjectStore { upload_id, ); + let length = buf.len(); + let hashing_span = tracing::info_span!("Hashing request body"); let hash_string = actix_web::web::block(move || { let guard = hashing_span.enter(); @@ -463,6 +485,9 @@ impl ObjectStore { .headers_mut() .insert("content-type", "application/octet-stream"); action.headers_mut().insert("content-md5", hash_string); + action + .headers_mut() + .insert("content-length", length.to_string()); Ok(self.build_request(action)) }