fallbackswitch: add 'stop-on-eos' property

Fix the following use case:
- main input of fallbackswitch is finite (a media file)
- fallback input is infinite (videotestsrc)
- main input is done and send eos, which is propagated downstream
- fallbackswitch switches to fallback, sending STREAM_START which reset
  EOS downstream (aggregator does that)
- fallback input keeps pushing buffers forever.

Solve it by adding a 'stop-on-eos' property so fallbackswitch stops
pushing property once the main input is eos.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1248>
This commit is contained in:
Guillaume Desmottes 2023-06-12 17:48:54 +02:00 committed by Sebastian Dröge
parent 0998f9a303
commit 638ffc3c7c
2 changed files with 61 additions and 0 deletions

View file

@ -1475,6 +1475,18 @@
"type": "guint64",
"writable": true
},
"stop-on-eos": {
"blurb": "Stop forwarding buffers as soon as one input pad is eos",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "false",
"mutable": "ready",
"readable": true,
"type": "gboolean",
"writable": true
},
"timeout": {
"blurb": "Timeout on an input before switching to a lower priority input.",
"conditionally-available": false,

View file

@ -26,6 +26,7 @@ const PROP_IMMEDIATE_FALLBACK: &str = "immediate-fallback";
const PROP_LATENCY: &str = "latency";
const PROP_MIN_UPSTREAM_LATENCY: &str = "min-upstream-latency";
const PROP_TIMEOUT: &str = "timeout";
const PROP_STOP_ON_EOS: &str = "stop-on-eos";
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
@ -50,6 +51,7 @@ struct Settings {
min_upstream_latency: gst::ClockTime,
immediate_fallback: bool,
auto_switch: bool,
stop_on_eos: bool,
}
impl Default for Settings {
@ -60,6 +62,7 @@ impl Default for Settings {
min_upstream_latency: gst::ClockTime::ZERO,
immediate_fallback: false,
auto_switch: true,
stop_on_eos: false,
}
}
}
@ -200,6 +203,8 @@ struct SinkState {
current_running_time: Option<gst::ClockTime>,
flushing: bool,
clock_id: Option<gst::SingleShotClockId>,
/// true if the sink pad has received eos
eos: bool,
}
impl Default for SinkState {
@ -213,6 +218,7 @@ impl Default for SinkState {
current_running_time: gst::ClockTime::NONE,
flushing: false,
clock_id: None,
eos: false,
}
}
}
@ -232,6 +238,7 @@ impl SinkState {
fn reset(&mut self) {
self.flushing = false;
self.caps_info = CapsInfo::None;
self.eos = false;
}
fn clip_buffer(&self, mut buffer: gst::Buffer) -> Option<gst::Buffer> {
@ -551,6 +558,11 @@ impl FallbackSwitch {
let pad = pad.downcast_ref::<super::FallbackSwitchSinkPad>().unwrap();
let pad_imp = pad.imp();
if settings.stop_on_eos && self.has_sink_pad_eos() {
debug!(CAT, obj: pad, "return eos as stop-on-eos is enabled");
return Err(gst::FlowError::Eos);
}
let mut buffer = {
let pad_state = pad_imp.state.lock();
trace!(
@ -895,6 +907,12 @@ impl FallbackSwitch {
pad_state.reset();
state.first = true;
}
gst::EventView::Eos(_) => {
pad_state.eos = true;
}
gst::EventView::StreamStart(_) => {
pad_state.eos = false;
}
_ => {}
}
@ -1033,6 +1051,22 @@ impl FallbackSwitch {
}
}
}
/// check if at least one sink pad has received eos
fn has_sink_pad_eos(&self) -> bool {
let pads = self.obj().sink_pads();
for pad in pads {
let pad = pad.downcast_ref::<super::FallbackSwitchSinkPad>().unwrap();
let pad_imp = pad.imp();
let pad_state = pad_imp.state.lock();
if pad_state.eos {
return true;
}
}
false
}
}
#[glib::object_subclass]
@ -1106,6 +1140,12 @@ impl ObjectImpl for FallbackSwitch {
.default_value(Settings::default().auto_switch)
.mutable_ready()
.build(),
glib::ParamSpecBoolean::builder(PROP_STOP_ON_EOS)
.nick("stop on EOS")
.blurb("Stop forwarding buffers as soon as one input pad is eos")
.default_value(Settings::default().stop_on_eos)
.mutable_ready()
.build(),
]
});
@ -1179,6 +1219,11 @@ impl ObjectImpl for FallbackSwitch {
let new_value = value.get().expect("type checked upstream");
settings.auto_switch = new_value;
}
PROP_STOP_ON_EOS => {
let mut settings = self.settings.lock();
let new_value = value.get().expect("type checked upstream");
settings.stop_on_eos = new_value;
}
_ => unimplemented!(),
}
}
@ -1209,6 +1254,10 @@ impl ObjectImpl for FallbackSwitch {
let settings = self.settings.lock();
settings.auto_switch.to_value()
}
PROP_STOP_ON_EOS => {
let settings = self.settings.lock();
settings.stop_on_eos.to_value()
}
_ => unimplemented!(),
}
}