livesync: Handle flags and late buffer patching after queueing

This makes the chain function almost independent of the output state. We
still do the early discard check with `buffer_is_backwards` so we don't
try to queue buffers we can't use, allowing us to fast-forward upstream
without blocking on the src task.

Don't accept `LateOverThreshold` buffers when we have `pending_caps` or
a `pending_segment`. We need to apply these first before we can sensibly
patch buffers from the new stream.

Deduplicate most of the output buffer patching code into a new
`patch_output_buffer` method.

For: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/450
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1387>
This commit is contained in:
Jan Alexander Steffens (heftig) 2023-10-25 00:38:28 +02:00 committed by Sebastian Dröge
parent f565875b6c
commit 98d839a920

View file

@ -507,6 +507,10 @@ impl State {
// Safe default // Safe default
.unwrap_or(DEFAULT_DURATION); .unwrap_or(DEFAULT_DURATION);
} }
fn pending_events(&self) -> bool {
self.pending_caps.is_some() || self.pending_segment.is_some()
}
} }
impl LiveSync { impl LiveSync {
@ -916,52 +920,13 @@ impl LiveSync {
buf_mut.set_pts(pts.map(|t| t + state.latency)); buf_mut.set_pts(pts.map(|t| t + state.latency));
} }
if state let timestamp = state.ts_range(buf_mut, segment);
.out_buffer
.as_ref()
.map_or(false, |b| b.flags().contains(gst::BufferFlags::GAP))
{
// We are done bridging a gap, so mark it as DISCONT instead
buf_mut.unset_flags(gst::BufferFlags::GAP);
buf_mut.set_flags(gst::BufferFlags::DISCONT);
}
let mut timestamp = state.ts_range(buf_mut, segment);
let lateness = self.buffer_is_backwards(&state, timestamp); let lateness = self.buffer_is_backwards(&state, timestamp);
match lateness {
BufferLateness::OnTime => {}
BufferLateness::LateUnderThreshold => { if lateness == BufferLateness::LateUnderThreshold {
gst::debug!(CAT, imp: self, "Discarding late {:?}", buf_mut); gst::debug!(CAT, imp: self, "Discarding late {:?}", buf_mut);
state.num_drop += 1; state.num_drop += 1;
return Ok(gst::FlowSuccess::Ok); return Ok(gst::FlowSuccess::Ok);
}
BufferLateness::LateOverThreshold => {
gst::debug!(CAT, imp: self, "Accepting late {:?}", buf_mut);
let prev = state.out_buffer.as_ref().unwrap();
let prev_duration = prev.duration().unwrap();
if let Some(audio_info) = &state.in_audio_info {
let mut map_info = buf_mut.map_writable().map_err(|e| {
gst::error!(CAT, imp: self, "Failed to map buffer: {}", e);
gst::FlowError::Error
})?;
audio_info
.format_info()
.fill_silence(map_info.as_mut_slice());
} else {
buf_mut.set_duration(Some(state.fallback_duration));
}
buf_mut.set_dts(prev.dts().map(|t| t + prev_duration));
buf_mut.set_pts(prev.pts().map(|t| t + prev_duration));
buf_mut.set_flags(gst::BufferFlags::GAP);
timestamp = state.ts_range(buf_mut, state.out_segment.as_ref().unwrap());
}
} }
gst::trace!(CAT, imp: self, "Queueing {:?} ({:?})", buffer, lateness); gst::trace!(CAT, imp: self, "Queueing {:?} ({:?})", buffer, lateness);
@ -1135,57 +1100,49 @@ impl LiveSync {
let mut segment = None; let mut segment = None;
match in_buffer { match in_buffer {
Some((buffer, lateness)) => { Some((mut buffer, BufferLateness::OnTime)) => {
state.num_in += 1; state.num_in += 1;
if state
.out_buffer
.as_ref()
.map_or(false, |b| b.flags().contains(gst::BufferFlags::GAP))
{
// We are done bridging a gap, so mark it as DISCONT instead
let buf_mut = buffer.make_mut();
buf_mut.unset_flags(gst::BufferFlags::GAP);
buf_mut.set_flags(gst::BufferFlags::DISCONT);
}
state.out_buffer = Some(buffer); state.out_buffer = Some(buffer);
state.out_timestamp = state.in_timestamp; state.out_timestamp = state.in_timestamp;
caps = state.pending_caps.take(); caps = state.pending_caps.take();
segment = state.pending_segment.take(); segment = state.pending_segment.take();
if lateness != BufferLateness::OnTime {
state.num_duplicate += 1;
}
} }
Some((buffer, BufferLateness::LateOverThreshold)) if !state.pending_events() => {
gst::debug!(CAT, imp: self, "Accepting late {:?}", buffer);
state.num_in += 1;
self.patch_output_buffer(&mut state, Some(buffer))?;
}
Some((buffer, BufferLateness::LateOverThreshold)) => {
// Cannot accept late-over-threshold buffers while we have pending events
gst::debug!(CAT, imp: self, "Discarding late {:?}", buffer);
state.num_drop += 1;
self.patch_output_buffer(&mut state, None)?;
}
None => { None => {
// Work around borrow checker self.patch_output_buffer(&mut state, None)?;
let State { }
fallback_duration,
out_buffer: ref mut buffer,
out_audio_info: ref audio_info,
..
} = *state;
gst::debug!(CAT, imp: self, "Repeating {:?}", buffer);
let buffer = buffer.as_mut().unwrap().make_mut(); Some((_, BufferLateness::LateUnderThreshold)) => {
let prev_duration = buffer.duration().unwrap(); // Is discarded before queueing
unreachable!();
if let Some(audio_info) = audio_info {
if !buffer.flags().contains(gst::BufferFlags::GAP) {
let mut map_info = buffer.map_writable().map_err(|e| {
gst::error!(CAT, imp: self, "Failed to map buffer: {}", e);
gst::FlowError::Error
})?;
audio_info
.format_info()
.fill_silence(map_info.as_mut_slice());
}
} else {
buffer.set_duration(Some(fallback_duration));
}
buffer.set_dts(buffer.dts().map(|t| t + prev_duration));
buffer.set_pts(buffer.pts().map(|t| t + prev_duration));
buffer.set_flags(gst::BufferFlags::GAP);
buffer.unset_flags(gst::BufferFlags::DISCONT);
state.out_timestamp = state.ts_range(
state.out_buffer.as_ref().unwrap(),
state.out_segment.as_ref().unwrap(),
);
state.num_duplicate += 1;
} }
} }
@ -1323,4 +1280,58 @@ impl LiveSync {
details: details details: details
); );
} }
/// Patches the output buffer for repeating, setting out_buffer and out_timestamp
fn patch_output_buffer(
&self,
state: &mut State,
source: Option<gst::Buffer>,
) -> Result<(), gst::FlowError> {
let out_buffer = state.out_buffer.as_mut().unwrap();
let duration = out_buffer.duration().unwrap();
let dts = out_buffer.dts().map(|t| t + duration);
let pts = out_buffer.pts().map(|t| t + duration);
if let Some(source) = source {
gst::debug!(
CAT,
imp: self,
"Repeating {:?} using {:?}",
out_buffer,
source
);
*out_buffer = source;
} else {
gst::debug!(CAT, imp: self, "Repeating {:?}", out_buffer);
}
let buffer = out_buffer.make_mut();
if let Some(audio_info) = &state.out_audio_info {
if !buffer.flags().contains(gst::BufferFlags::GAP) {
let mut map_info = buffer.map_writable().map_err(|e| {
gst::error!(CAT, imp: self, "Failed to map buffer: {}", e);
gst::FlowError::Error
})?;
audio_info
.format_info()
.fill_silence(map_info.as_mut_slice());
}
} else {
buffer.set_duration(state.fallback_duration);
}
buffer.set_dts(dts);
buffer.set_pts(pts);
buffer.set_flags(gst::BufferFlags::GAP);
buffer.unset_flags(gst::BufferFlags::DISCONT);
state.out_timestamp = state.ts_range(
state.out_buffer.as_ref().unwrap(),
state.out_segment.as_ref().unwrap(),
);
state.num_duplicate += 1;
Ok(())
}
} }