rusoto: s3sink: Implement support for GstUriHandler interface

With the URI handler interface implemented, we can drop the old method
of specifying bucket, key and region. This also brings it in line with
how it is for s3src.
This commit is contained in:
Sanchayan Maity 2021-04-30 18:35:35 +05:30
parent 8bda233d02
commit bf5e231e5b
2 changed files with 96 additions and 76 deletions

View file

@ -38,15 +38,15 @@ $ gst-launch-1.0 \
## s3sink ## s3sink
Writes data to a specified S3 bucket. The `region` parameter is optional, and Writes data to a specified S3 (region, bucket, object, version?) tuple. The
if not specified, the default parameter will be used (from `.aws/config` file). version may be omitted.
``` ```
$ gst-launch-1.0 \ $ gst-launch-1.0 \
videotestsrc ! \ videotestsrc ! \
theoraenc ! \ theoraenc ! \
oggmux ! \ oggmux ! \
s3sink bucket=example-bucket key=my/file.ogv region=us-west-1 s3sink uri=s3://us-west-1/example-bucket/my/file.ogv?version=my-optional-version
``` ```
## awstranscriber ## awstranscriber

View file

@ -16,7 +16,6 @@ use gst::{gst_error, gst_info, gst_trace};
use gst_base::subclass::prelude::*; use gst_base::subclass::prelude::*;
use futures::future; use futures::future;
use rusoto_core::region::Region;
use rusoto_s3::{ use rusoto_s3::{
CompleteMultipartUploadRequest, CompletedMultipartUpload, CompletedPart, CompleteMultipartUploadRequest, CompletedMultipartUpload, CompletedPart,
CreateMultipartUploadRequest, S3Client, UploadPartRequest, S3, CreateMultipartUploadRequest, S3Client, UploadPartRequest, S3,
@ -25,9 +24,9 @@ use rusoto_s3::{
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use std::convert::From; use std::convert::From;
use std::str::FromStr;
use std::sync::Mutex; use std::sync::Mutex;
use crate::s3url::*;
use crate::s3utils::{self, WaitError}; use crate::s3utils::{self, WaitError};
struct Started { struct Started {
@ -82,15 +81,13 @@ impl Default for State {
const DEFAULT_BUFFER_SIZE: u64 = 5 * 1024 * 1024; const DEFAULT_BUFFER_SIZE: u64 = 5 * 1024 * 1024;
struct Settings { struct Settings {
region: Region,
bucket: Option<String>,
key: Option<String>,
content_type: Option<String>, content_type: Option<String>,
buffer_size: u64, buffer_size: u64,
} }
#[derive(Default)] #[derive(Default)]
pub struct S3Sink { pub struct S3Sink {
url: Mutex<Option<GstS3Url>>,
settings: Mutex<Settings>, settings: Mutex<Settings>,
state: Mutex<State>, state: Mutex<State>,
canceller: Mutex<Option<future::AbortHandle>>, canceller: Mutex<Option<future::AbortHandle>>,
@ -107,9 +104,6 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
impl Default for Settings { impl Default for Settings {
fn default() -> Self { fn default() -> Self {
Settings { Settings {
region: Region::default(),
bucket: None,
key: None,
content_type: None, content_type: None,
buffer_size: DEFAULT_BUFFER_SIZE, buffer_size: DEFAULT_BUFFER_SIZE,
} }
@ -153,6 +147,7 @@ impl S3Sink {
} }
fn create_upload_part_request(&self) -> Result<UploadPartRequest, gst::ErrorMessage> { fn create_upload_part_request(&self) -> Result<UploadPartRequest, gst::ErrorMessage> {
let url = self.url.lock().unwrap();
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
let state = match *state { let state = match *state {
@ -168,8 +163,8 @@ impl S3Sink {
&mut state.buffer, &mut state.buffer,
Vec::with_capacity(settings.buffer_size as usize), Vec::with_capacity(settings.buffer_size as usize),
))), ))),
bucket: settings.bucket.as_ref().unwrap().to_owned(), bucket: url.as_ref().unwrap().bucket.to_owned(),
key: settings.key.as_ref().unwrap().to_owned(), key: url.as_ref().unwrap().object.to_owned(),
upload_id: state.upload_id.to_owned(), upload_id: state.upload_id.to_owned(),
part_number, part_number,
..Default::default() ..Default::default()
@ -179,7 +174,6 @@ impl S3Sink {
fn create_complete_multipart_upload_request( fn create_complete_multipart_upload_request(
&self, &self,
started_state: &mut Started, started_state: &mut Started,
settings: &Settings,
) -> CompleteMultipartUploadRequest { ) -> CompleteMultipartUploadRequest {
started_state started_state
.completed_parts .completed_parts
@ -192,9 +186,10 @@ impl S3Sink {
)), )),
}; };
let url = self.url.lock().unwrap();
CompleteMultipartUploadRequest { CompleteMultipartUploadRequest {
bucket: settings.bucket.as_ref().unwrap().to_owned(), bucket: url.as_ref().unwrap().bucket.to_owned(),
key: settings.key.as_ref().unwrap().to_owned(), key: url.as_ref().unwrap().object.to_owned(),
upload_id: started_state.upload_id.to_owned(), upload_id: started_state.upload_id.to_owned(),
multipart_upload: Some(completed_upload), multipart_upload: Some(completed_upload),
..Default::default() ..Default::default()
@ -203,24 +198,15 @@ impl S3Sink {
fn create_create_multipart_upload_request( fn create_create_multipart_upload_request(
&self, &self,
url: &GstS3Url,
settings: &Settings, settings: &Settings,
) -> Result<CreateMultipartUploadRequest, gst::ErrorMessage> { ) -> CreateMultipartUploadRequest {
if settings.bucket.is_none() || settings.key.is_none() { CreateMultipartUploadRequest {
return Err(gst::error_msg!( bucket: url.bucket.clone(),
gst::ResourceError::Settings, key: url.object.clone(),
["Bucket or key is not defined"]
));
}
let bucket = settings.bucket.as_ref().unwrap();
let key = settings.key.as_ref().unwrap();
Ok(CreateMultipartUploadRequest {
bucket: bucket.clone(),
key: key.clone(),
content_type: settings.content_type.clone(), content_type: settings.content_type.clone(),
..Default::default() ..Default::default()
}) }
} }
fn finalize_upload(&self, element: &super::S3Sink) -> Result<(), gst::ErrorMessage> { fn finalize_upload(&self, element: &super::S3Sink) -> Result<(), gst::ErrorMessage> {
@ -239,9 +225,7 @@ impl S3Sink {
} }
}; };
let settings = self.settings.lock().unwrap(); let complete_req = self.create_complete_multipart_upload_request(started_state);
let complete_req = self.create_complete_multipart_upload_request(started_state, &settings);
let complete_req_future = started_state.client.complete_multipart_upload(complete_req); let complete_req_future = started_state.client.complete_multipart_upload(complete_req);
s3utils::wait(&self.canceller, complete_req_future) s3utils::wait(&self.canceller, complete_req_future)
@ -265,9 +249,19 @@ impl S3Sink {
unreachable!("Element should be started"); unreachable!("Element should be started");
} }
let client = S3Client::new(settings.region.clone()); let s3url = match *self.url.lock().unwrap() {
Some(ref url) => url.clone(),
None => {
return Err(gst::error_msg!(
gst::ResourceError::Settings,
["Cannot start without a URL being set"]
));
}
};
let create_multipart_req = self.create_create_multipart_upload_request(&settings)?; let client = S3Client::new(s3url.region.clone());
let create_multipart_req = self.create_create_multipart_upload_request(&s3url, &settings);
let create_multipart_req_future = client.create_multipart_upload(create_multipart_req); let create_multipart_req_future = client.create_multipart_upload(create_multipart_req);
let response = s3utils::wait(&self.canceller, create_multipart_req_future).map_err( let response = s3utils::wait(&self.canceller, create_multipart_req_future).map_err(
@ -339,6 +333,36 @@ impl S3Sink {
c.abort() c.abort()
}; };
} }
fn set_uri(self: &S3Sink, _: &super::S3Sink, url_str: Option<&str>) -> Result<(), glib::Error> {
let state = self.state.lock().unwrap();
if let State::Started { .. } = *state {
return Err(glib::Error::new(
gst::URIError::BadState,
"Cannot set URI on a started s3sink",
));
}
let mut url = self.url.lock().unwrap();
if url_str.is_none() {
*url = None;
return Ok(());
}
let url_str = url_str.unwrap();
match parse_s3_url(url_str) {
Ok(s3url) => {
*url = Some(s3url);
Ok(())
}
Err(_) => Err(glib::Error::new(
gst::URIError::BadUri,
"Could not parse URI",
)),
}
}
} }
#[glib::object_subclass] #[glib::object_subclass]
@ -346,33 +370,13 @@ impl ObjectSubclass for S3Sink {
const NAME: &'static str = "RusotoS3Sink"; const NAME: &'static str = "RusotoS3Sink";
type Type = super::S3Sink; type Type = super::S3Sink;
type ParentType = gst_base::BaseSink; type ParentType = gst_base::BaseSink;
type Interfaces = (gst::URIHandler,);
} }
impl ObjectImpl for S3Sink { impl ObjectImpl for S3Sink {
fn properties() -> &'static [glib::ParamSpec] { fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| { static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![ vec![
glib::ParamSpec::new_string(
"bucket",
"S3 Bucket",
"The bucket of the file to write",
None,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpec::new_string(
"key",
"S3 Key",
"The key of the file to write",
None,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpec::new_string(
"region",
"AWS Region",
"An AWS region (e.g. eu-west-2).",
None,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpec::new_uint64( glib::ParamSpec::new_uint64(
"part-size", "part-size",
"Part size", "Part size",
@ -382,6 +386,13 @@ impl ObjectImpl for S3Sink {
DEFAULT_BUFFER_SIZE, DEFAULT_BUFFER_SIZE,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
), ),
glib::ParamSpec::new_string(
"uri",
"URI",
"The S3 object URI",
None,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
] ]
}); });
@ -390,7 +401,7 @@ impl ObjectImpl for S3Sink {
fn set_property( fn set_property(
&self, &self,
_obj: &Self::Type, obj: &Self::Type,
_id: usize, _id: usize,
value: &glib::Value, value: &glib::Value,
pspec: &glib::ParamSpec, pspec: &glib::ParamSpec,
@ -398,24 +409,12 @@ impl ObjectImpl for S3Sink {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
match pspec.name() { match pspec.name() {
"bucket" => {
settings.bucket = value
.get::<Option<String>>()
.expect("type checked upstream");
}
"key" => {
settings.key = value
.get::<Option<String>>()
.expect("type checked upstream");
}
"region" => {
settings.region =
Region::from_str(&value.get::<String>().expect("type checked upstream"))
.unwrap();
}
"part-size" => { "part-size" => {
settings.buffer_size = value.get::<u64>().expect("type checked upstream"); settings.buffer_size = value.get::<u64>().expect("type checked upstream");
} }
"uri" => {
let _ = self.set_uri(obj, value.get().expect("type checked upstream"));
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
@ -424,10 +423,15 @@ impl ObjectImpl for S3Sink {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
match pspec.name() { match pspec.name() {
"key" => settings.key.to_value(),
"bucket" => settings.bucket.to_value(),
"region" => settings.region.name().to_value(),
"part-size" => settings.buffer_size.to_value(), "part-size" => settings.buffer_size.to_value(),
"uri" => {
let url = match *self.url.lock().unwrap() {
Some(ref url) => url.to_string(),
None => "".to_string(),
};
url.to_value()
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
@ -465,6 +469,22 @@ impl ElementImpl for S3Sink {
} }
} }
impl URIHandlerImpl for S3Sink {
const URI_TYPE: gst::URIType = gst::URIType::Sink;
fn protocols() -> &'static [&'static str] {
&["s3"]
}
fn uri(&self, _: &Self::Type) -> Option<String> {
self.url.lock().unwrap().as_ref().map(|s| s.to_string())
}
fn set_uri(&self, element: &Self::Type, uri: &str) -> Result<(), glib::Error> {
self.set_uri(element, Some(uri))
}
}
impl BaseSinkImpl for S3Sink { impl BaseSinkImpl for S3Sink {
fn start(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> { fn start(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> {
self.start() self.start()