rusoto: Implement auth via explicit access-key/secret-access-key properties

This allows passing them explicitly as strings to the elements instead
of relying on system/per-user configuration.
This commit is contained in:
Sebastian Dröge 2021-09-27 16:49:12 +03:00
parent 3ce6a5f403
commit 502b336361
3 changed files with 193 additions and 28 deletions

View file

@ -26,7 +26,7 @@ use gst::{
use std::default::Default; use std::default::Default;
use rusoto_core::Region; use rusoto_core::Region;
use rusoto_credential::{ChainProvider, ProvideAwsCredentials}; use rusoto_credential::{ChainProvider, ProvideAwsCredentials, StaticProvider};
use rusoto_signature::signature::SignedRequest; use rusoto_signature::signature::SignedRequest;
@ -127,6 +127,8 @@ struct Settings {
vocabulary: Option<String>, vocabulary: Option<String>,
session_id: Option<String>, session_id: Option<String>,
results_stability: AwsTranscriberResultStability, results_stability: AwsTranscriberResultStability,
access_key: Option<String>,
secret_access_key: Option<String>,
} }
impl Default for Settings { impl Default for Settings {
@ -138,6 +140,8 @@ impl Default for Settings {
vocabulary: None, vocabulary: None,
session_id: None, session_id: None,
results_stability: DEFAULT_STABILITY, results_stability: DEFAULT_STABILITY,
access_key: None,
secret_access_key: None,
} }
} }
} }
@ -857,15 +861,39 @@ impl Transcriber {
gst_info!(CAT, obj: element, "Connecting .."); gst_info!(CAT, obj: element, "Connecting ..");
let creds = { let creds = match (
let _enter = RUNTIME.enter(); settings.access_key.as_ref(),
futures::executor::block_on(ChainProvider::new().credentials()).map_err(|err| { settings.secret_access_key.as_ref(),
gst_error!(CAT, obj: element, "Failed to generate credentials: {}", err); ) {
error_msg!( (Some(access_key), Some(secret_access_key)) => {
gst::CoreError::Failed, let _enter = RUNTIME.enter();
["Failed to generate credentials: {}", err] futures::executor::block_on(
) StaticProvider::new_minimal(access_key.clone(), secret_access_key.clone())
})? .credentials()
.map_err(|err| {
gst_error!(
CAT,
obj: element,
"Failed to generate credentials: {}",
err
);
error_msg!(
gst::CoreError::Failed,
["Failed to generate credentials: {}", err]
)
}),
)?
}
_ => {
let _enter = RUNTIME.enter();
futures::executor::block_on(ChainProvider::new().credentials()).map_err(|err| {
gst_error!(CAT, obj: element, "Failed to generate credentials: {}", err);
error_msg!(
gst::CoreError::Failed,
["Failed to generate credentials: {}", err]
)
})?
}
}; };
let language_code = settings let language_code = settings
@ -1113,6 +1141,20 @@ impl ObjectImpl for Transcriber {
DEFAULT_STABILITY as i32, DEFAULT_STABILITY as i32,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
), ),
glib::ParamSpec::new_string(
"access-key",
"Access Key",
"AWS Access Key",
None,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpec::new_string(
"secret-access-key",
"Secret Access Key",
"AWS Secret Access Key",
None,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
] ]
}); });
@ -1165,6 +1207,14 @@ impl ObjectImpl for Transcriber {
.get::<AwsTranscriberResultStability>() .get::<AwsTranscriberResultStability>()
.expect("type checked upstream"); .expect("type checked upstream");
} }
"access-key" => {
let mut settings = self.settings.lock().unwrap();
settings.access_key = value.get().expect("type checked upstream");
}
"secret-access-key" => {
let mut settings = self.settings.lock().unwrap();
settings.secret_access_key = value.get().expect("type checked upstream");
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
@ -1195,6 +1245,14 @@ impl ObjectImpl for Transcriber {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
settings.results_stability.to_value() settings.results_stability.to_value()
} }
"access-key" => {
let settings = self.settings.lock().unwrap();
settings.access_key.to_value()
}
"secret-access-key" => {
let settings = self.settings.lock().unwrap();
settings.secret_access_key.to_value()
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }

View file

