s3src: Consolidate stream reading into get object retries

Previously, the actual reading from the streaming body of a GetObject
request was not within the same timeout/retry path as the dispatch of
the HTTP request itself. We consolidate these two into a single async
block and create a sum type to encapsulate the rusoto and std library
error paths within that future.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/690>
This commit is contained in:
Arun Raghavan 2022-03-18 14:31:21 +05:30
parent 1ad277a410
commit 7b8d3acf10
3 changed files with 78 additions and 73 deletions

View file

@ -6,6 +6,7 @@
// //
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use futures::TryFutureExt;
use gst::glib; use gst::glib;
use gst::prelude::*; use gst::prelude::*;
use gst::subclass::prelude::*; use gst::subclass::prelude::*;
@ -28,7 +29,7 @@ use std::sync::Mutex;
use std::time::Duration; use std::time::Duration;
use crate::s3url::*; use crate::s3url::*;
use crate::s3utils::{self, WaitError}; use crate::s3utils::{self, RetriableError, WaitError};
use super::OnError; use super::OnError;
@ -216,8 +217,11 @@ impl S3Sink {
let upload_id = &state.upload_id; let upload_id = &state.upload_id;
let client = &state.client; let client = &state.client;
let upload_part_req_future = let upload_part_req_future = || {
|| client.upload_part(self.create_upload_part_request(&body, part_number, upload_id)); client
.upload_part(self.create_upload_part_request(&body, part_number, upload_id))
.map_err(RetriableError::Rusoto)
};
let output = s3utils::wait_retry( let output = s3utils::wait_retry(
&self.canceller, &self.canceller,
@ -278,7 +282,7 @@ impl S3Sink {
} }
Some(gst::error_msg!( Some(gst::error_msg!(
gst::ResourceError::OpenWrite, gst::ResourceError::OpenWrite,
["Failed to upload part: {}", err] ["Failed to upload part: {:?}", err]
)) ))
} }
WaitError::Cancelled => None, WaitError::Cancelled => None,

View file

