livesync: Wait for the end timestamp of the previous buffer before looking at queue

Previously livesync was waiting for the start timestamp of the current
buffer after looking at the queue and right before pushing it
downstream. This meant that it generally looked too early at the queue
and especially that upstream had to provide the next buffer already at
the start timestamp of the previous one.

Instead, now wait before looking at the queue and wait for the end
timestamp of the previous buffer. Once the previous buffer has expired,
a new buffer really needs to be available or otherwise a filler buffer
has to be pushed downstream.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1250>
This commit is contained in:
Sebastian Dröge 2023-06-20 10:36:51 +03:00 committed by GStreamer Marge Bot
parent 52ded6e8cc
commit 1119ed6620

View file

@ -1028,6 +1028,42 @@ impl LiveSync {
} }
state.srcresult?; state.srcresult?;
if let Some(out_timestamp) = state.out_timestamp {
let sync_ts = out_timestamp.end;
let element = self.obj();
let base_time = element.base_time().ok_or_else(|| {
gst::error!(CAT, imp: self, "Missing base time");
gst::FlowError::Flushing
})?;
let clock = element.clock().ok_or_else(|| {
gst::error!(CAT, imp: self, "Missing clock");
gst::FlowError::Flushing
})?;
let clock_id = clock.new_single_shot_id(base_time + sync_ts);
state.clock_id = Some(clock_id.clone());
gst::trace!(
CAT,
imp: self,
"Waiting for clock to reach {}",
clock_id.time(),
);
let (res, _) = MutexGuard::unlocked(&mut state, || clock_id.wait());
gst::trace!(CAT, imp: self, "Clock returned {res:?}",);
if res == Err(gst::ClockError::Unscheduled) {
return Err(gst::FlowError::Flushing);
}
state.srcresult?;
state.clock_id = None;
}
let in_item = state.queue.pop_front(); let in_item = state.queue.pop_front();
gst::trace!(CAT, imp: self, "Unqueueing {:?}", in_item); gst::trace!(CAT, imp: self, "Unqueueing {:?}", in_item);
@ -1184,40 +1220,6 @@ impl LiveSync {
state.out_segment = Some(segment); state.out_segment = Some(segment);
} }
{
let element = self.obj();
let base_time = element.base_time().ok_or_else(|| {
gst::error!(CAT, imp: self, "Missing base time");
gst::FlowError::Flushing
})?;
let clock = element.clock().ok_or_else(|| {
gst::error!(CAT, imp: self, "Missing clock");
gst::FlowError::Flushing
})?;
let clock_id = clock.new_single_shot_id(base_time + sync_ts);
state.clock_id = Some(clock_id.clone());
gst::trace!(
CAT,
imp: self,
"Waiting for clock to reach {}",
clock_id.time(),
);
let (res, _) = MutexGuard::unlocked(&mut state, || clock_id.wait());
gst::trace!(CAT, imp: self, "Clock returned {res:?}",);
if res == Err(gst::ClockError::Unscheduled) {
return Err(gst::FlowError::Flushing);
}
state.srcresult?;
state.clock_id = None;
}
state.num_out += 1; state.num_out += 1;
if duplicate { if duplicate {
state.num_duplicate += 1; state.num_duplicate += 1;