onvifaggregator: refactor to support duration-less media buffers

For instance when dealing with a variable framerate media stream,
input media buffers may not hold a duration, in which case we try
to calculate one by waiting for the following buffer.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/765>
This commit is contained in:
Mathieu Duponchelle 2022-05-24 22:59:29 +02:00 committed by GStreamer Marge Bot
parent 77260a8442
commit ab01fc6143

View file

@ -45,6 +45,9 @@ impl PartialEq for MetaFrame {
struct State {
// FIFO of MetaFrames
meta_frames: BTreeSet<MetaFrame>,
// We may store the next buffer we output here while waiting
// for a future buffer, when we need one to calculate its duration
current_media_buffer: Option<gst::Buffer>,
}
pub struct OnvifAggregator {
@ -277,7 +280,7 @@ impl OnvifAggregator {
Ok(())
}
fn lookup_reference_timestamp(&self, buffer: gst::Buffer) -> Option<gst::ClockTime> {
fn lookup_reference_timestamp(&self, buffer: &gst::Buffer) -> Option<gst::ClockTime> {
for meta in buffer.iter_meta::<gst::ReferenceTimestampMeta>() {
if meta.reference().is_subset(&NTP_CAPS) {
return Some(meta.timestamp());
@ -287,7 +290,66 @@ impl OnvifAggregator {
None
}
// Called after consuming metadata buffers, we peek the current media buffer
fn media_buffer_duration(
&self,
element: &super::OnvifAggregator,
current_media_buffer: &gst::Buffer,
timeout: bool,
) -> Option<gst::ClockTime> {
match current_media_buffer.duration() {
Some(duration) => {
gst::log!(
CAT,
obj: element,
"Current media buffer has a duration, using it: {}",
duration
);
Some(duration)
}
None => {
if let Some(next_buffer) = self.media_sink_pad.peek_buffer() {
match next_buffer.pts().zip(current_media_buffer.pts()) {
Some((next_pts, current_pts)) => {
let duration = next_pts.saturating_sub(current_pts);
gst::log!(
CAT,
obj: element,
"calculated duration for current media buffer from next buffer: {}",
duration
);
Some(duration)
}
None => {
gst::log!(
CAT,
obj: element,
"could not calculate duration for current media buffer"
);
Some(gst::ClockTime::from_nseconds(0))
}
}
} else if timeout {
gst::log!(
CAT,
obj: element,
"could not calculate duration for current media buffer"
);
Some(gst::ClockTime::from_nseconds(0))
} else {
gst::trace!(
CAT,
obj: element,
"No next buffer to peek at yet to calculate duration"
);
None
}
}
}
}
// Called after consuming metadata buffers, we consume the current media buffer
// and output it when:
//
// * it does not have a reference timestamp meta
@ -299,51 +361,71 @@ impl OnvifAggregator {
element: &super::OnvifAggregator,
timeout: bool,
) -> Result<Option<(gst::Buffer, Option<gst::ClockTime>)>, gst::FlowError> {
if let Some(media_buffer) = self.media_sink_pad.peek_buffer() {
let duration = media_buffer.duration().ok_or_else(|| {
gst::error!(CAT, obj: element, "Require buffers with duration");
gst::FlowError::Error
})?;
if let Some(mut current_media_buffer) = state
.current_media_buffer
.take()
.or_else(|| self.media_sink_pad.pop_buffer())
{
if let Some(current_media_start) =
self.lookup_reference_timestamp(&current_media_buffer)
{
let duration =
match self.media_buffer_duration(element, &current_media_buffer, timeout) {
Some(duration) => {
// Update the buffer duration for good measure, in order to
// set a fully-accurate position later on in aggregate()
{
let buf_mut = current_media_buffer.make_mut();
buf_mut.set_duration(duration);
}
if let Some(start) = self.lookup_reference_timestamp(media_buffer) {
let end = start + duration;
duration
}
None => {
state.current_media_buffer = Some(current_media_buffer);
return Ok(None);
}
};
let end = current_media_start + duration;
if timeout {
gst::debug!(
CAT,
obj: element,
"Media buffer spanning {} -> {} is ready (timeout)",
start,
current_media_start,
end
);
Ok(Some((self.media_sink_pad.pop_buffer().unwrap(), Some(end))))
Ok(Some((current_media_buffer, Some(end))))
} else if self.meta_sink_pad.is_eos() {
gst::debug!(
CAT,
obj: element,
"Media buffer spanning {} -> {} is ready (meta pad is EOS)",
start,
current_media_start,
end
);
Ok(Some((self.media_sink_pad.pop_buffer().unwrap(), Some(end))))
Ok(Some((current_media_buffer, Some(end))))
} else if let Some(latest_frame) = state.meta_frames.iter().next_back() {
if latest_frame.timestamp > end {
gst::debug!(
CAT,
obj: element,
"Media buffer spanning {} -> {} is ready",
start,
current_media_start,
end
);
Ok(Some((self.media_sink_pad.pop_buffer().unwrap(), Some(end))))
Ok(Some((current_media_buffer, Some(end))))
} else {
gst::trace!(
CAT,
obj: element,
"Media buffer spanning {} -> {} isn't ready yet",
start,
current_media_start,
end
);
state.current_media_buffer = Some(current_media_buffer);
Ok(None)
}
} else {
@ -351,10 +433,10 @@ impl OnvifAggregator {
CAT,
obj: element,
"Media buffer spanning {} -> {} isn't ready yet",
start,
current_media_start,
end
);
state.current_media_buffer = Some(current_media_buffer);
Ok(None)
}
} else {
@ -364,10 +446,7 @@ impl OnvifAggregator {
"Consuming media buffer with no reference NTP timestamp"
);
Ok(Some((
self.media_sink_pad.pop_buffer().unwrap(),
gst::ClockTime::NONE,
)))
Ok(Some((current_media_buffer, gst::ClockTime::NONE)))
}
} else {
gst::trace!(CAT, obj: element, "No media buffer queued");
@ -383,6 +462,8 @@ impl AggregatorImpl for OnvifAggregator {
element: &Self::Type,
timeout: bool,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::trace!(CAT, obj: element, "aggregate, timeout: {}", timeout);
let mut state = self.state.lock().unwrap();
self.consume_meta(&mut state, element)?;
@ -434,7 +515,15 @@ impl AggregatorImpl for OnvifAggregator {
s.set("frames", buflist);
}
element.set_position(buffer.pts().opt_add(buffer.duration()));
let position = buffer.pts().opt_add(
buffer
.duration()
.unwrap_or_else(|| gst::ClockTime::from_nseconds(0)),
);
gst::log!(CAT, obj: element, "Updating position: {:?}", position);
element.set_position(position);
self.finish_buffer(element, buffer)
} else if self.media_sink_pad.is_eos() {