onvifaggregator: refactor, expect parsed metadata

The aggregator was consuming meta buffers too greedily, causing
potential interleaving starvation upstream. Refactor to consume
media and meta buffers synchronously

Also expect parsed=true metadata caps (requiring an upstream
onvifmetadataparse element).
This commit is contained in:
Mathieu Duponchelle 2022-06-03 18:32:46 +02:00 committed by Sebastian Dröge
parent 21da753607
commit 3011764da1
2 changed files with 194 additions and 309 deletions

View file

@ -5,41 +5,12 @@ use gst_base::prelude::*;
use gst_base::subclass::prelude::*;
use gst_base::AGGREGATOR_FLOW_NEED_DATA;
use once_cell::sync::Lazy;
use std::collections::BTreeSet;
use std::io::Cursor;
use std::sync::Mutex;
// Incoming metadata is split up frame-wise, and stored in a FIFO.
#[derive(Eq, Clone)]
struct MetaFrame {
// From UtcTime attribute, in nanoseconds since prime epoch
timestamp: gst::ClockTime,
// The frame element, dumped to XML
buffer: gst::Buffer,
}
impl Ord for MetaFrame {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.timestamp.cmp(&other.timestamp)
}
}
impl PartialOrd for MetaFrame {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for MetaFrame {
fn eq(&self, other: &Self) -> bool {
self.timestamp == other.timestamp
}
}
#[derive(Default)]
struct State {
// FIFO of MetaFrames
meta_frames: BTreeSet<MetaFrame>,
meta_frames: Vec<gst::Buffer>,
// We may store the next buffer we output here while waiting
// for a future buffer, when we need one to calculate its duration
current_media_buffer: Option<gst::Buffer>,
@ -124,6 +95,7 @@ impl ElementImpl for OnvifAggregator {
.unwrap();
let meta_caps = gst::Caps::builder("application/x-onvif-metadata")
.field("parsed", true)
.field("encoding", "utf8")
.build();
@ -180,51 +152,30 @@ impl ElementImpl for OnvifAggregator {
}
impl OnvifAggregator {
// We simply consume all the incoming meta buffers and store them in a FIFO
// as they arrive
fn consume_meta(
&self,
state: &mut State,
element: &super::OnvifAggregator,
) -> Result<(), gst::FlowError> {
while let Some(buffer) = self.meta_sink_pad.pop_buffer() {
let root = crate::xml_from_buffer(&buffer).map_err(|err| {
element.post_error_message(err);
gst::FlowError::Error
})?;
for res in crate::iterate_video_analytics_frames(&root) {
let (dt, el) = res.map_err(|err| {
element.post_error_message(err);
gst::FlowError::Error
})?;
let prime_dt_ns = crate::PRIME_EPOCH_OFFSET
+ gst::ClockTime::from_nseconds(dt.timestamp_nanos() as u64);
let mut writer = Cursor::new(Vec::new());
el.write_to(&mut writer).map_err(|err| {
end: gst::ClockTime,
) -> Result<bool, gst::FlowError> {
while let Some(buffer) = self.meta_sink_pad.peek_buffer() {
let meta_ts = crate::lookup_reference_timestamp(&buffer).ok_or_else(|| {
gst::element_error!(
element,
gst::ResourceError::Write,
["Failed to write back frame as XML: {}", err]
gst::ResourceError::Read,
["Parsed metadata buffer should hold reference timestamp"]
);
gst::FlowError::Error
})?;
gst::trace!(CAT, "Consuming metadata buffer {}", prime_dt_ns);
state.meta_frames.insert(MetaFrame {
timestamp: prime_dt_ns,
buffer: gst::Buffer::from_slice(writer.into_inner()),
});
if meta_ts <= end {
let buffer = self.meta_sink_pad.pop_buffer().unwrap();
state.meta_frames.push(buffer);
} else {
return Ok(true);
}
}
Ok(())
Ok(self.meta_sink_pad.is_eos())
}
fn media_buffer_duration(
@ -286,19 +237,13 @@ impl OnvifAggregator {
}
}
// Called after consuming metadata buffers, we consume the current media buffer
// and output it when:
//
// * it does not have a reference timestamp meta
// * we have timed out
// * we have consumed a metadata buffer for a future frame
fn consume_media(
&self,
state: &mut State,
element: &super::OnvifAggregator,
timeout: bool,
) -> Result<Option<(gst::Buffer, Option<gst::ClockTime>)>, gst::FlowError> {
if let Some(mut current_media_buffer) = state
) -> Result<Option<gst::Buffer>, gst::FlowError> {
if let Some(current_media_buffer) = state
.current_media_buffer
.take()
.or_else(|| self.media_sink_pad.pop_buffer())
@ -306,88 +251,26 @@ impl OnvifAggregator {
if let Some(current_media_start) =
crate::lookup_reference_timestamp(&current_media_buffer)
{
let duration =
match self.media_buffer_duration(element, &current_media_buffer, timeout) {
Some(duration) => {
// Update the buffer duration for good measure, in order to
// set a fully-accurate position later on in aggregate()
{
let buf_mut = current_media_buffer.make_mut();
buf_mut.set_duration(duration);
}
let end = current_media_start + duration;
duration
if self.consume_meta(state, element, end)? {
Ok(Some(current_media_buffer))
} else {
state.current_media_buffer = Some(current_media_buffer);
Ok(None)
}
}
None => {
state.current_media_buffer = Some(current_media_buffer);
return Ok(None);
}
};
let end = current_media_start + duration;
if timeout {
gst::debug!(
CAT,
obj: element,
"Media buffer spanning {} -> {} is ready (timeout)",
current_media_start,
end
);
Ok(Some((current_media_buffer, Some(end))))
} else if self.meta_sink_pad.is_eos() {
gst::debug!(
CAT,
obj: element,
"Media buffer spanning {} -> {} is ready (meta pad is EOS)",
current_media_start,
end
);
Ok(Some((current_media_buffer, Some(end))))
} else if let Some(latest_frame) = state.meta_frames.iter().next_back() {
if latest_frame.timestamp > end {
gst::debug!(
CAT,
obj: element,
"Media buffer spanning {} -> {} is ready",
current_media_start,
end
);
Ok(Some((current_media_buffer, Some(end))))
} else {
gst::trace!(
CAT,
obj: element,
"Media buffer spanning {} -> {} isn't ready yet",
current_media_start,
end
);
state.current_media_buffer = Some(current_media_buffer);
Ok(None)
}
} else {
gst::trace!(
CAT,
obj: element,
"Media buffer spanning {} -> {} isn't ready yet",
current_media_start,
end
);
state.current_media_buffer = Some(current_media_buffer);
Ok(None)
}
} else {
gst::debug!(
CAT,
obj: element,
"Consuming media buffer with no reference NTP timestamp"
);
Ok(Some((current_media_buffer, gst::ClockTime::NONE)))
Ok(Some(current_media_buffer))
}
} else {
gst::trace!(CAT, obj: element, "No media buffer queued");
Ok(None)
}
}
@ -403,42 +286,14 @@ impl AggregatorImpl for OnvifAggregator {
let mut state = self.state.lock().unwrap();
self.consume_meta(&mut state, element)?;
// When the current media buffer is ready, we attach all matching metadata buffers
// and push it out
if let Some((mut buffer, end)) = self.consume_media(&mut state, element, timeout)? {
if let Some(mut buffer) = self.consume_media(&mut state, element, timeout)? {
let mut buflist = gst::BufferList::new();
if let Some(end) = end {
let mut split_at: Option<MetaFrame> = None;
{
let buflist_mut = buflist.get_mut().unwrap();
for frame in state.meta_frames.iter() {
if frame.timestamp > end {
gst::trace!(
CAT,
obj: element,
"keeping metadata buffer at {} for next media buffer",
frame.timestamp
);
split_at = Some(frame.clone());
break;
} else {
gst::debug!(
CAT,
obj: element,
"Attaching meta buffer {}",
frame.timestamp
);
buflist_mut.add(frame.buffer.clone());
}
}
if let Some(split_at) = split_at {
state.meta_frames = state.meta_frames.split_off(&split_at);
} else {
state.meta_frames.clear();
for frame in state.meta_frames.drain(..) {
buflist_mut.add(frame);
}
}

View file

@ -456,26 +456,41 @@ impl OnvifOverlay {
gst::FlowError::Error
})?;
for object in root.children() {
for object in root
.get_child("VideoAnalytics", "http://www.onvif.org/ver10/schema")
.map(|el| el.children().into_iter().collect())
.unwrap_or_else(Vec::new)
{
if object.is("Frame", "http://www.onvif.org/ver10/schema") {
for object in object.children() {
if object.is("Object", "http://www.onvif.org/ver10/schema") {
gst::trace!(CAT, obj: element, "Handling object {:?}", object);
let object_id = match object.attr("ObjectId") {
Some(id) => id.to_string(),
None => {
gst::warning!(CAT, obj: element, "XML Object with no ObjectId");
gst::warning!(
CAT,
obj: element,
"XML Object with no ObjectId"
);
continue;
}
};
if !object_ids.insert(object_id.clone()) {
gst::debug!(CAT, "Skipping older version of object {}", object_id);
gst::debug!(
CAT,
"Skipping older version of object {}",
object_id
);
continue;
}
let appearance = match object
.get_child("Appearance", "http://www.onvif.org/ver10/schema")
{
let appearance = match object.get_child(
"Appearance",
"http://www.onvif.org/ver10/schema",
) {
Some(appearance) => appearance,
None => continue,
};
@ -490,13 +505,17 @@ impl OnvifOverlay {
let tag = appearance
.get_child("Class", "http://www.onvif.org/ver10/schema")
.and_then(|class| {
class.get_child("Type", "http://www.onvif.org/ver10/schema")
class.get_child(
"Type",
"http://www.onvif.org/ver10/schema",
)
})
.map(|t| t.text());
let bbox = match shape
.get_child("BoundingBox", "http://www.onvif.org/ver10/schema")
{
let bbox = match shape.get_child(
"BoundingBox",
"http://www.onvif.org/ver10/schema",
) {
Some(bbox) => bbox,
None => {
gst::warning!(
@ -508,8 +527,8 @@ impl OnvifOverlay {
}
};
let left: f64 = match bbox.attr("left").and_then(|val| val.parse().ok())
{
let left: f64 =
match bbox.attr("left").and_then(|val| val.parse().ok()) {
Some(val) => val,
None => {
gst::warning!(
@ -534,7 +553,8 @@ impl OnvifOverlay {
}
};
let top: f64 = match bbox.attr("top").and_then(|val| val.parse().ok()) {
let top: f64 =
match bbox.attr("top").and_then(|val| val.parse().ok()) {
Some(val) => val,
None => {
gst::warning!(
@ -546,8 +566,10 @@ impl OnvifOverlay {
}
};
let bottom: f64 =
match bbox.attr("bottom").and_then(|val| val.parse().ok()) {
let bottom: f64 = match bbox
.attr("bottom")
.and_then(|val| val.parse().ok())
{
Some(val) => val,
None => {
gst::warning!(
@ -569,11 +591,13 @@ impl OnvifOverlay {
let mut points = vec![];
if let Some(polygon) =
shape.get_child("Polygon", "http://www.onvif.org/ver10/schema")
if let Some(polygon) = shape
.get_child("Polygon", "http://www.onvif.org/ver10/schema")
{
for point in polygon.children() {
if point.is("Point", "http://www.onvif.org/ver10/schema") {
if point
.is("Point", "http://www.onvif.org/ver10/schema")
{
let px: f64 = match point
.attr("x")
.and_then(|val| val.parse().ok())
@ -604,11 +628,15 @@ impl OnvifOverlay {
}
};
let px = width / 2 + ((px * (width / 2) as f64) as i32);
let px = (px as u32).saturating_sub(x1 as u32).min(w);
let px =
width / 2 + ((px * (width / 2) as f64) as i32);
let px =
(px as u32).saturating_sub(x1 as u32).min(w);
let py = height / 2 - ((py * (height / 2) as f64) as i32);
let py = (py as u32).saturating_sub(y1 as u32).min(h);
let py = height / 2
- ((py * (height / 2) as f64) as i32);
let py =
(py as u32).saturating_sub(y1 as u32).min(h);
points.push(Point { x: px, y: py });
}
@ -626,6 +654,8 @@ impl OnvifOverlay {
}
}
}
}
}
if !frames.is_empty() {
self.overlay_shapes(&mut state, element, shapes);