fallbackswitch: add 'stop-on-eos' property

Fix the following use case:
- main input of fallbackswitch is finite (a media file)
- fallback input is infinite (videotestsrc)
- main input is done and send eos, which is propagated downstream
- fallbackswitch switches to fallback, sending STREAM_START which reset
  EOS downstream (aggregator does that)
- fallback input keeps pushing buffers forever.

Solve it by adding a 'stop-on-eos' property so fallbackswitch stops
pushing property once the main input is eos.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1249>
This commit is contained in:
Guillaume Desmottes 2023-06-12 17:48:54 +02:00 committed by Sebastian Dröge
parent 68faccdacf
commit 6f75243c8f
2 changed files with 61 additions and 0 deletions

View file

@ -1475,6 +1475,18 @@
"type": "guint64", "type": "guint64",
"writable": true "writable": true
}, },
"stop-on-eos": {
"blurb": "Stop forwarding buffers as soon as one input pad is eos",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "false",
"mutable": "ready",
"readable": true,
"type": "gboolean",
"writable": true
},
"timeout": { "timeout": {
"blurb": "Timeout on an input before switching to a lower priority input.", "blurb": "Timeout on an input before switching to a lower priority input.",
"conditionally-available": false, "conditionally-available": false,

View file

@ -26,6 +26,7 @@ const PROP_IMMEDIATE_FALLBACK: &str = "immediate-fallback";
const PROP_LATENCY: &str = "latency"; const PROP_LATENCY: &str = "latency";
const PROP_MIN_UPSTREAM_LATENCY: &str = "min-upstream-latency"; const PROP_MIN_UPSTREAM_LATENCY: &str = "min-upstream-latency";
const PROP_TIMEOUT: &str = "timeout"; const PROP_TIMEOUT: &str = "timeout";
const PROP_STOP_ON_EOS: &str = "stop-on-eos";
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new( gst::DebugCategory::new(
@ -50,6 +51,7 @@ struct Settings {
min_upstream_latency: gst::ClockTime, min_upstream_latency: gst::ClockTime,
immediate_fallback: bool, immediate_fallback: bool,
auto_switch: bool, auto_switch: bool,
stop_on_eos: bool,
} }
impl Default for Settings { impl Default for Settings {
@ -60,6 +62,7 @@ impl Default for Settings {
min_upstream_latency: gst::ClockTime::ZERO, min_upstream_latency: gst::ClockTime::ZERO,
immediate_fallback: false, immediate_fallback: false,
auto_switch: true, auto_switch: true,
stop_on_eos: false,
} }
} }
} }
@ -200,6 +203,8 @@ struct SinkState {
current_running_time: Option<gst::ClockTime>, current_running_time: Option<gst::ClockTime>,
flushing: bool, flushing: bool,
clock_id: Option<gst::SingleShotClockId>, clock_id: Option<gst::SingleShotClockId>,
/// true if the sink pad has received eos
eos: bool,
} }
impl Default for SinkState { impl Default for SinkState {
@ -213,6 +218,7 @@ impl Default for SinkState {
current_running_time: gst::ClockTime::NONE, current_running_time: gst::ClockTime::NONE,
flushing: false, flushing: false,
clock_id: None, clock_id: None,
eos: false,
} }
} }
} }
@ -232,6 +238,7 @@ impl SinkState {
fn reset(&mut self) { fn reset(&mut self) {
self.flushing = false; self.flushing = false;
self.caps_info = CapsInfo::None; self.caps_info = CapsInfo::None;
self.eos = false;
} }
fn clip_buffer(&self, mut buffer: gst::Buffer) -> Option<gst::Buffer> { fn clip_buffer(&self, mut buffer: gst::Buffer) -> Option<gst::Buffer> {
@ -551,6 +558,11 @@ impl FallbackSwitch {
let pad = pad.downcast_ref::<super::FallbackSwitchSinkPad>().unwrap(); let pad = pad.downcast_ref::<super::FallbackSwitchSinkPad>().unwrap();
let pad_imp = pad.imp(); let pad_imp = pad.imp();
if settings.stop_on_eos && self.has_sink_pad_eos() {
debug!(CAT, obj: pad, "return eos as stop-on-eos is enabled");
return Err(gst::FlowError::Eos);
}
let mut buffer = { let mut buffer = {
let pad_state = pad_imp.state.lock(); let pad_state = pad_imp.state.lock();
trace!( trace!(
@ -895,6 +907,12 @@ impl FallbackSwitch {
pad_state.reset(); pad_state.reset();
state.first = true; state.first = true;
} }
gst::EventView::Eos(_) => {
pad_state.eos = true;
}
gst::EventView::StreamStart(_) => {
pad_state.eos = false;
}
_ => {} _ => {}
} }
@ -1033,6 +1051,22 @@ impl FallbackSwitch {
} }
} }
} }
/// check if at least one sink pad has received eos
fn has_sink_pad_eos(&self) -> bool {
let pads = self.obj().sink_pads();
for pad in pads {
let pad = pad.downcast_ref::<super::FallbackSwitchSinkPad>().unwrap();
let pad_imp = pad.imp();
let pad_state = pad_imp.state.lock();
if pad_state.eos {
return true;
}
}
false
}
} }
#[glib::object_subclass] #[glib::object_subclass]
@ -1106,6 +1140,12 @@ impl ObjectImpl for FallbackSwitch {
.default_value(Settings::default().auto_switch) .default_value(Settings::default().auto_switch)
.mutable_ready() .mutable_ready()
.build(), .build(),
glib::ParamSpecBoolean::builder(PROP_STOP_ON_EOS)
.nick("stop on EOS")
.blurb("Stop forwarding buffers as soon as one input pad is eos")
.default_value(Settings::default().stop_on_eos)
.mutable_ready()
.build(),
] ]
}); });
@ -1179,6 +1219,11 @@ impl ObjectImpl for FallbackSwitch {
let new_value = value.get().expect("type checked upstream"); let new_value = value.get().expect("type checked upstream");
settings.auto_switch = new_value; settings.auto_switch = new_value;
} }
PROP_STOP_ON_EOS => {
let mut settings = self.settings.lock();
let new_value = value.get().expect("type checked upstream");
settings.stop_on_eos = new_value;
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
@ -1209,6 +1254,10 @@ impl ObjectImpl for FallbackSwitch {
let settings = self.settings.lock(); let settings = self.settings.lock();
settings.auto_switch.to_value() settings.auto_switch.to_value()
} }
PROP_STOP_ON_EOS => {
let settings = self.settings.lock();
settings.stop_on_eos.to_value()
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }