diff --git a/net/aws/src/s3hlssink/imp.rs b/net/aws/src/s3hlssink/imp.rs index 2f35bb978..825920d15 100644 --- a/net/aws/src/s3hlssink/imp.rs +++ b/net/aws/src/s3hlssink/imp.rs @@ -8,7 +8,6 @@ // // SPDX-License-Identifier: MPL-2.0 -use futures::future; use once_cell::sync::Lazy; use std::io::Write; use std::str::FromStr; @@ -90,7 +89,7 @@ pub struct S3HlsSink { settings: Mutex, state: Mutex, hlssink: gst::Element, - canceller: Mutex>, + canceller: Mutex, } static CAT: Lazy = Lazy::new(|| { @@ -459,7 +458,7 @@ impl ObjectSubclass for S3HlsSink { settings: Mutex::new(Settings::default()), state: Mutex::new(State::Stopped), hlssink, - canceller: Mutex::new(None), + canceller: Mutex::new(s3utils::Canceller::default()), } } } @@ -803,10 +802,19 @@ impl ElementImpl for S3HlsSink { PAD_TEMPLATES.as_ref() } + #[allow(clippy::single_match)] fn change_state( &self, transition: gst::StateChange, ) -> Result { + match transition { + gst::StateChange::PausedToReady => { + let mut canceller = self.canceller.lock().unwrap(); + canceller.abort(); + } + _ => (), + } + let ret = self.parent_change_state(transition)?; /* * The settings lock must not be taken before the parent state change. @@ -850,6 +858,11 @@ impl ElementImpl for S3HlsSink { } } + gst::StateChange::PausedToReady => { + let mut canceller = self.canceller.lock().unwrap(); + *canceller = s3utils::Canceller::None; + } + gst::StateChange::ReadyToNull => { drop(settings); /* diff --git a/net/aws/src/s3sink/multipartsink.rs b/net/aws/src/s3sink/multipartsink.rs index 387515466..5f2755b7d 100644 --- a/net/aws/src/s3sink/multipartsink.rs +++ b/net/aws/src/s3sink/multipartsink.rs @@ -26,7 +26,6 @@ use aws_sdk_s3::{ Client, }; -use futures::future; use once_cell::sync::Lazy; use std::collections::HashMap; use std::convert::From; @@ -180,8 +179,8 @@ pub struct S3Sink { url: Mutex>, settings: Mutex, state: Mutex, - canceller: Mutex>, - abort_multipart_canceller: Mutex>, + canceller: Mutex, + abort_multipart_canceller: Mutex, } static CAT: Lazy = Lazy::new(|| { @@ -606,19 +605,6 @@ impl S3Sink { Ok(()) } - fn cancel(&self) { - let mut canceller = self.canceller.lock().unwrap(); - let mut abort_canceller = self.abort_multipart_canceller.lock().unwrap(); - - if let Some(c) = abort_canceller.take() { - c.abort() - }; - - if let Some(c) = canceller.take() { - c.abort() - }; - } - fn set_uri(self: &S3Sink, url_str: Option<&str>) -> Result<(), glib::Error> { let state = self.state.lock().unwrap(); @@ -1061,8 +1047,18 @@ impl BaseSinkImpl for S3Sink { } fn unlock(&self) -> Result<(), gst::ErrorMessage> { - self.cancel(); + let mut canceller = self.canceller.lock().unwrap(); + let mut abort_canceller = self.abort_multipart_canceller.lock().unwrap(); + canceller.abort(); + abort_canceller.abort(); + Ok(()) + } + fn unlock_stop(&self) -> Result<(), gst::ErrorMessage> { + let mut canceller = self.canceller.lock().unwrap(); + let mut abort_canceller = self.abort_multipart_canceller.lock().unwrap(); + *canceller = s3utils::Canceller::None; + *abort_canceller = s3utils::Canceller::None; Ok(()) } diff --git a/net/aws/src/s3sink/putobjectsink.rs b/net/aws/src/s3sink/putobjectsink.rs index fb3ecd35d..615949a0a 100644 --- a/net/aws/src/s3sink/putobjectsink.rs +++ b/net/aws/src/s3sink/putobjectsink.rs @@ -21,7 +21,6 @@ use aws_sdk_s3::{ Client, }; -use futures::future; use once_cell::sync::Lazy; use std::collections::HashMap; use std::convert::From; @@ -152,7 +151,7 @@ pub struct S3PutObjectSink { url: Mutex>, settings: Mutex, state: Mutex, - canceller: Mutex>, + canceller: Mutex, } static CAT: Lazy = Lazy::new(|| { @@ -312,14 +311,6 @@ impl S3PutObjectSink { Ok(()) } - fn cancel(&self) { - let mut canceller = self.canceller.lock().unwrap(); - - if let Some(c) = canceller.take() { - c.abort() - }; - } - fn set_uri(self: &S3PutObjectSink, url_str: Option<&str>) -> Result<(), glib::Error> { let state = self.state.lock().unwrap(); @@ -710,8 +701,14 @@ impl BaseSinkImpl for S3PutObjectSink { } fn unlock(&self) -> Result<(), gst::ErrorMessage> { - self.cancel(); + let mut canceller = self.canceller.lock().unwrap(); + canceller.abort(); + Ok(()) + } + fn unlock_stop(&self) -> Result<(), gst::ErrorMessage> { + let mut canceller = self.canceller.lock().unwrap(); + *canceller = s3utils::Canceller::None; Ok(()) } diff --git a/net/aws/src/s3src/imp.rs b/net/aws/src/s3src/imp.rs index 25e6f7cd2..40b85696f 100644 --- a/net/aws/src/s3src/imp.rs +++ b/net/aws/src/s3src/imp.rs @@ -7,7 +7,6 @@ // SPDX-License-Identifier: MPL-2.0 use bytes::Bytes; -use futures::future; use once_cell::sync::Lazy; use std::sync::Mutex; use std::time::Duration; @@ -77,7 +76,7 @@ impl Default for Settings { pub struct S3Src { settings: Mutex, state: Mutex, - canceller: Mutex>, + canceller: Mutex, } static CAT: Lazy = Lazy::new(|| { @@ -89,14 +88,6 @@ static CAT: Lazy = Lazy::new(|| { }); impl S3Src { - fn cancel(&self) { - let mut canceller = self.canceller.lock().unwrap(); - - if let Some(c) = canceller.take() { - c.abort() - }; - } - fn connect(self: &S3Src, url: &GstS3Url) -> Result { let settings = self.settings.lock().unwrap(); let timeout_config = s3utils::timeout_config(settings.request_timeout); @@ -521,9 +512,6 @@ impl BaseSrcImpl for S3Src { } fn stop(&self) -> Result<(), gst::ErrorMessage> { - // First, stop any asynchronous tasks if we're running, as they will have the state lock - self.cancel(); - let mut state = self.state.lock().unwrap(); if let StreamingState::Stopped = *state { @@ -587,7 +575,14 @@ impl BaseSrcImpl for S3Src { } fn unlock(&self) -> Result<(), gst::ErrorMessage> { - self.cancel(); + let mut canceller = self.canceller.lock().unwrap(); + canceller.abort(); + Ok(()) + } + + fn unlock_stop(&self) -> Result<(), gst::ErrorMessage> { + let mut canceller = self.canceller.lock().unwrap(); + *canceller = s3utils::Canceller::None; Ok(()) } } diff --git a/net/aws/src/s3utils.rs b/net/aws/src/s3utils.rs index dc03a465d..b4bfb5964 100644 --- a/net/aws/src/s3utils.rs +++ b/net/aws/src/s3utils.rs @@ -51,21 +51,38 @@ impl fmt::Display for WaitError } } -pub fn wait( - canceller: &Mutex>, - future: F, -) -> Result> +#[derive(Default)] +pub enum Canceller { + #[default] + None, + Handle(future::AbortHandle), + Cancelled, +} + +impl Canceller { + pub fn abort(&mut self) { + if let Canceller::Handle(ref canceller) = *self { + canceller.abort(); + } + + *self = Canceller::Cancelled; + } +} + +pub fn wait(canceller_mutex: &Mutex, future: F) -> Result> where F: Send + Future>, F::Output: Send, T: Send, E: Send, { - let mut canceller_guard = canceller.lock().unwrap(); + let mut canceller = canceller_mutex.lock().unwrap(); + if matches!(*canceller, Canceller::Cancelled) { + return Err(WaitError::Cancelled); + } let (abort_handle, abort_registration) = future::AbortHandle::new_pair(); - - canceller_guard.replace(abort_handle); - drop(canceller_guard); + *canceller = Canceller::Handle(abort_handle); + drop(canceller); let abortable_future = future::Abortable::new(future, abort_registration); @@ -86,17 +103,21 @@ where }; /* Clear out the canceller */ - canceller_guard = canceller.lock().unwrap(); - *canceller_guard = None; + let mut canceller = canceller_mutex.lock().unwrap(); + if matches!(*canceller, Canceller::Cancelled) { + return Err(WaitError::Cancelled); + } + *canceller = Canceller::None; + drop(canceller); res } pub fn wait_stream( - canceller: &Mutex>, + canceller_mutex: &Mutex, stream: &mut ByteStream, ) -> Result> { - wait(canceller, async move { + wait(canceller_mutex, async move { let mut collect = BytesMut::new(); // Loop over the stream and collect till we're done @@ -116,7 +137,7 @@ pub fn timeout_config(request_timeout: Duration) -> TimeoutConfig { } pub fn wait_config( - canceller: &Mutex>, + canceller_mutex: &Mutex, region: Region, timeout_config: TimeoutConfig, credentials: Option, @@ -136,11 +157,13 @@ pub fn wait_config( .load(), }; - let mut canceller_guard = canceller.lock().unwrap(); + let mut canceller = canceller_mutex.lock().unwrap(); + if matches!(*canceller, Canceller::Cancelled) { + return Err(WaitError::Cancelled); + } let (abort_handle, abort_registration) = future::AbortHandle::new_pair(); - - canceller_guard.replace(abort_handle); - drop(canceller_guard); + *canceller = Canceller::Handle(abort_handle); + drop(canceller); let abortable_future = future::Abortable::new(config_future, abort_registration); @@ -157,8 +180,12 @@ pub fn wait_config( }; /* Clear out the canceller */ - canceller_guard = canceller.lock().unwrap(); - *canceller_guard = None; + let mut canceller = canceller_mutex.lock().unwrap(); + if matches!(*canceller, Canceller::Cancelled) { + return Err(WaitError::Cancelled); + } + *canceller = Canceller::None; + drop(canceller); res }