From 98d839a9205b64af3c3e9618b9c9727527be9441 Mon Sep 17 00:00:00 2001 From: "Jan Alexander Steffens (heftig)" Date: Wed, 25 Oct 2023 00:38:28 +0200 Subject: [PATCH] livesync: Handle flags and late buffer patching after queueing This makes the chain function almost independent of the output state. We still do the early discard check with `buffer_is_backwards` so we don't try to queue buffers we can't use, allowing us to fast-forward upstream without blocking on the src task. Don't accept `LateOverThreshold` buffers when we have `pending_caps` or a `pending_segment`. We need to apply these first before we can sensibly patch buffers from the new stream. Deduplicate most of the output buffer patching code into a new `patch_output_buffer` method. For: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/450 Part-of: --- utils/livesync/src/livesync/imp.rs | 181 +++++++++++++++-------------- 1 file changed, 96 insertions(+), 85 deletions(-) diff --git a/utils/livesync/src/livesync/imp.rs b/utils/livesync/src/livesync/imp.rs index b107fcf3..4dbbee5c 100644 --- a/utils/livesync/src/livesync/imp.rs +++ b/utils/livesync/src/livesync/imp.rs @@ -507,6 +507,10 @@ impl State { // Safe default .unwrap_or(DEFAULT_DURATION); } + + fn pending_events(&self) -> bool { + self.pending_caps.is_some() || self.pending_segment.is_some() + } } impl LiveSync { @@ -916,52 +920,13 @@ impl LiveSync { buf_mut.set_pts(pts.map(|t| t + state.latency)); } - if state - .out_buffer - .as_ref() - .map_or(false, |b| b.flags().contains(gst::BufferFlags::GAP)) - { - // We are done bridging a gap, so mark it as DISCONT instead - buf_mut.unset_flags(gst::BufferFlags::GAP); - buf_mut.set_flags(gst::BufferFlags::DISCONT); - } - - let mut timestamp = state.ts_range(buf_mut, segment); + let timestamp = state.ts_range(buf_mut, segment); let lateness = self.buffer_is_backwards(&state, timestamp); - match lateness { - BufferLateness::OnTime => {} - BufferLateness::LateUnderThreshold => { - gst::debug!(CAT, imp: self, "Discarding late {:?}", buf_mut); - state.num_drop += 1; - return Ok(gst::FlowSuccess::Ok); - } - - BufferLateness::LateOverThreshold => { - gst::debug!(CAT, imp: self, "Accepting late {:?}", buf_mut); - - let prev = state.out_buffer.as_ref().unwrap(); - let prev_duration = prev.duration().unwrap(); - - if let Some(audio_info) = &state.in_audio_info { - let mut map_info = buf_mut.map_writable().map_err(|e| { - gst::error!(CAT, imp: self, "Failed to map buffer: {}", e); - gst::FlowError::Error - })?; - - audio_info - .format_info() - .fill_silence(map_info.as_mut_slice()); - } else { - buf_mut.set_duration(Some(state.fallback_duration)); - } - - buf_mut.set_dts(prev.dts().map(|t| t + prev_duration)); - buf_mut.set_pts(prev.pts().map(|t| t + prev_duration)); - buf_mut.set_flags(gst::BufferFlags::GAP); - - timestamp = state.ts_range(buf_mut, state.out_segment.as_ref().unwrap()); - } + if lateness == BufferLateness::LateUnderThreshold { + gst::debug!(CAT, imp: self, "Discarding late {:?}", buf_mut); + state.num_drop += 1; + return Ok(gst::FlowSuccess::Ok); } gst::trace!(CAT, imp: self, "Queueing {:?} ({:?})", buffer, lateness); @@ -1135,57 +1100,49 @@ impl LiveSync { let mut segment = None; match in_buffer { - Some((buffer, lateness)) => { + Some((mut buffer, BufferLateness::OnTime)) => { state.num_in += 1; + if state + .out_buffer + .as_ref() + .map_or(false, |b| b.flags().contains(gst::BufferFlags::GAP)) + { + // We are done bridging a gap, so mark it as DISCONT instead + let buf_mut = buffer.make_mut(); + buf_mut.unset_flags(gst::BufferFlags::GAP); + buf_mut.set_flags(gst::BufferFlags::DISCONT); + } + state.out_buffer = Some(buffer); state.out_timestamp = state.in_timestamp; caps = state.pending_caps.take(); segment = state.pending_segment.take(); - - if lateness != BufferLateness::OnTime { - state.num_duplicate += 1; - } } + + Some((buffer, BufferLateness::LateOverThreshold)) if !state.pending_events() => { + gst::debug!(CAT, imp: self, "Accepting late {:?}", buffer); + state.num_in += 1; + + self.patch_output_buffer(&mut state, Some(buffer))?; + } + + Some((buffer, BufferLateness::LateOverThreshold)) => { + // Cannot accept late-over-threshold buffers while we have pending events + gst::debug!(CAT, imp: self, "Discarding late {:?}", buffer); + state.num_drop += 1; + + self.patch_output_buffer(&mut state, None)?; + } + None => { - // Work around borrow checker - let State { - fallback_duration, - out_buffer: ref mut buffer, - out_audio_info: ref audio_info, - .. - } = *state; - gst::debug!(CAT, imp: self, "Repeating {:?}", buffer); + self.patch_output_buffer(&mut state, None)?; + } - let buffer = buffer.as_mut().unwrap().make_mut(); - let prev_duration = buffer.duration().unwrap(); - - if let Some(audio_info) = audio_info { - if !buffer.flags().contains(gst::BufferFlags::GAP) { - let mut map_info = buffer.map_writable().map_err(|e| { - gst::error!(CAT, imp: self, "Failed to map buffer: {}", e); - gst::FlowError::Error - })?; - - audio_info - .format_info() - .fill_silence(map_info.as_mut_slice()); - } - } else { - buffer.set_duration(Some(fallback_duration)); - } - - buffer.set_dts(buffer.dts().map(|t| t + prev_duration)); - buffer.set_pts(buffer.pts().map(|t| t + prev_duration)); - buffer.set_flags(gst::BufferFlags::GAP); - buffer.unset_flags(gst::BufferFlags::DISCONT); - - state.out_timestamp = state.ts_range( - state.out_buffer.as_ref().unwrap(), - state.out_segment.as_ref().unwrap(), - ); - state.num_duplicate += 1; + Some((_, BufferLateness::LateUnderThreshold)) => { + // Is discarded before queueing + unreachable!(); } } @@ -1323,4 +1280,58 @@ impl LiveSync { details: details ); } + + /// Patches the output buffer for repeating, setting out_buffer and out_timestamp + fn patch_output_buffer( + &self, + state: &mut State, + source: Option, + ) -> Result<(), gst::FlowError> { + let out_buffer = state.out_buffer.as_mut().unwrap(); + + let duration = out_buffer.duration().unwrap(); + let dts = out_buffer.dts().map(|t| t + duration); + let pts = out_buffer.pts().map(|t| t + duration); + + if let Some(source) = source { + gst::debug!( + CAT, + imp: self, + "Repeating {:?} using {:?}", + out_buffer, + source + ); + *out_buffer = source; + } else { + gst::debug!(CAT, imp: self, "Repeating {:?}", out_buffer); + } + + let buffer = out_buffer.make_mut(); + + if let Some(audio_info) = &state.out_audio_info { + if !buffer.flags().contains(gst::BufferFlags::GAP) { + let mut map_info = buffer.map_writable().map_err(|e| { + gst::error!(CAT, imp: self, "Failed to map buffer: {}", e); + gst::FlowError::Error + })?; + audio_info + .format_info() + .fill_silence(map_info.as_mut_slice()); + } + } else { + buffer.set_duration(state.fallback_duration); + } + + buffer.set_dts(dts); + buffer.set_pts(pts); + buffer.set_flags(gst::BufferFlags::GAP); + buffer.unset_flags(gst::BufferFlags::DISCONT); + + state.out_timestamp = state.ts_range( + state.out_buffer.as_ref().unwrap(), + state.out_segment.as_ref().unwrap(), + ); + state.num_duplicate += 1; + Ok(()) + } }