mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-01-10 19:25:26 +00:00
s3src: Consolidate stream reading into get object retries
Previously, the actual reading from the streaming body of a GetObject request was not within the same timeout/retry path as the dispatch of the HTTP request itself. We consolidate these two into a single async block and create a sum type to encapsulate the rusoto and std library error paths within that future. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/690>
This commit is contained in:
parent
22bb6ec74a
commit
5fe95afe87
3 changed files with 78 additions and 73 deletions
|
@ -6,6 +6,7 @@
|
|||
//
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
use futures::TryFutureExt;
|
||||
use gst::glib;
|
||||
use gst::prelude::*;
|
||||
use gst::subclass::prelude::*;
|
||||
|
@ -27,7 +28,7 @@ use std::sync::Mutex;
|
|||
use std::time::Duration;
|
||||
|
||||
use crate::s3url::*;
|
||||
use crate::s3utils::{self, WaitError};
|
||||
use crate::s3utils::{self, RetriableError, WaitError};
|
||||
|
||||
use super::OnError;
|
||||
|
||||
|
@ -215,8 +216,11 @@ impl S3Sink {
|
|||
let upload_id = &state.upload_id;
|
||||
let client = &state.client;
|
||||
|
||||
let upload_part_req_future =
|
||||
|| client.upload_part(self.create_upload_part_request(&body, part_number, upload_id));
|
||||
let upload_part_req_future = || {
|
||||
client
|
||||
.upload_part(self.create_upload_part_request(&body, part_number, upload_id))
|
||||
.map_err(RetriableError::Rusoto)
|
||||
};
|
||||
|
||||
let output = s3utils::wait_retry(
|
||||
&self.canceller,
|
||||
|
@ -277,7 +281,7 @@ impl S3Sink {
|
|||
}
|
||||
Some(gst::error_msg!(
|
||||
gst::ResourceError::OpenWrite,
|
||||
["Failed to upload part: {}", err]
|
||||
["Failed to upload part: {:?}", err]
|
||||
))
|
||||
}
|
||||
WaitError::Cancelled => None,
|
||||
|
|
|
@ -9,11 +9,13 @@
|
|||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
|
||||
use bytes::Bytes;
|
||||
use bytes::{buf::BufMut, Bytes, BytesMut};
|
||||
use futures::future;
|
||||
use futures::{TryFutureExt, TryStreamExt};
|
||||
use once_cell::sync::Lazy;
|
||||
use rusoto_core::request::HttpClient;
|
||||
use rusoto_credential::StaticProvider;
|
||||
use rusoto_s3::GetObjectError;
|
||||
use rusoto_s3::{GetObjectRequest, HeadObjectRequest, S3Client, S3};
|
||||
|
||||
use gst::glib;
|
||||
|
@ -25,7 +27,7 @@ use gst_base::subclass::base_src::CreateSuccess;
|
|||
use gst_base::subclass::prelude::*;
|
||||
|
||||
use crate::s3url::*;
|
||||
use crate::s3utils::{self, WaitError};
|
||||
use crate::s3utils::{self, RetriableError, WaitError};
|
||||
|
||||
const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 10_000;
|
||||
const DEFAULT_RETRY_DURATION_MSEC: u64 = 60_000;
|
||||
|
@ -138,12 +140,14 @@ impl S3Src {
|
|||
let settings = self.settings.lock().unwrap();
|
||||
|
||||
let head_object_future = || {
|
||||
client.head_object(HeadObjectRequest {
|
||||
bucket: url.bucket.clone(),
|
||||
key: url.object.clone(),
|
||||
version_id: url.version.clone(),
|
||||
..Default::default()
|
||||
})
|
||||
client
|
||||
.head_object(HeadObjectRequest {
|
||||
bucket: url.bucket.clone(),
|
||||
key: url.object.clone(),
|
||||
version_id: url.version.clone(),
|
||||
..Default::default()
|
||||
})
|
||||
.map_err(RetriableError::Rusoto)
|
||||
};
|
||||
|
||||
let output = s3utils::wait_retry(
|
||||
|
@ -155,7 +159,7 @@ impl S3Src {
|
|||
.map_err(|err| match err {
|
||||
WaitError::FutureError(err) => gst::error_msg!(
|
||||
gst::ResourceError::NotFound,
|
||||
["Failed to HEAD object: {}", err]
|
||||
["Failed to HEAD object: {:?}", err]
|
||||
),
|
||||
WaitError::Cancelled => {
|
||||
gst::error_msg!(gst::LibraryError::Failed, ["Interrupted during start"])
|
||||
|
@ -198,25 +202,46 @@ impl S3Src {
|
|||
|
||||
let settings = self.settings.lock().unwrap();
|
||||
|
||||
gst::debug!(
|
||||
CAT,
|
||||
obj: src,
|
||||
"Requesting range: {}-{}",
|
||||
offset,
|
||||
offset + length - 1
|
||||
);
|
||||
let get_object_future = || async {
|
||||
gst::debug!(
|
||||
CAT,
|
||||
obj: src,
|
||||
"Requesting range: {}-{}",
|
||||
offset,
|
||||
offset + length - 1
|
||||
);
|
||||
|
||||
let get_object_future = || {
|
||||
client.get_object(GetObjectRequest {
|
||||
bucket: url.bucket.clone(),
|
||||
key: url.object.clone(),
|
||||
range: Some(format!("bytes={}-{}", offset, offset + length - 1)),
|
||||
version_id: url.version.clone(),
|
||||
..Default::default()
|
||||
})
|
||||
let output = client
|
||||
.get_object(GetObjectRequest {
|
||||
bucket: url.bucket.clone(),
|
||||
key: url.object.clone(),
|
||||
range: Some(format!("bytes={}-{}", offset, offset + length - 1)),
|
||||
version_id: url.version.clone(),
|
||||
..Default::default()
|
||||
})
|
||||
.map_err(RetriableError::Rusoto)
|
||||
.await?;
|
||||
|
||||
gst::debug!(
|
||||
CAT,
|
||||
obj: src,
|
||||
"Read {} bytes",
|
||||
output.content_length.unwrap()
|
||||
);
|
||||
|
||||
let mut collect = BytesMut::new();
|
||||
let mut stream = output.body.unwrap();
|
||||
|
||||
// Loop over the stream and collect till we're done
|
||||
// FIXME: Can we use TryStreamExt::collect() here?
|
||||
while let Some(item) = stream.try_next().map_err(RetriableError::Std).await? {
|
||||
collect.put(item)
|
||||
}
|
||||
|
||||
Ok::<Bytes, RetriableError<GetObjectError>>(collect.freeze())
|
||||
};
|
||||
|
||||
let output = s3utils::wait_retry(
|
||||
s3utils::wait_retry(
|
||||
&self.canceller,
|
||||
settings.request_timeout,
|
||||
settings.retry_duration,
|
||||
|
@ -225,22 +250,7 @@ impl S3Src {
|
|||
.map_err(|err| match err {
|
||||
WaitError::FutureError(err) => Some(gst::error_msg!(
|
||||
gst::ResourceError::Read,
|
||||
["Could not read: {}", err]
|
||||
)),
|
||||
WaitError::Cancelled => None,
|
||||
})?;
|
||||
|
||||
gst::debug!(
|
||||
CAT,
|
||||
obj: src,
|
||||
"Read {} bytes",
|
||||
output.content_length.unwrap()
|
||||
);
|
||||
|
||||
s3utils::wait_stream(&self.canceller, &mut output.body.unwrap()).map_err(|err| match err {
|
||||
WaitError::FutureError(err) => Some(gst::error_msg!(
|
||||
gst::ResourceError::Read,
|
||||
["Could not read: {}", err]
|
||||
["Could not read: {:?}", err]
|
||||
)),
|
||||
WaitError::Cancelled => None,
|
||||
})
|
||||
|
|
|
@ -6,11 +6,10 @@
|
|||
//
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
use bytes::{buf::BufMut, Bytes, BytesMut};
|
||||
use futures::{future, Future, FutureExt, TryFutureExt, TryStreamExt};
|
||||
use futures::{future, Future, FutureExt, TryFutureExt};
|
||||
use once_cell::sync::Lazy;
|
||||
use rusoto_core::RusotoError::{HttpDispatch, Unknown};
|
||||
use rusoto_core::{ByteStream, HttpDispatchError, RusotoError};
|
||||
use rusoto_core::{HttpDispatchError, RusotoError};
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use tokio::runtime;
|
||||
|
@ -32,6 +31,12 @@ static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| {
|
|||
.unwrap()
|
||||
});
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum RetriableError<E> {
|
||||
Rusoto(RusotoError<E>),
|
||||
Std(std::io::Error),
|
||||
}
|
||||
|
||||
pub enum WaitError<E> {
|
||||
Cancelled,
|
||||
FutureError(E),
|
||||
|
@ -40,10 +45,10 @@ pub enum WaitError<E> {
|
|||
fn make_timeout<F, T, E>(
|
||||
timeout: Duration,
|
||||
future: F,
|
||||
) -> impl Future<Output = Result<T, RusotoError<E>>>
|
||||
) -> impl Future<Output = Result<T, RetriableError<E>>>
|
||||
where
|
||||
E: std::fmt::Debug,
|
||||
F: Future<Output = Result<T, RusotoError<E>>>,
|
||||
F: Future<Output = Result<T, RetriableError<E>>>,
|
||||
{
|
||||
tokio::time::timeout(timeout, future).map(|v| match v {
|
||||
// Future resolved succesfully
|
||||
|
@ -53,18 +58,20 @@ where
|
|||
// Timeout elapsed
|
||||
// Use an HttpDispatch error so the caller doesn't have to deal with this separately from
|
||||
// other HTTP dispatch errors
|
||||
_ => Err(HttpDispatch(HttpDispatchError::new("Timeout".to_owned()))),
|
||||
_ => Err(RetriableError::Rusoto(HttpDispatch(
|
||||
HttpDispatchError::new("Timeout".to_owned()),
|
||||
))),
|
||||
})
|
||||
}
|
||||
|
||||
fn make_retry<F, T, E, Fut>(
|
||||
timeout: Option<Duration>,
|
||||
mut future: F,
|
||||
) -> impl Future<Output = Result<T, RusotoError<E>>>
|
||||
) -> impl Future<Output = Result<T, RetriableError<E>>>
|
||||
where
|
||||
E: std::fmt::Debug,
|
||||
F: FnMut() -> Fut,
|
||||
Fut: Future<Output = Result<T, RusotoError<E>>>,
|
||||
Fut: Future<Output = Result<T, RetriableError<E>>>,
|
||||
{
|
||||
backoff::future::retry(
|
||||
backoff::ExponentialBackoffBuilder::new()
|
||||
|
@ -74,11 +81,11 @@ where
|
|||
.build(),
|
||||
move || {
|
||||
future().map_err(|err| match err {
|
||||
HttpDispatch(_) => {
|
||||
RetriableError::Rusoto(HttpDispatch(_)) => {
|
||||
gst::warning!(CAT, "Error waiting for operation ({:?}), retrying", err);
|
||||
backoff::Error::transient(err)
|
||||
}
|
||||
Unknown(ref response) => {
|
||||
RetriableError::Rusoto(Unknown(ref response)) => {
|
||||
gst::warning!(
|
||||
CAT,
|
||||
"Unknown error waiting for operation ({:?}), retrying",
|
||||
|
@ -103,11 +110,11 @@ pub fn wait_retry<F, T, E, Fut>(
|
|||
req_timeout: Option<Duration>,
|
||||
retry_timeout: Option<Duration>,
|
||||
mut future: F,
|
||||
) -> Result<T, WaitError<RusotoError<E>>>
|
||||
) -> Result<T, WaitError<RetriableError<E>>>
|
||||
where
|
||||
E: std::fmt::Debug,
|
||||
F: FnMut() -> Fut,
|
||||
Fut: Send + Future<Output = Result<T, RusotoError<E>>>,
|
||||
Fut: Send + Future<Output = Result<T, RetriableError<E>>>,
|
||||
Fut::Output: Send,
|
||||
T: Send,
|
||||
E: Send,
|
||||
|
@ -197,19 +204,3 @@ where
|
|||
|
||||
res
|
||||
}
|
||||
|
||||
pub fn wait_stream(
|
||||
canceller: &Mutex<Option<future::AbortHandle>>,
|
||||
stream: &mut ByteStream,
|
||||
) -> Result<Bytes, WaitError<std::io::Error>> {
|
||||
wait(canceller, async move {
|
||||
let mut collect = BytesMut::new();
|
||||
|
||||
// Loop over the stream and collect till we're done
|
||||
while let Some(item) = stream.try_next().await? {
|
||||
collect.put(item)
|
||||
}
|
||||
|
||||
Ok::<Bytes, std::io::Error>(collect.freeze())
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue