diff --git a/net/rusoto/Cargo.toml b/net/rusoto/Cargo.toml index 21749a41..2b7b54c3 100644 --- a/net/rusoto/Cargo.toml +++ b/net/rusoto/Cargo.toml @@ -8,16 +8,16 @@ description = "Amazon S3 Plugin" edition = "2018" [dependencies] -bytes = "0.4" -futures = "0.1" +bytes = "0.5" +futures = "0.3" glib = { git = "https://github.com/gtk-rs/glib" } gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_12"] } gstreamer-base = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_12"] } -rusoto_core = "0.42" -rusoto_s3 = "0.42" +rusoto_core = "0.43" +rusoto_s3 = "0.43" url = "2" percent-encoding = "2" -tokio = "0.1" +tokio = { version = "0.2", features = [ "rt-threaded" ] } lazy_static = "1.0" [lib] diff --git a/net/rusoto/src/s3sink.rs b/net/rusoto/src/s3sink.rs index d69adc70..33fc6331 100644 --- a/net/rusoto/src/s3sink.rs +++ b/net/rusoto/src/s3sink.rs @@ -15,13 +15,8 @@ use gst::subclass::prelude::*; use gst_base::subclass::prelude::*; -use futures::prelude::*; -use futures::sync::oneshot; - +use futures::future; use rusoto_core::region::Region; - -use tokio::runtime; - use rusoto_s3::{ CompleteMultipartUploadRequest, CompletedMultipartUpload, CompletedPart, CreateMultipartUploadRequest, S3Client, UploadPartRequest, S3, @@ -31,9 +26,10 @@ use std::convert::From; use std::str::FromStr; use std::sync::Mutex; -use crate::s3utils; +use crate::s3utils::{self, WaitError}; struct Started { + client: S3Client, buffer: Vec, upload_id: String, part_number: i64, @@ -41,8 +37,9 @@ struct Started { } impl Started { - pub fn new(buffer: Vec, upload_id: String) -> Started { + pub fn new(client: S3Client, buffer: Vec, upload_id: String) -> Started { Started { + client, buffer, upload_id, part_number: 0, @@ -93,9 +90,7 @@ struct Settings { pub struct S3Sink { settings: Mutex, state: Mutex, - runtime: runtime::Runtime, - canceller: Mutex>>, - client: Mutex, + canceller: Mutex>, } lazy_static! { @@ -167,20 +162,6 @@ impl S3Sink { let upload_part_req = self.create_upload_part_request()?; let part_number = upload_part_req.part_number; - let upload_part_req_future = self - .client - .lock() - .unwrap() - .upload_part(upload_part_req) - .map_err(|err| { - gst_error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to upload part: {}", err] - ) - }); - - let output = s3utils::wait(&self.canceller, &self.runtime, upload_part_req_future)?; - let mut state = self.state.lock().unwrap(); let state = match *state { State::Started(ref mut started_state) => started_state, @@ -188,6 +169,18 @@ impl S3Sink { unreachable!("Element should be started"); } }; + + let upload_part_req_future = state.client.upload_part(upload_part_req); + + let output = + s3utils::wait(&self.canceller, upload_part_req_future).map_err(|err| match err { + WaitError::FutureError(err) => Some(gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to upload part: {}", err] + )), + WaitError::Cancelled => None, + })?; + state.completed_parts.push(CompletedPart { e_tag: output.e_tag, part_number: Some(part_number), @@ -221,14 +214,11 @@ impl S3Sink { }) } - fn create_complete_multipart_upload_request(&self) -> CompleteMultipartUploadRequest { - let mut state = self.state.lock().unwrap(); - - let started_state = match *state { - State::Started(ref mut started_state) => started_state, - State::Stopped => unreachable!("Cannot stop before start"), - }; - + fn create_complete_multipart_upload_request( + &self, + started_state: &mut Started, + settings: &Settings, + ) -> CompleteMultipartUploadRequest { started_state .completed_parts .sort_by(|a, b| a.part_number.cmp(&b.part_number)); @@ -240,7 +230,6 @@ impl S3Sink { )), }; - let settings = self.settings.lock().unwrap(); CompleteMultipartUploadRequest { bucket: settings.bucket.as_ref().unwrap().to_owned(), key: settings.key.as_ref().unwrap().to_owned(), @@ -252,8 +241,8 @@ impl S3Sink { fn create_create_multipart_upload_request( &self, + settings: &Settings, ) -> Result { - let settings = self.settings.lock().unwrap(); if settings.bucket.is_none() || settings.key.is_none() { return Err(gst_error_msg!( gst::ResourceError::Settings, @@ -280,47 +269,56 @@ impl S3Sink { )); } - let complete_req = self.create_complete_multipart_upload_request(); - let complete_req_future = self - .client - .lock() - .unwrap() - .complete_multipart_upload(complete_req) - .map_err(|err| { - gst_error_msg!( + let mut state = self.state.lock().unwrap(); + let started_state = match *state { + State::Started(ref mut started_state) => started_state, + State::Stopped => { + unreachable!("Element should be started"); + } + }; + + let settings = self.settings.lock().unwrap(); + + let complete_req = self.create_complete_multipart_upload_request(started_state, &settings); + let complete_req_future = started_state.client.complete_multipart_upload(complete_req); + + s3utils::wait(&self.canceller, complete_req_future) + .map(|_| ()) + .map_err(|err| match err { + WaitError::FutureError(err) => gst_error_msg!( gst::ResourceError::Write, ["Failed to complete multipart upload: {}.", err.to_string()] - ) - }); - - s3utils::wait(&self.canceller, &self.runtime, complete_req_future) - .map_err(|err| { - err.unwrap_or_else(|| { + ), + WaitError::Cancelled => { gst_error_msg!(gst::LibraryError::Failed, ["Interrupted during stop"]) - }) + } }) - .map(|_| ()) } - fn start(&self) -> Result { - let create_multipart_req = self.create_create_multipart_upload_request()?; - let create_multipart_req_future = self - .client - .lock() - .unwrap() - .create_multipart_upload(create_multipart_req) - .map_err(|err| { - gst_error_msg!( + fn start(&self) -> Result<(), gst::ErrorMessage> { + let mut state = self.state.lock().unwrap(); + let settings = self.settings.lock().unwrap(); + + if let State::Started { .. } = *state { + unreachable!("Element should be started"); + } + + let client = S3Client::new(settings.region.clone()); + + let create_multipart_req = self.create_create_multipart_upload_request(&settings)?; + let create_multipart_req_future = client.create_multipart_upload(create_multipart_req); + + let response = s3utils::wait(&self.canceller, create_multipart_req_future).map_err( + |err| match err { + WaitError::FutureError(err) => gst_error_msg!( gst::ResourceError::OpenWrite, ["Failed to create multipart upload: {}", err] - ) - }); - let response = s3utils::wait(&self.canceller, &self.runtime, create_multipart_req_future) - .map_err(|err| { - err.unwrap_or_else(|| { - gst_error_msg!(gst::LibraryError::Failed, ["Interrupted during start"]) - }) - })?; + ), + WaitError::Cancelled => { + gst_error_msg!(gst::LibraryError::Failed, ["Interrupted during start"]) + } + }, + )?; let upload_id = response.upload_id.ok_or_else(|| { gst_error_msg!( @@ -329,10 +327,13 @@ impl S3Sink { ) })?; - Ok(Started::new( - Vec::with_capacity(self.settings.lock().unwrap().buffer_size as usize), + *state = State::Started(Started::new( + client, + Vec::with_capacity(settings.buffer_size as usize), upload_id, - )) + )); + + Ok(()) } fn update_buffer( @@ -372,10 +373,9 @@ impl S3Sink { fn cancel(&self) { let mut canceller = self.canceller.lock().unwrap(); - if canceller.take().is_some() { - /* We don't do anything, the Sender will be dropped, and that will cause the - * Receiver to be cancelled */ - } + if let Some(c) = canceller.take() { + c.abort() + }; } } @@ -392,12 +392,6 @@ impl ObjectSubclass for S3Sink { settings: Mutex::new(Default::default()), state: Mutex::new(Default::default()), canceller: Mutex::new(None), - runtime: runtime::Builder::new() - .core_threads(1) - .name_prefix("rusotos3sink-runtime") - .build() - .unwrap(), - client: Mutex::new(S3Client::new(Region::default())), } } @@ -438,18 +432,13 @@ impl ObjectImpl for S3Sink { settings.key = value.get::().expect("type checked upstream"); } subclass::Property("region", ..) => { - let region = Region::from_str( + settings.region = Region::from_str( &value .get::() .expect("type checked upstream") .expect("set_property(\"region\"): no value provided"), ) .unwrap(); - if settings.region != region { - let mut client = self.client.lock().unwrap(); - *client = S3Client::new(region.clone()); - settings.region = region; - } } subclass::Property("part-size", ..) => { settings.buffer_size = value.get_some::().expect("type checked upstream"); @@ -476,14 +465,7 @@ impl ElementImpl for S3Sink {} impl BaseSinkImpl for S3Sink { fn start(&self, _element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> { - let mut state = self.state.lock().unwrap(); - if let State::Started(_) = *state { - unreachable!("RusotoS3Sink already started"); - } - - *state = State::Started(self.start()?); - - Ok(()) + self.start() } fn stop(&self, element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> { diff --git a/net/rusoto/src/s3src.rs b/net/rusoto/src/s3src.rs index d2e95fbf..4c51454b 100644 --- a/net/rusoto/src/s3src.rs +++ b/net/rusoto/src/s3src.rs @@ -9,10 +9,8 @@ use std::sync::Mutex; use bytes::Bytes; -use futures::sync::oneshot; -use futures::{Future, Stream}; +use futures::future; use rusoto_s3::*; -use tokio::runtime; use glib::prelude::*; use glib::subclass; @@ -25,7 +23,7 @@ use gst_base::subclass::base_src::CreateSuccess; use gst_base::subclass::prelude::*; use crate::s3url::*; -use crate::s3utils; +use crate::s3utils::{self, WaitError}; #[allow(clippy::large_enum_variant)] enum StreamingState { @@ -40,8 +38,7 @@ enum StreamingState { pub struct S3Src { url: Mutex>, state: Mutex, - runtime: runtime::Runtime, - canceller: Mutex>>, + canceller: Mutex>, } lazy_static! { @@ -66,10 +63,9 @@ impl S3Src { fn cancel(&self) { let mut canceller = self.canceller.lock().unwrap(); - if canceller.take().is_some() { - /* We don't do anything, the Sender will be dropped, and that will cause the - * Receiver to be cancelled */ - } + if let Some(c) = canceller.take() { + c.abort() + }; } fn connect(self: &S3Src, url: &GstS3Url) -> Result { @@ -125,20 +121,14 @@ impl S3Src { let response = client.head_object(request); - let output = s3utils::wait( - &self.canceller, - &self.runtime, - response.map_err(|err| { - gst_error_msg!( - gst::ResourceError::NotFound, - ["Failed to HEAD object: {}", err] - ) - }), - ) - .map_err(|err| { - err.unwrap_or_else(|| { + let output = s3utils::wait(&self.canceller, response).map_err(|err| match err { + WaitError::FutureError(err) => gst_error_msg!( + gst::ResourceError::NotFound, + ["Failed to HEAD object: {}", err] + ), + WaitError::Cancelled => { gst_error_msg!(gst::LibraryError::Failed, ["Interrupted during start"]) - }) + } })?; if let Some(size) = output.content_length { @@ -193,17 +183,13 @@ impl S3Src { let response = client.get_object(request); - /* Drop the state lock now that we're done with it and need the next part to be - * interruptible */ - drop(state); - - let output = s3utils::wait( - &self.canceller, - &self.runtime, - response.map_err(|err| { - gst_error_msg!(gst::ResourceError::Read, ["Could not read: {}", err]) - }), - )?; + let output = s3utils::wait(&self.canceller, response).map_err(|err| match err { + WaitError::FutureError(err) => Some(gst_error_msg!( + gst::ResourceError::Read, + ["Could not read: {}", err] + )), + WaitError::Cancelled => None, + })?; gst_debug!( CAT, @@ -212,13 +198,13 @@ impl S3Src { output.content_length.unwrap() ); - s3utils::wait( - &self.canceller, - &self.runtime, - output.body.unwrap().concat2().map_err(|err| { - gst_error_msg!(gst::ResourceError::Read, ["Could not read: {}", err]) - }), - ) + s3utils::wait_stream(&self.canceller, &mut output.body.unwrap()).map_err(|err| match err { + WaitError::FutureError(err) => Some(gst_error_msg!( + gst::ResourceError::Read, + ["Could not read: {}", err] + )), + WaitError::Cancelled => None, + }) } } @@ -234,11 +220,6 @@ impl ObjectSubclass for S3Src { Self { url: Mutex::new(None), state: Mutex::new(StreamingState::Stopped), - runtime: runtime::Builder::new() - .core_threads(1) - .name_prefix("rusotos3src-runtime") - .build() - .unwrap(), canceller: Mutex::new(None), } } @@ -346,15 +327,12 @@ impl BaseSrcImpl for S3Src { } fn start(&self, src: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { - let state = self.state.lock().unwrap(); + let mut state = self.state.lock().unwrap(); if let StreamingState::Started { .. } = *state { unreachable!("RusotoS3Src is already started"); } - /* Drop the lock as self.head() needs it */ - drop(state); - let s3url = match *self.url.lock().unwrap() { Some(ref url) => url.clone(), None => { @@ -366,11 +344,8 @@ impl BaseSrcImpl for S3Src { }; let s3client = self.connect(&s3url)?; - let size = self.head(src, &s3client, &s3url)?; - let mut state = self.state.lock().unwrap(); - *state = StreamingState::Started { url: s3url, client: s3client, @@ -381,6 +356,9 @@ impl BaseSrcImpl for S3Src { } fn stop(&self, _: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { + // First, stop any asynchronous tasks if we're running, as they will have the state lock + self.cancel(); + let mut state = self.state.lock().unwrap(); if let StreamingState::Stopped = *state { diff --git a/net/rusoto/src/s3utils.rs b/net/rusoto/src/s3utils.rs index 70620730..43c4af42 100644 --- a/net/rusoto/src/s3utils.rs +++ b/net/rusoto/src/s3utils.rs @@ -6,39 +6,60 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use futures::sync::oneshot; -use futures::Future; +use bytes::{buf::BufMut, Bytes, BytesMut}; +use futures::stream::TryStreamExt; +use futures::{future, Future}; +use rusoto_core::ByteStream; use std::sync::Mutex; use tokio::runtime; -pub fn wait( - canceller: &Mutex>>, - runtime: &runtime::Runtime, +lazy_static! { + static ref RUNTIME: runtime::Runtime = runtime::Builder::new() + .threaded_scheduler() + .enable_all() + .core_threads(2) + .thread_name("gst-rusoto-runtime") + .build() + .unwrap(); +} + +pub enum WaitError { + Cancelled, + FutureError(E), +} + +pub fn wait( + canceller: &Mutex>, future: F, -) -> Result> +) -> Result> where - F: Send + Future + 'static, - F::Item: Send, + F: Send + Future>, + F::Output: Send, + T: Send, + E: Send, { let mut canceller_guard = canceller.lock().unwrap(); - let (sender, receiver) = oneshot::channel::(); + let (abort_handle, abort_registration) = future::AbortHandle::new_pair(); - canceller_guard.replace(sender); + canceller_guard.replace(abort_handle); drop(canceller_guard); - let unlock_error = gst_error_msg!(gst::ResourceError::Busy, ["unlock"]); + let abortable_future = future::Abortable::new(future, abort_registration); - let res = oneshot::spawn(future, &runtime.executor()) - .select(receiver.then(|_| Err(unlock_error.clone()))) - .wait() - .map(|v| v.0) - .map_err(|err| { - if err.0 == unlock_error { - None - } else { - Some(err.0) + // FIXME: add a timeout as well + + let res = RUNTIME.enter(|| { + futures::executor::block_on(async { + match abortable_future.await { + // Future resolved successfully + Ok(Ok(res)) => Ok(res), + // Future resolved with an error + Ok(Err(err)) => Err(WaitError::FutureError(err)), + // Canceller called before future resolved + Err(future::Aborted) => Err(WaitError::Cancelled), } - }); + }) + }); /* Clear out the canceller */ canceller_guard = canceller.lock().unwrap(); @@ -46,3 +67,19 @@ where res } + +pub fn wait_stream( + canceller: &Mutex>, + stream: &mut ByteStream, +) -> Result> { + wait(canceller, async move { + let mut collect = BytesMut::new(); + + // Loop over the stream and collect till we're done + while let Some(item) = stream.try_next().await? { + collect.put(item) + } + + Ok::(collect.freeze()) + }) +}