utils/fallbackswitch: Timeout the main stream if buffers were too late for too long

Buffers are still forwarded until the timeout is reached even if they're
too late, but if they were continuously too late for more than the
duration of the timeout setting then switch to the fallback stream
instead.
This commit is contained in:
Sebastian Dröge 2020-07-03 16:52:21 +03:00 committed by Sebastian Dröge
parent daa6cfbb6a
commit 55f3349b39

View file

@ -121,6 +121,7 @@ impl FallbackSwitch {
&self, &self,
agg: &gst_base::Aggregator, agg: &gst_base::Aggregator,
state: &mut OutputState, state: &mut OutputState,
settings: &Settings,
mut buffer: gst::Buffer, mut buffer: gst::Buffer,
fallback_sinkpad: Option<&gst_base::AggregatorPad>, fallback_sinkpad: Option<&gst_base::AggregatorPad>,
) -> Result<Option<(gst::Buffer, gst::Caps, bool)>, gst::FlowError> { ) -> Result<Option<(gst::Buffer, gst::Caps, bool)>, gst::FlowError> {
@ -141,6 +142,7 @@ impl FallbackSwitch {
gst::FlowError::Error gst::FlowError::Error
})?; })?;
let running_time = segment.to_running_time(buffer.get_dts_or_pts());
{ {
// FIXME: This will not work correctly for negative DTS // FIXME: This will not work correctly for negative DTS
let buffer = buffer.make_mut(); let buffer = buffer.make_mut();
@ -148,6 +150,47 @@ impl FallbackSwitch {
buffer.set_dts(segment.to_running_time(buffer.get_dts())); buffer.set_dts(segment.to_running_time(buffer.get_dts()));
} }
let is_late = {
let clock = agg.get_clock();
let base_time = agg.get_base_time();
if let Some(clock) = clock {
let now = clock.get_time();
let latency = agg.get_latency();
if latency.is_some() {
let deadline = base_time + running_time + latency;
if now > deadline {
gst_debug!(CAT, obj: agg, "Buffer is too late: {} > {}", now, deadline);
true
} else {
false
}
} else {
false
}
} else {
false
}
};
if state.last_sinkpad_time.is_some()
&& is_late
&& state.last_sinkpad_time + settings.timeout <= running_time
{
gst_debug!(
CAT,
obj: agg,
"Buffer is too late and timeout reached: {} + {} <= {}",
state.last_sinkpad_time,
settings.timeout,
running_time,
);
return Ok(None);
}
let mut active_sinkpad = self.active_sinkpad.lock().unwrap(); let mut active_sinkpad = self.active_sinkpad.lock().unwrap();
let pad_change = active_sinkpad.as_ref() != Some(self.sinkpad.upcast_ref::<gst::Pad>()); let pad_change = active_sinkpad.as_ref() != Some(self.sinkpad.upcast_ref::<gst::Pad>());
if pad_change { if pad_change {
@ -170,7 +213,9 @@ impl FallbackSwitch {
} }
drop(active_sinkpad); drop(active_sinkpad);
state.last_sinkpad_time = buffer.get_dts_or_pts(); if !is_late || state.last_sinkpad_time.is_none() {
state.last_sinkpad_time = buffer.get_dts_or_pts();
}
// Drop all older buffers from the fallback sinkpad // Drop all older buffers from the fallback sinkpad
if let Some(fallback_sinkpad) = fallback_sinkpad { if let Some(fallback_sinkpad) = fallback_sinkpad {
@ -324,9 +369,13 @@ impl FallbackSwitch {
gst_debug!(CAT, obj: agg, "Aggregate called: timeout {}", timeout); gst_debug!(CAT, obj: agg, "Aggregate called: timeout {}", timeout);
if let Some(buffer) = self.sinkpad.pop_buffer() { if let Some(buffer) = self.sinkpad.pop_buffer() {
if let Some(res) = if let Some(res) = self.handle_main_buffer(
self.handle_main_buffer(agg, &mut *state, buffer, fallback_sinkpad.as_ref())? agg,
{ &mut *state,
&settings,
buffer,
fallback_sinkpad.as_ref(),
)? {
return Ok(res); return Ok(res);
} }
} else if self.sinkpad.is_eos() { } else if self.sinkpad.is_eos() {