Migrate s3src and s3sink to use AWS SDK

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/759>
This commit is contained in:
Sanchayan Maity 2022-05-14 10:31:35 +05:30 committed by Sebastian Dröge
parent dccd4c3306
commit 768fad2445
5 changed files with 592 additions and 670 deletions

View file

@ -15,13 +15,19 @@ bytes = "1.0"
futures = "0.3" futures = "0.3"
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
aws-config = "0.12.0"
aws-sdk-s3 = "0.12.0"
aws-sdk-transcribe = "0.12.0"
aws-types = "0.12.0"
aws-smithy-http = { version = "0.42.0", features = [ "rt-tokio" ] }
aws-smithy-types = "0.42.0"
rusoto_core = "0.48" rusoto_core = "0.48"
rusoto_s3 = "0.48" rusoto_s3 = "0.48"
rusoto_credential = "0.48" rusoto_credential = "0.48"
rusoto_signature = "0.48" rusoto_signature = "0.48"
url = "2" url = "2"
percent-encoding = "2" percent-encoding = "2"
tokio = { version = "1.0", features = [ "rt-multi-thread", "time" ] } tokio = { version = "1.0", features = [ "full" ] }
async-tungstenite = { version = "0.17", features = ["tokio", "tokio-runtime", "tokio-native-tls"] } async-tungstenite = { version = "0.17", features = ["tokio", "tokio-runtime", "tokio-native-tls"] }
nom = "7" nom = "7"
crc = "3" crc = "3"

View file

@ -6,35 +6,37 @@
// //
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use futures::TryFutureExt;
use gst::glib; use gst::glib;
use gst::prelude::*; use gst::prelude::*;
use gst::subclass::prelude::*; use gst::subclass::prelude::*;
use gst_base::subclass::prelude::*; use gst_base::subclass::prelude::*;
use futures::future; use aws_sdk_s3::client::fluent_builders::{
use rusoto_core::{region::Region, request::HttpClient}; AbortMultipartUpload, CompleteMultipartUpload, CreateMultipartUpload, UploadPart,
use rusoto_credential::StaticProvider;
use rusoto_s3::{
AbortMultipartUploadRequest, CompleteMultipartUploadRequest, CompletedMultipartUpload,
CompletedPart, CreateMultipartUploadRequest, S3Client, UploadPartRequest, S3,
}; };
use aws_sdk_s3::config;
use aws_sdk_s3::model::{CompletedMultipartUpload, CompletedPart};
use aws_sdk_s3::types::ByteStream;
use aws_sdk_s3::{Client, Credentials, Region, RetryConfig};
use futures::future;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use std::collections::HashMap; use std::collections::HashMap;
use std::convert::From;
use std::sync::Mutex; use std::sync::Mutex;
use std::time::Duration; use std::time::Duration;
use crate::s3url::*; use crate::s3url::*;
use crate::s3utils::{self, duration_from_millis, duration_to_millis, RetriableError, WaitError}; use crate::s3utils::{self, duration_from_millis, duration_to_millis, WaitError};
use super::OnError; use super::OnError;
const DEFAULT_RETRY_ATTEMPTS: u32 = 5;
const DEFAULT_BUFFER_SIZE: u64 = 5 * 1024 * 1024;
const DEFAULT_MULTIPART_UPLOAD_ON_ERROR: OnError = OnError::DoNothing; const DEFAULT_MULTIPART_UPLOAD_ON_ERROR: OnError = OnError::DoNothing;
// General setting for create / abort requests // General setting for create / abort requests
const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 10_000; const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 15_000;
const DEFAULT_RETRY_DURATION_MSEC: u64 = 60_000; const DEFAULT_RETRY_DURATION_MSEC: u64 = 60_000;
// This needs to be independently configurable, as the part size can be upto 5GB // This needs to be independently configurable, as the part size can be upto 5GB
const DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC: u64 = 10_000; const DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC: u64 = 10_000;
@ -45,7 +47,7 @@ const DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC: u64 = 600_000; // 10 minutes
const DEFAULT_COMPLETE_RETRY_DURATION_MSEC: u64 = 3_600_000; // 60 minutes const DEFAULT_COMPLETE_RETRY_DURATION_MSEC: u64 = 3_600_000; // 60 minutes
struct Started { struct Started {
client: S3Client, client: Client,
buffer: Vec<u8>, buffer: Vec<u8>,
upload_id: String, upload_id: String,
part_number: i64, part_number: i64,
@ -53,7 +55,7 @@ struct Started {
} }
impl Started { impl Started {
pub fn new(client: S3Client, buffer: Vec<u8>, upload_id: String) -> Started { pub fn new(client: Client, buffer: Vec<u8>, upload_id: String) -> Started {
Started { Started {
client, client,
buffer, buffer,
@ -93,8 +95,6 @@ impl Default for State {
} }
} }
const DEFAULT_BUFFER_SIZE: u64 = 5 * 1024 * 1024;
struct Settings { struct Settings {
region: Region, region: Region,
bucket: Option<String>, bucket: Option<String>,
@ -104,40 +104,16 @@ struct Settings {
access_key: Option<String>, access_key: Option<String>,
secret_access_key: Option<String>, secret_access_key: Option<String>,
metadata: Option<gst::Structure>, metadata: Option<gst::Structure>,
retry_attempts: u32,
multipart_upload_on_error: OnError, multipart_upload_on_error: OnError,
request_timeout: Option<Duration>, request_timeout: Duration,
retry_duration: Option<Duration>,
upload_part_request_timeout: Option<Duration>,
upload_part_retry_duration: Option<Duration>,
complete_upload_request_timeout: Option<Duration>,
complete_upload_retry_duration: Option<Duration>,
} }
impl Settings { impl Settings {
fn to_uri(&self) -> String { fn to_uri(&self) -> String {
format!( format!(
"s3://{}/{}/{}", "s3://{}/{}/{}",
match self.region { self.region,
Region::Custom {
ref name,
ref endpoint,
} => {
format!(
"{}+{}",
base32::encode(
base32::Alphabet::RFC4648 { padding: true },
name.as_bytes(),
),
base32::encode(
base32::Alphabet::RFC4648 { padding: true },
endpoint.as_bytes(),
),
)
}
_ => {
String::from(self.region.name())
}
},
self.bucket.as_ref().unwrap(), self.bucket.as_ref().unwrap(),
self.key.as_ref().unwrap() self.key.as_ref().unwrap()
) )
@ -170,29 +146,17 @@ impl Settings {
impl Default for Settings { impl Default for Settings {
fn default() -> Self { fn default() -> Self {
Settings { Settings {
region: Region::default(), region: Region::new("us-west-2"),
bucket: None, bucket: None,
key: None, key: None,
content_type: None, content_type: None,
buffer_size: DEFAULT_BUFFER_SIZE,
access_key: None, access_key: None,
secret_access_key: None, secret_access_key: None,
metadata: None, metadata: None,
buffer_size: DEFAULT_BUFFER_SIZE,
retry_attempts: DEFAULT_RETRY_ATTEMPTS,
multipart_upload_on_error: DEFAULT_MULTIPART_UPLOAD_ON_ERROR, multipart_upload_on_error: DEFAULT_MULTIPART_UPLOAD_ON_ERROR,
request_timeout: Some(Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC)), request_timeout: Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC),
retry_duration: Some(Duration::from_millis(DEFAULT_RETRY_DURATION_MSEC)),
upload_part_request_timeout: Some(Duration::from_millis(
DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC,
)),
upload_part_retry_duration: Some(Duration::from_millis(
DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC,
)),
complete_upload_request_timeout: Some(Duration::from_millis(
DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC,
)),
complete_upload_retry_duration: Some(Duration::from_millis(
DEFAULT_COMPLETE_RETRY_DURATION_MSEC,
)),
} }
} }
} }
@ -219,8 +183,9 @@ impl S3Sink {
&self, &self,
element: &super::S3Sink, element: &super::S3Sink,
) -> Result<(), Option<gst::ErrorMessage>> { ) -> Result<(), Option<gst::ErrorMessage>> {
let upload_part_req: UploadPart = self.create_upload_part_request()?;
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap();
let state = match *state { let state = match *state {
State::Started(ref mut started_state) => started_state, State::Started(ref mut started_state) => started_state,
State::Stopped => { State::Stopped => {
@ -228,28 +193,13 @@ impl S3Sink {
} }
}; };
let part_number = state.increment_part_number()?; let part_number = state.part_number;
let body = std::mem::replace(
&mut state.buffer,
Vec::with_capacity(settings.buffer_size as usize),
);
let upload_id = &state.upload_id;
let client = &state.client;
let upload_part_req_future = || { let upload_part_req_future = upload_part_req.send();
client let output =
.upload_part(self.create_upload_part_request(&body, part_number, upload_id)) s3utils::wait(&self.canceller, upload_part_req_future).map_err(|err| match err {
.map_err(RetriableError::Rusoto)
};
let output = s3utils::wait_retry(
&self.canceller,
settings.upload_part_request_timeout,
settings.upload_part_retry_duration,
upload_part_req_future,
)
.map_err(|err| match err {
WaitError::FutureError(err) => { WaitError::FutureError(err) => {
let settings = self.settings.lock().unwrap();
match settings.multipart_upload_on_error { match settings.multipart_upload_on_error {
OnError::Abort => { OnError::Abort => {
gst::log!( gst::log!(
@ -301,80 +251,119 @@ impl S3Sink {
} }
Some(gst::error_msg!( Some(gst::error_msg!(
gst::ResourceError::OpenWrite, gst::ResourceError::OpenWrite,
["Failed to upload part: {:?}", err] ["Failed to upload part: {}", err]
)) ))
} }
WaitError::Cancelled => None, WaitError::Cancelled => None,
})?; })?;
state.completed_parts.push(CompletedPart { let completed_part = CompletedPart::builder()
e_tag: output.e_tag, .set_e_tag(output.e_tag)
part_number: Some(part_number), .set_part_number(Some(part_number as i32))
}); .build();
state.completed_parts.push(completed_part);
gst::info!(CAT, obj: element, "Uploaded part {}", part_number); gst::info!(CAT, obj: element, "Uploaded part {}", part_number);
Ok(()) Ok(())
} }
fn create_upload_part_request( fn create_upload_part_request(&self) -> Result<UploadPart, gst::ErrorMessage> {
&self,
body: &[u8],
part_number: i64,
upload_id: &str,
) -> UploadPartRequest {
let url = self.url.lock().unwrap(); let url = self.url.lock().unwrap();
let settings = self.settings.lock().unwrap();
UploadPartRequest { let mut state = self.state.lock().unwrap();
body: Some(rusoto_core::ByteStream::from(body.to_owned())), let state = match *state {
bucket: url.as_ref().unwrap().bucket.to_owned(), State::Started(ref mut started_state) => started_state,
key: url.as_ref().unwrap().object.to_owned(), State::Stopped => {
upload_id: upload_id.to_owned(), unreachable!("Element should be started");
part_number,
..Default::default()
} }
};
let part_number = state.increment_part_number()?;
let body = Some(ByteStream::from(std::mem::replace(
&mut state.buffer,
Vec::with_capacity(settings.buffer_size as usize),
)));
let bucket = Some(url.as_ref().unwrap().bucket.to_owned());
let key = Some(url.as_ref().unwrap().object.to_owned());
let upload_id = Some(state.upload_id.to_owned());
let client = &state.client;
let upload_part = client
.upload_part()
.set_body(body)
.set_bucket(bucket)
.set_key(key)
.set_upload_id(upload_id)
.set_part_number(Some(part_number as i32));
Ok(upload_part)
} }
fn create_complete_multipart_upload_request( fn create_complete_multipart_upload_request(
&self, &self,
started_state: &Started, started_state: &mut Started,
completed_upload: CompletedMultipartUpload, ) -> CompleteMultipartUpload {
) -> CompleteMultipartUploadRequest { started_state
.completed_parts
.sort_by(|a, b| a.part_number.cmp(&b.part_number));
let parts = Some(std::mem::take(&mut started_state.completed_parts));
let completed_upload = CompletedMultipartUpload::builder().set_parts(parts).build();
let url = self.url.lock().unwrap(); let url = self.url.lock().unwrap();
CompleteMultipartUploadRequest { let client = &started_state.client;
bucket: url.as_ref().unwrap().bucket.to_owned(),
key: url.as_ref().unwrap().object.to_owned(), let bucket = Some(url.as_ref().unwrap().bucket.to_owned());
upload_id: started_state.upload_id.to_owned(), let key = Some(url.as_ref().unwrap().object.to_owned());
multipart_upload: Some(completed_upload), let upload_id = Some(started_state.upload_id.to_owned());
..Default::default() let multipart_upload = Some(completed_upload);
}
client
.complete_multipart_upload()
.set_bucket(bucket)
.set_key(key)
.set_upload_id(upload_id)
.set_multipart_upload(multipart_upload)
} }
fn create_create_multipart_upload_request( fn create_create_multipart_upload_request(
&self, &self,
client: &Client,
url: &GstS3Url, url: &GstS3Url,
settings: &Settings, settings: &Settings,
) -> CreateMultipartUploadRequest { ) -> CreateMultipartUpload {
CreateMultipartUploadRequest { let bucket = Some(url.bucket.clone());
bucket: url.bucket.clone(), let key = Some(url.object.clone());
key: url.object.clone(), let content_type = settings.content_type.clone();
content_type: settings.content_type.clone(), let metadata = settings.to_metadata(&self.instance());
metadata: settings.to_metadata(&self.instance()),
..Default::default() client
} .create_multipart_upload()
.set_bucket(bucket)
.set_key(key)
.set_content_type(content_type)
.set_metadata(metadata)
} }
fn create_abort_multipart_upload_request( fn create_abort_multipart_upload_request(
&self, &self,
client: &Client,
url: &GstS3Url, url: &GstS3Url,
started_state: &Started, started_state: &Started,
) -> AbortMultipartUploadRequest { ) -> AbortMultipartUpload {
AbortMultipartUploadRequest { let bucket = Some(url.bucket.clone());
bucket: url.bucket.clone(), let key = Some(url.object.clone());
expected_bucket_owner: None,
key: url.object.clone(), client
request_payer: None, .abort_multipart_upload()
upload_id: started_state.upload_id.to_owned(), .set_bucket(bucket)
} .set_expected_bucket_owner(None)
.set_key(key)
.set_request_payer(None)
.set_upload_id(Some(started_state.upload_id.to_owned()))
} }
fn abort_multipart_upload_request( fn abort_multipart_upload_request(
@ -385,26 +374,18 @@ impl S3Sink {
Some(ref url) => url.clone(), Some(ref url) => url.clone(),
None => unreachable!("Element should be started"), None => unreachable!("Element should be started"),
}; };
let abort_req_future = || {
let abort_req = self.create_abort_multipart_upload_request(&s3url, started_state);
started_state
.client
.abort_multipart_upload(abort_req)
.map_err(RetriableError::Rusoto)
};
s3utils::wait_retry( let client = &started_state.client;
&self.abort_multipart_canceller, let abort_req = self.create_abort_multipart_upload_request(client, &s3url, started_state);
Some(Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC)), let abort_req_future = abort_req.send();
Some(Duration::from_millis(DEFAULT_RETRY_DURATION_MSEC)),
abort_req_future, s3utils::wait(&self.abort_multipart_canceller, abort_req_future)
)
.map(|_| ()) .map(|_| ())
.map_err(|err| match err { .map_err(|err| match err {
WaitError::FutureError(err) => { WaitError::FutureError(err) => {
gst::error_msg!( gst::error_msg!(
gst::ResourceError::Write, gst::ResourceError::Write,
["Failed to abort multipart upload: {:?}.", err] ["Failed to abort multipart upload: {}.", err.to_string()]
) )
} }
WaitError::Cancelled => { WaitError::Cancelled => {
@ -420,37 +401,21 @@ impl S3Sink {
&self, &self,
started_state: &mut Started, started_state: &mut Started,
) -> Result<(), gst::ErrorMessage> { ) -> Result<(), gst::ErrorMessage> {
started_state let complete_req = self.create_complete_multipart_upload_request(started_state);
.completed_parts let complete_req_future = complete_req.send();
.sort_by(|a, b| a.part_number.cmp(&b.part_number));
let completed_upload = CompletedMultipartUpload { s3utils::wait(&self.canceller, complete_req_future)
parts: Some(std::mem::take(&mut started_state.completed_parts)),
};
let complete_req_future = || {
let complete_req = self
.create_complete_multipart_upload_request(started_state, completed_upload.clone());
started_state
.client
.complete_multipart_upload(complete_req)
.map_err(RetriableError::Rusoto)
};
s3utils::wait_retry(
&self.canceller,
Some(Duration::from_millis(DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC)),
Some(Duration::from_millis(DEFAULT_COMPLETE_RETRY_DURATION_MSEC)),
complete_req_future,
)
.map(|_| ()) .map(|_| ())
.map_err(|err| match err { .map_err(|err| match err {
WaitError::FutureError(err) => gst::error_msg!( WaitError::FutureError(err) => gst::error_msg!(
gst::ResourceError::Write, gst::ResourceError::Write,
["Failed to complete multipart upload: {:?}.", err] ["Failed to complete multipart upload: {}.", err.to_string()]
), ),
WaitError::Cancelled => { WaitError::Cancelled => {
gst::error_msg!(gst::LibraryError::Failed, ["Interrupted during stop"]) gst::error_msg!(
gst::LibraryError::Failed,
["Complete multipart upload request interrupted"]
)
} }
}) })
} }
@ -492,46 +457,61 @@ impl S3Sink {
} }
}; };
let client = match ( let timeout_config = s3utils::timeout_config(settings.request_timeout);
let cred = match (
settings.access_key.as_ref(), settings.access_key.as_ref(),
settings.secret_access_key.as_ref(), settings.secret_access_key.as_ref(),
) { ) {
(Some(access_key), Some(secret_access_key)) => { (Some(access_key), Some(secret_access_key)) => Some(Credentials::new(
let creds = access_key.clone(),
StaticProvider::new_minimal(access_key.clone(), secret_access_key.clone()); secret_access_key.clone(),
S3Client::new_with( None,
HttpClient::new().expect("failed to create request dispatcher"), None,
creds, "rusoto-s3-sink",
s3url.region.clone(), )),
) _ => None,
}
_ => S3Client::new(s3url.region.clone()),
}; };
let create_multipart_req_future = || { let sdk_config =
let create_multipart_req = s3utils::wait_config(&self.canceller, s3url.region.clone(), timeout_config, cred)
self.create_create_multipart_upload_request(&s3url, &settings);
client
.create_multipart_upload(create_multipart_req)
.map_err(RetriableError::Rusoto)
};
let response = s3utils::wait_retry(
&self.canceller,
Some(Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC)),
Some(Duration::from_millis(DEFAULT_RETRY_DURATION_MSEC)),
create_multipart_req_future,
)
.map_err(|err| match err { .map_err(|err| match err {
WaitError::FutureError(err) => gst::error_msg!( WaitError::FutureError(err) => gst::error_msg!(
gst::ResourceError::OpenWrite, gst::ResourceError::OpenWrite,
["Failed to create multipart upload: {:?}", err] ["Failed to create SDK config: {}", err]
), ),
WaitError::Cancelled => { WaitError::Cancelled => {
gst::error_msg!(gst::LibraryError::Failed, ["Interrupted during start"]) gst::error_msg!(
gst::LibraryError::Failed,
["SDK config request interrupted during start"]
)
} }
})?; })?;
let config = config::Builder::from(&sdk_config)
.retry_config(RetryConfig::new().with_max_attempts(settings.retry_attempts))
.build();
let client = Client::from_conf(config);
let create_multipart_req =
self.create_create_multipart_upload_request(&client, &s3url, &settings);
let create_multipart_req_future = create_multipart_req.send();
let response = s3utils::wait(&self.canceller, create_multipart_req_future).map_err(
|err| match err {
WaitError::FutureError(err) => gst::error_msg!(
gst::ResourceError::OpenWrite,
["Failed to create multipart upload: {}", err]
),
WaitError::Cancelled => {
gst::error_msg!(
gst::LibraryError::Failed,
["Create multipart request interrupted during start"]
)
}
},
)?;
let upload_id = response.upload_id.ok_or_else(|| { let upload_id = response.upload_id.ok_or_else(|| {
gst::error_msg!( gst::error_msg!(
gst::ResourceError::Failed, gst::ResourceError::Failed,
@ -662,7 +642,7 @@ impl ObjectImpl for S3Sink {
"region", "region",
"AWS Region", "AWS Region",
"An AWS region (e.g. eu-west-2).", "An AWS region (e.g. eu-west-2).",
None, Some("us-west-2"),
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
), ),
glib::ParamSpecUInt64::new( glib::ParamSpecUInt64::new(
@ -710,6 +690,15 @@ impl ObjectImpl for S3Sink {
DEFAULT_MULTIPART_UPLOAD_ON_ERROR as i32, DEFAULT_MULTIPART_UPLOAD_ON_ERROR as i32,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
), ),
glib::ParamSpecUInt::new(
"retry-attempts",
"Retry attempts",
"Number of times AWS SDK attempts a request before abandoning the request",
1,
10,
DEFAULT_RETRY_ATTEMPTS,
glib::ParamFlags::READWRITE,
),
glib::ParamSpecInt64::new( glib::ParamSpecInt64::new(
"request-timeout", "request-timeout",
"Request timeout", "Request timeout",
@ -719,46 +708,46 @@ impl ObjectImpl for S3Sink {
DEFAULT_REQUEST_TIMEOUT_MSEC as i64, DEFAULT_REQUEST_TIMEOUT_MSEC as i64,
glib::ParamFlags::READWRITE, glib::ParamFlags::READWRITE,
), ),
glib::ParamSpecInt64::new(
"retry-duration",
"Retry duration",
"How long we should retry general S3 requests before giving up (in ms, set to -1 for infinity)",
-1,
std::i64::MAX,
DEFAULT_RETRY_DURATION_MSEC as i64,
glib::ParamFlags::READWRITE,
),
glib::ParamSpecInt64::new( glib::ParamSpecInt64::new(
"upload-part-request-timeout", "upload-part-request-timeout",
"Upload part request timeout", "Upload part request timeout",
"Timeout for a single upload part request (in ms, set to -1 for infinity)", "Timeout for a single upload part request (in ms, set to -1 for infinity) (Deprecated. Use request-timeout.)",
-1, -1,
std::i64::MAX, std::i64::MAX,
DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC as i64, DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC as i64,
glib::ParamFlags::READWRITE, glib::ParamFlags::READWRITE,
), ),
glib::ParamSpecInt64::new(
"upload-part-retry-duration",
"Upload part retry duration",
"How long we should retry upload part requests before giving up (in ms, set to -1 for infinity)",
-1,
std::i64::MAX,
DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC as i64,
glib::ParamFlags::READWRITE,
),
glib::ParamSpecInt64::new( glib::ParamSpecInt64::new(
"complete-upload-request-timeout", "complete-upload-request-timeout",
"Complete upload request timeout", "Complete upload request timeout",
"Timeout for the complete multipart upload request (in ms, set to -1 for infinity)", "Timeout for the complete multipart upload request (in ms, set to -1 for infinity) (Deprecated. Use request-timeout.)",
-1, -1,
std::i64::MAX, std::i64::MAX,
DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC as i64, DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC as i64,
glib::ParamFlags::READWRITE, glib::ParamFlags::READWRITE,
), ),
glib::ParamSpecInt64::new(
"retry-duration",
"Retry duration",
"How long we should retry general S3 requests before giving up (in ms, set to -1 for infinity) (Deprecated. Use retry-attempts.)",
-1,
std::i64::MAX,
DEFAULT_RETRY_DURATION_MSEC as i64,
glib::ParamFlags::READWRITE,
),
glib::ParamSpecInt64::new(
"upload-part-retry-duration",
"Upload part retry duration",
"How long we should retry upload part requests before giving up (in ms, set to -1 for infinity) (Deprecated. Use retry-attempts.)",
-1,
std::i64::MAX,
DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC as i64,
glib::ParamFlags::READWRITE,
),
glib::ParamSpecInt64::new( glib::ParamSpecInt64::new(
"complete-upload-retry-duration", "complete-upload-retry-duration",
"Complete upload retry duration", "Complete upload retry duration",
"How long we should retry complete multipart upload requests before giving up (in ms, set to -1 for infinity)", "How long we should retry complete multipart upload requests before giving up (in ms, set to -1 for infinity) (Deprecated. Use retry-attempts.)",
-1, -1,
std::i64::MAX, std::i64::MAX,
DEFAULT_COMPLETE_RETRY_DURATION_MSEC as i64, DEFAULT_COMPLETE_RETRY_DURATION_MSEC as i64,
@ -806,17 +795,7 @@ impl ObjectImpl for S3Sink {
} }
"region" => { "region" => {
let region = value.get::<String>().expect("type checked upstream"); let region = value.get::<String>().expect("type checked upstream");
settings.region = region settings.region = Region::new(region);
.parse::<Region>()
.or_else(|_| {
let (name, endpoint) = region.split_once('+').ok_or(())?;
Ok(Region::Custom {
name: name.into(),
endpoint: endpoint.into(),
})
})
.unwrap_or_else(|_: ()| panic!("Invalid region '{}'", region));
if settings.key.is_some() && settings.bucket.is_some() { if settings.key.is_some() && settings.bucket.is_some() {
let _ = self.set_uri(obj, Some(&settings.to_uri())); let _ = self.set_uri(obj, Some(&settings.to_uri()));
} }
@ -840,29 +819,34 @@ impl ObjectImpl for S3Sink {
settings.multipart_upload_on_error = settings.multipart_upload_on_error =
value.get::<OnError>().expect("type checked upstream"); value.get::<OnError>().expect("type checked upstream");
} }
"retry-attempts" => {
settings.retry_attempts = value.get::<u32>().expect("type checked upstream");
}
"request-timeout" => { "request-timeout" => {
settings.request_timeout = settings.request_timeout =
duration_from_millis(value.get::<i64>().expect("type checked upstream")); duration_from_millis(value.get::<i64>().expect("type checked upstream"));
} }
"retry-duration" => {
settings.retry_duration =
duration_from_millis(value.get::<i64>().expect("type checked upstream"));
}
"upload-part-request-timeout" => { "upload-part-request-timeout" => {
settings.upload_part_request_timeout = settings.request_timeout =
duration_from_millis(value.get::<i64>().expect("type checked upstream"));
}
"upload-part-retry-duration" => {
settings.upload_part_retry_duration =
duration_from_millis(value.get::<i64>().expect("type checked upstream")); duration_from_millis(value.get::<i64>().expect("type checked upstream"));
} }
"complete-upload-request-timeout" => { "complete-upload-request-timeout" => {
settings.complete_upload_request_timeout = settings.request_timeout =
duration_from_millis(value.get::<i64>().expect("type checked upstream")); duration_from_millis(value.get::<i64>().expect("type checked upstream"));
} }
"complete-upload-retry-duration" => { "retry-duration" | "upload-part-retry-duration" | "complete-upload-retry-duration" => {
settings.complete_upload_retry_duration = /*
duration_from_millis(value.get::<i64>().expect("type checked upstream")); * To maintain backwards compatibility calculate retry attempts
* by dividing the provided duration from request timeout.
*/
let value = value.get::<i64>().expect("type checked upstream");
let request_timeout = duration_to_millis(Some(settings.request_timeout));
let retry_attempts = if value > request_timeout {
value / request_timeout
} else {
request_timeout / value
};
settings.retry_attempts = retry_attempts as u32;
} }
_ => unimplemented!(), _ => unimplemented!(),
} }
@ -874,7 +858,7 @@ impl ObjectImpl for S3Sink {
match pspec.name() { match pspec.name() {
"key" => settings.key.to_value(), "key" => settings.key.to_value(),
"bucket" => settings.bucket.to_value(), "bucket" => settings.bucket.to_value(),
"region" => settings.region.name().to_value(), "region" => settings.region.to_string().to_value(),
"part-size" => settings.buffer_size.to_value(), "part-size" => settings.buffer_size.to_value(),
"uri" => { "uri" => {
let url = match *self.url.lock().unwrap() { let url = match *self.url.lock().unwrap() {
@ -888,19 +872,17 @@ impl ObjectImpl for S3Sink {
"secret-access-key" => settings.secret_access_key.to_value(), "secret-access-key" => settings.secret_access_key.to_value(),
"metadata" => settings.metadata.to_value(), "metadata" => settings.metadata.to_value(),
"on-error" => settings.multipart_upload_on_error.to_value(), "on-error" => settings.multipart_upload_on_error.to_value(),
"request-timeout" => duration_to_millis(settings.request_timeout).to_value(), "retry-attempts" => settings.retry_attempts.to_value(),
"retry-duration" => duration_to_millis(settings.retry_duration).to_value(), "request-timeout" => duration_to_millis(Some(settings.request_timeout)).to_value(),
"upload-part-request-timeout" => { "upload-part-request-timeout" => {
duration_to_millis(settings.upload_part_request_timeout).to_value() duration_to_millis(Some(settings.request_timeout)).to_value()
}
"upload-part-retry-duration" => {
duration_to_millis(settings.upload_part_retry_duration).to_value()
} }
"complete-upload-request-timeout" => { "complete-upload-request-timeout" => {
duration_to_millis(settings.complete_upload_request_timeout).to_value() duration_to_millis(Some(settings.request_timeout)).to_value()
} }
"complete-upload-retry-duration" => { "retry-duration" | "upload-part-retry-duration" | "complete-upload-retry-duration" => {
duration_to_millis(settings.complete_upload_retry_duration).to_value() let request_timeout = duration_to_millis(Some(settings.request_timeout));
(settings.retry_attempts as i64 * request_timeout).to_value()
} }
_ => unimplemented!(), _ => unimplemented!(),
} }

View file

@ -6,17 +6,14 @@
// //
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use bytes::Bytes;
use futures::future;
use once_cell::sync::Lazy;
use std::sync::Mutex; use std::sync::Mutex;
use std::time::Duration; use std::time::Duration;
use bytes::{buf::BufMut, Bytes, BytesMut}; use aws_sdk_s3::config;
use futures::future; use aws_sdk_s3::{Client, Credentials, RetryConfig};
use futures::{TryFutureExt, TryStreamExt};
use once_cell::sync::Lazy;
use rusoto_core::request::HttpClient;
use rusoto_credential::StaticProvider;
use rusoto_s3::GetObjectError;
use rusoto_s3::{GetObjectRequest, HeadObjectRequest, S3Client, S3};
use gst::glib; use gst::glib;
use gst::prelude::*; use gst::prelude::*;
@ -27,9 +24,10 @@ use gst_base::subclass::base_src::CreateSuccess;
use gst_base::subclass::prelude::*; use gst_base::subclass::prelude::*;
use crate::s3url::*; use crate::s3url::*;
use crate::s3utils::{self, duration_from_millis, duration_to_millis, RetriableError, WaitError}; use crate::s3utils::{self, duration_from_millis, duration_to_millis, WaitError};
const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 10_000; const DEFAULT_RETRY_ATTEMPTS: u32 = 5;
const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 15000;
const DEFAULT_RETRY_DURATION_MSEC: u64 = 60_000; const DEFAULT_RETRY_DURATION_MSEC: u64 = 60_000;
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
@ -37,7 +35,7 @@ enum StreamingState {
Stopped, Stopped,
Started { Started {
url: GstS3Url, url: GstS3Url,
client: S3Client, client: Client,
size: u64, size: u64,
}, },
} }
@ -48,13 +46,25 @@ impl Default for StreamingState {
} }
} }
#[derive(Default)]
struct Settings { struct Settings {
url: Option<GstS3Url>, url: Option<GstS3Url>,
access_key: Option<String>, access_key: Option<String>,
secret_access_key: Option<String>, secret_access_key: Option<String>,
request_timeout: Option<Duration>, retry_attempts: u32,
retry_duration: Option<Duration>, request_timeout: Duration,
}
impl Default for Settings {
fn default() -> Self {
let duration = Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC);
Self {
url: None,
access_key: None,
secret_access_key: None,
retry_attempts: DEFAULT_RETRY_ATTEMPTS,
request_timeout: duration,
}
}
} }
#[derive(Default)] #[derive(Default)]
@ -81,24 +91,44 @@ impl S3Src {
}; };
} }
fn connect(self: &S3Src, url: &GstS3Url) -> S3Client { fn connect(self: &S3Src, url: &GstS3Url) -> Result<Client, gst::ErrorMessage> {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
let timeout_config = s3utils::timeout_config(settings.request_timeout);
match ( let cred = match (
settings.access_key.as_ref(), settings.access_key.as_ref(),
settings.secret_access_key.as_ref(), settings.secret_access_key.as_ref(),
) { ) {
(Some(access_key), Some(secret_access_key)) => { (Some(access_key), Some(secret_access_key)) => Some(Credentials::new(
let creds = access_key.clone(),
StaticProvider::new_minimal(access_key.clone(), secret_access_key.clone()); secret_access_key.clone(),
S3Client::new_with( None,
HttpClient::new().expect("failed to create request dispatcher"), None,
creds, "rusoto-s3-src",
url.region.clone(), )),
_ => None,
};
let sdk_config =
s3utils::wait_config(&self.canceller, url.region.clone(), timeout_config, cred)
.map_err(|err| match err {
WaitError::FutureError(err) => gst::error_msg!(
gst::ResourceError::OpenWrite,
["Failed to create SDK config: {}", err]
),
WaitError::Cancelled => {
gst::error_msg!(
gst::LibraryError::Failed,
["SDK config request interrupted during start"]
) )
} }
_ => S3Client::new(url.region.clone()), })?;
}
let config = config::Builder::from(&sdk_config)
.retry_config(RetryConfig::new().with_max_attempts(settings.retry_attempts))
.build();
Ok(Client::from_conf(config))
} }
fn set_uri(self: &S3Src, _: &super::S3Src, url_str: Option<&str>) -> Result<(), glib::Error> { fn set_uri(self: &S3Src, _: &super::S3Src, url_str: Option<&str>) -> Result<(), glib::Error> {
@ -134,47 +164,38 @@ impl S3Src {
fn head( fn head(
self: &S3Src, self: &S3Src,
src: &super::S3Src, src: &super::S3Src,
client: &S3Client, client: &Client,
url: &GstS3Url, url: &GstS3Url,
) -> Result<u64, gst::ErrorMessage> { ) -> Result<u64, gst::ErrorMessage> {
let settings = self.settings.lock().unwrap(); let head_object = client
.head_object()
.set_bucket(Some(url.bucket.clone()))
.set_key(Some(url.object.clone()))
.set_version_id(url.version.clone());
let head_object_future = head_object.send();
let head_object_future = || { let output =
client s3utils::wait(&self.canceller, head_object_future).map_err(|err| match err {
.head_object(HeadObjectRequest {
bucket: url.bucket.clone(),
key: url.object.clone(),
version_id: url.version.clone(),
..Default::default()
})
.map_err(RetriableError::Rusoto)
};
let output = s3utils::wait_retry(
&self.canceller,
settings.request_timeout,
settings.retry_duration,
head_object_future,
)
.map_err(|err| match err {
WaitError::FutureError(err) => gst::error_msg!( WaitError::FutureError(err) => gst::error_msg!(
gst::ResourceError::NotFound, gst::ResourceError::NotFound,
["Failed to HEAD object: {:?}", err] ["Failed to get HEAD object: {:?}", err]
), ),
WaitError::Cancelled => { WaitError::Cancelled => {
gst::error_msg!(gst::LibraryError::Failed, ["Interrupted during start"]) gst::error_msg!(
gst::LibraryError::Failed,
["Head object request interrupted"]
)
} }
})?; })?;
if let Some(size) = output.content_length { gst::info!(
gst::info!(CAT, obj: src, "HEAD success, content length = {}", size); CAT,
Ok(size as u64) obj: src,
} else { "HEAD success, content length = {}",
Err(gst::error_msg!( output.content_length
gst::ResourceError::Read, );
["Failed to get content length"]
)) Ok(output.content_length as u64)
}
} }
/* Returns the bytes, Some(error) if one occured, or a None error if interrupted */ /* Returns the bytes, Some(error) if one occured, or a None error if interrupted */
@ -200,9 +221,13 @@ impl S3Src {
} }
}; };
let settings = self.settings.lock().unwrap(); let get_object = client
.get_object()
.set_bucket(Some(url.bucket.clone()))
.set_key(Some(url.object.clone()))
.set_range(Some(format!("bytes={}-{}", offset, offset + length - 1)))
.set_version_id(url.version.clone());
let get_object_future = || async {
gst::debug!( gst::debug!(
CAT, CAT,
obj: src, obj: src,
@ -211,46 +236,23 @@ impl S3Src {
offset + length - 1 offset + length - 1
); );
let output = client let get_object_future = get_object.send();
.get_object(GetObjectRequest {
bucket: url.bucket.clone(),
key: url.object.clone(),
range: Some(format!("bytes={}-{}", offset, offset + length - 1)),
version_id: url.version.clone(),
..Default::default()
})
.map_err(RetriableError::Rusoto)
.await?;
gst::debug!( let mut output =
CAT, s3utils::wait(&self.canceller, get_object_future).map_err(|err| match err {
obj: src,
"Read {} bytes",
output.content_length.unwrap()
);
let mut collect = BytesMut::new();
let mut stream = output.body.unwrap();
// Loop over the stream and collect till we're done
// FIXME: Can we use TryStreamExt::collect() here?
while let Some(item) = stream.try_next().map_err(RetriableError::Std).await? {
collect.put(item)
}
Ok::<Bytes, RetriableError<GetObjectError>>(collect.freeze())
};
s3utils::wait_retry(
&self.canceller,
settings.request_timeout,
settings.retry_duration,
get_object_future,
)
.map_err(|err| match err {
WaitError::FutureError(err) => Some(gst::error_msg!( WaitError::FutureError(err) => Some(gst::error_msg!(
gst::ResourceError::Read, gst::ResourceError::Read,
["Could not read: {:?}", err] ["Could not read: {}", err]
)),
WaitError::Cancelled => None,
})?;
gst::debug!(CAT, obj: src, "Read {} bytes", output.content_length);
s3utils::wait_stream(&self.canceller, &mut output.body).map_err(|err| match err {
WaitError::FutureError(err) => Some(gst::error_msg!(
gst::ResourceError::Read,
["Could not read: {}", err]
)), )),
WaitError::Cancelled => None, WaitError::Cancelled => None,
}) })
@ -302,12 +304,21 @@ impl ObjectImpl for S3Src {
glib::ParamSpecInt64::new( glib::ParamSpecInt64::new(
"retry-duration", "retry-duration",
"Retry duration", "Retry duration",
"How long we should retry S3 requests before giving up (in ms, set to -1 for infinity)", "How long we should retry S3 requests before giving up (in ms, set to -1 for infinity) (Deprecated. Use retry-attempts.)",
-1, -1,
std::i64::MAX, std::i64::MAX,
DEFAULT_RETRY_DURATION_MSEC as i64, DEFAULT_RETRY_DURATION_MSEC as i64,
glib::ParamFlags::READWRITE, glib::ParamFlags::READWRITE,
), ),
glib::ParamSpecUInt::new(
"retry-attempts",
"Retry attempts",
"Number of times AWS SDK attempts a request before abandoning the request",
1,
10,
DEFAULT_RETRY_ATTEMPTS,
glib::ParamFlags::READWRITE,
),
] ]
}); });
@ -321,27 +332,39 @@ impl ObjectImpl for S3Src {
value: &glib::Value, value: &glib::Value,
pspec: &glib::ParamSpec, pspec: &glib::ParamSpec,
) { ) {
let mut settings = self.settings.lock().unwrap();
match pspec.name() { match pspec.name() {
"uri" => { "uri" => {
drop(settings);
let _ = self.set_uri(obj, value.get().expect("type checked upstream")); let _ = self.set_uri(obj, value.get().expect("type checked upstream"));
} }
"access-key" => { "access-key" => {
let mut settings = self.settings.lock().unwrap();
settings.access_key = value.get().expect("type checked upstream"); settings.access_key = value.get().expect("type checked upstream");
} }
"secret-access-key" => { "secret-access-key" => {
let mut settings = self.settings.lock().unwrap();
settings.secret_access_key = value.get().expect("type checked upstream"); settings.secret_access_key = value.get().expect("type checked upstream");
} }
"request-timeout" => { "request-timeout" => {
let mut settings = self.settings.lock().unwrap();
settings.request_timeout = settings.request_timeout =
duration_from_millis(value.get::<i64>().expect("type checked upstream")); duration_from_millis(value.get::<i64>().expect("type checked upstream"));
} }
"retry-duration" => { "retry-duration" => {
let mut settings = self.settings.lock().unwrap(); /*
settings.retry_duration = * To maintain backwards compatibility calculate retry attempts
duration_from_millis(value.get::<i64>().expect("type checked upstream")); * by dividing the provided duration from request timeout.
*/
let value = value.get::<i64>().expect("type checked upstream");
let request_timeout = duration_to_millis(Some(settings.request_timeout));
let retry_attempts = if value > request_timeout {
value / request_timeout
} else {
request_timeout / value
};
settings.retry_attempts = retry_attempts as u32;
}
"retry-attempts" => {
settings.retry_attempts = value.get::<u32>().expect("type checked upstream");
} }
_ => unimplemented!(), _ => unimplemented!(),
} }
@ -361,8 +384,12 @@ impl ObjectImpl for S3Src {
} }
"access-key" => settings.access_key.to_value(), "access-key" => settings.access_key.to_value(),
"secret-access-key" => settings.secret_access_key.to_value(), "secret-access-key" => settings.secret_access_key.to_value(),
"request-timeout" => duration_to_millis(settings.request_timeout).to_value(), "request-timeout" => duration_to_millis(Some(settings.request_timeout)).to_value(),
"retry-duration" => duration_to_millis(settings.retry_duration).to_value(), "retry-duration" => {
let request_timeout = duration_to_millis(Some(settings.request_timeout));
(settings.retry_attempts as i64 * request_timeout).to_value()
}
"retry-attempts" => settings.retry_attempts.to_value(),
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
@ -459,7 +486,7 @@ impl BaseSrcImpl for S3Src {
}; };
drop(settings); drop(settings);
let s3client = self.connect(&s3url); if let Ok(s3client) = self.connect(&s3url) {
let size = self.head(src, &s3client, &s3url)?; let size = self.head(src, &s3client, &s3url)?;
*state = StreamingState::Started { *state = StreamingState::Started {
@ -469,6 +496,12 @@ impl BaseSrcImpl for S3Src {
}; };
Ok(()) Ok(())
} else {
Err(gst::error_msg!(
gst::ResourceError::Failed,
["Cannot connect to S3 resource"]
))
}
} }
fn stop(&self, _: &Self::Type) -> Result<(), gst::ErrorMessage> { fn stop(&self, _: &Self::Type) -> Result<(), gst::ErrorMessage> {

View file

@ -6,8 +6,8 @@
// //
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use aws_sdk_s3::Region;
use percent_encoding::{percent_decode, percent_encode, AsciiSet, CONTROLS}; use percent_encoding::{percent_decode, percent_encode, AsciiSet, CONTROLS};
use rusoto_core::Region;
use url::Url; use url::Url;
#[derive(Clone)] #[derive(Clone)]
@ -29,27 +29,7 @@ impl ToString for GstS3Url {
fn to_string(&self) -> String { fn to_string(&self) -> String {
format!( format!(
"s3://{}/{}/{}{}", "s3://{}/{}/{}{}",
match self.region { self.region,
Region::Custom {
ref name,
ref endpoint,
} => {
format!(
"{}+{}",
base32::encode(
base32::Alphabet::RFC4648 { padding: true },
name.as_bytes(),
),
base32::encode(
base32::Alphabet::RFC4648 { padding: true },
endpoint.as_bytes(),
),
)
}
_ => {
String::from(self.region.name())
}
},
self.bucket, self.bucket,
percent_encode(self.object.as_bytes(), PATH_SEGMENT), percent_encode(self.object.as_bytes(), PATH_SEGMENT),
if self.version.is_some() { if self.version.is_some() {
@ -73,8 +53,9 @@ pub fn parse_s3_url(url_str: &str) -> Result<GstS3Url, String> {
} }
let host = url.host_str().unwrap(); let host = url.host_str().unwrap();
let region = host
.parse::<Region>() let region_str = host
.parse()
.or_else(|_| { .or_else(|_| {
let (name, endpoint) = host.split_once('+').ok_or(())?; let (name, endpoint) = host.split_once('+').ok_or(())?;
let name = let name =
@ -83,10 +64,15 @@ pub fn parse_s3_url(url_str: &str) -> Result<GstS3Url, String> {
base32::decode(base32::Alphabet::RFC4648 { padding: true }, endpoint).ok_or(())?; base32::decode(base32::Alphabet::RFC4648 { padding: true }, endpoint).ok_or(())?;
let name = String::from_utf8(name).map_err(|_| ())?; let name = String::from_utf8(name).map_err(|_| ())?;
let endpoint = String::from_utf8(endpoint).map_err(|_| ())?; let endpoint = String::from_utf8(endpoint).map_err(|_| ())?;
Ok(Region::Custom { name, endpoint }) Ok(format!("{}{}", name, endpoint))
}) })
.map_err(|_: ()| format!("Invalid region '{}'", host))?; .map_err(|_: ()| format!("Invalid region '{}'", host))?;
// Note that aws_sdk_s3::Region does not provide any error/validation
// methods to check the region argument being passed to it.
// See https://docs.rs/aws-sdk-s3/latest/aws_sdk_s3/struct.Region.html
let region = Region::new(region_str);
let mut path = url let mut path = url
.path_segments() .path_segments()
.ok_or_else(|| format!("Invalid uri '{}'", url))?; .ok_or_else(|| format!("Invalid uri '{}'", url))?;
@ -127,81 +113,3 @@ pub fn parse_s3_url(url_str: &str) -> Result<GstS3Url, String> {
version, version,
}) })
} }
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn cannot_be_base() {
assert!(parse_s3_url("data:something").is_err());
}
#[test]
fn invalid_scheme() {
assert!(parse_s3_url("file:///dev/zero").is_err());
}
#[test]
fn bad_region() {
assert!(parse_s3_url("s3://atlantis-1/i-hope-we/dont-find-this").is_err());
}
#[test]
fn no_bucket() {
assert!(parse_s3_url("s3://ap-south-1").is_err());
assert!(parse_s3_url("s3://ap-south-1/").is_err());
}
#[test]
fn no_object() {
assert!(parse_s3_url("s3://ap-south-1/my-bucket").is_err());
assert!(parse_s3_url("s3://ap-south-1/my-bucket/").is_err());
}
#[test]
fn valid_simple() {
assert!(parse_s3_url("s3://ap-south-1/my-bucket/my-object").is_ok());
}
#[test]
fn extraneous_query() {
assert!(parse_s3_url("s3://ap-south-1/my-bucket/my-object?foo=bar").is_err());
}
#[test]
fn valid_version() {
assert!(parse_s3_url("s3://ap-south-1/my-bucket/my-object?version=one").is_ok());
}
#[test]
fn trailing_slash() {
// Slashes are valid at the end of the object key
assert_eq!(
parse_s3_url("s3://ap-south-1/my-bucket/my-object/")
.unwrap()
.object,
"my-object/"
);
}
#[test]
fn percent_encoding() {
assert_eq!(
parse_s3_url("s3://ap-south-1/my-bucket/my%20object")
.unwrap()
.object,
"my object"
);
}
#[test]
fn percent_decoding() {
assert_eq!(
parse_s3_url("s3://ap-south-1/my-bucket/my object")
.unwrap()
.to_string(),
"s3://ap-south-1/my-bucket/my%20object"
);
}
}

View file

@ -6,21 +6,22 @@
// //
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use futures::{future, Future, FutureExt, TryFutureExt}; use aws_config::meta::region::RegionProviderChain;
use aws_sdk_s3::{Credentials, Region};
use aws_types::sdk_config::SdkConfig;
use aws_smithy_http::byte_stream::{ByteStream, Error};
use aws_smithy_types::{timeout, tristate::TriState};
use bytes::{buf::BufMut, Bytes, BytesMut};
use futures::stream::TryStreamExt;
use futures::{future, Future};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use rusoto_core::RusotoError::{HttpDispatch, Unknown};
use rusoto_core::{HttpDispatchError, RusotoError};
use std::sync::Mutex; use std::sync::Mutex;
use std::time::Duration; use std::time::Duration;
use tokio::runtime; use tokio::runtime;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { const DEFAULT_S3_REGION: &str = "us-west-2";
gst::DebugCategory::new(
"rusotos3utils",
gst::DebugColorFlags::empty(),
Some("Amazon S3 utilities"),
)
});
static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| { static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| {
runtime::Builder::new_multi_thread() runtime::Builder::new_multi_thread()
@ -32,90 +33,18 @@ static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| {
}); });
#[derive(Debug)] #[derive(Debug)]
pub enum RetriableError<E> {
Rusoto(RusotoError<E>),
Std(std::io::Error),
}
pub enum WaitError<E> { pub enum WaitError<E> {
Cancelled, Cancelled,
FutureError(E), FutureError(E),
} }
fn make_timeout<F, T, E>( pub fn wait<F, T, E>(
timeout: Duration,
future: F,
) -> impl Future<Output = Result<T, RetriableError<E>>>
where
E: std::fmt::Debug,
F: Future<Output = Result<T, RetriableError<E>>>,
{
tokio::time::timeout(timeout, future).map(|v| match v {
// Future resolved succesfully
Ok(Ok(v)) => Ok(v),
// Future resolved with an error
Ok(Err(e)) => Err(e),
// Timeout elapsed
// Use an HttpDispatch error so the caller doesn't have to deal with this separately from
// other HTTP dispatch errors
_ => Err(RetriableError::Rusoto(HttpDispatch(
HttpDispatchError::new("Timeout".to_owned()),
))),
})
}
fn make_retry<F, T, E, Fut>(
timeout: Option<Duration>,
mut future: F,
) -> impl Future<Output = Result<T, RetriableError<E>>>
where
E: std::fmt::Debug,
F: FnMut() -> Fut,
Fut: Future<Output = Result<T, RetriableError<E>>>,
{
backoff::future::retry(
backoff::ExponentialBackoffBuilder::new()
.with_initial_interval(Duration::from_millis(500))
.with_multiplier(1.5)
.with_max_elapsed_time(timeout)
.build(),
move || {
future().map_err(|err| match err {
RetriableError::Rusoto(HttpDispatch(_)) => {
gst::warning!(CAT, "Error waiting for operation ({:?}), retrying", err);
backoff::Error::transient(err)
}
RetriableError::Rusoto(Unknown(ref response)) => {
gst::warning!(
CAT,
"Unknown error waiting for operation ({:?}), retrying",
response
);
// Retry on 5xx errors
if response.status.is_server_error() {
backoff::Error::transient(err)
} else {
backoff::Error::permanent(err)
}
}
_ => backoff::Error::permanent(err),
})
},
)
}
pub fn wait_retry<F, T, E, Fut>(
canceller: &Mutex<Option<future::AbortHandle>>, canceller: &Mutex<Option<future::AbortHandle>>,
req_timeout: Option<Duration>, future: F,
retry_timeout: Option<Duration>, ) -> Result<T, WaitError<E>>
mut future: F,
) -> Result<T, WaitError<RetriableError<E>>>
where where
E: std::fmt::Debug, F: Send + Future<Output = Result<T, E>>,
F: FnMut() -> Fut, F::Output: Send,
Fut: Send + Future<Output = Result<T, RetriableError<E>>>,
Fut::Output: Send,
T: Send, T: Send,
E: Send, E: Send,
{ {
@ -125,28 +54,14 @@ where
canceller_guard.replace(abort_handle); canceller_guard.replace(abort_handle);
drop(canceller_guard); drop(canceller_guard);
let abortable_future = future::Abortable::new(future, abort_registration);
// FIXME: add a timeout as well
let res = { let res = {
let _enter = RUNTIME.enter(); let _enter = RUNTIME.enter();
futures::executor::block_on(async { futures::executor::block_on(async {
// The order of this future stack matters: the innermost future is the supplied future match abortable_future.await {
// generator closure. We wrap that in a timeout to bound how long we wait. This, in
// turn, is wrapped in a retrying future which will make multiple attempts until it
// ultimately fails.
// The timeout must be created within the tokio executor
let res = match req_timeout {
None => {
let retry_future = make_retry(retry_timeout, future);
future::Abortable::new(retry_future, abort_registration).await
}
Some(t) => {
let timeout_future = || make_timeout(t, future());
let retry_future = make_retry(retry_timeout, timeout_future);
future::Abortable::new(retry_future, abort_registration).await
}
};
match res {
// Future resolved successfully // Future resolved successfully
Ok(Ok(res)) => Ok(res), Ok(Ok(res)) => Ok(res),
// Future resolved with an error // Future resolved with an error
@ -164,16 +79,94 @@ where
res res
} }
pub fn duration_from_millis(millis: i64) -> Option<Duration> { pub fn wait_stream(
canceller: &Mutex<Option<future::AbortHandle>>,
stream: &mut ByteStream,
) -> Result<Bytes, WaitError<Error>> {
wait(canceller, async move {
let mut collect = BytesMut::new();
// Loop over the stream and collect till we're done
while let Some(item) = stream.try_next().await? {
collect.put(item)
}
Ok::<Bytes, Error>(collect.freeze())
})
}
// See setting-timeouts example in aws-sdk-rust.
pub fn timeout_config(request_timeout: Duration) -> timeout::Config {
timeout::Config::new().with_api_timeouts(
timeout::Api::new()
// This timeout acts at the "Request to a service" level. When the SDK makes a request to a
// service, that "request" can contain several HTTP requests. This way, you can retry
// failures that are likely spurious, or refresh credentials.
.with_call_timeout(TriState::Set(request_timeout))
// This timeout acts at the "HTTP request" level and sets a separate timeout for each
// HTTP request made as part of a "service request."
.with_call_attempt_timeout(TriState::Set(request_timeout)),
)
}
pub fn wait_config(
canceller: &Mutex<Option<future::AbortHandle>>,
region: Region,
timeout_config: timeout::Config,
credentials: Option<Credentials>,
) -> Result<SdkConfig, WaitError<Error>> {
let region_provider = RegionProviderChain::first_try(region)
.or_default_provider()
.or_else(Region::new(DEFAULT_S3_REGION));
let config_future = match credentials {
Some(cred) => aws_config::from_env()
.timeout_config(timeout_config)
.region(region_provider)
.credentials_provider(cred)
.load(),
None => aws_config::from_env()
.timeout_config(timeout_config)
.region(region_provider)
.load(),
};
let mut canceller_guard = canceller.lock().unwrap();
let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
canceller_guard.replace(abort_handle);
drop(canceller_guard);
let abortable_future = future::Abortable::new(config_future, abort_registration);
let res = {
let _enter = RUNTIME.enter();
futures::executor::block_on(async {
match abortable_future.await {
// Future resolved successfully
Ok(config) => Ok(config),
// Canceller called before future resolved
Err(future::Aborted) => Err(WaitError::Cancelled),
}
})
};
/* Clear out the canceller */
canceller_guard = canceller.lock().unwrap();
*canceller_guard = None;
res
}
pub fn duration_from_millis(millis: i64) -> Duration {
match millis { match millis {
-1 => None, -1 => Duration::MAX,
v => Some(Duration::from_millis(v as u64)), v => Duration::from_millis(v as u64),
} }
} }
pub fn duration_to_millis(dur: Option<Duration>) -> i64 { pub fn duration_to_millis(dur: Option<Duration>) -> i64 {
match dur { match dur {
None => -1, None => Duration::MAX.as_millis() as i64,
Some(d) => d.as_millis() as i64, Some(d) => d.as_millis() as i64,
} }
} }