quinn: Fix race condition when unlocking

It would be possible that there is no cancellable yet when unlock() is
called, then a new future is executed and it wouldn't have any
information that it is not supposed to run at all.

To solve this remember if unlock() was called and reset this in
unlock_stop().

Also actually implement unlock() / unlock_stop() for the sink, and don't
cancel in stop() as unlock() / unlock_stop() would've been called before
that already.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1602>
This commit is contained in:
Sebastian Dröge 2024-06-07 18:46:03 +03:00 committed by GStreamer Marge Bot
parent c42040fbb8
commit 00aaecad07
3 changed files with 63 additions and 29 deletions

View file

@ -7,11 +7,11 @@
// //
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use crate::common::*;
use crate::utils::{ use crate::utils::{
client_endpoint, make_socket_addr, server_endpoint, wait, WaitError, CONNECTION_CLOSE_CODE, client_endpoint, make_socket_addr, server_endpoint, wait, WaitError, CONNECTION_CLOSE_CODE,
CONNECTION_CLOSE_MSG, CONNECTION_CLOSE_MSG,
}; };
use crate::{common::*, utils};
use bytes::Bytes; use bytes::Bytes;
use futures::future; use futures::future;
use gst::{glib, prelude::*, subclass::prelude::*}; use gst::{glib, prelude::*, subclass::prelude::*};
@ -83,7 +83,7 @@ impl Default for Settings {
pub struct QuinnQuicSink { pub struct QuinnQuicSink {
settings: Mutex<Settings>, settings: Mutex<Settings>,
state: Mutex<State>, state: Mutex<State>,
canceller: Mutex<Option<future::AbortHandle>>, canceller: Mutex<utils::Canceller>,
} }
impl Default for QuinnQuicSink { impl Default for QuinnQuicSink {
@ -91,7 +91,7 @@ impl Default for QuinnQuicSink {
Self { Self {
settings: Mutex::new(Settings::default()), settings: Mutex::new(Settings::default()),
state: Mutex::new(State::default()), state: Mutex::new(State::default()),
canceller: Mutex::new(None), canceller: Mutex::new(utils::Canceller::default()),
} }
} }
} }
@ -459,6 +459,20 @@ impl BaseSinkImpl for QuinnQuicSink {
}, },
} }
} }
fn unlock(&self) -> Result<(), gst::ErrorMessage> {
let mut canceller = self.canceller.lock().unwrap();
canceller.abort();
Ok(())
}
fn unlock_stop(&self) -> Result<(), gst::ErrorMessage> {
let mut canceller = self.canceller.lock().unwrap();
if matches!(&*canceller, utils::Canceller::Cancelled) {
*canceller = utils::Canceller::None;
}
Ok(())
}
} }
impl QuinnQuicSink { impl QuinnQuicSink {

View file

@ -7,11 +7,11 @@
// //
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use crate::common::*;
use crate::utils::{ use crate::utils::{
client_endpoint, make_socket_addr, server_endpoint, wait, WaitError, CONNECTION_CLOSE_CODE, client_endpoint, make_socket_addr, server_endpoint, wait, Canceller, WaitError,
CONNECTION_CLOSE_MSG, CONNECTION_CLOSE_CODE, CONNECTION_CLOSE_MSG,
}; };
use crate::{common::*, utils};
use bytes::Bytes; use bytes::Bytes;
use futures::future; use futures::future;
use gst::{glib, prelude::*, subclass::prelude::*}; use gst::{glib, prelude::*, subclass::prelude::*};
@ -87,7 +87,7 @@ impl Default for Settings {
pub struct QuinnQuicSrc { pub struct QuinnQuicSrc {
settings: Mutex<Settings>, settings: Mutex<Settings>,
state: Mutex<State>, state: Mutex<State>,
canceller: Mutex<Option<future::AbortHandle>>, canceller: Mutex<utils::Canceller>,
} }
impl Default for QuinnQuicSrc { impl Default for QuinnQuicSrc {
@ -95,7 +95,7 @@ impl Default for QuinnQuicSrc {
Self { Self {
settings: Mutex::new(Settings::default()), settings: Mutex::new(Settings::default()),
state: Mutex::new(State::default()), state: Mutex::new(State::default()),
canceller: Mutex::new(None), canceller: Mutex::new(utils::Canceller::default()),
} }
} }
} }
@ -400,8 +400,6 @@ impl BaseSrcImpl for QuinnQuicSrc {
} }
fn stop(&self) -> Result<(), gst::ErrorMessage> { fn stop(&self) -> Result<(), gst::ErrorMessage> {
self.cancel();
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
if let State::Started(ref mut state) = *state { if let State::Started(ref mut state) = *state {
@ -466,7 +464,16 @@ impl BaseSrcImpl for QuinnQuicSrc {
} }
fn unlock(&self) -> Result<(), gst::ErrorMessage> { fn unlock(&self) -> Result<(), gst::ErrorMessage> {
self.cancel(); let mut canceller = self.canceller.lock().unwrap();
canceller.abort();
Ok(())
}
fn unlock_stop(&self) -> Result<(), gst::ErrorMessage> {
let mut canceller = self.canceller.lock().unwrap();
if matches!(&*canceller, Canceller::Cancelled) {
*canceller = Canceller::None;
}
Ok(()) Ok(())
} }
@ -584,14 +591,6 @@ impl QuinnQuicSrc {
} }
} }
fn cancel(&self) {
let mut canceller = self.canceller.lock().unwrap();
if let Some(c) = canceller.take() {
c.abort()
};
}
async fn init_connection(&self) -> Result<(Connection, Option<RecvStream>), WaitError> { async fn init_connection(&self) -> Result<(Connection, Option<RecvStream>), WaitError> {
let server_addr; let server_addr;
let server_name; let server_name;

View file

@ -45,8 +45,26 @@ pub static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| {
.unwrap() .unwrap()
}); });
#[derive(Default)]
pub enum Canceller {
#[default]
None,
Handle(future::AbortHandle),
Cancelled,
}
impl Canceller {
pub fn abort(&mut self) {
if let Canceller::Handle(ref canceller) = *self {
canceller.abort();
}
*self = Canceller::Cancelled;
}
}
pub fn wait<F, T>( pub fn wait<F, T>(
canceller: &Mutex<Option<future::AbortHandle>>, canceller_mutex: &Mutex<Canceller>,
future: F, future: F,
timeout: u32, timeout: u32,
) -> Result<T, WaitError> ) -> Result<T, WaitError>
@ -54,18 +72,18 @@ where
F: Send + Future<Output = T>, F: Send + Future<Output = T>,
T: Send + 'static, T: Send + 'static,
{ {
let mut canceller_guard = canceller.lock().unwrap(); let mut canceller = canceller_mutex.lock().unwrap();
let (abort_handle, abort_registration) = future::AbortHandle::new_pair(); if matches!(*canceller, Canceller::Cancelled) {
return Err(WaitError::FutureAborted);
if canceller_guard.is_some() { } else if matches!(*canceller, Canceller::Handle(..)) {
return Err(WaitError::FutureError(gst::error_msg!( return Err(WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed, gst::ResourceError::Failed,
["Old Canceller should not exist"] ["Old Canceller should not exist"]
))); )));
} }
let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
canceller_guard.replace(abort_handle); *canceller = Canceller::Handle(abort_handle);
drop(canceller_guard); drop(canceller);
let future = async { let future = async {
if timeout == 0 { if timeout == 0 {
@ -98,8 +116,11 @@ where
let res = RUNTIME.block_on(future); let res = RUNTIME.block_on(future);
canceller_guard = canceller.lock().unwrap(); let mut canceller = canceller_mutex.lock().unwrap();
*canceller_guard = None; if matches!(*canceller, Canceller::Cancelled) {
return Err(WaitError::FutureAborted);
}
*canceller = Canceller::None;
res res
} }