2019-06-12 08:07:39 +00:00
|
|
|
// Copyright (C) 2017 Author: Arun Raghavan <arun@arunraghavan.net>
|
|
|
|
//
|
2022-01-15 18:40:12 +00:00
|
|
|
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
|
|
|
|
// If a copy of the MPL was not distributed with this file, You can obtain one at
|
|
|
|
// <https://mozilla.org/MPL/2.0/>.
|
|
|
|
//
|
|
|
|
// SPDX-License-Identifier: MPL-2.0
|
2019-06-12 08:07:39 +00:00
|
|
|
|
2022-05-14 05:01:35 +00:00
|
|
|
use aws_config::meta::region::RegionProviderChain;
|
2023-11-17 09:11:13 +00:00
|
|
|
use aws_sdk_s3::{
|
|
|
|
config::{timeout::TimeoutConfig, Credentials, Region},
|
2024-06-26 08:43:58 +00:00
|
|
|
error::{DisplayErrorContext, ProvideErrorMetadata},
|
2023-11-17 09:11:13 +00:00
|
|
|
primitives::{ByteStream, ByteStreamError},
|
|
|
|
};
|
2022-05-14 05:01:35 +00:00
|
|
|
use aws_types::sdk_config::SdkConfig;
|
|
|
|
|
|
|
|
use bytes::{buf::BufMut, Bytes, BytesMut};
|
|
|
|
use futures::{future, Future};
|
2024-01-31 15:07:56 +00:00
|
|
|
use once_cell::sync::Lazy;
|
aws: improve error message logs
The `Display` and `Debug` trait for the AWS error messages are not very useful.
- `Display` only prints the high level error, e.g.: "service error".
- `Debug` prints all the fields in the error stack, resulting in hard to read
messages with redudant or unnecessary information. E.g.:
> ServiceError(ServiceError { source: BadRequestException(BadRequestException {
> message: Some("1 validation error detected: Value 'test' at 'languageCode'
> failed to satisfy constraint: Member must satisfy enum value set: [ar-AE,
> zh-HK, en-US, ar-SA, zh-CN, fi-FI, pl-PL, no-NO, nl-NL, pt-PT, es-ES, th-TH,
> de-DE, it-IT, fr-FR, ko-KR, hi-IN, en-AU, pt-BR, sv-SE, ja-JP, ca-ES, es-US,
> fr-CA, en-GB]"), meta: ErrorMetadata { code: Some("BadRequestException"),
> message: Some("1 validation error detected: Value 'test' at 'languageCode'
> failed to satisfy constraint: Member must satisfy enum value set: [ar-AE,
> zh-HK, en-US, ar-SA, zh-CN, fi-FI, pl-PL, no-NO, nl-NL, pt-PT, es-ES, th-TH,
> de-DE, it-IT, fr-FR, ko-KR, hi-IN, en-AU, pt-BR, sv-SE, ja-JP, ca-ES, es-US,
> fr-CA, en-GB]"), extras: Some({"aws_request_id": "1b8bbafd-5b71-4ba5-8676-28432381e6a9"}) } }),
> raw: Response { status: StatusCode(400), headers: Headers { headers:
> {"x-amzn-requestid": HeaderValue { _private: H0("1b8bbafd-5b71-4ba5-8676-28432381e6a9") },
> "x-amzn-errortype": HeaderValue { _private:
> H0("BadRequestException:http://internal.amazon.com/coral/com.amazonaws.transcribe.streaming/") },
> "date": HeaderValue { _private: H0("Tue, 26 Mar 2024 17:41:31 GMT") },
> "content-type": HeaderValue { _private: H0("application/x-amz-json-1.1") },
> "content-length": HeaderValue { _private: H0("315") }} }, body: SdkBody {
> inner: Once(Some(b"{\"Message\":\"1 validation error detected: Value 'test'
> at 'languageCode' failed to satisfy constraint: Member must satisfy enum value
> set: [ar-AE, zh-HK, en-US, ar-SA, zh-CN, fi-FI, pl-PL, no-NO, nl-NL, pt-PT,
> es-ES, th-TH, de-DE, it-IT, fr-FR, ko-KR, hi-IN, en-AU, pt-BR, sv-SE, ja-JP,
> ca-ES, es-US, fr-CA, en-GB]\"}")), retryable: true }, extensions: Extensions {
> extensions_02x: Extensions, extensions_1x: Extensions } } })
This commit adopts the most informative and concise solution I could come up
with to log AWS errors. With the above error case, this results in:
> service error: Error { code: "BadRequestException", message: "1 validation
> error detected: Value 'test' at 'languageCode' failed to satisfy constraint:
> Member must satisfy enum value set: [ar-AE, zh-HK, en-US, ar-SA, zh-CN, fi-FI,
> pl-PL, no-NO, nl-NL, pt-PT, es-ES, th-TH, de-DE, it-IT, fr-FR, ko-KR, hi-IN,
> en-AU, pt-BR, sv-SE, ja-JP, ca-ES, es-US, fr-CA, en-GB]",
> aws_request_id: "a40a32a8-7b0b-4228-a348-f8502087a9f0" }
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1521>
2024-03-26 19:05:32 +00:00
|
|
|
use std::fmt;
|
2019-06-12 08:07:39 +00:00
|
|
|
use std::sync::Mutex;
|
2022-02-23 18:10:30 +00:00
|
|
|
use std::time::Duration;
|
2019-06-12 08:07:39 +00:00
|
|
|
use tokio::runtime;
|
|
|
|
|
2024-03-26 16:13:53 +00:00
|
|
|
pub const DEFAULT_S3_REGION: &str = "us-west-2";
|
|
|
|
|
2024-10-20 19:03:56 +00:00
|
|
|
#[allow(deprecated)]
|
2024-03-26 16:13:53 +00:00
|
|
|
pub static AWS_BEHAVIOR_VERSION: Lazy<aws_config::BehaviorVersion> =
|
|
|
|
Lazy::new(aws_config::BehaviorVersion::v2023_11_09);
|
2022-02-23 18:10:30 +00:00
|
|
|
|
2020-11-22 15:43:59 +00:00
|
|
|
static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| {
|
2021-01-09 10:14:31 +00:00
|
|
|
runtime::Builder::new_multi_thread()
|
2020-04-20 01:28:30 +00:00
|
|
|
.enable_all()
|
2021-01-09 10:14:31 +00:00
|
|
|
.worker_threads(2)
|
2022-05-26 09:52:42 +00:00
|
|
|
.thread_name("gst-aws-runtime")
|
2020-04-20 01:28:30 +00:00
|
|
|
.build()
|
2020-11-22 15:43:59 +00:00
|
|
|
.unwrap()
|
|
|
|
});
|
2020-04-20 01:28:30 +00:00
|
|
|
|
2022-03-18 09:01:21 +00:00
|
|
|
#[derive(Debug)]
|
2020-04-20 01:28:30 +00:00
|
|
|
pub enum WaitError<E> {
|
|
|
|
Cancelled,
|
|
|
|
FutureError(E),
|
|
|
|
}
|
|
|
|
|
aws: improve error message logs
The `Display` and `Debug` trait for the AWS error messages are not very useful.
- `Display` only prints the high level error, e.g.: "service error".
- `Debug` prints all the fields in the error stack, resulting in hard to read
messages with redudant or unnecessary information. E.g.:
> ServiceError(ServiceError { source: BadRequestException(BadRequestException {
> message: Some("1 validation error detected: Value 'test' at 'languageCode'
> failed to satisfy constraint: Member must satisfy enum value set: [ar-AE,
> zh-HK, en-US, ar-SA, zh-CN, fi-FI, pl-PL, no-NO, nl-NL, pt-PT, es-ES, th-TH,
> de-DE, it-IT, fr-FR, ko-KR, hi-IN, en-AU, pt-BR, sv-SE, ja-JP, ca-ES, es-US,
> fr-CA, en-GB]"), meta: ErrorMetadata { code: Some("BadRequestException"),
> message: Some("1 validation error detected: Value 'test' at 'languageCode'
> failed to satisfy constraint: Member must satisfy enum value set: [ar-AE,
> zh-HK, en-US, ar-SA, zh-CN, fi-FI, pl-PL, no-NO, nl-NL, pt-PT, es-ES, th-TH,
> de-DE, it-IT, fr-FR, ko-KR, hi-IN, en-AU, pt-BR, sv-SE, ja-JP, ca-ES, es-US,
> fr-CA, en-GB]"), extras: Some({"aws_request_id": "1b8bbafd-5b71-4ba5-8676-28432381e6a9"}) } }),
> raw: Response { status: StatusCode(400), headers: Headers { headers:
> {"x-amzn-requestid": HeaderValue { _private: H0("1b8bbafd-5b71-4ba5-8676-28432381e6a9") },
> "x-amzn-errortype": HeaderValue { _private:
> H0("BadRequestException:http://internal.amazon.com/coral/com.amazonaws.transcribe.streaming/") },
> "date": HeaderValue { _private: H0("Tue, 26 Mar 2024 17:41:31 GMT") },
> "content-type": HeaderValue { _private: H0("application/x-amz-json-1.1") },
> "content-length": HeaderValue { _private: H0("315") }} }, body: SdkBody {
> inner: Once(Some(b"{\"Message\":\"1 validation error detected: Value 'test'
> at 'languageCode' failed to satisfy constraint: Member must satisfy enum value
> set: [ar-AE, zh-HK, en-US, ar-SA, zh-CN, fi-FI, pl-PL, no-NO, nl-NL, pt-PT,
> es-ES, th-TH, de-DE, it-IT, fr-FR, ko-KR, hi-IN, en-AU, pt-BR, sv-SE, ja-JP,
> ca-ES, es-US, fr-CA, en-GB]\"}")), retryable: true }, extensions: Extensions {
> extensions_02x: Extensions, extensions_1x: Extensions } } })
This commit adopts the most informative and concise solution I could come up
with to log AWS errors. With the above error case, this results in:
> service error: Error { code: "BadRequestException", message: "1 validation
> error detected: Value 'test' at 'languageCode' failed to satisfy constraint:
> Member must satisfy enum value set: [ar-AE, zh-HK, en-US, ar-SA, zh-CN, fi-FI,
> pl-PL, no-NO, nl-NL, pt-PT, es-ES, th-TH, de-DE, it-IT, fr-FR, ko-KR, hi-IN,
> en-AU, pt-BR, sv-SE, ja-JP, ca-ES, es-US, fr-CA, en-GB]",
> aws_request_id: "a40a32a8-7b0b-4228-a348-f8502087a9f0" }
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1521>
2024-03-26 19:05:32 +00:00
|
|
|
impl<E: ProvideErrorMetadata + std::error::Error> fmt::Display for WaitError<E> {
|
|
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
|
match self {
|
|
|
|
WaitError::Cancelled => f.write_str("Cancelled"),
|
2024-06-26 08:43:58 +00:00
|
|
|
WaitError::FutureError(err) => {
|
|
|
|
write!(f, "{}: {}", DisplayErrorContext(&err), err.meta())
|
|
|
|
}
|
aws: improve error message logs
The `Display` and `Debug` trait for the AWS error messages are not very useful.
- `Display` only prints the high level error, e.g.: "service error".
- `Debug` prints all the fields in the error stack, resulting in hard to read
messages with redudant or unnecessary information. E.g.:
> ServiceError(ServiceError { source: BadRequestException(BadRequestException {
> message: Some("1 validation error detected: Value 'test' at 'languageCode'
> failed to satisfy constraint: Member must satisfy enum value set: [ar-AE,
> zh-HK, en-US, ar-SA, zh-CN, fi-FI, pl-PL, no-NO, nl-NL, pt-PT, es-ES, th-TH,
> de-DE, it-IT, fr-FR, ko-KR, hi-IN, en-AU, pt-BR, sv-SE, ja-JP, ca-ES, es-US,
> fr-CA, en-GB]"), meta: ErrorMetadata { code: Some("BadRequestException"),
> message: Some("1 validation error detected: Value 'test' at 'languageCode'
> failed to satisfy constraint: Member must satisfy enum value set: [ar-AE,
> zh-HK, en-US, ar-SA, zh-CN, fi-FI, pl-PL, no-NO, nl-NL, pt-PT, es-ES, th-TH,
> de-DE, it-IT, fr-FR, ko-KR, hi-IN, en-AU, pt-BR, sv-SE, ja-JP, ca-ES, es-US,
> fr-CA, en-GB]"), extras: Some({"aws_request_id": "1b8bbafd-5b71-4ba5-8676-28432381e6a9"}) } }),
> raw: Response { status: StatusCode(400), headers: Headers { headers:
> {"x-amzn-requestid": HeaderValue { _private: H0("1b8bbafd-5b71-4ba5-8676-28432381e6a9") },
> "x-amzn-errortype": HeaderValue { _private:
> H0("BadRequestException:http://internal.amazon.com/coral/com.amazonaws.transcribe.streaming/") },
> "date": HeaderValue { _private: H0("Tue, 26 Mar 2024 17:41:31 GMT") },
> "content-type": HeaderValue { _private: H0("application/x-amz-json-1.1") },
> "content-length": HeaderValue { _private: H0("315") }} }, body: SdkBody {
> inner: Once(Some(b"{\"Message\":\"1 validation error detected: Value 'test'
> at 'languageCode' failed to satisfy constraint: Member must satisfy enum value
> set: [ar-AE, zh-HK, en-US, ar-SA, zh-CN, fi-FI, pl-PL, no-NO, nl-NL, pt-PT,
> es-ES, th-TH, de-DE, it-IT, fr-FR, ko-KR, hi-IN, en-AU, pt-BR, sv-SE, ja-JP,
> ca-ES, es-US, fr-CA, en-GB]\"}")), retryable: true }, extensions: Extensions {
> extensions_02x: Extensions, extensions_1x: Extensions } } })
This commit adopts the most informative and concise solution I could come up
with to log AWS errors. With the above error case, this results in:
> service error: Error { code: "BadRequestException", message: "1 validation
> error detected: Value 'test' at 'languageCode' failed to satisfy constraint:
> Member must satisfy enum value set: [ar-AE, zh-HK, en-US, ar-SA, zh-CN, fi-FI,
> pl-PL, no-NO, nl-NL, pt-PT, es-ES, th-TH, de-DE, it-IT, fr-FR, ko-KR, hi-IN,
> en-AU, pt-BR, sv-SE, ja-JP, ca-ES, es-US, fr-CA, en-GB]",
> aws_request_id: "a40a32a8-7b0b-4228-a348-f8502087a9f0" }
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1521>
2024-03-26 19:05:32 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-06-07 15:55:43 +00:00
|
|
|
#[derive(Default)]
|
|
|
|
pub enum Canceller {
|
|
|
|
#[default]
|
|
|
|
None,
|
|
|
|
Handle(future::AbortHandle),
|
|
|
|
Cancelled,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Canceller {
|
|
|
|
pub fn abort(&mut self) {
|
|
|
|
if let Canceller::Handle(ref canceller) = *self {
|
|
|
|
canceller.abort();
|
|
|
|
}
|
|
|
|
|
|
|
|
*self = Canceller::Cancelled;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn wait<F, T, E>(canceller_mutex: &Mutex<Canceller>, future: F) -> Result<T, WaitError<E>>
|
2022-02-23 18:10:30 +00:00
|
|
|
where
|
2022-05-14 05:01:35 +00:00
|
|
|
F: Send + Future<Output = Result<T, E>>,
|
|
|
|
F::Output: Send,
|
|
|
|
T: Send,
|
|
|
|
E: Send,
|
2022-02-23 18:10:30 +00:00
|
|
|
{
|
2024-06-07 15:55:43 +00:00
|
|
|
let mut canceller = canceller_mutex.lock().unwrap();
|
|
|
|
if matches!(*canceller, Canceller::Cancelled) {
|
|
|
|
return Err(WaitError::Cancelled);
|
|
|
|
}
|
2022-05-14 05:01:35 +00:00
|
|
|
let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
|
2024-06-07 15:55:43 +00:00
|
|
|
*canceller = Canceller::Handle(abort_handle);
|
|
|
|
drop(canceller);
|
2022-05-14 05:01:35 +00:00
|
|
|
|
|
|
|
let abortable_future = future::Abortable::new(future, abort_registration);
|
|
|
|
|
|
|
|
// FIXME: add a timeout as well
|
|
|
|
|
|
|
|
let res = {
|
|
|
|
let _enter = RUNTIME.enter();
|
|
|
|
futures::executor::block_on(async {
|
|
|
|
match abortable_future.await {
|
|
|
|
// Future resolved successfully
|
|
|
|
Ok(Ok(res)) => Ok(res),
|
|
|
|
// Future resolved with an error
|
|
|
|
Ok(Err(err)) => Err(WaitError::FutureError(err)),
|
|
|
|
// Canceller called before future resolved
|
|
|
|
Err(future::Aborted) => Err(WaitError::Cancelled),
|
|
|
|
}
|
|
|
|
})
|
|
|
|
};
|
|
|
|
|
|
|
|
/* Clear out the canceller */
|
2024-06-07 15:55:43 +00:00
|
|
|
let mut canceller = canceller_mutex.lock().unwrap();
|
|
|
|
if matches!(*canceller, Canceller::Cancelled) {
|
|
|
|
return Err(WaitError::Cancelled);
|
|
|
|
}
|
|
|
|
*canceller = Canceller::None;
|
|
|
|
drop(canceller);
|
2022-05-14 05:01:35 +00:00
|
|
|
|
|
|
|
res
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn wait_stream(
|
2024-06-07 15:55:43 +00:00
|
|
|
canceller_mutex: &Mutex<Canceller>,
|
2022-05-14 05:01:35 +00:00
|
|
|
stream: &mut ByteStream,
|
2023-11-17 09:11:13 +00:00
|
|
|
) -> Result<Bytes, WaitError<ByteStreamError>> {
|
2024-06-07 15:55:43 +00:00
|
|
|
wait(canceller_mutex, async move {
|
2022-05-14 05:01:35 +00:00
|
|
|
let mut collect = BytesMut::new();
|
|
|
|
|
|
|
|
// Loop over the stream and collect till we're done
|
|
|
|
while let Some(item) = stream.try_next().await? {
|
|
|
|
collect.put(item)
|
|
|
|
}
|
|
|
|
|
2023-11-17 09:11:13 +00:00
|
|
|
Ok::<Bytes, ByteStreamError>(collect.freeze())
|
2022-02-23 18:10:30 +00:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2022-05-14 05:01:35 +00:00
|
|
|
// See setting-timeouts example in aws-sdk-rust.
|
2022-10-31 09:00:55 +00:00
|
|
|
pub fn timeout_config(request_timeout: Duration) -> TimeoutConfig {
|
|
|
|
TimeoutConfig::builder()
|
|
|
|
.operation_attempt_timeout(request_timeout)
|
|
|
|
.build()
|
2022-02-23 18:10:30 +00:00
|
|
|
}
|
|
|
|
|
2022-05-14 05:01:35 +00:00
|
|
|
pub fn wait_config(
|
2024-06-07 15:55:43 +00:00
|
|
|
canceller_mutex: &Mutex<Canceller>,
|
2022-05-14 05:01:35 +00:00
|
|
|
region: Region,
|
2022-10-31 09:00:55 +00:00
|
|
|
timeout_config: TimeoutConfig,
|
2022-05-14 05:01:35 +00:00
|
|
|
credentials: Option<Credentials>,
|
2023-11-17 09:11:13 +00:00
|
|
|
) -> Result<SdkConfig, WaitError<ByteStreamError>> {
|
2022-05-14 05:01:35 +00:00
|
|
|
let region_provider = RegionProviderChain::first_try(region)
|
|
|
|
.or_default_provider()
|
|
|
|
.or_else(Region::new(DEFAULT_S3_REGION));
|
|
|
|
let config_future = match credentials {
|
2024-10-20 19:28:19 +00:00
|
|
|
Some(cred) => aws_config::defaults(*AWS_BEHAVIOR_VERSION)
|
2022-05-14 05:01:35 +00:00
|
|
|
.timeout_config(timeout_config)
|
|
|
|
.region(region_provider)
|
|
|
|
.credentials_provider(cred)
|
|
|
|
.load(),
|
2024-10-20 19:28:19 +00:00
|
|
|
None => aws_config::defaults(*AWS_BEHAVIOR_VERSION)
|
2022-05-14 05:01:35 +00:00
|
|
|
.timeout_config(timeout_config)
|
|
|
|
.region(region_provider)
|
|
|
|
.load(),
|
|
|
|
};
|
|
|
|
|
2024-06-07 15:55:43 +00:00
|
|
|
let mut canceller = canceller_mutex.lock().unwrap();
|
|
|
|
if matches!(*canceller, Canceller::Cancelled) {
|
|
|
|
return Err(WaitError::Cancelled);
|
|
|
|
}
|
2022-02-23 18:10:30 +00:00
|
|
|
let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
|
2024-06-07 15:55:43 +00:00
|
|
|
*canceller = Canceller::Handle(abort_handle);
|
|
|
|
drop(canceller);
|
2022-02-23 18:10:30 +00:00
|
|
|
|
2022-05-14 05:01:35 +00:00
|
|
|
let abortable_future = future::Abortable::new(config_future, abort_registration);
|
|
|
|
|
2022-02-23 18:10:30 +00:00
|
|
|
let res = {
|
|
|
|
let _enter = RUNTIME.enter();
|
|
|
|
futures::executor::block_on(async {
|
2022-05-14 05:01:35 +00:00
|
|
|
match abortable_future.await {
|
2022-02-23 18:10:30 +00:00
|
|
|
// Future resolved successfully
|
2022-05-14 05:01:35 +00:00
|
|
|
Ok(config) => Ok(config),
|
2022-02-23 18:10:30 +00:00
|
|
|
// Canceller called before future resolved
|
|
|
|
Err(future::Aborted) => Err(WaitError::Cancelled),
|
|
|
|
}
|
|
|
|
})
|
|
|
|
};
|
|
|
|
|
|
|
|
/* Clear out the canceller */
|
2024-06-07 15:55:43 +00:00
|
|
|
let mut canceller = canceller_mutex.lock().unwrap();
|
|
|
|
if matches!(*canceller, Canceller::Cancelled) {
|
|
|
|
return Err(WaitError::Cancelled);
|
|
|
|
}
|
|
|
|
*canceller = Canceller::None;
|
|
|
|
drop(canceller);
|
2022-02-23 18:10:30 +00:00
|
|
|
|
|
|
|
res
|
|
|
|
}
|
2022-03-18 10:10:18 +00:00
|
|
|
|
2022-05-14 05:01:35 +00:00
|
|
|
pub fn duration_from_millis(millis: i64) -> Duration {
|
2022-03-18 10:10:18 +00:00
|
|
|
match millis {
|
2022-05-14 05:01:35 +00:00
|
|
|
-1 => Duration::MAX,
|
|
|
|
v => Duration::from_millis(v as u64),
|
2022-03-18 10:10:18 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn duration_to_millis(dur: Option<Duration>) -> i64 {
|
|
|
|
match dur {
|
2022-05-14 05:01:35 +00:00
|
|
|
None => Duration::MAX.as_millis() as i64,
|
2022-03-18 10:10:18 +00:00
|
|
|
Some(d) => d.as_millis() as i64,
|
|
|
|
}
|
|
|
|
}
|