diff --git a/utils/fallbackswitch/src/fallbackswitch/imp.rs b/utils/fallbackswitch/src/fallbackswitch/imp.rs index 3e76be4b..2300e27a 100644 --- a/utils/fallbackswitch/src/fallbackswitch/imp.rs +++ b/utils/fallbackswitch/src/fallbackswitch/imp.rs @@ -14,7 +14,7 @@ use gst::{debug, log, trace}; use once_cell::sync::Lazy; -use parking_lot::{Mutex, MutexGuard}; +use parking_lot::{Condvar, Mutex, MutexGuard}; use std::sync::atomic::{AtomicU32, Ordering}; const PROP_PRIORITY: &str = "priority"; @@ -87,6 +87,10 @@ struct State { timeout_running_time: Option, timeout_clock_id: Option, + + /// If the src pad is currently busy. Should be checked and waited on using `src_busy_cond` + /// before calling anything requiring the stream lock. + src_busy: bool, } impl Default for State { @@ -102,6 +106,8 @@ impl Default for State { timeout_running_time: None, timeout_clock_id: None, + + src_busy: false, } } } @@ -420,6 +426,7 @@ impl SinkState { #[derive(Debug)] pub struct FallbackSwitch { state: Mutex, + src_busy_cond: Condvar, settings: Mutex, // Separated from the rest of the `state` because it can be @@ -952,10 +959,12 @@ impl FallbackSwitch { } if switched_pad { - let _ = pad.push_event(gst::event::Reconfigure::new()); - pad.sticky_events_foreach(|event| { - self.src_pad.push_event(event.clone()); - std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep) + self.with_src_busy(|| { + let _ = pad.push_event(gst::event::Reconfigure::new()); + pad.sticky_events_foreach(|event| { + self.src_pad.push_event(event.clone()); + std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep) + }); }); self.obj().notify(PROP_ACTIVE_PAD); @@ -987,11 +996,13 @@ impl FallbackSwitch { let out_gap_event = builder.build(); - self.src_pad.push_event(out_gap_event); + self.with_src_busy(|| { + self.src_pad.push_event(out_gap_event); + }); Ok(gst::FlowSuccess::Ok) } else { - self.src_pad.push(buffer) + self.with_src_busy(|| self.src_pad.push(buffer)) } } @@ -1114,15 +1125,18 @@ impl FallbackSwitch { drop(state); if fwd_sticky { - let _ = pad.push_event(gst::event::Reconfigure::new()); - pad.sticky_events_foreach(|event| { - self.src_pad.push_event(event.clone()); - std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep) + self.with_src_busy(|| { + let _ = pad.push_event(gst::event::Reconfigure::new()); + pad.sticky_events_foreach(|event| { + self.src_pad.push_event(event.clone()); + std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep) + }); }); self.obj().notify(PROP_ACTIVE_PAD); } - self.src_pad.push_event(event) + + self.with_src_busy(|| self.src_pad.push_event(event)) } fn sink_query(&self, pad: &super::FallbackSwitchSinkPad, query: &mut gst::QueryRef) -> bool { @@ -1237,6 +1251,34 @@ impl FallbackSwitch { false } + + /// Wait until src_busy is not set and set it, execute + /// the closure, then unset it again and notify its Cond. + /// + /// The State lock is taken while modifying src_busy, + /// but not while executing the closure. + fn with_src_busy(&self, func: F) -> R + where + F: FnOnce() -> R, + { + { + let mut state = self.state.lock(); + while state.src_busy { + self.src_busy_cond.wait(&mut state); + } + state.src_busy = true; + } + + let ret = func(); + + { + let mut state = self.state.lock(); + state.src_busy = false; + self.src_busy_cond.notify_one(); + } + + ret + } } #[glib::object_subclass] @@ -1260,6 +1302,7 @@ impl ObjectSubclass for FallbackSwitch { Self { state: Mutex::new(State::default()), + src_busy_cond: Condvar::default(), settings: Mutex::new(Settings::default()), active_sinkpad: Mutex::new(None), src_pad: srcpad,