From e7f70cc228753a0ec6055bc91e4db42793283174 Mon Sep 17 00:00:00 2001 From: Jan Schmidt Date: Thu, 13 Apr 2023 02:30:54 +1000 Subject: [PATCH] fallbackswitch: Fix pad health calculation and notifies Change the pad health calculation to consider a pad 'healthy' if it has received data within the last 'timeout' window. Previously, inactive pads were constantly flip-flopping between healthy and not healthy depending on whether they were slightly ahead of or behind the active pad running_time. When the health status of a pad changes, make sure to always notify the property, so that applications that are manually controlling the active pad can make their switching decisions. Part-of: --- .../fallbackswitch/src/fallbackswitch/imp.rs | 219 +++++++++++++++--- utils/fallbackswitch/tests/fallbackswitch.rs | 5 +- 2 files changed, 191 insertions(+), 33 deletions(-) diff --git a/utils/fallbackswitch/src/fallbackswitch/imp.rs b/utils/fallbackswitch/src/fallbackswitch/imp.rs index ee80125c..5be825e5 100644 --- a/utils/fallbackswitch/src/fallbackswitch/imp.rs +++ b/utils/fallbackswitch/src/fallbackswitch/imp.rs @@ -77,7 +77,7 @@ struct State { output_running_time: Option, - timeout_running_time: gst::ClockTime, + timeout_running_time: Option, timeout_clock_id: Option, } @@ -92,7 +92,7 @@ impl Default for State { output_running_time: None, - timeout_running_time: gst::ClockTime::ZERO, + timeout_running_time: None, timeout_clock_id: None, } } @@ -359,13 +359,52 @@ impl SinkState { Some(clock_id) } - fn is_healthy(&self, state: &State, settings: &Settings) -> bool { - match self.current_running_time { - Some(current_running_time) => { - current_running_time >= state.timeout_running_time.saturating_sub(settings.timeout) - && current_running_time <= state.timeout_running_time + fn is_healthy( + &self, + pad: &super::FallbackSwitchSinkPad, + state: &State, + settings: &Settings, + now_running_time: Option, + ) -> bool { + /* The pad is healthy if it has received data within the + * last 'timeout' duration, which means the pad's current_running_time+timeout + * is later than 'now' according to the passed in running time, but not later + * than the timeout_running_time that would mean we time out before outputting + * that buffer */ + match ( + self.current_running_time, + now_running_time, + state.timeout_running_time, + ) { + (Some(pad_running_time), Some(now_running_time), Some(global_timeout_running_time)) => { + let timeout_running_time = pad_running_time.saturating_add(settings.timeout); + log!( + CAT, + obj: pad, + "pad_running_time {} timeout_running_time {} now_running_time {}", + pad_running_time, + timeout_running_time, + now_running_time, + ); + + timeout_running_time > now_running_time // Must be > not >= + && pad_running_time <= global_timeout_running_time } - None => false, + (Some(pad_running_time), Some(now_running_time), None) => { + let timeout_running_time = pad_running_time.saturating_add(settings.timeout); + log!( + CAT, + obj: pad, + "pad_running_time {} timeout_running_time {} now_running_time {}", + pad_running_time, + timeout_running_time, + now_running_time, + ); + + timeout_running_time > now_running_time // Must be > not >= + } + (Some(_input_running_time), None, _) => true, + (None, _, _) => false, } } } @@ -411,13 +450,22 @@ impl FallbackSwitch { ); /* Advance the output running time to this timeout */ - state.output_running_time = Some(state.timeout_running_time); + state.output_running_time = state.timeout_running_time; + + if !settings.auto_switch { + /* If auto-switching is disabled, don't check for a new + * pad */ + state.timed_out = true; + return; + } let active_sinkpad = self.active_sinkpad.lock().clone(); let mut best_priority = 0u32; let mut best_pad = None; + let now_running_time = state.timeout_running_time; + for pad in self.obj().sink_pads() { /* Don't consider the active sinkpad */ let pad = pad.downcast_ref::().unwrap(); @@ -430,7 +478,7 @@ impl FallbackSwitch { #[allow(clippy::collapsible_if)] /* If this pad has data that arrived within the 'timeout' window * before the timeout fired, we can switch to it */ - if pad_state.is_healthy(state, settings) { + if pad_state.is_healthy(pad, state, settings, now_running_time) { if best_pad.is_none() || pad_settings.priority < best_priority { best_pad = Some(pad.clone()); best_priority = pad_settings.priority; @@ -451,7 +499,8 @@ impl FallbackSwitch { } } - fn on_timeout(&self, clock_id: &gst::ClockId, settings: &Settings) { + fn on_timeout(&self, clock_id: &gst::ClockId) { + let settings = self.settings.lock().clone(); let mut state = self.state.lock(); if state.timeout_clock_id.as_ref() != Some(clock_id) { @@ -463,7 +512,12 @@ impl FallbackSwitch { // Ensure sink_chain on an inactive pad can schedule another timeout state.timeout_clock_id = None; - self.handle_timeout(&mut state, settings); + self.handle_timeout(&mut state, &settings); + let changed = self.update_health_statuses(&state, &settings); + drop(state); + for pad in changed { + pad.notify(PROP_IS_HEALTHY); + } } fn cancel_waits(&self) { @@ -480,29 +534,29 @@ impl FallbackSwitch { state: &mut State, settings: &Settings, running_time: gst::ClockTime, - ) { + ) -> bool { state.cancel_timeout(); let clock = match self.obj().clock() { - None => return, + None => return false, Some(clock) => clock, }; let base_time = match self.obj().base_time() { Some(base_time) => base_time, - None => return, + None => return false, }; let timeout_running_time = running_time .saturating_add(state.upstream_latency + settings.timeout + settings.latency); let wait_until = timeout_running_time + base_time; - state.timeout_running_time = timeout_running_time; + state.timeout_running_time = Some(timeout_running_time); /* If we're already running behind, fire the timeout immediately */ let now = clock.time(); if now.map_or(false, |now| wait_until <= now) { self.handle_timeout(state, settings); - return; + return true; } debug!(CAT, imp: self, "Scheduling timeout for {}", wait_until); @@ -518,10 +572,41 @@ impl FallbackSwitch { None => return, Some(imp) => imp, }; - let settings = imp.settings.lock().clone(); - imp.on_timeout(clock_id, &settings); + imp.on_timeout(clock_id); }) .expect("Failed to wait async"); + false + } + + fn update_health_statuses( + &self, + state: &State, + settings: &Settings, + ) -> Vec { + let mut changed = Vec::::new(); + + /* Iterate over sink pads and update their is_healthy status, + * returning a Vec of pads whose health changed and need notifying */ + for pad in self.obj().sink_pads() { + let pad = pad.downcast_ref::().unwrap(); + let pad_imp = pad.imp(); + let mut pad_state = pad_imp.state.lock(); + + /* If this pad has data that arrived within the 'timeout' window + * before the timeout fired, we can switch to it */ + let is_healthy = pad_state.is_healthy(pad, state, settings, state.output_running_time); + let health_changed = is_healthy != pad_state.is_healthy; + pad_state.is_healthy = is_healthy; + + drop(pad_state); + + if health_changed { + log!(CAT, obj: pad, "Health changed to {}", is_healthy); + changed.push(pad.clone()); + } + } + + changed } fn sink_activatemode( @@ -600,7 +685,7 @@ impl FallbackSwitch { * - sleep until the buffer running time, then check if we're still active */ - /* First see if we should become the active pad */ + /* see if we should become the active pad */ let active_sinkpad = self.active_sinkpad.lock().clone(); let mut is_active = active_sinkpad.as_ref() == Some(pad); if !is_active && settings.auto_switch { @@ -633,6 +718,35 @@ impl FallbackSwitch { let raw_pad = !matches!(pad_state.caps_info, CapsInfo::None); let (start_running_time, end_running_time) = pad_state.get_sync_time(&buffer); + if let Some(running_time) = start_running_time { + pad_state.current_running_time = Some(running_time); + } + + /* Update pad is-healthy state if necessary and notify + * if it changes, as that might affect which pad is + * active */ + let is_healthy = pad_state.is_healthy(pad, &state, &settings, state.output_running_time); + let health_changed = is_healthy != pad_state.is_healthy; + pad_state.is_healthy = is_healthy; + + /* Need to drop state locks before notifying */ + let (mut state, mut pad_state) = if health_changed { + drop(pad_state); + drop(state); + log!(CAT, obj: pad, "Health changed to {}", is_healthy); + pad.notify(PROP_IS_HEALTHY); + + if !settings.auto_switch { + /* Re-check if this is the active sinkpad */ + let active_sinkpad = self.active_sinkpad.lock().clone(); + is_active = active_sinkpad.as_ref() == Some(pad); + } + + (self.state.lock(), pad_imp.state.lock()) + } else { + (state, pad_state) + }; + log!( CAT, obj: pad, @@ -651,16 +765,18 @@ impl FallbackSwitch { start_running_time, state.upstream_latency + settings.latency, ) - } else if end_running_time.map_or(false, |end_running_time| { - end_running_time < state.timeout_running_time - }) { + } else if state.timeout_running_time.is_some() + && end_running_time.map_or(false, |end_running_time| { + end_running_time < state.timeout_running_time.unwrap() + }) + { if raw_pad { log!( CAT, obj: pad, "Dropping trailing raw {:?} before timeout {}", buffer, - state.timeout_running_time + state.timeout_running_time.unwrap() ); return Ok(gst::FlowSuccess::Ok); } else { @@ -669,7 +785,7 @@ impl FallbackSwitch { obj: pad, "Not dropping trailing non-raw {:?} before timeout {}", buffer, - state.timeout_running_time + state.timeout_running_time.unwrap() ); None @@ -683,17 +799,16 @@ impl FallbackSwitch { ) }; - if let Some(running_time) = start_running_time { - pad_state.current_running_time = Some(running_time); - } drop(pad_state); + let mut update_all_pad_health = false; + /* Before sleeping, ensure there is a timeout to switch active pads, * in case the initial active pad never receives a buffer */ if let Some(running_time) = start_running_time { if state.timeout_clock_id.is_none() && !is_active { // May change active pad immediately - self.schedule_timeout(&mut state, &settings, running_time); + update_all_pad_health = self.schedule_timeout(&mut state, &settings, running_time); is_active = self.active_sinkpad.lock().as_ref() == Some(pad); } } @@ -748,7 +863,8 @@ impl FallbackSwitch { if let Some(end_running_time) = end_running_time { // May change active pad immediately - self.schedule_timeout(&mut state, &settings, end_running_time); + update_all_pad_health |= + self.schedule_timeout(&mut state, &settings, end_running_time); is_active = self.active_sinkpad.lock().as_ref() == Some(pad); } else { state.cancel_timeout(); @@ -758,11 +874,37 @@ impl FallbackSwitch { if let Some(running_time) = end_running_time { pad_state.current_running_time = Some(running_time); } - pad_state.is_healthy = pad_state.is_healthy(&state, &settings); + let is_healthy = pad_state.is_healthy(pad, &state, &settings, state.output_running_time); + let health_changed = is_healthy != pad_state.is_healthy; + if health_changed { + log!(CAT, obj: pad, "Health changed to {}", is_healthy); + } + pad_state.is_healthy = is_healthy; drop(pad_state); + /* If the schedule_timeout() calls above said the timeout happened, + * we should update the health of all pads here */ + let mut state = if update_all_pad_health { + let changed_health_pads = self.update_health_statuses(&state, &settings); + drop(state); + + for pad in changed_health_pads { + pad.notify(PROP_IS_HEALTHY); + } + + self.state.lock() + } else { + state + }; + if !is_active { log!(CAT, obj: pad, "Dropping {:?} on inactive pad", buffer); + + drop(state); + if health_changed { + pad.notify(PROP_IS_HEALTHY); + } + return Ok(gst::FlowSuccess::Ok); } @@ -772,15 +914,30 @@ impl FallbackSwitch { is_active = self.active_sinkpad.lock().as_ref() == Some(pad); if !is_active { log!(CAT, obj: pad, "Dropping {:?} on inactive pad", buffer); + + drop(state); + if health_changed { + pad.notify(PROP_IS_HEALTHY); + } return Ok(gst::FlowSuccess::Ok); } + /* Update the health status for all pads, since we're the active pad */ + let changed_health_pads = self.update_health_statuses(&state, &settings); + let switched_pad = state.switched_pad; let discont_pending = state.discont_pending; state.switched_pad = false; state.discont_pending = false; drop(state); + if health_changed { + pad.notify(PROP_IS_HEALTHY); + } + for pad in changed_health_pads { + pad.notify(PROP_IS_HEALTHY); + } + if switched_pad { let _ = pad.push_event(gst::event::Reconfigure::new()); pad.sticky_events_foreach(|event| { diff --git a/utils/fallbackswitch/tests/fallbackswitch.rs b/utils/fallbackswitch/tests/fallbackswitch.rs index 8257c73d..564f7af6 100644 --- a/utils/fallbackswitch/tests/fallbackswitch.rs +++ b/utils/fallbackswitch/tests/fallbackswitch.rs @@ -279,12 +279,13 @@ fn test_long_drop_and_recover(live: bool) { let buffer = pull_buffer(&pipeline); assert_fallback_buffer!(buffer, Some(4.seconds())); - // Produce a sixth frame from the normal source + // Produce a sixth frame from the normal source, which + // will make it healthy again push_buffer(&pipeline, 5.seconds()); set_time(&pipeline, 5.seconds() + 10.mseconds()); let buffer = pull_buffer(&pipeline); assert_buffer!(buffer, Some(5.seconds())); - assert!(!mainsink.property::("is-healthy")); + assert!(mainsink.property::("is-healthy")); drop(mainsink); drop(switch);