From 505fab2e1ce662a752a0ea835f4a59851afd2236 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Fri, 21 Jun 2024 13:46:03 +0300 Subject: [PATCH] livesync: Allow queueing up to latency buffers This was already reported by the latency query, and not doing this would require to always put a queue before livesync. Part-of: --- utils/livesync/src/livesync/imp.rs | 60 ++++++++++++++++++++---------- 1 file changed, 40 insertions(+), 20 deletions(-) diff --git a/utils/livesync/src/livesync/imp.rs b/utils/livesync/src/livesync/imp.rs index 583441b9..f3e965a6 100644 --- a/utils/livesync/src/livesync/imp.rs +++ b/utils/livesync/src/livesync/imp.rs @@ -55,7 +55,7 @@ enum BufferLateness { #[derive(Debug)] enum Item { - Buffer(gst::Buffer, BufferLateness), + Buffer(gst::Buffer, Option, BufferLateness), Event(gst::Event), // SAFETY: Item needs to wait until the query and the receiver has returned Query(std::ptr::NonNull, mpsc::SyncSender), @@ -134,9 +134,6 @@ struct State { /// Queue between sinkpad and srcpad queue: VecDeque, - /// Whether our queue currently holds a buffer. We only allow one! - buffer_queued: bool, - /// Current buffer of our srcpad out_buffer: Option, @@ -199,7 +196,6 @@ impl Default for State { in_audio_info: None, out_audio_info: None, queue: VecDeque::with_capacity(32), - buffer_queued: false, out_buffer: None, out_buffer_duplicate: false, in_timestamp: None, @@ -506,6 +502,28 @@ impl State { fn pending_events(&self) -> bool { self.pending_caps.is_some() || self.pending_segment.is_some() } + + fn queue_filled(&self) -> bool { + let first_ts = self.queue.iter().find_map(|item| match item { + Item::Buffer(_, Some(Timestamps { start, .. }), _) => Some(*start), + _ => None, + }); + let Some(first_ts) = first_ts else { + return false; + }; + + let last_ts = self + .queue + .iter() + .rev() + .find_map(|item| match item { + Item::Buffer(_, Some(Timestamps { start, .. }), _) => Some(*start), + _ => None, + }) + .unwrap(); + + last_ts.saturating_sub(first_ts) > self.latency + } } impl LiveSync { @@ -563,8 +581,6 @@ impl LiveSync { // Ensure we drop any query response sender to unblock the sinkpad state.queue.clear(); - state.buffer_queued = false; - self.cond.notify_all(); } @@ -837,7 +853,7 @@ impl LiveSync { } } - while state.srcresult.is_ok() && state.buffer_queued { + while state.srcresult.is_ok() && state.queue_filled() { self.cond.wait(&mut state); } state.srcresult?; @@ -931,8 +947,9 @@ impl LiveSync { } gst::trace!(CAT, imp: self, "Queueing {:?} ({:?})", buffer, lateness); - state.queue.push_back(Item::Buffer(buffer, lateness)); - state.buffer_queued = true; + state + .queue + .push_back(Item::Buffer(buffer, timestamp, lateness)); state.in_timestamp = timestamp; self.cond.notify_all(); @@ -1038,15 +1055,16 @@ impl LiveSync { let in_buffer = match in_item { None => None, - Some(Item::Buffer(buffer, lateness)) => { - if self.buffer_is_early(&state, state.in_timestamp) { + Some(Item::Buffer(buffer, timestamp, lateness)) => { + if self.buffer_is_early(&state, timestamp) { // Try this buffer again on the next iteration - state.queue.push_front(Item::Buffer(buffer, lateness)); + state + .queue + .push_front(Item::Buffer(buffer, timestamp, lateness)); None } else { - state.buffer_queued = false; self.cond.notify_all(); - Some((buffer, lateness)) + Some((buffer, timestamp, lateness)) } } @@ -1102,7 +1120,7 @@ impl LiveSync { let mut segment = None; match in_buffer { - Some((mut buffer, BufferLateness::OnTime)) => { + Some((mut buffer, timestamp, BufferLateness::OnTime)) => { state.num_in += 1; if state.out_buffer.is_none() || state.out_buffer_duplicate { @@ -1112,20 +1130,22 @@ impl LiveSync { state.out_buffer = Some(buffer); state.out_buffer_duplicate = false; - state.out_timestamp = state.in_timestamp; + state.out_timestamp = timestamp; caps = state.pending_caps.take(); segment = state.pending_segment.take(); } - Some((buffer, BufferLateness::LateOverThreshold)) if !state.pending_events() => { + Some((buffer, _timestamp, BufferLateness::LateOverThreshold)) + if !state.pending_events() => + { gst::debug!(CAT, imp: self, "Accepting late {:?}", buffer); state.num_in += 1; self.patch_output_buffer(&mut state, Some(buffer))?; } - Some((buffer, BufferLateness::LateOverThreshold)) => { + Some((buffer, _timestamp, BufferLateness::LateOverThreshold)) => { // Cannot accept late-over-threshold buffers while we have pending events gst::debug!(CAT, imp: self, "Discarding late {:?}", buffer); state.num_drop += 1; @@ -1137,7 +1157,7 @@ impl LiveSync { self.patch_output_buffer(&mut state, None)?; } - Some((_, BufferLateness::LateUnderThreshold)) => { + Some((_, _, BufferLateness::LateUnderThreshold)) => { // Is discarded before queueing unreachable!(); }