diff --git a/net/rusoto/src/s3sink/imp.rs b/net/rusoto/src/s3sink/imp.rs index a0d6e054c..ed8d45fa1 100644 --- a/net/rusoto/src/s3sink/imp.rs +++ b/net/rusoto/src/s3sink/imp.rs @@ -6,6 +6,7 @@ // // SPDX-License-Identifier: MPL-2.0 +use futures::TryFutureExt; use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; @@ -28,7 +29,7 @@ use std::sync::Mutex; use std::time::Duration; use crate::s3url::*; -use crate::s3utils::{self, WaitError}; +use crate::s3utils::{self, RetriableError, WaitError}; use super::OnError; @@ -216,8 +217,11 @@ impl S3Sink { let upload_id = &state.upload_id; let client = &state.client; - let upload_part_req_future = - || client.upload_part(self.create_upload_part_request(&body, part_number, upload_id)); + let upload_part_req_future = || { + client + .upload_part(self.create_upload_part_request(&body, part_number, upload_id)) + .map_err(RetriableError::Rusoto) + }; let output = s3utils::wait_retry( &self.canceller, @@ -278,7 +282,7 @@ impl S3Sink { } Some(gst::error_msg!( gst::ResourceError::OpenWrite, - ["Failed to upload part: {}", err] + ["Failed to upload part: {:?}", err] )) } WaitError::Cancelled => None, diff --git a/net/rusoto/src/s3src/imp.rs b/net/rusoto/src/s3src/imp.rs index d75bd0107..fc7f99acb 100644 --- a/net/rusoto/src/s3src/imp.rs +++ b/net/rusoto/src/s3src/imp.rs @@ -9,11 +9,13 @@ use std::sync::Mutex; use std::time::Duration; -use bytes::Bytes; +use bytes::{buf::BufMut, Bytes, BytesMut}; use futures::future; +use futures::{TryFutureExt, TryStreamExt}; use once_cell::sync::Lazy; use rusoto_core::request::HttpClient; use rusoto_credential::StaticProvider; +use rusoto_s3::GetObjectError; use rusoto_s3::{GetObjectRequest, HeadObjectRequest, S3Client, S3}; use gst::glib; @@ -26,7 +28,7 @@ use gst_base::subclass::base_src::CreateSuccess; use gst_base::subclass::prelude::*; use crate::s3url::*; -use crate::s3utils::{self, WaitError}; +use crate::s3utils::{self, RetriableError, WaitError}; const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 10_000; const DEFAULT_RETRY_DURATION_MSEC: u64 = 60_000; @@ -139,12 +141,14 @@ impl S3Src { let settings = self.settings.lock().unwrap(); let head_object_future = || { - client.head_object(HeadObjectRequest { - bucket: url.bucket.clone(), - key: url.object.clone(), - version_id: url.version.clone(), - ..Default::default() - }) + client + .head_object(HeadObjectRequest { + bucket: url.bucket.clone(), + key: url.object.clone(), + version_id: url.version.clone(), + ..Default::default() + }) + .map_err(RetriableError::Rusoto) }; let output = s3utils::wait_retry( @@ -156,7 +160,7 @@ impl S3Src { .map_err(|err| match err { WaitError::FutureError(err) => gst::error_msg!( gst::ResourceError::NotFound, - ["Failed to HEAD object: {}", err] + ["Failed to HEAD object: {:?}", err] ), WaitError::Cancelled => { gst::error_msg!(gst::LibraryError::Failed, ["Interrupted during start"]) @@ -199,25 +203,46 @@ impl S3Src { let settings = self.settings.lock().unwrap(); - gst_debug!( - CAT, - obj: src, - "Requesting range: {}-{}", - offset, - offset + length - 1 - ); + let get_object_future = || async { + gst_debug!( + CAT, + obj: src, + "Requesting range: {}-{}", + offset, + offset + length - 1 + ); - let get_object_future = || { - client.get_object(GetObjectRequest { - bucket: url.bucket.clone(), - key: url.object.clone(), - range: Some(format!("bytes={}-{}", offset, offset + length - 1)), - version_id: url.version.clone(), - ..Default::default() - }) + let output = client + .get_object(GetObjectRequest { + bucket: url.bucket.clone(), + key: url.object.clone(), + range: Some(format!("bytes={}-{}", offset, offset + length - 1)), + version_id: url.version.clone(), + ..Default::default() + }) + .map_err(RetriableError::Rusoto) + .await?; + + gst_debug!( + CAT, + obj: src, + "Read {} bytes", + output.content_length.unwrap() + ); + + let mut collect = BytesMut::new(); + let mut stream = output.body.unwrap(); + + // Loop over the stream and collect till we're done + // FIXME: Can we use TryStreamExt::collect() here? + while let Some(item) = stream.try_next().map_err(RetriableError::Std).await? { + collect.put(item) + } + + Ok::>(collect.freeze()) }; - let output = s3utils::wait_retry( + s3utils::wait_retry( &self.canceller, settings.request_timeout, settings.retry_duration, @@ -226,22 +251,7 @@ impl S3Src { .map_err(|err| match err { WaitError::FutureError(err) => Some(gst::error_msg!( gst::ResourceError::Read, - ["Could not read: {}", err] - )), - WaitError::Cancelled => None, - })?; - - gst_debug!( - CAT, - obj: src, - "Read {} bytes", - output.content_length.unwrap() - ); - - s3utils::wait_stream(&self.canceller, &mut output.body.unwrap()).map_err(|err| match err { - WaitError::FutureError(err) => Some(gst::error_msg!( - gst::ResourceError::Read, - ["Could not read: {}", err] + ["Could not read: {:?}", err] )), WaitError::Cancelled => None, }) diff --git a/net/rusoto/src/s3utils.rs b/net/rusoto/src/s3utils.rs index b973dbeab..94b7ba8a1 100644 --- a/net/rusoto/src/s3utils.rs +++ b/net/rusoto/src/s3utils.rs @@ -6,11 +6,10 @@ // // SPDX-License-Identifier: MPL-2.0 -use bytes::{buf::BufMut, Bytes, BytesMut}; -use futures::{future, Future, FutureExt, TryFutureExt, TryStreamExt}; +use futures::{future, Future, FutureExt, TryFutureExt}; use once_cell::sync::Lazy; use rusoto_core::RusotoError::{HttpDispatch, Unknown}; -use rusoto_core::{ByteStream, HttpDispatchError, RusotoError}; +use rusoto_core::{HttpDispatchError, RusotoError}; use std::sync::Mutex; use std::time::Duration; use tokio::runtime; @@ -34,6 +33,12 @@ static RUNTIME: Lazy = Lazy::new(|| { .unwrap() }); +#[derive(Debug)] +pub enum RetriableError { + Rusoto(RusotoError), + Std(std::io::Error), +} + pub enum WaitError { Cancelled, FutureError(E), @@ -42,10 +47,10 @@ pub enum WaitError { fn make_timeout( timeout: Duration, future: F, -) -> impl Future>> +) -> impl Future>> where E: std::fmt::Debug, - F: Future>>, + F: Future>>, { tokio::time::timeout(timeout, future).map(|v| match v { // Future resolved succesfully @@ -55,18 +60,20 @@ where // Timeout elapsed // Use an HttpDispatch error so the caller doesn't have to deal with this separately from // other HTTP dispatch errors - _ => Err(HttpDispatch(HttpDispatchError::new("Timeout".to_owned()))), + _ => Err(RetriableError::Rusoto(HttpDispatch( + HttpDispatchError::new("Timeout".to_owned()), + ))), }) } fn make_retry( timeout: Option, mut future: F, -) -> impl Future>> +) -> impl Future>> where E: std::fmt::Debug, F: FnMut() -> Fut, - Fut: Future>>, + Fut: Future>>, { backoff::future::retry( backoff::ExponentialBackoffBuilder::new() @@ -76,11 +83,11 @@ where .build(), move || { future().map_err(|err| match err { - HttpDispatch(_) => { + RetriableError::Rusoto(HttpDispatch(_)) => { gst_warning!(CAT, "Error waiting for operation ({:?}), retrying", err); backoff::Error::transient(err) } - Unknown(ref response) => { + RetriableError::Rusoto(Unknown(ref response)) => { gst_warning!( CAT, "Unknown error waiting for operation ({:?}), retrying", @@ -105,11 +112,11 @@ pub fn wait_retry( req_timeout: Option, retry_timeout: Option, mut future: F, -) -> Result>> +) -> Result>> where E: std::fmt::Debug, F: FnMut() -> Fut, - Fut: Send + Future>>, + Fut: Send + Future>>, Fut::Output: Send, T: Send, E: Send, @@ -199,19 +206,3 @@ where res } - -pub fn wait_stream( - canceller: &Mutex>, - stream: &mut ByteStream, -) -> Result> { - wait(canceller, async move { - let mut collect = BytesMut::new(); - - // Loop over the stream and collect till we're done - while let Some(item) = stream.try_next().await? { - collect.put(item) - } - - Ok::(collect.freeze()) - }) -}