From 638ffc3c7c7d8ff05f6ddba79969ac50a21470cb Mon Sep 17 00:00:00 2001 From: Guillaume Desmottes Date: Mon, 12 Jun 2023 17:48:54 +0200 Subject: [PATCH] fallbackswitch: add 'stop-on-eos' property Fix the following use case: - main input of fallbackswitch is finite (a media file) - fallback input is infinite (videotestsrc) - main input is done and send eos, which is propagated downstream - fallbackswitch switches to fallback, sending STREAM_START which reset EOS downstream (aggregator does that) - fallback input keeps pushing buffers forever. Solve it by adding a 'stop-on-eos' property so fallbackswitch stops pushing property once the main input is eos. Part-of: --- docs/plugins/gst_plugins_cache.json | 12 +++++ .../fallbackswitch/src/fallbackswitch/imp.rs | 49 +++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index b408c643..8acb8bc6 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -1475,6 +1475,18 @@ "type": "guint64", "writable": true }, + "stop-on-eos": { + "blurb": "Stop forwarding buffers as soon as one input pad is eos", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "false", + "mutable": "ready", + "readable": true, + "type": "gboolean", + "writable": true + }, "timeout": { "blurb": "Timeout on an input before switching to a lower priority input.", "conditionally-available": false, diff --git a/utils/fallbackswitch/src/fallbackswitch/imp.rs b/utils/fallbackswitch/src/fallbackswitch/imp.rs index a7f05b3e..ee80125c 100644 --- a/utils/fallbackswitch/src/fallbackswitch/imp.rs +++ b/utils/fallbackswitch/src/fallbackswitch/imp.rs @@ -26,6 +26,7 @@ const PROP_IMMEDIATE_FALLBACK: &str = "immediate-fallback"; const PROP_LATENCY: &str = "latency"; const PROP_MIN_UPSTREAM_LATENCY: &str = "min-upstream-latency"; const PROP_TIMEOUT: &str = "timeout"; +const PROP_STOP_ON_EOS: &str = "stop-on-eos"; static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( @@ -50,6 +51,7 @@ struct Settings { min_upstream_latency: gst::ClockTime, immediate_fallback: bool, auto_switch: bool, + stop_on_eos: bool, } impl Default for Settings { @@ -60,6 +62,7 @@ impl Default for Settings { min_upstream_latency: gst::ClockTime::ZERO, immediate_fallback: false, auto_switch: true, + stop_on_eos: false, } } } @@ -200,6 +203,8 @@ struct SinkState { current_running_time: Option, flushing: bool, clock_id: Option, + /// true if the sink pad has received eos + eos: bool, } impl Default for SinkState { @@ -213,6 +218,7 @@ impl Default for SinkState { current_running_time: gst::ClockTime::NONE, flushing: false, clock_id: None, + eos: false, } } } @@ -232,6 +238,7 @@ impl SinkState { fn reset(&mut self) { self.flushing = false; self.caps_info = CapsInfo::None; + self.eos = false; } fn clip_buffer(&self, mut buffer: gst::Buffer) -> Option { @@ -551,6 +558,11 @@ impl FallbackSwitch { let pad = pad.downcast_ref::().unwrap(); let pad_imp = pad.imp(); + if settings.stop_on_eos && self.has_sink_pad_eos() { + debug!(CAT, obj: pad, "return eos as stop-on-eos is enabled"); + return Err(gst::FlowError::Eos); + } + let mut buffer = { let pad_state = pad_imp.state.lock(); trace!( @@ -895,6 +907,12 @@ impl FallbackSwitch { pad_state.reset(); state.first = true; } + gst::EventView::Eos(_) => { + pad_state.eos = true; + } + gst::EventView::StreamStart(_) => { + pad_state.eos = false; + } _ => {} } @@ -1033,6 +1051,22 @@ impl FallbackSwitch { } } } + + /// check if at least one sink pad has received eos + fn has_sink_pad_eos(&self) -> bool { + let pads = self.obj().sink_pads(); + + for pad in pads { + let pad = pad.downcast_ref::().unwrap(); + let pad_imp = pad.imp(); + let pad_state = pad_imp.state.lock(); + if pad_state.eos { + return true; + } + } + + false + } } #[glib::object_subclass] @@ -1106,6 +1140,12 @@ impl ObjectImpl for FallbackSwitch { .default_value(Settings::default().auto_switch) .mutable_ready() .build(), + glib::ParamSpecBoolean::builder(PROP_STOP_ON_EOS) + .nick("stop on EOS") + .blurb("Stop forwarding buffers as soon as one input pad is eos") + .default_value(Settings::default().stop_on_eos) + .mutable_ready() + .build(), ] }); @@ -1179,6 +1219,11 @@ impl ObjectImpl for FallbackSwitch { let new_value = value.get().expect("type checked upstream"); settings.auto_switch = new_value; } + PROP_STOP_ON_EOS => { + let mut settings = self.settings.lock(); + let new_value = value.get().expect("type checked upstream"); + settings.stop_on_eos = new_value; + } _ => unimplemented!(), } } @@ -1209,6 +1254,10 @@ impl ObjectImpl for FallbackSwitch { let settings = self.settings.lock(); settings.auto_switch.to_value() } + PROP_STOP_ON_EOS => { + let settings = self.settings.lock(); + settings.stop_on_eos.to_value() + } _ => unimplemented!(), } }