@ -14,7 +14,8 @@ use gst::{gst_debug, gst_error, gst_info, gst_trace};
use gst_base::subclass::prelude::*; use gst_base::subclass::prelude::*;
use futures::future; use futures::future;
use rusoto_core::region::Region; use rusoto_core::{region::Region, request::HttpClient};
use rusoto_credential::StaticProvider;
use rusoto_s3::{ use rusoto_s3::{
CompleteMultipartUploadRequest, CompletedMultipartUpload, CompletedPart, CompleteMultipartUploadRequest, CompletedMultipartUpload, CompletedPart,
CreateMultipartUploadRequest, S3Client, UploadPartRequest, S3, CreateMultipartUploadRequest, S3Client, UploadPartRequest, S3,
@ -86,6 +87,8 @@ struct Settings {
key: Option<String>, key: Option<String>,
content_type: Option<String>, content_type: Option<String>,
buffer_size: u64, buffer_size: u64,
access_key: Option<String>,
secret_access_key: Option<String>,
} }
impl Settings { impl Settings {
@ -107,6 +110,8 @@ impl Default for Settings {
key: None, key: None,
content_type: None, content_type: None,
buffer_size: DEFAULT_BUFFER_SIZE, buffer_size: DEFAULT_BUFFER_SIZE,
access_key: None,
secret_access_key: None,
} }
} }
} }
@ -273,7 +278,21 @@ impl S3Sink {
} }
}; };
let client = S3Client::new(s3url.region.clone()); let client = match (
settings.access_key.as_ref(),
settings.secret_access_key.as_ref(),
) {
(Some(access_key), Some(secret_access_key)) => {
let creds =
StaticProvider::new_minimal(access_key.clone(), secret_access_key.clone());
S3Client::new_with(
HttpClient::new().expect("failed to create request dispatcher"),
creds,
s3url.region.clone(),
)
}
_ => S3Client::new(s3url.region.clone()),
};
let create_multipart_req = self.create_create_multipart_upload_request(&s3url, &settings); let create_multipart_req = self.create_create_multipart_upload_request(&s3url, &settings);
let create_multipart_req_future = client.create_multipart_upload(create_multipart_req); let create_multipart_req_future = client.create_multipart_upload(create_multipart_req);
@ -434,6 +453,20 @@ impl ObjectImpl for S3Sink {
None, None,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
), ),
glib::ParamSpec::new_string(
"access-key",
"Access Key",
"AWS Access Key",
None,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpec::new_string(
"secret-access-key",
"Secret Access Key",
"AWS Secret Access Key",
None,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
] ]
}); });
@ -480,6 +513,12 @@ impl ObjectImpl for S3Sink {
"uri" => { "uri" => {
let _ = self.set_uri(obj, value.get().expect("type checked upstream")); let _ = self.set_uri(obj, value.get().expect("type checked upstream"));
} }
"access-key" => {
settings.access_key = value.get().expect("type checked upstream");
}
"secret-access-key" => {
settings.secret_access_key = value.get().expect("type checked upstream");
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
@ -500,6 +539,8 @@ impl ObjectImpl for S3Sink {
url.to_value() url.to_value()
} }
"access-key" => settings.access_key.to_value(),
"secret-access-key" => settings.secret_access_key.to_value(),
_ => unimplemented!(), _ => unimplemented!(),
} }
} }

View file

