mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2024-11-26 05:21:00 +00:00
livesync: Handle flags and late buffer patching after queueing
This makes the chain function almost independent of the output state. We still do the early discard check with `buffer_is_backwards` so we don't try to queue buffers we can't use, allowing us to fast-forward upstream without blocking on the src task. Don't accept `LateOverThreshold` buffers when we have `pending_caps` or a `pending_segment`. We need to apply these first before we can sensibly patch buffers from the new stream. Deduplicate most of the output buffer patching code into a new `patch_output_buffer` method. For: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/450 Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1369>
This commit is contained in:
parent
7c48a299c3
commit
2f36bd5d77
1 changed files with 90 additions and 85 deletions
|
@ -505,6 +505,10 @@ impl State {
|
||||||
// Safe default
|
// Safe default
|
||||||
.unwrap_or(DEFAULT_DURATION);
|
.unwrap_or(DEFAULT_DURATION);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn pending_events(&self) -> bool {
|
||||||
|
self.pending_caps.is_some() || self.pending_segment.is_some()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl LiveSync {
|
impl LiveSync {
|
||||||
|
@ -912,54 +916,15 @@ impl LiveSync {
|
||||||
buf_mut.set_pts(pts.map(|t| t + state.latency));
|
buf_mut.set_pts(pts.map(|t| t + state.latency));
|
||||||
}
|
}
|
||||||
|
|
||||||
if state
|
let timestamp = state.ts_range(buf_mut, segment);
|
||||||
.out_buffer
|
|
||||||
.as_ref()
|
|
||||||
.map_or(false, |b| b.flags().contains(gst::BufferFlags::GAP))
|
|
||||||
{
|
|
||||||
// We are done bridging a gap, so mark it as DISCONT instead
|
|
||||||
buf_mut.unset_flags(gst::BufferFlags::GAP);
|
|
||||||
buf_mut.set_flags(gst::BufferFlags::DISCONT);
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut timestamp = state.ts_range(buf_mut, segment);
|
|
||||||
let lateness = self.buffer_is_backwards(&state, timestamp);
|
let lateness = self.buffer_is_backwards(&state, timestamp);
|
||||||
match lateness {
|
|
||||||
BufferLateness::OnTime => {}
|
|
||||||
|
|
||||||
BufferLateness::LateUnderThreshold => {
|
if lateness == BufferLateness::LateUnderThreshold {
|
||||||
gst::debug!(CAT, imp: self, "Discarding late {:?}", buf_mut);
|
gst::debug!(CAT, imp: self, "Discarding late {:?}", buf_mut);
|
||||||
state.num_drop += 1;
|
state.num_drop += 1;
|
||||||
return Ok(gst::FlowSuccess::Ok);
|
return Ok(gst::FlowSuccess::Ok);
|
||||||
}
|
}
|
||||||
|
|
||||||
BufferLateness::LateOverThreshold => {
|
|
||||||
gst::debug!(CAT, imp: self, "Accepting late {:?}", buf_mut);
|
|
||||||
|
|
||||||
let prev = state.out_buffer.as_ref().unwrap();
|
|
||||||
let prev_duration = prev.duration().unwrap();
|
|
||||||
|
|
||||||
if let Some(audio_info) = &state.in_audio_info {
|
|
||||||
let mut map_info = buf_mut.map_writable().map_err(|e| {
|
|
||||||
gst::error!(CAT, imp: self, "Failed to map buffer: {}", e);
|
|
||||||
gst::FlowError::Error
|
|
||||||
})?;
|
|
||||||
|
|
||||||
audio_info
|
|
||||||
.format_info()
|
|
||||||
.fill_silence(map_info.as_mut_slice());
|
|
||||||
} else {
|
|
||||||
buf_mut.set_duration(Some(state.fallback_duration));
|
|
||||||
}
|
|
||||||
|
|
||||||
buf_mut.set_dts(prev.dts().map(|t| t + prev_duration));
|
|
||||||
buf_mut.set_pts(prev.pts().map(|t| t + prev_duration));
|
|
||||||
buf_mut.set_flags(gst::BufferFlags::GAP);
|
|
||||||
|
|
||||||
timestamp = state.ts_range(buf_mut, state.out_segment.as_ref().unwrap());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
gst::trace!(CAT, imp: self, "Queueing {:?} ({:?})", buffer, lateness);
|
gst::trace!(CAT, imp: self, "Queueing {:?} ({:?})", buffer, lateness);
|
||||||
state.queue.push_back(Item::Buffer(buffer, lateness));
|
state.queue.push_back(Item::Buffer(buffer, lateness));
|
||||||
state.buffer_queued = true;
|
state.buffer_queued = true;
|
||||||
|
@ -1129,57 +1094,49 @@ impl LiveSync {
|
||||||
let mut segment = None;
|
let mut segment = None;
|
||||||
|
|
||||||
match in_buffer {
|
match in_buffer {
|
||||||
Some((buffer, lateness)) => {
|
Some((mut buffer, BufferLateness::OnTime)) => {
|
||||||
state.num_in += 1;
|
state.num_in += 1;
|
||||||
|
|
||||||
|
if state
|
||||||
|
.out_buffer
|
||||||
|
.as_ref()
|
||||||
|
.map_or(false, |b| b.flags().contains(gst::BufferFlags::GAP))
|
||||||
|
{
|
||||||
|
// We are done bridging a gap, so mark it as DISCONT instead
|
||||||
|
let buf_mut = buffer.make_mut();
|
||||||
|
buf_mut.unset_flags(gst::BufferFlags::GAP);
|
||||||
|
buf_mut.set_flags(gst::BufferFlags::DISCONT);
|
||||||
|
}
|
||||||
|
|
||||||
state.out_buffer = Some(buffer);
|
state.out_buffer = Some(buffer);
|
||||||
state.out_timestamp = state.in_timestamp;
|
state.out_timestamp = state.in_timestamp;
|
||||||
|
|
||||||
caps = state.pending_caps.take();
|
caps = state.pending_caps.take();
|
||||||
segment = state.pending_segment.take();
|
segment = state.pending_segment.take();
|
||||||
|
}
|
||||||
|
|
||||||
if lateness != BufferLateness::OnTime {
|
Some((buffer, BufferLateness::LateOverThreshold)) if !state.pending_events() => {
|
||||||
state.num_duplicate += 1;
|
gst::debug!(CAT, imp: self, "Accepting late {:?}", buffer);
|
||||||
|
state.num_in += 1;
|
||||||
|
|
||||||
|
self.patch_output_buffer(&mut state, Some(buffer))?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Some((buffer, BufferLateness::LateOverThreshold)) => {
|
||||||
|
// Cannot accept late-over-threshold buffers while we have pending events
|
||||||
|
gst::debug!(CAT, imp: self, "Discarding late {:?}", buffer);
|
||||||
|
state.num_drop += 1;
|
||||||
|
|
||||||
|
self.patch_output_buffer(&mut state, None)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
None => {
|
None => {
|
||||||
// Work around borrow checker
|
self.patch_output_buffer(&mut state, None)?;
|
||||||
let State {
|
|
||||||
fallback_duration,
|
|
||||||
out_buffer: ref mut buffer,
|
|
||||||
out_audio_info: ref audio_info,
|
|
||||||
..
|
|
||||||
} = *state;
|
|
||||||
gst::debug!(CAT, imp: self, "Repeating {:?}", buffer);
|
|
||||||
|
|
||||||
let buffer = buffer.as_mut().unwrap().make_mut();
|
|
||||||
let prev_duration = buffer.duration().unwrap();
|
|
||||||
|
|
||||||
if let Some(audio_info) = audio_info {
|
|
||||||
if !buffer.flags().contains(gst::BufferFlags::GAP) {
|
|
||||||
let mut map_info = buffer.map_writable().map_err(|e| {
|
|
||||||
gst::error!(CAT, imp: self, "Failed to map buffer: {}", e);
|
|
||||||
gst::FlowError::Error
|
|
||||||
})?;
|
|
||||||
|
|
||||||
audio_info
|
|
||||||
.format_info()
|
|
||||||
.fill_silence(map_info.as_mut_slice());
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
buffer.set_duration(Some(fallback_duration));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
buffer.set_dts(buffer.dts().map(|t| t + prev_duration));
|
Some((_, BufferLateness::LateUnderThreshold)) => {
|
||||||
buffer.set_pts(buffer.pts().map(|t| t + prev_duration));
|
// Is discarded before queueing
|
||||||
buffer.set_flags(gst::BufferFlags::GAP);
|
unreachable!();
|
||||||
buffer.unset_flags(gst::BufferFlags::DISCONT);
|
|
||||||
|
|
||||||
state.out_timestamp = state.ts_range(
|
|
||||||
state.out_buffer.as_ref().unwrap(),
|
|
||||||
state.out_segment.as_ref().unwrap(),
|
|
||||||
);
|
|
||||||
state.num_duplicate += 1;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1317,4 +1274,52 @@ impl LiveSync {
|
||||||
details: details
|
details: details
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Patches the output buffer for repeating, setting out_buffer and out_timestamp
|
||||||
|
fn patch_output_buffer(
|
||||||
|
&self,
|
||||||
|
state: &mut State,
|
||||||
|
source: Option<gst::Buffer>,
|
||||||
|
) -> Result<(), gst::FlowError> {
|
||||||
|
let out_buffer = state.out_buffer.as_mut().unwrap();
|
||||||
|
|
||||||
|
let duration = out_buffer.duration().unwrap();
|
||||||
|
let dts = out_buffer.dts().map(|t| t + duration);
|
||||||
|
let pts = out_buffer.pts().map(|t| t + duration);
|
||||||
|
|
||||||
|
if let Some(source) = source {
|
||||||
|
gst::debug!(CAT, imp: self, "Repeating {:?} using {:?}", out_buffer, source);
|
||||||
|
*out_buffer = source;
|
||||||
|
} else {
|
||||||
|
gst::debug!(CAT, imp: self, "Repeating {:?}", out_buffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
let buffer = out_buffer.make_mut();
|
||||||
|
|
||||||
|
if let Some(audio_info) = &state.out_audio_info {
|
||||||
|
if !buffer.flags().contains(gst::BufferFlags::GAP) {
|
||||||
|
let mut map_info = buffer.map_writable().map_err(|e| {
|
||||||
|
gst::error!(CAT, imp: self, "Failed to map buffer: {}", e);
|
||||||
|
gst::FlowError::Error
|
||||||
|
})?;
|
||||||
|
audio_info
|
||||||
|
.format_info()
|
||||||
|
.fill_silence(map_info.as_mut_slice());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
buffer.set_duration(state.fallback_duration);
|
||||||
|
}
|
||||||
|
|
||||||
|
buffer.set_dts(dts);
|
||||||
|
buffer.set_pts(pts);
|
||||||
|
buffer.set_flags(gst::BufferFlags::GAP);
|
||||||
|
buffer.unset_flags(gst::BufferFlags::DISCONT);
|
||||||
|
|
||||||
|
state.out_timestamp = state.ts_range(
|
||||||
|
state.out_buffer.as_ref().unwrap(),
|
||||||
|
state.out_segment.as_ref().unwrap(),
|
||||||
|
);
|
||||||
|
state.num_duplicate += 1;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue