mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2024-11-25 13:01:07 +00:00
livesync: Allow queueing up to latency buffers
This was already reported by the latency query, and not doing this would require to always put a queue before livesync. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1635>
This commit is contained in:
parent
a10577b42c
commit
505fab2e1c
1 changed files with 40 additions and 20 deletions
|
@ -55,7 +55,7 @@ enum BufferLateness {
|
|||
|
||||
#[derive(Debug)]
|
||||
enum Item {
|
||||
Buffer(gst::Buffer, BufferLateness),
|
||||
Buffer(gst::Buffer, Option<Timestamps>, BufferLateness),
|
||||
Event(gst::Event),
|
||||
// SAFETY: Item needs to wait until the query and the receiver has returned
|
||||
Query(std::ptr::NonNull<gst::QueryRef>, mpsc::SyncSender<bool>),
|
||||
|
@ -134,9 +134,6 @@ struct State {
|
|||
/// Queue between sinkpad and srcpad
|
||||
queue: VecDeque<Item>,
|
||||
|
||||
/// Whether our queue currently holds a buffer. We only allow one!
|
||||
buffer_queued: bool,
|
||||
|
||||
/// Current buffer of our srcpad
|
||||
out_buffer: Option<gst::Buffer>,
|
||||
|
||||
|
@ -199,7 +196,6 @@ impl Default for State {
|
|||
in_audio_info: None,
|
||||
out_audio_info: None,
|
||||
queue: VecDeque::with_capacity(32),
|
||||
buffer_queued: false,
|
||||
out_buffer: None,
|
||||
out_buffer_duplicate: false,
|
||||
in_timestamp: None,
|
||||
|
@ -506,6 +502,28 @@ impl State {
|
|||
fn pending_events(&self) -> bool {
|
||||
self.pending_caps.is_some() || self.pending_segment.is_some()
|
||||
}
|
||||
|
||||
fn queue_filled(&self) -> bool {
|
||||
let first_ts = self.queue.iter().find_map(|item| match item {
|
||||
Item::Buffer(_, Some(Timestamps { start, .. }), _) => Some(*start),
|
||||
_ => None,
|
||||
});
|
||||
let Some(first_ts) = first_ts else {
|
||||
return false;
|
||||
};
|
||||
|
||||
let last_ts = self
|
||||
.queue
|
||||
.iter()
|
||||
.rev()
|
||||
.find_map(|item| match item {
|
||||
Item::Buffer(_, Some(Timestamps { start, .. }), _) => Some(*start),
|
||||
_ => None,
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
last_ts.saturating_sub(first_ts) > self.latency
|
||||
}
|
||||
}
|
||||
|
||||
impl LiveSync {
|
||||
|
@ -563,8 +581,6 @@ impl LiveSync {
|
|||
|
||||
// Ensure we drop any query response sender to unblock the sinkpad
|
||||
state.queue.clear();
|
||||
state.buffer_queued = false;
|
||||
|
||||
self.cond.notify_all();
|
||||
}
|
||||
|
||||
|
@ -837,7 +853,7 @@ impl LiveSync {
|
|||
}
|
||||
}
|
||||
|
||||
while state.srcresult.is_ok() && state.buffer_queued {
|
||||
while state.srcresult.is_ok() && state.queue_filled() {
|
||||
self.cond.wait(&mut state);
|
||||
}
|
||||
state.srcresult?;
|
||||
|
@ -931,8 +947,9 @@ impl LiveSync {
|
|||
}
|
||||
|
||||
gst::trace!(CAT, imp: self, "Queueing {:?} ({:?})", buffer, lateness);
|
||||
state.queue.push_back(Item::Buffer(buffer, lateness));
|
||||
state.buffer_queued = true;
|
||||
state
|
||||
.queue
|
||||
.push_back(Item::Buffer(buffer, timestamp, lateness));
|
||||
state.in_timestamp = timestamp;
|
||||
self.cond.notify_all();
|
||||
|
||||
|
@ -1038,15 +1055,16 @@ impl LiveSync {
|
|||
let in_buffer = match in_item {
|
||||
None => None,
|
||||
|
||||
Some(Item::Buffer(buffer, lateness)) => {
|
||||
if self.buffer_is_early(&state, state.in_timestamp) {
|
||||
Some(Item::Buffer(buffer, timestamp, lateness)) => {
|
||||
if self.buffer_is_early(&state, timestamp) {
|
||||
// Try this buffer again on the next iteration
|
||||
state.queue.push_front(Item::Buffer(buffer, lateness));
|
||||
state
|
||||
.queue
|
||||
.push_front(Item::Buffer(buffer, timestamp, lateness));
|
||||
None
|
||||
} else {
|
||||
state.buffer_queued = false;
|
||||
self.cond.notify_all();
|
||||
Some((buffer, lateness))
|
||||
Some((buffer, timestamp, lateness))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1102,7 +1120,7 @@ impl LiveSync {
|
|||
let mut segment = None;
|
||||
|
||||
match in_buffer {
|
||||
Some((mut buffer, BufferLateness::OnTime)) => {
|
||||
Some((mut buffer, timestamp, BufferLateness::OnTime)) => {
|
||||
state.num_in += 1;
|
||||
|
||||
if state.out_buffer.is_none() || state.out_buffer_duplicate {
|
||||
|
@ -1112,20 +1130,22 @@ impl LiveSync {
|
|||
|
||||
state.out_buffer = Some(buffer);
|
||||
state.out_buffer_duplicate = false;
|
||||
state.out_timestamp = state.in_timestamp;
|
||||
state.out_timestamp = timestamp;
|
||||
|
||||
caps = state.pending_caps.take();
|
||||
segment = state.pending_segment.take();
|
||||
}
|
||||
|
||||
Some((buffer, BufferLateness::LateOverThreshold)) if !state.pending_events() => {
|
||||
Some((buffer, _timestamp, BufferLateness::LateOverThreshold))
|
||||
if !state.pending_events() =>
|
||||
{
|
||||
gst::debug!(CAT, imp: self, "Accepting late {:?}", buffer);
|
||||
state.num_in += 1;
|
||||
|
||||
self.patch_output_buffer(&mut state, Some(buffer))?;
|
||||
}
|
||||
|
||||
Some((buffer, BufferLateness::LateOverThreshold)) => {
|
||||
Some((buffer, _timestamp, BufferLateness::LateOverThreshold)) => {
|
||||
// Cannot accept late-over-threshold buffers while we have pending events
|
||||
gst::debug!(CAT, imp: self, "Discarding late {:?}", buffer);
|
||||
state.num_drop += 1;
|
||||
|
@ -1137,7 +1157,7 @@ impl LiveSync {
|
|||
self.patch_output_buffer(&mut state, None)?;
|
||||
}
|
||||
|
||||
Some((_, BufferLateness::LateUnderThreshold)) => {
|
||||
Some((_, _, BufferLateness::LateUnderThreshold)) => {
|
||||
// Is discarded before queueing
|
||||
unreachable!();
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue