rusoto: Upgrade to Rusoto 0.43

This moves to Rusoto 0.43, which has moved from futures to async/.await.
As a result, we implement a utility function to convert the
async/streaming bits to blocking operations backed by a tokio runtime.

In the process, we also need to restructure s3sink a little, so that the
client is now part of the started state (like it is for s3src). This is
a better model than a separate client, as it reflects the condition that
the client is only available in the started state.
This commit is contained in:
Arun Raghavan 2020-04-19 21:28:30 -04:00
commit d398d4a7dc
4 changed files with 171 additions and 174 deletions

View file

@ -8,16 +8,16 @@ description = "Amazon S3 Plugin"
edition = "2018" edition = "2018"
[dependencies] [dependencies]
bytes = "0.4" bytes = "0.5"
futures = "0.1" futures = "0.3"
glib = { git = "https://github.com/gtk-rs/glib" } glib = { git = "https://github.com/gtk-rs/glib" }
gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_12"] } gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_12"] }
gstreamer-base = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_12"] } gstreamer-base = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_12"] }
rusoto_core = "0.42" rusoto_core = "0.43"
rusoto_s3 = "0.42" rusoto_s3 = "0.43"
url = "2" url = "2"
percent-encoding = "2" percent-encoding = "2"
tokio = "0.1" tokio = { version = "0.2", features = [ "rt-threaded" ] }
lazy_static = "1.0" lazy_static = "1.0"
[lib] [lib]

View file

