diff --git a/net/rusoto/README.md b/net/rusoto/README.md index 19460fbb2..18c49900c 100644 --- a/net/rusoto/README.md +++ b/net/rusoto/README.md @@ -38,15 +38,15 @@ $ gst-launch-1.0 \ ## s3sink -Writes data to a specified S3 bucket. The `region` parameter is optional, and -if not specified, the default parameter will be used (from `.aws/config` file). +Writes data to a specified S3 (region, bucket, object, version?) tuple. The +version may be omitted. ``` $ gst-launch-1.0 \ videotestsrc ! \ theoraenc ! \ oggmux ! \ - s3sink bucket=example-bucket key=my/file.ogv region=us-west-1 + s3sink uri=s3://us-west-1/example-bucket/my/file.ogv?version=my-optional-version ``` ## awstranscriber diff --git a/net/rusoto/src/s3sink/imp.rs b/net/rusoto/src/s3sink/imp.rs index 55de4cef2..c02e91f5a 100644 --- a/net/rusoto/src/s3sink/imp.rs +++ b/net/rusoto/src/s3sink/imp.rs @@ -16,7 +16,6 @@ use gst::{gst_error, gst_info, gst_trace}; use gst_base::subclass::prelude::*; use futures::future; -use rusoto_core::region::Region; use rusoto_s3::{ CompleteMultipartUploadRequest, CompletedMultipartUpload, CompletedPart, CreateMultipartUploadRequest, S3Client, UploadPartRequest, S3, @@ -25,9 +24,9 @@ use rusoto_s3::{ use once_cell::sync::Lazy; use std::convert::From; -use std::str::FromStr; use std::sync::Mutex; +use crate::s3url::*; use crate::s3utils::{self, WaitError}; struct Started { @@ -82,15 +81,13 @@ impl Default for State { const DEFAULT_BUFFER_SIZE: u64 = 5 * 1024 * 1024; struct Settings { - region: Region, - bucket: Option, - key: Option, content_type: Option, buffer_size: u64, } #[derive(Default)] pub struct S3Sink { + url: Mutex>, settings: Mutex, state: Mutex, canceller: Mutex>, @@ -107,9 +104,6 @@ static CAT: Lazy = Lazy::new(|| { impl Default for Settings { fn default() -> Self { Settings { - region: Region::default(), - bucket: None, - key: None, content_type: None, buffer_size: DEFAULT_BUFFER_SIZE, } @@ -153,6 +147,7 @@ impl S3Sink { } fn create_upload_part_request(&self) -> Result { + let url = self.url.lock().unwrap(); let settings = self.settings.lock().unwrap(); let mut state = self.state.lock().unwrap(); let state = match *state { @@ -168,8 +163,8 @@ impl S3Sink { &mut state.buffer, Vec::with_capacity(settings.buffer_size as usize), ))), - bucket: settings.bucket.as_ref().unwrap().to_owned(), - key: settings.key.as_ref().unwrap().to_owned(), + bucket: url.as_ref().unwrap().bucket.to_owned(), + key: url.as_ref().unwrap().object.to_owned(), upload_id: state.upload_id.to_owned(), part_number, ..Default::default() @@ -179,7 +174,6 @@ impl S3Sink { fn create_complete_multipart_upload_request( &self, started_state: &mut Started, - settings: &Settings, ) -> CompleteMultipartUploadRequest { started_state .completed_parts @@ -192,9 +186,10 @@ impl S3Sink { )), }; + let url = self.url.lock().unwrap(); CompleteMultipartUploadRequest { - bucket: settings.bucket.as_ref().unwrap().to_owned(), - key: settings.key.as_ref().unwrap().to_owned(), + bucket: url.as_ref().unwrap().bucket.to_owned(), + key: url.as_ref().unwrap().object.to_owned(), upload_id: started_state.upload_id.to_owned(), multipart_upload: Some(completed_upload), ..Default::default() @@ -203,24 +198,15 @@ impl S3Sink { fn create_create_multipart_upload_request( &self, + url: &GstS3Url, settings: &Settings, - ) -> Result { - if settings.bucket.is_none() || settings.key.is_none() { - return Err(gst::error_msg!( - gst::ResourceError::Settings, - ["Bucket or key is not defined"] - )); - } - - let bucket = settings.bucket.as_ref().unwrap(); - let key = settings.key.as_ref().unwrap(); - - Ok(CreateMultipartUploadRequest { - bucket: bucket.clone(), - key: key.clone(), + ) -> CreateMultipartUploadRequest { + CreateMultipartUploadRequest { + bucket: url.bucket.clone(), + key: url.object.clone(), content_type: settings.content_type.clone(), ..Default::default() - }) + } } fn finalize_upload(&self, element: &super::S3Sink) -> Result<(), gst::ErrorMessage> { @@ -239,9 +225,7 @@ impl S3Sink { } }; - let settings = self.settings.lock().unwrap(); - - let complete_req = self.create_complete_multipart_upload_request(started_state, &settings); + let complete_req = self.create_complete_multipart_upload_request(started_state); let complete_req_future = started_state.client.complete_multipart_upload(complete_req); s3utils::wait(&self.canceller, complete_req_future) @@ -265,9 +249,19 @@ impl S3Sink { unreachable!("Element should be started"); } - let client = S3Client::new(settings.region.clone()); + let s3url = match *self.url.lock().unwrap() { + Some(ref url) => url.clone(), + None => { + return Err(gst::error_msg!( + gst::ResourceError::Settings, + ["Cannot start without a URL being set"] + )); + } + }; - let create_multipart_req = self.create_create_multipart_upload_request(&settings)?; + let client = S3Client::new(s3url.region.clone()); + + let create_multipart_req = self.create_create_multipart_upload_request(&s3url, &settings); let create_multipart_req_future = client.create_multipart_upload(create_multipart_req); let response = s3utils::wait(&self.canceller, create_multipart_req_future).map_err( @@ -339,6 +333,36 @@ impl S3Sink { c.abort() }; } + + fn set_uri(self: &S3Sink, _: &super::S3Sink, url_str: Option<&str>) -> Result<(), glib::Error> { + let state = self.state.lock().unwrap(); + + if let State::Started { .. } = *state { + return Err(glib::Error::new( + gst::URIError::BadState, + "Cannot set URI on a started s3sink", + )); + } + + let mut url = self.url.lock().unwrap(); + + if url_str.is_none() { + *url = None; + return Ok(()); + } + + let url_str = url_str.unwrap(); + match parse_s3_url(url_str) { + Ok(s3url) => { + *url = Some(s3url); + Ok(()) + } + Err(_) => Err(glib::Error::new( + gst::URIError::BadUri, + "Could not parse URI", + )), + } + } } #[glib::object_subclass] @@ -346,33 +370,13 @@ impl ObjectSubclass for S3Sink { const NAME: &'static str = "RusotoS3Sink"; type Type = super::S3Sink; type ParentType = gst_base::BaseSink; + type Interfaces = (gst::URIHandler,); } impl ObjectImpl for S3Sink { fn properties() -> &'static [glib::ParamSpec] { static PROPERTIES: Lazy> = Lazy::new(|| { vec![ - glib::ParamSpec::new_string( - "bucket", - "S3 Bucket", - "The bucket of the file to write", - None, - glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, - ), - glib::ParamSpec::new_string( - "key", - "S3 Key", - "The key of the file to write", - None, - glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, - ), - glib::ParamSpec::new_string( - "region", - "AWS Region", - "An AWS region (e.g. eu-west-2).", - None, - glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, - ), glib::ParamSpec::new_uint64( "part-size", "Part size", @@ -382,6 +386,13 @@ impl ObjectImpl for S3Sink { DEFAULT_BUFFER_SIZE, glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, ), + glib::ParamSpec::new_string( + "uri", + "URI", + "The S3 object URI", + None, + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), ] }); @@ -390,7 +401,7 @@ impl ObjectImpl for S3Sink { fn set_property( &self, - _obj: &Self::Type, + obj: &Self::Type, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec, @@ -398,24 +409,12 @@ impl ObjectImpl for S3Sink { let mut settings = self.settings.lock().unwrap(); match pspec.name() { - "bucket" => { - settings.bucket = value - .get::>() - .expect("type checked upstream"); - } - "key" => { - settings.key = value - .get::>() - .expect("type checked upstream"); - } - "region" => { - settings.region = - Region::from_str(&value.get::().expect("type checked upstream")) - .unwrap(); - } "part-size" => { settings.buffer_size = value.get::().expect("type checked upstream"); } + "uri" => { + let _ = self.set_uri(obj, value.get().expect("type checked upstream")); + } _ => unimplemented!(), } } @@ -424,10 +423,15 @@ impl ObjectImpl for S3Sink { let settings = self.settings.lock().unwrap(); match pspec.name() { - "key" => settings.key.to_value(), - "bucket" => settings.bucket.to_value(), - "region" => settings.region.name().to_value(), "part-size" => settings.buffer_size.to_value(), + "uri" => { + let url = match *self.url.lock().unwrap() { + Some(ref url) => url.to_string(), + None => "".to_string(), + }; + + url.to_value() + } _ => unimplemented!(), } } @@ -465,6 +469,22 @@ impl ElementImpl for S3Sink { } } +impl URIHandlerImpl for S3Sink { + const URI_TYPE: gst::URIType = gst::URIType::Sink; + + fn protocols() -> &'static [&'static str] { + &["s3"] + } + + fn uri(&self, _: &Self::Type) -> Option { + self.url.lock().unwrap().as_ref().map(|s| s.to_string()) + } + + fn set_uri(&self, element: &Self::Type, uri: &str) -> Result<(), glib::Error> { + self.set_uri(element, Some(uri)) + } +} + impl BaseSinkImpl for S3Sink { fn start(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> { self.start()