diff --git a/utils/fallbackswitch/src/fallbacksrc/imp.rs b/utils/fallbackswitch/src/fallbacksrc/imp.rs index 02d51568..bf90ba47 100644 --- a/utils/fallbackswitch/src/fallbacksrc/imp.rs +++ b/utils/fallbackswitch/src/fallbacksrc/imp.rs @@ -79,6 +79,7 @@ struct Settings { restart_on_eos: bool, min_latency: gst::ClockTime, buffer_duration: i64, + immediate_fallback: bool, } impl Default for Settings { @@ -95,6 +96,7 @@ impl Default for Settings { restart_on_eos: false, min_latency: gst::ClockTime::ZERO, buffer_duration: -1, + immediate_fallback: false, } } } @@ -286,6 +288,13 @@ impl ObjectImpl for FallbackSrc { gst::Structure::static_type(), glib::ParamFlags::READABLE, ), + glib::ParamSpec::new_boolean( + "immediate-fallback", + "Immediate fallback", + "Forward the fallback streams immediately at startup, when the primary streams are slow to start up and immediate output is required", + false, + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), ] }); @@ -432,6 +441,18 @@ impl ObjectImpl for FallbackSrc { ); settings.buffer_duration = new_value; } + "immediate-fallback" => { + let mut settings = self.settings.lock().unwrap(); + let new_value = value.get().expect("type checked upstream"); + gst_info!( + CAT, + obj: obj, + "Changing immediate-fallback from {:?} to {:?}", + settings.immediate_fallback, + new_value, + ); + settings.immediate_fallback = new_value; + } _ => unimplemented!(), } } @@ -538,6 +559,10 @@ impl ObjectImpl for FallbackSrc { settings.buffer_duration.to_value() } "statistics" => self.stats().to_value(), + "immediate-fallback" => { + let settings = self.settings.lock().unwrap(); + settings.immediate_fallback.to_value() + } _ => unimplemented!(), } } @@ -775,6 +800,7 @@ impl FallbackSrc { min_latency: gst::ClockTime, is_audio: bool, fallback_uri: Option<&str>, + immediate_fallback: bool, ) -> Stream { let fallback_input = if is_audio { self.create_fallback_audio_input(element) @@ -820,6 +846,9 @@ impl FallbackSrc { switch .set_property("min-upstream-latency", &min_latency.nseconds()) .unwrap(); + switch + .set_property("immediate-fallback", &immediate_fallback) + .unwrap(); gst::Element::link_pads(&fallback_input, Some("src"), &switch, Some("fallback_sink")) .unwrap(); @@ -903,6 +932,7 @@ impl FallbackSrc { settings.min_latency, false, fallback_uri.as_deref(), + settings.immediate_fallback, ); flow_combiner.add_pad(&stream.srcpad); Some(stream) @@ -912,8 +942,14 @@ impl FallbackSrc { // Create audio stream let audio_stream = if settings.enable_audio { - let stream = - self.create_stream(element, settings.timeout, settings.min_latency, true, None); + let stream = self.create_stream( + element, + settings.timeout, + settings.min_latency, + true, + None, + settings.immediate_fallback, + ); flow_combiner.add_pad(&stream.srcpad); Some(stream) } else { diff --git a/utils/fallbackswitch/src/fallbackswitch/imp.rs b/utils/fallbackswitch/src/fallbackswitch/imp.rs index 89ef28a0..2dea6cde 100644 --- a/utils/fallbackswitch/src/fallbackswitch/imp.rs +++ b/utils/fallbackswitch/src/fallbackswitch/imp.rs @@ -92,11 +92,13 @@ struct PadInputState { const DEFAULT_TIMEOUT: gst::ClockTime = gst::ClockTime::from_seconds(5); const DEFAULT_AUTO_SWITCH: bool = true; const DEFAULT_STREAM_HEALTH: StreamHealth = StreamHealth::Inactive; +const DEFAULT_IMMEDIATE_FALLBACK: bool = false; #[derive(Debug, Clone)] struct Settings { timeout: gst::ClockTime, auto_switch: bool, + immediate_fallback: bool, } impl Default for StreamHealth { @@ -120,6 +122,7 @@ impl Default for Settings { Settings { timeout: DEFAULT_TIMEOUT, auto_switch: DEFAULT_AUTO_SWITCH, + immediate_fallback: DEFAULT_IMMEDIATE_FALLBACK, } } } @@ -411,40 +414,54 @@ impl FallbackSwitch { if state.last_output_time.is_none() { state.last_output_time = running_time; } - if backup_pad == &self.primary_sinkpad { - state.primary.last_sinkpad_time = running_time; - } else { - state.fallback.last_sinkpad_time = running_time; - } - // Get the next one if this one is before the timeout - if state.last_output_time.zip(running_time).map_or( - false, - |(last_output_time, running_time)| { - last_output_time + settings.timeout > running_time - }, - ) { + // If the other pad never received a buffer, we want to start consuming + // buffers on this pad in order to provide an output at start up + // (for example with a slow primary) + let ignore_timeout = settings.immediate_fallback && { + if backup_pad == &self.primary_sinkpad { + state.primary.last_sinkpad_time = running_time; + state.fallback.last_sinkpad_time.is_none() + } else { + state.fallback.last_sinkpad_time = running_time; + state.primary.last_sinkpad_time.is_none() + } + }; + + if !ignore_timeout { + // Get the next one if this one is before the timeout + if state.last_output_time.zip(running_time).map_or( + false, + |(last_output_time, running_time)| { + last_output_time + settings.timeout > running_time + }, + ) { + gst_debug!( + CAT, + obj: backup_pad, + "Timeout not reached yet: {} + {} > {}", + state.last_output_time.display(), + settings.timeout, + running_time.display(), + ); + continue; + } gst_debug!( CAT, obj: backup_pad, - "Timeout not reached yet: {} + {} > {}", + "Timeout reached: {} + {} <= {}", state.last_output_time.display(), settings.timeout, running_time.display(), ); - - continue; + } else { + gst_debug!( + CAT, + obj: backup_pad, + "Consuming buffer as we haven't yet received a buffer on the other pad", + ); } - gst_debug!( - CAT, - obj: backup_pad, - "Timeout reached: {} + {} <= {}", - state.last_output_time.display(), - settings.timeout, - running_time.display(), - ); - let mut active_sinkpad = self.active_sinkpad.lock().unwrap(); let pad_change = settings.auto_switch && active_sinkpad.as_ref() != Some(backup_pad.upcast_ref::()); @@ -721,6 +738,13 @@ impl ObjectImpl for FallbackSwitch { DEFAULT_STREAM_HEALTH as i32, glib::ParamFlags::READABLE, ), + glib::ParamSpec::new_boolean( + "immediate-fallback", + "Immediate fallback", + "Forward the fallback stream immediately at startup, when the primary stream is slow to start up and immediate output is required", + DEFAULT_AUTO_SWITCH, + glib::ParamFlags::READWRITE| gst::PARAM_FLAG_MUTABLE_READY, + ), ] }); @@ -779,6 +803,10 @@ impl ObjectImpl for FallbackSwitch { let mut settings = self.settings.lock().unwrap(); settings.auto_switch = value.get().expect("type checked upstream"); } + "immediate-fallback" => { + let mut settings = self.settings.lock().unwrap(); + settings.immediate_fallback = value.get().expect("type checked upstream"); + } _ => unimplemented!(), } } @@ -805,6 +833,10 @@ impl ObjectImpl for FallbackSwitch { let state = self.output_state.lock().unwrap(); state.fallback.stream_health.to_value() } + "immediate-fallback" => { + let settings = self.settings.lock().unwrap(); + settings.immediate_fallback.to_value() + } _ => unimplemented!(), } }