reqwesthttpsrc: Fix race condition when unlocking

It would be possible that there is no cancellable yet when unlock() is
called, then a new future is executed and it wouldn't have any
information that it is not supposed to run at all.

To solve this remember if unlock() was called and reset this in
unlock_stop().

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1602>
This commit is contained in:
Sebastian Dröge 2024-06-03 16:01:23 +03:00 committed by GStreamer Marge Bot
parent f68655b5e2
commit 9945b702b8

View file

@ -139,13 +139,31 @@ enum State {
}, },
} }
#[derive(Debug, Default)] #[derive(Default)]
enum Canceller {
#[default]
None,
Handle(future::AbortHandle),
Cancelled,
}
impl Canceller {
fn abort(&mut self) {
if let Canceller::Handle(ref canceller) = *self {
canceller.abort();
}
*self = Canceller::Cancelled;
}
}
#[derive(Default)]
pub struct ReqwestHttpSrc { pub struct ReqwestHttpSrc {
client: Mutex<Option<ClientContext>>, client: Mutex<Option<ClientContext>>,
external_client: Mutex<Option<ClientContext>>, external_client: Mutex<Option<ClientContext>>,
settings: Mutex<Settings>, settings: Mutex<Settings>,
state: Mutex<State>, state: Mutex<State>,
canceller: Mutex<Option<future::AbortHandle>>, canceller: Mutex<Canceller>,
} }
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
@ -617,8 +635,11 @@ impl ReqwestHttpSrc {
let timeout = self.settings.lock().unwrap().timeout; let timeout = self.settings.lock().unwrap().timeout;
let mut canceller = self.canceller.lock().unwrap(); let mut canceller = self.canceller.lock().unwrap();
if matches!(*canceller, Canceller::Cancelled) {
return Err(None);
}
let (abort_handle, abort_registration) = future::AbortHandle::new_pair(); let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
canceller.replace(abort_handle); *canceller = Canceller::Handle(abort_handle);
drop(canceller); drop(canceller);
// Wrap in a timeout // Wrap in a timeout
@ -652,7 +673,11 @@ impl ReqwestHttpSrc {
}; };
/* Clear out the canceller */ /* Clear out the canceller */
let _ = self.canceller.lock().unwrap().take(); let mut canceller = self.canceller.lock().unwrap();
if matches!(*canceller, Canceller::Cancelled) {
return Err(None);
}
*canceller = Canceller::None;
res res
} }
@ -1024,10 +1049,14 @@ impl BaseSrcImpl for ReqwestHttpSrc {
} }
fn unlock(&self) -> Result<(), gst::ErrorMessage> { fn unlock(&self) -> Result<(), gst::ErrorMessage> {
let canceller = self.canceller.lock().unwrap(); let mut canceller = self.canceller.lock().unwrap();
if let Some(ref canceller) = *canceller {
canceller.abort(); canceller.abort();
Ok(())
} }
fn unlock_stop(&self) -> Result<(), gst::ErrorMessage> {
let mut canceller = self.canceller.lock().unwrap();
*canceller = Canceller::None;
Ok(()) Ok(())
} }