From 502b336361da8a4ddd5ab96e7009359f2c559394 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 27 Sep 2021 16:49:12 +0300 Subject: [PATCH] rusoto: Implement auth via explicit access-key/secret-access-key properties This allows passing them explicitly as strings to the elements instead of relying on system/per-user configuration. --- net/rusoto/src/aws_transcriber/imp.rs | 78 ++++++++++++++++++--- net/rusoto/src/s3sink/imp.rs | 45 +++++++++++- net/rusoto/src/s3src/imp.rs | 98 ++++++++++++++++++++++----- 3 files changed, 193 insertions(+), 28 deletions(-) diff --git a/net/rusoto/src/aws_transcriber/imp.rs b/net/rusoto/src/aws_transcriber/imp.rs index ce3a53e8..07a3843e 100644 --- a/net/rusoto/src/aws_transcriber/imp.rs +++ b/net/rusoto/src/aws_transcriber/imp.rs @@ -26,7 +26,7 @@ use gst::{ use std::default::Default; use rusoto_core::Region; -use rusoto_credential::{ChainProvider, ProvideAwsCredentials}; +use rusoto_credential::{ChainProvider, ProvideAwsCredentials, StaticProvider}; use rusoto_signature::signature::SignedRequest; @@ -127,6 +127,8 @@ struct Settings { vocabulary: Option, session_id: Option, results_stability: AwsTranscriberResultStability, + access_key: Option, + secret_access_key: Option, } impl Default for Settings { @@ -138,6 +140,8 @@ impl Default for Settings { vocabulary: None, session_id: None, results_stability: DEFAULT_STABILITY, + access_key: None, + secret_access_key: None, } } } @@ -857,15 +861,39 @@ impl Transcriber { gst_info!(CAT, obj: element, "Connecting .."); - let creds = { - let _enter = RUNTIME.enter(); - futures::executor::block_on(ChainProvider::new().credentials()).map_err(|err| { - gst_error!(CAT, obj: element, "Failed to generate credentials: {}", err); - error_msg!( - gst::CoreError::Failed, - ["Failed to generate credentials: {}", err] - ) - })? + let creds = match ( + settings.access_key.as_ref(), + settings.secret_access_key.as_ref(), + ) { + (Some(access_key), Some(secret_access_key)) => { + let _enter = RUNTIME.enter(); + futures::executor::block_on( + StaticProvider::new_minimal(access_key.clone(), secret_access_key.clone()) + .credentials() + .map_err(|err| { + gst_error!( + CAT, + obj: element, + "Failed to generate credentials: {}", + err + ); + error_msg!( + gst::CoreError::Failed, + ["Failed to generate credentials: {}", err] + ) + }), + )? + } + _ => { + let _enter = RUNTIME.enter(); + futures::executor::block_on(ChainProvider::new().credentials()).map_err(|err| { + gst_error!(CAT, obj: element, "Failed to generate credentials: {}", err); + error_msg!( + gst::CoreError::Failed, + ["Failed to generate credentials: {}", err] + ) + })? + } }; let language_code = settings @@ -1113,6 +1141,20 @@ impl ObjectImpl for Transcriber { DEFAULT_STABILITY as i32, glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, ), + glib::ParamSpec::new_string( + "access-key", + "Access Key", + "AWS Access Key", + None, + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), + glib::ParamSpec::new_string( + "secret-access-key", + "Secret Access Key", + "AWS Secret Access Key", + None, + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), ] }); @@ -1165,6 +1207,14 @@ impl ObjectImpl for Transcriber { .get::() .expect("type checked upstream"); } + "access-key" => { + let mut settings = self.settings.lock().unwrap(); + settings.access_key = value.get().expect("type checked upstream"); + } + "secret-access-key" => { + let mut settings = self.settings.lock().unwrap(); + settings.secret_access_key = value.get().expect("type checked upstream"); + } _ => unimplemented!(), } } @@ -1195,6 +1245,14 @@ impl ObjectImpl for Transcriber { let settings = self.settings.lock().unwrap(); settings.results_stability.to_value() } + "access-key" => { + let settings = self.settings.lock().unwrap(); + settings.access_key.to_value() + } + "secret-access-key" => { + let settings = self.settings.lock().unwrap(); + settings.secret_access_key.to_value() + } _ => unimplemented!(), } } diff --git a/net/rusoto/src/s3sink/imp.rs b/net/rusoto/src/s3sink/imp.rs index 1428b38e..6f74e94e 100644 --- a/net/rusoto/src/s3sink/imp.rs +++ b/net/rusoto/src/s3sink/imp.rs @@ -14,7 +14,8 @@ use gst::{gst_debug, gst_error, gst_info, gst_trace}; use gst_base::subclass::prelude::*; use futures::future; -use rusoto_core::region::Region; +use rusoto_core::{region::Region, request::HttpClient}; +use rusoto_credential::StaticProvider; use rusoto_s3::{ CompleteMultipartUploadRequest, CompletedMultipartUpload, CompletedPart, CreateMultipartUploadRequest, S3Client, UploadPartRequest, S3, @@ -86,6 +87,8 @@ struct Settings { key: Option, content_type: Option, buffer_size: u64, + access_key: Option, + secret_access_key: Option, } impl Settings { @@ -107,6 +110,8 @@ impl Default for Settings { key: None, content_type: None, buffer_size: DEFAULT_BUFFER_SIZE, + access_key: None, + secret_access_key: None, } } } @@ -273,7 +278,21 @@ impl S3Sink { } }; - let client = S3Client::new(s3url.region.clone()); + let client = match ( + settings.access_key.as_ref(), + settings.secret_access_key.as_ref(), + ) { + (Some(access_key), Some(secret_access_key)) => { + let creds = + StaticProvider::new_minimal(access_key.clone(), secret_access_key.clone()); + S3Client::new_with( + HttpClient::new().expect("failed to create request dispatcher"), + creds, + s3url.region.clone(), + ) + } + _ => S3Client::new(s3url.region.clone()), + }; let create_multipart_req = self.create_create_multipart_upload_request(&s3url, &settings); let create_multipart_req_future = client.create_multipart_upload(create_multipart_req); @@ -434,6 +453,20 @@ impl ObjectImpl for S3Sink { None, glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, ), + glib::ParamSpec::new_string( + "access-key", + "Access Key", + "AWS Access Key", + None, + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), + glib::ParamSpec::new_string( + "secret-access-key", + "Secret Access Key", + "AWS Secret Access Key", + None, + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), ] }); @@ -480,6 +513,12 @@ impl ObjectImpl for S3Sink { "uri" => { let _ = self.set_uri(obj, value.get().expect("type checked upstream")); } + "access-key" => { + settings.access_key = value.get().expect("type checked upstream"); + } + "secret-access-key" => { + settings.secret_access_key = value.get().expect("type checked upstream"); + } _ => unimplemented!(), } } @@ -500,6 +539,8 @@ impl ObjectImpl for S3Sink { url.to_value() } + "access-key" => settings.access_key.to_value(), + "secret-access-key" => settings.secret_access_key.to_value(), _ => unimplemented!(), } } diff --git a/net/rusoto/src/s3src/imp.rs b/net/rusoto/src/s3src/imp.rs index 53b2bc3b..5f1c6f51 100644 --- a/net/rusoto/src/s3src/imp.rs +++ b/net/rusoto/src/s3src/imp.rs @@ -11,7 +11,9 @@ use std::sync::Mutex; use bytes::Bytes; use futures::future; use once_cell::sync::Lazy; -use rusoto_s3::*; +use rusoto_core::request::HttpClient; +use rusoto_credential::StaticProvider; +use rusoto_s3::{GetObjectRequest, HeadObjectRequest, S3Client, S3}; use gst::glib; use gst::prelude::*; @@ -41,9 +43,25 @@ impl Default for StreamingState { } } +struct Settings { + url: Option, + access_key: Option, + secret_access_key: Option, +} + +impl Default for Settings { + fn default() -> Self { + Settings { + url: None, + access_key: None, + secret_access_key: None, + } + } +} + #[derive(Default)] pub struct S3Src { - url: Mutex>, + settings: Mutex, state: Mutex, canceller: Mutex>, } @@ -66,7 +84,23 @@ impl S3Src { } fn connect(self: &S3Src, url: &GstS3Url) -> S3Client { - S3Client::new(url.region.clone()) + let settings = self.settings.lock().unwrap(); + + match ( + settings.access_key.as_ref(), + settings.secret_access_key.as_ref(), + ) { + (Some(access_key), Some(secret_access_key)) => { + let creds = + StaticProvider::new_minimal(access_key.clone(), secret_access_key.clone()); + S3Client::new_with( + HttpClient::new().expect("failed to create request dispatcher"), + creds, + url.region.clone(), + ) + } + _ => S3Client::new(url.region.clone()), + } } fn set_uri(self: &S3Src, _: &super::S3Src, url_str: Option<&str>) -> Result<(), glib::Error> { @@ -79,17 +113,17 @@ impl S3Src { )); } - let mut url = self.url.lock().unwrap(); + let mut settings = self.settings.lock().unwrap(); if url_str.is_none() { - *url = None; + settings.url = None; return Ok(()); } let url_str = url_str.unwrap(); match parse_s3_url(url_str) { Ok(s3url) => { - *url = Some(s3url); + settings.url = Some(s3url); Ok(()) } Err(_) => Err(glib::Error::new( @@ -212,13 +246,29 @@ impl ObjectSubclass for S3Src { impl ObjectImpl for S3Src { fn properties() -> &'static [glib::ParamSpec] { static PROPERTIES: Lazy> = Lazy::new(|| { - vec![glib::ParamSpec::new_string( - "uri", - "URI", - "The S3 object URI", - None, - glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, - )] + vec![ + glib::ParamSpec::new_string( + "uri", + "URI", + "The S3 object URI", + None, + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), + glib::ParamSpec::new_string( + "access-key", + "Access Key", + "AWS Access Key", + None, + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), + glib::ParamSpec::new_string( + "secret-access-key", + "Secret Access Key", + "AWS Secret Access Key", + None, + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), + ] }); PROPERTIES.as_ref() @@ -235,20 +285,32 @@ impl ObjectImpl for S3Src { "uri" => { let _ = self.set_uri(obj, value.get().expect("type checked upstream")); } + "access-key" => { + let mut settings = self.settings.lock().unwrap(); + settings.access_key = value.get().expect("type checked upstream"); + } + "secret-access-key" => { + let mut settings = self.settings.lock().unwrap(); + settings.secret_access_key = value.get().expect("type checked upstream"); + } _ => unimplemented!(), } } fn property(&self, _: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + let settings = self.settings.lock().unwrap(); + match pspec.name() { "uri" => { - let url = match *self.url.lock().unwrap() { + let url = match settings.url { Some(ref url) => url.to_string(), None => "".to_string(), }; url.to_value() } + "access-key" => settings.access_key.to_value(), + "secret-access-key" => settings.secret_access_key.to_value(), _ => unimplemented!(), } } @@ -302,7 +364,9 @@ impl URIHandlerImpl for S3Src { } fn uri(&self, _: &Self::Type) -> Option { - self.url.lock().unwrap().as_ref().map(|s| s.to_string()) + let settings = self.settings.lock().unwrap(); + + settings.url.as_ref().map(|s| s.to_string()) } fn set_uri(&self, element: &Self::Type, uri: &str) -> Result<(), glib::Error> { @@ -329,7 +393,8 @@ impl BaseSrcImpl for S3Src { unreachable!("RusotoS3Src is already started"); } - let s3url = match *self.url.lock().unwrap() { + let settings = self.settings.lock().unwrap(); + let s3url = match settings.url { Some(ref url) => url.clone(), None => { return Err(gst::error_msg!( @@ -338,6 +403,7 @@ impl BaseSrcImpl for S3Src { )); } }; + drop(settings); let s3client = self.connect(&s3url); let size = self.head(src, &s3client, &s3url)?;