mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-01-20 07:58:10 +00:00
fallbackswitch: protect src pad stream lock using Cond
Should prevent stream and State deadlocks, see https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/202 Fix #202 Hopefully fix #192 as well. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1330>
This commit is contained in:
parent
e1d35536ad
commit
91d2406180
1 changed files with 55 additions and 12 deletions
|
@ -14,7 +14,7 @@ use gst::{debug, log, trace};
|
||||||
|
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
|
|
||||||
use parking_lot::{Mutex, MutexGuard};
|
use parking_lot::{Condvar, Mutex, MutexGuard};
|
||||||
use std::sync::atomic::{AtomicU32, Ordering};
|
use std::sync::atomic::{AtomicU32, Ordering};
|
||||||
|
|
||||||
const PROP_PRIORITY: &str = "priority";
|
const PROP_PRIORITY: &str = "priority";
|
||||||
|
@ -87,6 +87,10 @@ struct State {
|
||||||
|
|
||||||
timeout_running_time: Option<gst::ClockTime>,
|
timeout_running_time: Option<gst::ClockTime>,
|
||||||
timeout_clock_id: Option<gst::ClockId>,
|
timeout_clock_id: Option<gst::ClockId>,
|
||||||
|
|
||||||
|
/// If the src pad is currently busy. Should be checked and waited on using `src_busy_cond`
|
||||||
|
/// before calling anything requiring the stream lock.
|
||||||
|
src_busy: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for State {
|
impl Default for State {
|
||||||
|
@ -102,6 +106,8 @@ impl Default for State {
|
||||||
|
|
||||||
timeout_running_time: None,
|
timeout_running_time: None,
|
||||||
timeout_clock_id: None,
|
timeout_clock_id: None,
|
||||||
|
|
||||||
|
src_busy: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -420,6 +426,7 @@ impl SinkState {
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct FallbackSwitch {
|
pub struct FallbackSwitch {
|
||||||
state: Mutex<State>,
|
state: Mutex<State>,
|
||||||
|
src_busy_cond: Condvar,
|
||||||
settings: Mutex<Settings>,
|
settings: Mutex<Settings>,
|
||||||
|
|
||||||
// Separated from the rest of the `state` because it can be
|
// Separated from the rest of the `state` because it can be
|
||||||
|
@ -952,11 +959,13 @@ impl FallbackSwitch {
|
||||||
}
|
}
|
||||||
|
|
||||||
if switched_pad {
|
if switched_pad {
|
||||||
|
self.with_src_busy(|| {
|
||||||
let _ = pad.push_event(gst::event::Reconfigure::new());
|
let _ = pad.push_event(gst::event::Reconfigure::new());
|
||||||
pad.sticky_events_foreach(|event| {
|
pad.sticky_events_foreach(|event| {
|
||||||
self.src_pad.push_event(event.clone());
|
self.src_pad.push_event(event.clone());
|
||||||
std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
|
std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
|
||||||
});
|
});
|
||||||
|
});
|
||||||
|
|
||||||
self.obj().notify(PROP_ACTIVE_PAD);
|
self.obj().notify(PROP_ACTIVE_PAD);
|
||||||
}
|
}
|
||||||
|
@ -987,11 +996,13 @@ impl FallbackSwitch {
|
||||||
|
|
||||||
let out_gap_event = builder.build();
|
let out_gap_event = builder.build();
|
||||||
|
|
||||||
|
self.with_src_busy(|| {
|
||||||
self.src_pad.push_event(out_gap_event);
|
self.src_pad.push_event(out_gap_event);
|
||||||
|
});
|
||||||
|
|
||||||
Ok(gst::FlowSuccess::Ok)
|
Ok(gst::FlowSuccess::Ok)
|
||||||
} else {
|
} else {
|
||||||
self.src_pad.push(buffer)
|
self.with_src_busy(|| self.src_pad.push(buffer))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1114,15 +1125,18 @@ impl FallbackSwitch {
|
||||||
drop(state);
|
drop(state);
|
||||||
|
|
||||||
if fwd_sticky {
|
if fwd_sticky {
|
||||||
|
self.with_src_busy(|| {
|
||||||
let _ = pad.push_event(gst::event::Reconfigure::new());
|
let _ = pad.push_event(gst::event::Reconfigure::new());
|
||||||
pad.sticky_events_foreach(|event| {
|
pad.sticky_events_foreach(|event| {
|
||||||
self.src_pad.push_event(event.clone());
|
self.src_pad.push_event(event.clone());
|
||||||
std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
|
std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
|
||||||
});
|
});
|
||||||
|
});
|
||||||
|
|
||||||
self.obj().notify(PROP_ACTIVE_PAD);
|
self.obj().notify(PROP_ACTIVE_PAD);
|
||||||
}
|
}
|
||||||
self.src_pad.push_event(event)
|
|
||||||
|
self.with_src_busy(|| self.src_pad.push_event(event))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn sink_query(&self, pad: &super::FallbackSwitchSinkPad, query: &mut gst::QueryRef) -> bool {
|
fn sink_query(&self, pad: &super::FallbackSwitchSinkPad, query: &mut gst::QueryRef) -> bool {
|
||||||
|
@ -1237,6 +1251,34 @@ impl FallbackSwitch {
|
||||||
|
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Wait until src_busy is not set and set it, execute
|
||||||
|
/// the closure, then unset it again and notify its Cond.
|
||||||
|
///
|
||||||
|
/// The State lock is taken while modifying src_busy,
|
||||||
|
/// but not while executing the closure.
|
||||||
|
fn with_src_busy<F, R>(&self, func: F) -> R
|
||||||
|
where
|
||||||
|
F: FnOnce() -> R,
|
||||||
|
{
|
||||||
|
{
|
||||||
|
let mut state = self.state.lock();
|
||||||
|
while state.src_busy {
|
||||||
|
self.src_busy_cond.wait(&mut state);
|
||||||
|
}
|
||||||
|
state.src_busy = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
let ret = func();
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut state = self.state.lock();
|
||||||
|
state.src_busy = false;
|
||||||
|
self.src_busy_cond.notify_one();
|
||||||
|
}
|
||||||
|
|
||||||
|
ret
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[glib::object_subclass]
|
#[glib::object_subclass]
|
||||||
|
@ -1260,6 +1302,7 @@ impl ObjectSubclass for FallbackSwitch {
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
state: Mutex::new(State::default()),
|
state: Mutex::new(State::default()),
|
||||||
|
src_busy_cond: Condvar::default(),
|
||||||
settings: Mutex::new(Settings::default()),
|
settings: Mutex::new(Settings::default()),
|
||||||
active_sinkpad: Mutex::new(None),
|
active_sinkpad: Mutex::new(None),
|
||||||
src_pad: srcpad,
|
src_pad: srcpad,
|
||||||
|
|
Loading…
Reference in a new issue