@ -11,7 +11,9 @@ use std::sync::Mutex;
use bytes::Bytes; use bytes::Bytes;
use futures::future; use futures::future;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use rusoto_s3::*; use rusoto_core::request::HttpClient;
use rusoto_credential::StaticProvider;
use rusoto_s3::{GetObjectRequest, HeadObjectRequest, S3Client, S3};
use gst::glib; use gst::glib;
use gst::prelude::*; use gst::prelude::*;
@ -41,9 +43,25 @@ impl Default for StreamingState {
} }
} }
struct Settings {
url: Option<GstS3Url>,
access_key: Option<String>,
secret_access_key: Option<String>,
}
impl Default for Settings {
fn default() -> Self {
Settings {
url: None,
access_key: None,
secret_access_key: None,
}
}
}
#[derive(Default)] #[derive(Default)]
pub struct S3Src { pub struct S3Src {
url: Mutex<Option<GstS3Url>>, settings: Mutex<Settings>,
state: Mutex<StreamingState>, state: Mutex<StreamingState>,
canceller: Mutex<Option<future::AbortHandle>>, canceller: Mutex<Option<future::AbortHandle>>,
} }
@ -66,7 +84,23 @@ impl S3Src {
} }
fn connect(self: &S3Src, url: &GstS3Url) -> S3Client { fn connect(self: &S3Src, url: &GstS3Url) -> S3Client {
S3Client::new(url.region.clone()) let settings = self.settings.lock().unwrap();
match (
settings.access_key.as_ref(),
settings.secret_access_key.as_ref(),
) {
(Some(access_key), Some(secret_access_key)) => {
let creds =
StaticProvider::new_minimal(access_key.clone(), secret_access_key.clone());
S3Client::new_with(
HttpClient::new().expect("failed to create request dispatcher"),
creds,
url.region.clone(),
)
}
_ => S3Client::new(url.region.clone()),
}
} }
fn set_uri(self: &S3Src, _: &super::S3Src, url_str: Option<&str>) -> Result<(), glib::Error> { fn set_uri(self: &S3Src, _: &super::S3Src, url_str: Option<&str>) -> Result<(), glib::Error> {
@ -79,17 +113,17 @@ impl S3Src {
)); ));
} }
let mut url = self.url.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
if url_str.is_none() { if url_str.is_none() {
*url = None; settings.url = None;
return Ok(()); return Ok(());
} }
let url_str = url_str.unwrap(); let url_str = url_str.unwrap();
match parse_s3_url(url_str) { match parse_s3_url(url_str) {
Ok(s3url) => { Ok(s3url) => {
*url = Some(s3url); settings.url = Some(s3url);
Ok(()) Ok(())
} }
Err(_) => Err(glib::Error::new( Err(_) => Err(glib::Error::new(
@ -212,13 +246,29 @@ impl ObjectSubclass for S3Src {
impl ObjectImpl for S3Src { impl ObjectImpl for S3Src {
fn properties() -> &'static [glib::ParamSpec] { fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| { static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![glib::ParamSpec::new_string( vec![
"uri", glib::ParamSpec::new_string(
"URI", "uri",
"The S3 object URI", "URI",
None, "The S3 object URI",
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, None,
)] glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpec::new_string(
"access-key",
"Access Key",
"AWS Access Key",
None,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpec::new_string(
"secret-access-key",
"Secret Access Key",
"AWS Secret Access Key",
None,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
]
}); });
PROPERTIES.as_ref() PROPERTIES.as_ref()
@ -235,20 +285,32 @@ impl ObjectImpl for S3Src {
"uri" => { "uri" => {
let _ = self.set_uri(obj, value.get().expect("type checked upstream")); let _ = self.set_uri(obj, value.get().expect("type checked upstream"));
} }
"access-key" => {
let mut settings = self.settings.lock().unwrap();
settings.access_key = value.get().expect("type checked upstream");
}
"secret-access-key" => {
let mut settings = self.settings.lock().unwrap();
settings.secret_access_key = value.get().expect("type checked upstream");
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
fn property(&self, _: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { fn property(&self, _: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() { match pspec.name() {
"uri" => { "uri" => {
let url = match *self.url.lock().unwrap() { let url = match settings.url {
Some(ref url) => url.to_string(), Some(ref url) => url.to_string(),
None => "".to_string(), None => "".to_string(),
}; };
url.to_value() url.to_value()
} }
"access-key" => settings.access_key.to_value(),
"secret-access-key" => settings.secret_access_key.to_value(),
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
@ -302,7 +364,9 @@ impl URIHandlerImpl for S3Src {
} }
fn uri(&self, _: &Self::Type) -> Option<String> { fn uri(&self, _: &Self::Type) -> Option<String> {
self.url.lock().unwrap().as_ref().map(|s| s.to_string()) let settings = self.settings.lock().unwrap();
settings.url.as_ref().map(|s| s.to_string())
} }
fn set_uri(&self, element: &Self::Type, uri: &str) -> Result<(), glib::Error> { fn set_uri(&self, element: &Self::Type, uri: &str) -> Result<(), glib::Error> {
@ -329,7 +393,8 @@ impl BaseSrcImpl for S3Src {
unreachable!("RusotoS3Src is already started"); unreachable!("RusotoS3Src is already started");
} }
let s3url = match *self.url.lock().unwrap() { let settings = self.settings.lock().unwrap();
let s3url = match settings.url {
Some(ref url) => url.clone(), Some(ref url) => url.clone(),
None => { None => {
return Err(gst::error_msg!( return Err(gst::error_msg!(
@ -338,6 +403,7 @@ impl BaseSrcImpl for S3Src {
)); ));
} }
}; };
drop(settings);
let s3client = self.connect(&s3url); let s3client = self.connect(&s3url);
let size = self.head(src, &s3client, &s3url)?; let size = self.head(src, &s3client, &s3url)?;