@ -15,13 +15,8 @@ use gst::subclass::prelude::*;
use gst_base::subclass::prelude::*; use gst_base::subclass::prelude::*;
use futures::prelude::*; use futures::future;
use futures::sync::oneshot;
use rusoto_core::region::Region; use rusoto_core::region::Region;
use tokio::runtime;
use rusoto_s3::{ use rusoto_s3::{
CompleteMultipartUploadRequest, CompletedMultipartUpload, CompletedPart, CompleteMultipartUploadRequest, CompletedMultipartUpload, CompletedPart,
CreateMultipartUploadRequest, S3Client, UploadPartRequest, S3, CreateMultipartUploadRequest, S3Client, UploadPartRequest, S3,
@ -31,9 +26,10 @@ use std::convert::From;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Mutex; use std::sync::Mutex;
use crate::s3utils; use crate::s3utils::{self, WaitError};
struct Started { struct Started {
client: S3Client,
buffer: Vec<u8>, buffer: Vec<u8>,
upload_id: String, upload_id: String,
part_number: i64, part_number: i64,
@ -41,8 +37,9 @@ struct Started {
} }
impl Started { impl Started {
pub fn new(buffer: Vec<u8>, upload_id: String) -> Started { pub fn new(client: S3Client, buffer: Vec<u8>, upload_id: String) -> Started {
Started { Started {
client,
buffer, buffer,
upload_id, upload_id,
part_number: 0, part_number: 0,
@ -93,9 +90,7 @@ struct Settings {
pub struct S3Sink { pub struct S3Sink {
settings: Mutex<Settings>, settings: Mutex<Settings>,
state: Mutex<State>, state: Mutex<State>,
runtime: runtime::Runtime, canceller: Mutex<Option<future::AbortHandle>>,
canceller: Mutex<Option<oneshot::Sender<()>>>,
client: Mutex<S3Client>,
} }
lazy_static! { lazy_static! {
@ -167,20 +162,6 @@ impl S3Sink {
let upload_part_req = self.create_upload_part_request()?; let upload_part_req = self.create_upload_part_request()?;
let part_number = upload_part_req.part_number; let part_number = upload_part_req.part_number;
let upload_part_req_future = self
.client
.lock()
.unwrap()
.upload_part(upload_part_req)
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenWrite,
["Failed to upload part: {}", err]
)
});
let output = s3utils::wait(&self.canceller, &self.runtime, upload_part_req_future)?;
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
let state = match *state { let state = match *state {
State::Started(ref mut started_state) => started_state, State::Started(ref mut started_state) => started_state,
@ -188,6 +169,18 @@ impl S3Sink {
unreachable!("Element should be started"); unreachable!("Element should be started");
} }
}; };
let upload_part_req_future = state.client.upload_part(upload_part_req);
let output =
s3utils::wait(&self.canceller, upload_part_req_future).map_err(|err| match err {
WaitError::FutureError(err) => Some(gst_error_msg!(
gst::ResourceError::OpenWrite,
["Failed to upload part: {}", err]
)),
WaitError::Cancelled => None,
})?;
state.completed_parts.push(CompletedPart { state.completed_parts.push(CompletedPart {
e_tag: output.e_tag, e_tag: output.e_tag,
part_number: Some(part_number), part_number: Some(part_number),
@ -221,14 +214,11 @@ impl S3Sink {
}) })
} }
fn create_complete_multipart_upload_request(&self) -> CompleteMultipartUploadRequest { fn create_complete_multipart_upload_request(
let mut state = self.state.lock().unwrap(); &self,
started_state: &mut Started,
let started_state = match *state { settings: &Settings,
State::Started(ref mut started_state) => started_state, ) -> CompleteMultipartUploadRequest {
State::Stopped => unreachable!("Cannot stop before start"),
};
started_state started_state
.completed_parts .completed_parts
.sort_by(|a, b| a.part_number.cmp(&b.part_number)); .sort_by(|a, b| a.part_number.cmp(&b.part_number));
@ -240,7 +230,6 @@ impl S3Sink {
)), )),
}; };
let settings = self.settings.lock().unwrap();
CompleteMultipartUploadRequest { CompleteMultipartUploadRequest {
bucket: settings.bucket.as_ref().unwrap().to_owned(), bucket: settings.bucket.as_ref().unwrap().to_owned(),
key: settings.key.as_ref().unwrap().to_owned(), key: settings.key.as_ref().unwrap().to_owned(),
@ -252,8 +241,8 @@ impl S3Sink {
fn create_create_multipart_upload_request( fn create_create_multipart_upload_request(
&self, &self,
settings: &Settings,
) -> Result<CreateMultipartUploadRequest, gst::ErrorMessage> { ) -> Result<CreateMultipartUploadRequest, gst::ErrorMessage> {
let settings = self.settings.lock().unwrap();
if settings.bucket.is_none() || settings.key.is_none() { if settings.bucket.is_none() || settings.key.is_none() {
return Err(gst_error_msg!( return Err(gst_error_msg!(
gst::ResourceError::Settings, gst::ResourceError::Settings,
@ -280,47 +269,56 @@ impl S3Sink {
)); ));
} }
let complete_req = self.create_complete_multipart_upload_request(); let mut state = self.state.lock().unwrap();
let complete_req_future = self let started_state = match *state {
.client State::Started(ref mut started_state) => started_state,
.lock() State::Stopped => {
.unwrap() unreachable!("Element should be started");
.complete_multipart_upload(complete_req) }
.map_err(|err| { };
gst_error_msg!(
let settings = self.settings.lock().unwrap();
let complete_req = self.create_complete_multipart_upload_request(started_state, &settings);
let complete_req_future = started_state.client.complete_multipart_upload(complete_req);
s3utils::wait(&self.canceller, complete_req_future)
.map(|_| ())
.map_err(|err| match err {
WaitError::FutureError(err) => gst_error_msg!(
gst::ResourceError::Write, gst::ResourceError::Write,
["Failed to complete multipart upload: {}.", err.to_string()] ["Failed to complete multipart upload: {}.", err.to_string()]
) ),
}); WaitError::Cancelled => {
s3utils::wait(&self.canceller, &self.runtime, complete_req_future)
.map_err(|err| {
err.unwrap_or_else(|| {
gst_error_msg!(gst::LibraryError::Failed, ["Interrupted during stop"]) gst_error_msg!(gst::LibraryError::Failed, ["Interrupted during stop"])
}) }
}) })
.map(|_| ())
} }
fn start(&self) -> Result<Started, gst::ErrorMessage> { fn start(&self) -> Result<(), gst::ErrorMessage> {
let create_multipart_req = self.create_create_multipart_upload_request()?; let mut state = self.state.lock().unwrap();
let create_multipart_req_future = self let settings = self.settings.lock().unwrap();
.client
.lock() if let State::Started { .. } = *state {
.unwrap() unreachable!("Element should be started");
.create_multipart_upload(create_multipart_req) }
.map_err(|err| {
gst_error_msg!( let client = S3Client::new(settings.region.clone());
let create_multipart_req = self.create_create_multipart_upload_request(&settings)?;
let create_multipart_req_future = client.create_multipart_upload(create_multipart_req);
let response = s3utils::wait(&self.canceller, create_multipart_req_future).map_err(
|err| match err {
WaitError::FutureError(err) => gst_error_msg!(
gst::ResourceError::OpenWrite, gst::ResourceError::OpenWrite,
["Failed to create multipart upload: {}", err] ["Failed to create multipart upload: {}", err]
) ),
}); WaitError::Cancelled => {
let response = s3utils::wait(&self.canceller, &self.runtime, create_multipart_req_future) gst_error_msg!(gst::LibraryError::Failed, ["Interrupted during start"])
.map_err(|err| { }
err.unwrap_or_else(|| { },
gst_error_msg!(gst::LibraryError::Failed, ["Interrupted during start"]) )?;
})
})?;
let upload_id = response.upload_id.ok_or_else(|| { let upload_id = response.upload_id.ok_or_else(|| {
gst_error_msg!( gst_error_msg!(
@ -329,10 +327,13 @@ impl S3Sink {
) )
})?; })?;
Ok(Started::new( *state = State::Started(Started::new(
Vec::with_capacity(self.settings.lock().unwrap().buffer_size as usize), client,
Vec::with_capacity(settings.buffer_size as usize),
upload_id, upload_id,
)) ));
Ok(())
} }
fn update_buffer( fn update_buffer(
@ -372,10 +373,9 @@ impl S3Sink {
fn cancel(&self) { fn cancel(&self) {
let mut canceller = self.canceller.lock().unwrap(); let mut canceller = self.canceller.lock().unwrap();
if canceller.take().is_some() { if let Some(c) = canceller.take() {
/* We don't do anything, the Sender will be dropped, and that will cause the c.abort()
* Receiver to be cancelled */ };
}
} }
} }
@ -392,12 +392,6 @@ impl ObjectSubclass for S3Sink {
settings: Mutex::new(Default::default()), settings: Mutex::new(Default::default()),
state: Mutex::new(Default::default()), state: Mutex::new(Default::default()),
canceller: Mutex::new(None), canceller: Mutex::new(None),
runtime: runtime::Builder::new()
.core_threads(1)
.name_prefix("rusotos3sink-runtime")
.build()
.unwrap(),
client: Mutex::new(S3Client::new(Region::default())),
} }
} }
@ -438,18 +432,13 @@ impl ObjectImpl for S3Sink {
settings.key = value.get::<String>().expect("type checked upstream"); settings.key = value.get::<String>().expect("type checked upstream");
} }
subclass::Property("region", ..) => { subclass::Property("region", ..) => {
let region = Region::from_str( settings.region = Region::from_str(
&value &value
.get::<String>() .get::<String>()
.expect("type checked upstream") .expect("type checked upstream")
.expect("set_property(\"region\"): no value provided"), .expect("set_property(\"region\"): no value provided"),
) )
.unwrap(); .unwrap();
if settings.region != region {
let mut client = self.client.lock().unwrap();
*client = S3Client::new(region.clone());
settings.region = region;
}
} }
subclass::Property("part-size", ..) => { subclass::Property("part-size", ..) => {
settings.buffer_size = value.get_some::<u64>().expect("type checked upstream"); settings.buffer_size = value.get_some::<u64>().expect("type checked upstream");
@ -476,14 +465,7 @@ impl ElementImpl for S3Sink {}
impl BaseSinkImpl for S3Sink { impl BaseSinkImpl for S3Sink {
fn start(&self, _element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> { fn start(&self, _element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap(); self.start()
if let State::Started(_) = *state {
unreachable!("RusotoS3Sink already started");
}
*state = State::Started(self.start()?);
Ok(())
} }
fn stop(&self, element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> { fn stop(&self, element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> {

View file

@ -9,10 +9,8 @@
use std::sync::Mutex; use std::sync::Mutex;
use bytes::Bytes; use bytes::Bytes;
use futures::sync::oneshot; use futures::future;
use futures::{Future, Stream};
use rusoto_s3::*; use rusoto_s3::*;
use tokio::runtime;
use glib::prelude::*; use glib::prelude::*;
use glib::subclass; use glib::subclass;
@ -25,7 +23,7 @@ use gst_base::subclass::base_src::CreateSuccess;
use gst_base::subclass::prelude::*; use gst_base::subclass::prelude::*;
use crate::s3url::*; use crate::s3url::*;
use crate::s3utils; use crate::s3utils::{self, WaitError};
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
enum StreamingState { enum StreamingState {
@ -40,8 +38,7 @@ enum StreamingState {
pub struct S3Src { pub struct S3Src {
url: Mutex<Option<GstS3Url>>, url: Mutex<Option<GstS3Url>>,
state: Mutex<StreamingState>, state: Mutex<StreamingState>,
runtime: runtime::Runtime, canceller: Mutex<Option<future::AbortHandle>>,
canceller: Mutex<Option<oneshot::Sender<Bytes>>>,
} }
lazy_static! { lazy_static! {
@ -66,10 +63,9 @@ impl S3Src {
fn cancel(&self) { fn cancel(&self) {
let mut canceller = self.canceller.lock().unwrap(); let mut canceller = self.canceller.lock().unwrap();
if canceller.take().is_some() { if let Some(c) = canceller.take() {
/* We don't do anything, the Sender will be dropped, and that will cause the c.abort()
* Receiver to be cancelled */ };
}
} }
fn connect(self: &S3Src, url: &GstS3Url) -> Result<S3Client, gst::ErrorMessage> { fn connect(self: &S3Src, url: &GstS3Url) -> Result<S3Client, gst::ErrorMessage> {
@ -125,20 +121,14 @@ impl S3Src {
let response = client.head_object(request); let response = client.head_object(request);
let output = s3utils::wait( let output = s3utils::wait(&self.canceller, response).map_err(|err| match err {
&self.canceller, WaitError::FutureError(err) => gst_error_msg!(
&self.runtime, gst::ResourceError::NotFound,
response.map_err(|err| { ["Failed to HEAD object: {}", err]
gst_error_msg!( ),
gst::ResourceError::NotFound, WaitError::Cancelled => {
["Failed to HEAD object: {}", err]
)
}),
)
.map_err(|err| {
err.unwrap_or_else(|| {
gst_error_msg!(gst::LibraryError::Failed, ["Interrupted during start"]) gst_error_msg!(gst::LibraryError::Failed, ["Interrupted during start"])
}) }
})?; })?;
if let Some(size) = output.content_length { if let Some(size) = output.content_length {
@ -193,17 +183,13 @@ impl S3Src {
let response = client.get_object(request); let response = client.get_object(request);
/* Drop the state lock now that we're done with it and need the next part to be let output = s3utils::wait(&self.canceller, response).map_err(|err| match err {
* interruptible */ WaitError::FutureError(err) => Some(gst_error_msg!(
drop(state); gst::ResourceError::Read,
["Could not read: {}", err]
let output = s3utils::wait( )),
&self.canceller, WaitError::Cancelled => None,
&self.runtime, })?;
response.map_err(|err| {
gst_error_msg!(gst::ResourceError::Read, ["Could not read: {}", err])
}),
)?;
gst_debug!( gst_debug!(
CAT, CAT,
@ -212,13 +198,13 @@ impl S3Src {
output.content_length.unwrap() output.content_length.unwrap()
); );
s3utils::wait( s3utils::wait_stream(&self.canceller, &mut output.body.unwrap()).map_err(|err| match err {
&self.canceller, WaitError::FutureError(err) => Some(gst_error_msg!(
&self.runtime, gst::ResourceError::Read,
output.body.unwrap().concat2().map_err(|err| { ["Could not read: {}", err]
gst_error_msg!(gst::ResourceError::Read, ["Could not read: {}", err]) )),
}), WaitError::Cancelled => None,
) })
} }
} }
@ -234,11 +220,6 @@ impl ObjectSubclass for S3Src {
Self { Self {
url: Mutex::new(None), url: Mutex::new(None),
state: Mutex::new(StreamingState::Stopped), state: Mutex::new(StreamingState::Stopped),
runtime: runtime::Builder::new()
.core_threads(1)
.name_prefix("rusotos3src-runtime")
.build()
.unwrap(),
canceller: Mutex::new(None), canceller: Mutex::new(None),
} }
} }
@ -346,15 +327,12 @@ impl BaseSrcImpl for S3Src {
} }
fn start(&self, src: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { fn start(&self, src: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
let state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
if let StreamingState::Started { .. } = *state { if let StreamingState::Started { .. } = *state {
unreachable!("RusotoS3Src is already started"); unreachable!("RusotoS3Src is already started");
} }
/* Drop the lock as self.head() needs it */
drop(state);
let s3url = match *self.url.lock().unwrap() { let s3url = match *self.url.lock().unwrap() {
Some(ref url) => url.clone(), Some(ref url) => url.clone(),
None => { None => {
@ -366,11 +344,8 @@ impl BaseSrcImpl for S3Src {
}; };
let s3client = self.connect(&s3url)?; let s3client = self.connect(&s3url)?;
let size = self.head(src, &s3client, &s3url)?; let size = self.head(src, &s3client, &s3url)?;
let mut state = self.state.lock().unwrap();
*state = StreamingState::Started { *state = StreamingState::Started {
url: s3url, url: s3url,
client: s3client, client: s3client,
@ -381,6 +356,9 @@ impl BaseSrcImpl for S3Src {
} }
fn stop(&self, _: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { fn stop(&self, _: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
// First, stop any asynchronous tasks if we're running, as they will have the state lock
self.cancel();
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
if let StreamingState::Stopped = *state { if let StreamingState::Stopped = *state {

View file

@ -6,39 +6,60 @@
// option. This file may not be copied, modified, or distributed // option. This file may not be copied, modified, or distributed
// except according to those terms. // except according to those terms.
use futures::sync::oneshot; use bytes::{buf::BufMut, Bytes, BytesMut};
use futures::Future; use futures::stream::TryStreamExt;
use futures::{future, Future};
use rusoto_core::ByteStream;
use std::sync::Mutex; use std::sync::Mutex;
use tokio::runtime; use tokio::runtime;
pub fn wait<F, T>( lazy_static! {
canceller: &Mutex<Option<oneshot::Sender<T>>>, static ref RUNTIME: runtime::Runtime = runtime::Builder::new()
runtime: &runtime::Runtime, .threaded_scheduler()
.enable_all()
.core_threads(2)
.thread_name("gst-rusoto-runtime")
.build()
.unwrap();
}
pub enum WaitError<E> {
Cancelled,
FutureError(E),
}
pub fn wait<F, T, E>(
canceller: &Mutex<Option<future::AbortHandle>>,
future: F, future: F,
) -> Result<F::Item, Option<gst::ErrorMessage>> ) -> Result<T, WaitError<E>>
where where
F: Send + Future<Error = gst::ErrorMessage> + 'static, F: Send + Future<Output = Result<T, E>>,
F::Item: Send, F::Output: Send,
T: Send,
E: Send,
{ {
let mut canceller_guard = canceller.lock().unwrap(); let mut canceller_guard = canceller.lock().unwrap();
let (sender, receiver) = oneshot::channel::<T>(); let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
canceller_guard.replace(sender); canceller_guard.replace(abort_handle);
drop(canceller_guard); drop(canceller_guard);
let unlock_error = gst_error_msg!(gst::ResourceError::Busy, ["unlock"]); let abortable_future = future::Abortable::new(future, abort_registration);
let res = oneshot::spawn(future, &runtime.executor()) // FIXME: add a timeout as well
.select(receiver.then(|_| Err(unlock_error.clone())))
.wait() let res = RUNTIME.enter(|| {
.map(|v| v.0) futures::executor::block_on(async {
.map_err(|err| { match abortable_future.await {
if err.0 == unlock_error { // Future resolved successfully
None Ok(Ok(res)) => Ok(res),
} else { // Future resolved with an error
Some(err.0) Ok(Err(err)) => Err(WaitError::FutureError(err)),
// Canceller called before future resolved
Err(future::Aborted) => Err(WaitError::Cancelled),
} }
}); })
});
/* Clear out the canceller */ /* Clear out the canceller */
canceller_guard = canceller.lock().unwrap(); canceller_guard = canceller.lock().unwrap();
@ -46,3 +67,19 @@ where
res res
} }
pub fn wait_stream(
canceller: &Mutex<Option<future::AbortHandle>>,
stream: &mut ByteStream,
) -> Result<Bytes, WaitError<std::io::Error>> {
wait(canceller, async move {
let mut collect = BytesMut::new();
// Loop over the stream and collect till we're done
while let Some(item) = stream.try_next().await? {
collect.put(item)
}
Ok::<Bytes, std::io::Error>(collect.freeze())
})
}