@ -9,11 +9,13 @@
use std::sync::Mutex; use std::sync::Mutex;
use std::time::Duration; use std::time::Duration;
use bytes::Bytes; use bytes::{buf::BufMut, Bytes, BytesMut};
use futures::future; use futures::future;
use futures::{TryFutureExt, TryStreamExt};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use rusoto_core::request::HttpClient; use rusoto_core::request::HttpClient;
use rusoto_credential::StaticProvider; use rusoto_credential::StaticProvider;
use rusoto_s3::GetObjectError;
use rusoto_s3::{GetObjectRequest, HeadObjectRequest, S3Client, S3}; use rusoto_s3::{GetObjectRequest, HeadObjectRequest, S3Client, S3};
use gst::glib; use gst::glib;
@ -26,7 +28,7 @@ use gst_base::subclass::base_src::CreateSuccess;
use gst_base::subclass::prelude::*; use gst_base::subclass::prelude::*;
use crate::s3url::*; use crate::s3url::*;
use crate::s3utils::{self, WaitError}; use crate::s3utils::{self, RetriableError, WaitError};
const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 10_000; const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 10_000;
const DEFAULT_RETRY_DURATION_MSEC: u64 = 60_000; const DEFAULT_RETRY_DURATION_MSEC: u64 = 60_000;
@ -139,12 +141,14 @@ impl S3Src {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
let head_object_future = || { let head_object_future = || {
client.head_object(HeadObjectRequest { client
.head_object(HeadObjectRequest {
bucket: url.bucket.clone(), bucket: url.bucket.clone(),
key: url.object.clone(), key: url.object.clone(),
version_id: url.version.clone(), version_id: url.version.clone(),
..Default::default() ..Default::default()
}) })
.map_err(RetriableError::Rusoto)
}; };
let output = s3utils::wait_retry( let output = s3utils::wait_retry(
@ -156,7 +160,7 @@ impl S3Src {
.map_err(|err| match err { .map_err(|err| match err {
WaitError::FutureError(err) => gst::error_msg!( WaitError::FutureError(err) => gst::error_msg!(
gst::ResourceError::NotFound, gst::ResourceError::NotFound,
["Failed to HEAD object: {}", err] ["Failed to HEAD object: {:?}", err]
), ),
WaitError::Cancelled => { WaitError::Cancelled => {
gst::error_msg!(gst::LibraryError::Failed, ["Interrupted during start"]) gst::error_msg!(gst::LibraryError::Failed, ["Interrupted during start"])
@ -199,6 +203,7 @@ impl S3Src {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
let get_object_future = || async {
gst_debug!( gst_debug!(
CAT, CAT,
obj: src, obj: src,
@ -207,29 +212,16 @@ impl S3Src {
offset + length - 1 offset + length - 1
); );
let get_object_future = || { let output = client
client.get_object(GetObjectRequest { .get_object(GetObjectRequest {
bucket: url.bucket.clone(), bucket: url.bucket.clone(),
key: url.object.clone(), key: url.object.clone(),
range: Some(format!("bytes={}-{}", offset, offset + length - 1)), range: Some(format!("bytes={}-{}", offset, offset + length - 1)),
version_id: url.version.clone(), version_id: url.version.clone(),
..Default::default() ..Default::default()
}) })
}; .map_err(RetriableError::Rusoto)
.await?;
let output = s3utils::wait_retry(
&self.canceller,
settings.request_timeout,
settings.retry_duration,
get_object_future,
)
.map_err(|err| match err {
WaitError::FutureError(err) => Some(gst::error_msg!(
gst::ResourceError::Read,
["Could not read: {}", err]
)),
WaitError::Cancelled => None,
})?;
gst_debug!( gst_debug!(
CAT, CAT,
@ -238,10 +230,28 @@ impl S3Src {
output.content_length.unwrap() output.content_length.unwrap()
); );
s3utils::wait_stream(&self.canceller, &mut output.body.unwrap()).map_err(|err| match err { let mut collect = BytesMut::new();
let mut stream = output.body.unwrap();
// Loop over the stream and collect till we're done
// FIXME: Can we use TryStreamExt::collect() here?
while let Some(item) = stream.try_next().map_err(RetriableError::Std).await? {
collect.put(item)
}
Ok::<Bytes, RetriableError<GetObjectError>>(collect.freeze())
};
s3utils::wait_retry(
&self.canceller,
settings.request_timeout,
settings.retry_duration,
get_object_future,
)
.map_err(|err| match err {
WaitError::FutureError(err) => Some(gst::error_msg!( WaitError::FutureError(err) => Some(gst::error_msg!(
gst::ResourceError::Read, gst::ResourceError::Read,
["Could not read: {}", err] ["Could not read: {:?}", err]
)), )),
WaitError::Cancelled => None, WaitError::Cancelled => None,
}) })

View file

@ -6,11 +6,10 @@
// //
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use bytes::{buf::BufMut, Bytes, BytesMut}; use futures::{future, Future, FutureExt, TryFutureExt};
use futures::{future, Future, FutureExt, TryFutureExt, TryStreamExt};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use rusoto_core::RusotoError::{HttpDispatch, Unknown}; use rusoto_core::RusotoError::{HttpDispatch, Unknown};
use rusoto_core::{ByteStream, HttpDispatchError, RusotoError}; use rusoto_core::{HttpDispatchError, RusotoError};
use std::sync::Mutex; use std::sync::Mutex;
use std::time::Duration; use std::time::Duration;
use tokio::runtime; use tokio::runtime;
@ -34,6 +33,12 @@ static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| {
.unwrap() .unwrap()
}); });
#[derive(Debug)]
pub enum RetriableError<E> {
Rusoto(RusotoError<E>),
Std(std::io::Error),
}
pub enum WaitError<E> { pub enum WaitError<E> {
Cancelled, Cancelled,
FutureError(E), FutureError(E),
@ -42,10 +47,10 @@ pub enum WaitError<E> {
fn make_timeout<F, T, E>( fn make_timeout<F, T, E>(
timeout: Duration, timeout: Duration,
future: F, future: F,
) -> impl Future<Output = Result<T, RusotoError<E>>> ) -> impl Future<Output = Result<T, RetriableError<E>>>
where where
E: std::fmt::Debug, E: std::fmt::Debug,
F: Future<Output = Result<T, RusotoError<E>>>, F: Future<Output = Result<T, RetriableError<E>>>,
{ {
tokio::time::timeout(timeout, future).map(|v| match v { tokio::time::timeout(timeout, future).map(|v| match v {
// Future resolved succesfully // Future resolved succesfully
@ -55,18 +60,20 @@ where
// Timeout elapsed // Timeout elapsed
// Use an HttpDispatch error so the caller doesn't have to deal with this separately from // Use an HttpDispatch error so the caller doesn't have to deal with this separately from
// other HTTP dispatch errors // other HTTP dispatch errors
_ => Err(HttpDispatch(HttpDispatchError::new("Timeout".to_owned()))), _ => Err(RetriableError::Rusoto(HttpDispatch(
HttpDispatchError::new("Timeout".to_owned()),
))),
}) })
} }
fn make_retry<F, T, E, Fut>( fn make_retry<F, T, E, Fut>(
timeout: Option<Duration>, timeout: Option<Duration>,
mut future: F, mut future: F,
) -> impl Future<Output = Result<T, RusotoError<E>>> ) -> impl Future<Output = Result<T, RetriableError<E>>>
where where
E: std::fmt::Debug, E: std::fmt::Debug,
F: FnMut() -> Fut, F: FnMut() -> Fut,
Fut: Future<Output = Result<T, RusotoError<E>>>, Fut: Future<Output = Result<T, RetriableError<E>>>,
{ {
backoff::future::retry( backoff::future::retry(
backoff::ExponentialBackoffBuilder::new() backoff::ExponentialBackoffBuilder::new()
@ -76,11 +83,11 @@ where
.build(), .build(),
move || { move || {
future().map_err(|err| match err { future().map_err(|err| match err {
HttpDispatch(_) => { RetriableError::Rusoto(HttpDispatch(_)) => {
gst_warning!(CAT, "Error waiting for operation ({:?}), retrying", err); gst_warning!(CAT, "Error waiting for operation ({:?}), retrying", err);
backoff::Error::transient(err) backoff::Error::transient(err)
} }
Unknown(ref response) => { RetriableError::Rusoto(Unknown(ref response)) => {
gst_warning!( gst_warning!(
CAT, CAT,
"Unknown error waiting for operation ({:?}), retrying", "Unknown error waiting for operation ({:?}), retrying",
@ -105,11 +112,11 @@ pub fn wait_retry<F, T, E, Fut>(
req_timeout: Option<Duration>, req_timeout: Option<Duration>,
retry_timeout: Option<Duration>, retry_timeout: Option<Duration>,
mut future: F, mut future: F,
) -> Result<T, WaitError<RusotoError<E>>> ) -> Result<T, WaitError<RetriableError<E>>>
where where
E: std::fmt::Debug, E: std::fmt::Debug,
F: FnMut() -> Fut, F: FnMut() -> Fut,
Fut: Send + Future<Output = Result<T, RusotoError<E>>>, Fut: Send + Future<Output = Result<T, RetriableError<E>>>,
Fut::Output: Send, Fut::Output: Send,
T: Send, T: Send,
E: Send, E: Send,
@ -199,19 +206,3 @@ where
res res
} }
pub fn wait_stream(
canceller: &Mutex<Option<future::AbortHandle>>,
stream: &mut ByteStream,
) -> Result<Bytes, WaitError<std::io::Error>> {
wait(canceller, async move {
let mut collect = BytesMut::new();
// Loop over the stream and collect till we're done
while let Some(item) = stream.try_next().await? {
collect.put(item)
}
Ok::<Bytes, std::io::Error>(collect.freeze())
})
}