mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2024-11-26 05:21:00 +00:00
onvif: Add onvifmetadataparse element
This splits XML metadata into separate frames and ensures properly timestamped metadata. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/781>
This commit is contained in:
parent
ef7ed2d953
commit
35b42b88d9
4 changed files with 772 additions and 91 deletions
|
@ -8,17 +8,104 @@
|
|||
#![allow(clippy::non_send_fields_in_send_ty)]
|
||||
|
||||
use gst::glib;
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
mod onvifaggregator;
|
||||
mod onvifdepay;
|
||||
mod onvifmetadataparse;
|
||||
mod onvifoverlay;
|
||||
mod onvifpay;
|
||||
|
||||
// Offset in nanoseconds from midnight 01-01-1900 (prime epoch) to
|
||||
// midnight 01-01-1970 (UNIX epoch)
|
||||
pub(crate) const PRIME_EPOCH_OFFSET: gst::ClockTime = gst::ClockTime::from_seconds(2_208_988_800);
|
||||
|
||||
pub(crate) static NTP_CAPS: Lazy<gst::Caps> =
|
||||
Lazy::new(|| gst::Caps::builder("timestamp/x-ntp").build());
|
||||
pub(crate) static UNIX_CAPS: Lazy<gst::Caps> =
|
||||
Lazy::new(|| gst::Caps::builder("timestamp/x-unix").build());
|
||||
|
||||
pub(crate) fn lookup_reference_timestamp(buffer: &gst::Buffer) -> Option<gst::ClockTime> {
|
||||
for meta in buffer.iter_meta::<gst::ReferenceTimestampMeta>() {
|
||||
if meta.reference().is_subset(&NTP_CAPS) {
|
||||
return Some(meta.timestamp());
|
||||
}
|
||||
if meta.reference().is_subset(&UNIX_CAPS) {
|
||||
return Some(meta.timestamp() + PRIME_EPOCH_OFFSET);
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
pub(crate) fn xml_from_buffer(buffer: &gst::Buffer) -> Result<minidom::Element, gst::ErrorMessage> {
|
||||
let map = buffer.map_readable().map_err(|_| {
|
||||
gst::error_msg!(gst::ResourceError::Read, ["Failed to map buffer readable"])
|
||||
})?;
|
||||
|
||||
let utf8 = std::str::from_utf8(&map).map_err(|err| {
|
||||
gst::error_msg!(
|
||||
gst::StreamError::Format,
|
||||
["Failed to decode buffer as UTF-8: {}", err]
|
||||
)
|
||||
})?;
|
||||
|
||||
let root = utf8.parse::<minidom::Element>().map_err(|err| {
|
||||
gst::error_msg!(
|
||||
gst::ResourceError::Read,
|
||||
["Failed to parse buffer as XML: {}", err]
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(root)
|
||||
}
|
||||
|
||||
pub(crate) fn iterate_video_analytics_frames(
|
||||
root: &minidom::Element,
|
||||
) -> impl Iterator<
|
||||
Item = Result<(chrono::DateTime<chrono::FixedOffset>, &minidom::Element), gst::ErrorMessage>,
|
||||
> {
|
||||
root.get_child("VideoAnalytics", "http://www.onvif.org/ver10/schema")
|
||||
.map(|analytics| {
|
||||
analytics.children().filter_map(|el| {
|
||||
// We are only interested in associating Frame metadata with video frames
|
||||
if el.is("Frame", "http://www.onvif.org/ver10/schema") {
|
||||
let timestamp = match el.attr("UtcTime") {
|
||||
Some(timestamp) => timestamp,
|
||||
None => {
|
||||
return Some(Err(gst::error_msg!(
|
||||
gst::ResourceError::Read,
|
||||
["Frame element has no UtcTime attribute"]
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
let dt = match chrono::DateTime::parse_from_rfc3339(timestamp) {
|
||||
Ok(dt) => dt,
|
||||
Err(err) => {
|
||||
return Some(Err(gst::error_msg!(
|
||||
gst::ResourceError::Read,
|
||||
["Failed to parse UtcTime {}: {}", timestamp, err]
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
Some(Ok((dt, el)))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
})
|
||||
.into_iter()
|
||||
.flatten()
|
||||
}
|
||||
|
||||
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
|
||||
onvifpay::register(plugin)?;
|
||||
onvifdepay::register(plugin)?;
|
||||
onvifaggregator::register(plugin)?;
|
||||
onvifoverlay::register(plugin)?;
|
||||
onvifmetadataparse::register(plugin)?;
|
||||
|
||||
gst::meta::CustomMeta::register("OnvifXMLFrameMeta", &[], |_, _, _, _| true);
|
||||
|
||||
|
|
|
@ -4,16 +4,11 @@ use gst::subclass::prelude::*;
|
|||
use gst_base::prelude::*;
|
||||
use gst_base::subclass::prelude::*;
|
||||
use gst_base::AGGREGATOR_FLOW_NEED_DATA;
|
||||
use minidom::Element;
|
||||
use once_cell::sync::Lazy;
|
||||
use std::collections::BTreeSet;
|
||||
use std::io::Cursor;
|
||||
use std::sync::Mutex;
|
||||
|
||||
// Offset in nanoseconds from midnight 01-01-1900 (prime epoch) to
|
||||
// midnight 01-01-1970 (UNIX epoch)
|
||||
const PRIME_EPOCH_OFFSET: gst::ClockTime = gst::ClockTime::from_seconds(2_208_988_800);
|
||||
|
||||
// Incoming metadata is split up frame-wise, and stored in a FIFO.
|
||||
#[derive(Eq, Clone)]
|
||||
struct MetaFrame {
|
||||
|
@ -67,9 +62,6 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
|||
)
|
||||
});
|
||||
|
||||
static NTP_CAPS: Lazy<gst::Caps> = Lazy::new(|| gst::Caps::builder("timestamp/x-ntp").build());
|
||||
static UNIX_CAPS: Lazy<gst::Caps> = Lazy::new(|| gst::Caps::builder("timestamp/x-unix").build());
|
||||
|
||||
#[glib::object_subclass]
|
||||
impl ObjectSubclass for OnvifAggregator {
|
||||
const NAME: &'static str = "GstOnvifAggregator";
|
||||
|
@ -196,104 +188,45 @@ impl OnvifAggregator {
|
|||
element: &super::OnvifAggregator,
|
||||
) -> Result<(), gst::FlowError> {
|
||||
while let Some(buffer) = self.meta_sink_pad.pop_buffer() {
|
||||
let buffer = buffer.into_mapped_buffer_readable().map_err(|_| {
|
||||
gst::element_error!(
|
||||
element,
|
||||
gst::ResourceError::Read,
|
||||
["Failed to map buffer readable"]
|
||||
);
|
||||
let root = crate::xml_from_buffer(&buffer).map_err(|err| {
|
||||
element.post_error_message(err);
|
||||
|
||||
gst::FlowError::Error
|
||||
})?;
|
||||
|
||||
let utf8 = std::str::from_utf8(buffer.as_ref()).map_err(|err| {
|
||||
gst::element_error!(
|
||||
element,
|
||||
gst::StreamError::Format,
|
||||
["Failed to decode buffer as UTF-8: {}", err]
|
||||
);
|
||||
for res in crate::iterate_video_analytics_frames(&root) {
|
||||
let (dt, el) = res.map_err(|err| {
|
||||
element.post_error_message(err);
|
||||
|
||||
gst::FlowError::Error
|
||||
})?;
|
||||
gst::FlowError::Error
|
||||
})?;
|
||||
|
||||
let root = utf8.parse::<Element>().map_err(|err| {
|
||||
gst::element_error!(
|
||||
element,
|
||||
gst::ResourceError::Read,
|
||||
["Failed to parse buffer as XML: {}", err]
|
||||
);
|
||||
let prime_dt_ns = crate::PRIME_EPOCH_OFFSET
|
||||
+ gst::ClockTime::from_nseconds(dt.timestamp_nanos() as u64);
|
||||
|
||||
gst::FlowError::Error
|
||||
})?;
|
||||
let mut writer = Cursor::new(Vec::new());
|
||||
el.write_to(&mut writer).map_err(|err| {
|
||||
gst::element_error!(
|
||||
element,
|
||||
gst::ResourceError::Write,
|
||||
["Failed to write back frame as XML: {}", err]
|
||||
);
|
||||
|
||||
if let Some(analytics) =
|
||||
root.get_child("VideoAnalytics", "http://www.onvif.org/ver10/schema")
|
||||
{
|
||||
for el in analytics.children() {
|
||||
// We are only interested in associating Frame metadata with video frames
|
||||
if el.is("Frame", "http://www.onvif.org/ver10/schema") {
|
||||
let timestamp = el.attr("UtcTime").ok_or_else(|| {
|
||||
gst::element_error!(
|
||||
element,
|
||||
gst::ResourceError::Read,
|
||||
["Frame element has no UtcTime attribute"]
|
||||
);
|
||||
gst::FlowError::Error
|
||||
})?;
|
||||
|
||||
gst::FlowError::Error
|
||||
})?;
|
||||
gst::trace!(CAT, "Consuming metadata buffer {}", prime_dt_ns);
|
||||
|
||||
let dt =
|
||||
chrono::DateTime::parse_from_rfc3339(timestamp).map_err(|err| {
|
||||
gst::element_error!(
|
||||
element,
|
||||
gst::ResourceError::Read,
|
||||
["Failed to parse UtcTime {}: {}", timestamp, err]
|
||||
);
|
||||
|
||||
gst::FlowError::Error
|
||||
})?;
|
||||
|
||||
let prime_dt_ns = PRIME_EPOCH_OFFSET
|
||||
+ gst::ClockTime::from_nseconds(dt.timestamp_nanos() as u64);
|
||||
|
||||
let mut writer = Cursor::new(Vec::new());
|
||||
el.write_to(&mut writer).map_err(|err| {
|
||||
gst::element_error!(
|
||||
element,
|
||||
gst::ResourceError::Write,
|
||||
["Failed to write back frame as XML: {}", err]
|
||||
);
|
||||
|
||||
gst::FlowError::Error
|
||||
})?;
|
||||
|
||||
gst::trace!(CAT, "Consuming metadata buffer {}", prime_dt_ns);
|
||||
|
||||
state.meta_frames.insert(MetaFrame {
|
||||
timestamp: prime_dt_ns,
|
||||
buffer: gst::Buffer::from_slice(writer.into_inner()),
|
||||
});
|
||||
}
|
||||
}
|
||||
state.meta_frames.insert(MetaFrame {
|
||||
timestamp: prime_dt_ns,
|
||||
buffer: gst::Buffer::from_slice(writer.into_inner()),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn lookup_reference_timestamp(&self, buffer: &gst::Buffer) -> Option<gst::ClockTime> {
|
||||
for meta in buffer.iter_meta::<gst::ReferenceTimestampMeta>() {
|
||||
if meta.reference().is_subset(&NTP_CAPS) {
|
||||
return Some(meta.timestamp());
|
||||
}
|
||||
if meta.reference().is_subset(&UNIX_CAPS) {
|
||||
return Some(meta.timestamp() + PRIME_EPOCH_OFFSET);
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
fn media_buffer_duration(
|
||||
&self,
|
||||
element: &super::OnvifAggregator,
|
||||
|
@ -371,7 +304,7 @@ impl OnvifAggregator {
|
|||
.or_else(|| self.media_sink_pad.pop_buffer())
|
||||
{
|
||||
if let Some(current_media_start) =
|
||||
self.lookup_reference_timestamp(¤t_media_buffer)
|
||||
crate::lookup_reference_timestamp(¤t_media_buffer)
|
||||
{
|
||||
let duration =
|
||||
match self.media_buffer_duration(element, ¤t_media_buffer, timeout) {
|
||||
|
|
636
net/onvif/src/onvifmetadataparse/imp.rs
Normal file
636
net/onvif/src/onvifmetadataparse/imp.rs
Normal file
|
@ -0,0 +1,636 @@
|
|||
// Copyright (C) 2022 Sebastian Dröge <sebastian@centricular.com>
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
|
||||
// If a copy of the MPL was not distributed with this file, You can obtain one at
|
||||
// <https://mozilla.org/MPL/2.0/>.
|
||||
//
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
use gst::glib;
|
||||
use gst::prelude::*;
|
||||
use gst::subclass::prelude::*;
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
use minidom::Element;
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Mutex;
|
||||
|
||||
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
||||
gst::DebugCategory::new(
|
||||
"onvifmetadataparse",
|
||||
gst::DebugColorFlags::empty(),
|
||||
Some("ONVIF Metadata Parser Element"),
|
||||
)
|
||||
});
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
struct Settings {
|
||||
latency: Option<gst::ClockTime>,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
struct State {
|
||||
// Initially queued buffers until we have a UTC time / PTS mapping
|
||||
pre_queued_buffers: Vec<gst::Buffer>,
|
||||
// Mapping of UTC time to PTS
|
||||
utc_time_pts_mapping: Option<(gst::ClockTime, gst::ClockTime)>,
|
||||
// UTC time -> XML
|
||||
queued_frames: BTreeMap<gst::ClockTime, Element>,
|
||||
// Configured latency
|
||||
configured_latency: gst::ClockTime,
|
||||
}
|
||||
|
||||
pub struct OnvifMetadataParse {
|
||||
srcpad: gst::Pad,
|
||||
sinkpad: gst::Pad,
|
||||
settings: Mutex<Settings>,
|
||||
state: Mutex<State>,
|
||||
}
|
||||
|
||||
impl OnvifMetadataParse {
|
||||
fn sink_chain(
|
||||
&self,
|
||||
_pad: &gst::Pad,
|
||||
element: &super::OnvifMetadataParse,
|
||||
buffer: gst::Buffer,
|
||||
) -> Result<gst::FlowSuccess, gst::FlowError> {
|
||||
gst::log!(CAT, obj: element, "Handling buffer {:?}", buffer);
|
||||
|
||||
let mut state = self.state.lock().unwrap();
|
||||
|
||||
let pts = match buffer.pts() {
|
||||
Some(pts) => pts,
|
||||
None => {
|
||||
gst::error!(CAT, obj: element, "Need buffers with PTS");
|
||||
return Err(gst::FlowError::Error);
|
||||
}
|
||||
};
|
||||
// First we need to get an UTC/PTS mapping. We wait up to the latency
|
||||
// for that and otherwise error out.
|
||||
if state.utc_time_pts_mapping.is_none() {
|
||||
let utc_time = crate::lookup_reference_timestamp(&buffer);
|
||||
if let Some(utc_time) = utc_time {
|
||||
let initial_pts = state
|
||||
.pre_queued_buffers
|
||||
.first()
|
||||
.map(|b| b.pts().unwrap())
|
||||
.unwrap_or(pts);
|
||||
let diff = pts.saturating_sub(initial_pts);
|
||||
let initial_utc_time = match utc_time.checked_sub(diff) {
|
||||
Some(initial_utc_time) => initial_utc_time,
|
||||
None => {
|
||||
gst::error!(CAT, obj: element, "Can't calculate initial UTC time");
|
||||
return Err(gst::FlowError::Error);
|
||||
}
|
||||
};
|
||||
|
||||
gst::info!(
|
||||
CAT,
|
||||
obj: element,
|
||||
"Calculated initial UTC/PTS mapping: {}/{}",
|
||||
initial_utc_time,
|
||||
initial_pts
|
||||
);
|
||||
state.utc_time_pts_mapping = Some((initial_utc_time, initial_pts));
|
||||
} else {
|
||||
state.pre_queued_buffers.push(buffer);
|
||||
|
||||
if let Some(front_pts) = state.pre_queued_buffers.first().map(|b| b.pts().unwrap())
|
||||
{
|
||||
if pts.saturating_sub(front_pts) >= state.configured_latency {
|
||||
gst::error!(
|
||||
CAT,
|
||||
obj: element,
|
||||
"Received no UTC time in the first {}",
|
||||
state.configured_latency
|
||||
);
|
||||
return Err(gst::FlowError::Error);
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(gst::FlowSuccess::Ok);
|
||||
}
|
||||
}
|
||||
|
||||
self.queue(element, &mut state, buffer)?;
|
||||
let buffers = self.drain(element, &mut state, Some(pts))?;
|
||||
|
||||
if let Some(buffers) = buffers {
|
||||
drop(state);
|
||||
self.srcpad.push_list(buffers)
|
||||
} else {
|
||||
Ok(gst::FlowSuccess::Ok)
|
||||
}
|
||||
}
|
||||
|
||||
fn queue(
|
||||
&self,
|
||||
element: &super::OnvifMetadataParse,
|
||||
state: &mut State,
|
||||
buffer: gst::Buffer,
|
||||
) -> Result<(), gst::FlowError> {
|
||||
let State {
|
||||
ref mut pre_queued_buffers,
|
||||
ref mut queued_frames,
|
||||
..
|
||||
} = &mut *state;
|
||||
|
||||
for buffer in pre_queued_buffers.drain(..).chain(std::iter::once(buffer)) {
|
||||
let root = crate::xml_from_buffer(&buffer).map_err(|err| {
|
||||
element.post_error_message(err);
|
||||
|
||||
gst::FlowError::Error
|
||||
})?;
|
||||
|
||||
for res in crate::iterate_video_analytics_frames(&root) {
|
||||
let (dt, el) = res.map_err(|err| {
|
||||
element.post_error_message(err);
|
||||
|
||||
gst::FlowError::Error
|
||||
})?;
|
||||
|
||||
let dt_unix_ns = gst::ClockTime::from_nseconds(dt.timestamp_nanos() as u64);
|
||||
|
||||
gst::trace!(
|
||||
CAT,
|
||||
obj: element,
|
||||
"Queueing frame with UTC time {}",
|
||||
dt_unix_ns
|
||||
);
|
||||
|
||||
let xml = queued_frames.entry(dt_unix_ns).or_insert_with(|| {
|
||||
Element::builder("VideoAnalytics", "http://www.onvif.org/ver10/schema")
|
||||
.prefix(Some("tt".into()), "http://www.onvif.org/ver10/schema")
|
||||
.unwrap()
|
||||
.build()
|
||||
});
|
||||
|
||||
xml.append_child(el.clone());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn drain(
|
||||
&self,
|
||||
element: &super::OnvifMetadataParse,
|
||||
state: &mut State,
|
||||
pts: Option<gst::ClockTime>,
|
||||
) -> Result<Option<gst::BufferList>, gst::FlowError> {
|
||||
let State {
|
||||
ref mut queued_frames,
|
||||
utc_time_pts_mapping,
|
||||
configured_latency,
|
||||
..
|
||||
} = &mut *state;
|
||||
|
||||
let utc_time_pts_mapping = match utc_time_pts_mapping {
|
||||
Some(utc_time_pts_mapping) => utc_time_pts_mapping,
|
||||
None => return Ok(None),
|
||||
};
|
||||
|
||||
let utc_time_to_pts = |utc_time: gst::ClockTime| {
|
||||
if utc_time < utc_time_pts_mapping.0 {
|
||||
let diff = utc_time_pts_mapping.0 - utc_time;
|
||||
utc_time_pts_mapping.1.checked_sub(diff)
|
||||
} else {
|
||||
let diff = utc_time - utc_time_pts_mapping.0;
|
||||
Some(utc_time_pts_mapping.1 + diff)
|
||||
}
|
||||
};
|
||||
|
||||
let mut buffers = Vec::new();
|
||||
|
||||
while !queued_frames.is_empty() {
|
||||
let utc_time = *queued_frames.iter().next().unwrap().0;
|
||||
let frame_pts = match utc_time_to_pts(utc_time) {
|
||||
Some(frame_pts) => frame_pts,
|
||||
None => {
|
||||
gst::warning!(CAT, obj: element, "UTC time {} outside segment", utc_time);
|
||||
gst::ClockTime::ZERO
|
||||
}
|
||||
};
|
||||
|
||||
// Not at EOS and not above the latency yet
|
||||
if pts.map_or(false, |pts| {
|
||||
pts.saturating_sub(frame_pts) < *configured_latency
|
||||
}) {
|
||||
break;
|
||||
}
|
||||
|
||||
let frame = queued_frames.remove(&utc_time).unwrap();
|
||||
|
||||
gst::trace!(
|
||||
CAT,
|
||||
obj: element,
|
||||
"Dequeueing frame with UTC time {} / PTS {}",
|
||||
utc_time,
|
||||
frame_pts
|
||||
);
|
||||
|
||||
let xml = Element::builder("MetadataStream", "http://www.onvif.org/ver10/schema")
|
||||
.prefix(Some("tt".into()), "http://www.onvif.org/ver10/schema")
|
||||
.unwrap()
|
||||
.append(frame)
|
||||
.build();
|
||||
|
||||
let mut vec = Vec::new();
|
||||
if let Err(err) = xml.write_to_decl(&mut vec) {
|
||||
gst::error!(CAT, obj: element, "Can't serialize XML element: {}", err);
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut buffer = gst::Buffer::from_mut_slice(vec);
|
||||
let buffer_ref = buffer.get_mut().unwrap();
|
||||
buffer_ref.set_pts(frame_pts);
|
||||
|
||||
gst::ReferenceTimestampMeta::add(
|
||||
buffer_ref,
|
||||
&crate::UNIX_CAPS,
|
||||
utc_time,
|
||||
gst::ClockTime::NONE,
|
||||
);
|
||||
|
||||
buffers.push(buffer);
|
||||
}
|
||||
|
||||
buffers.sort_by_key(|b| b.pts());
|
||||
|
||||
if !buffers.is_empty() {
|
||||
let mut buffer_list = gst::BufferList::new_sized(buffers.len());
|
||||
let buffer_list_ref = buffer_list.get_mut().unwrap();
|
||||
buffer_list_ref.extend(buffers);
|
||||
|
||||
Ok(Some(buffer_list))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn sink_event(
|
||||
&self,
|
||||
pad: &gst::Pad,
|
||||
element: &super::OnvifMetadataParse,
|
||||
event: gst::Event,
|
||||
) -> bool {
|
||||
gst::log!(CAT, obj: pad, "Handling event {:?}", event);
|
||||
|
||||
match event.view() {
|
||||
gst::EventView::Segment(_) | gst::EventView::Eos(_) => {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
let buffers = self.drain(element, &mut state, None).ok().flatten();
|
||||
state.pre_queued_buffers.clear();
|
||||
state.utc_time_pts_mapping = None;
|
||||
state.queued_frames.clear();
|
||||
drop(state);
|
||||
|
||||
if let Some(buffers) = buffers {
|
||||
if let Err(err) = self.srcpad.push_list(buffers) {
|
||||
gst::error!(CAT, obj: element, "Failed to drain frames: {}", err);
|
||||
}
|
||||
}
|
||||
|
||||
pad.event_default(Some(element), event)
|
||||
}
|
||||
gst::EventView::FlushStop(_) => {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
state.pre_queued_buffers.clear();
|
||||
state.queued_frames.clear();
|
||||
state.utc_time_pts_mapping = None;
|
||||
drop(state);
|
||||
pad.event_default(Some(element), event)
|
||||
}
|
||||
gst::EventView::Caps(ev) => {
|
||||
let settings = self.settings.lock().unwrap().clone();
|
||||
|
||||
let mut state = self.state.lock().unwrap();
|
||||
let previous_latency = state.configured_latency;
|
||||
let latency = if let Some(latency) = settings.latency {
|
||||
latency
|
||||
} else {
|
||||
let caps = ev.caps();
|
||||
let s = caps.structure(0).unwrap();
|
||||
let parsed = Some(true) == s.get("parsed").ok();
|
||||
|
||||
if parsed {
|
||||
gst::ClockTime::ZERO
|
||||
} else {
|
||||
gst::ClockTime::from_seconds(6)
|
||||
}
|
||||
};
|
||||
state.configured_latency = latency;
|
||||
drop(state);
|
||||
|
||||
gst::debug!(CAT, obj: element, "Configuring latency of {}", latency);
|
||||
if previous_latency != latency {
|
||||
let _ =
|
||||
element.post_message(gst::message::Latency::builder().src(element).build());
|
||||
}
|
||||
|
||||
let caps = self.srcpad.pad_template_caps();
|
||||
self.srcpad
|
||||
.push_event(gst::event::Caps::builder(&caps).build())
|
||||
}
|
||||
_ => pad.event_default(Some(element), event),
|
||||
}
|
||||
}
|
||||
|
||||
fn sink_query(
|
||||
&self,
|
||||
pad: &gst::Pad,
|
||||
element: &super::OnvifMetadataParse,
|
||||
query: &mut gst::QueryRef,
|
||||
) -> bool {
|
||||
gst::log!(CAT, obj: pad, "Handling query {:?}", query);
|
||||
|
||||
match query.view_mut() {
|
||||
gst::QueryViewMut::Caps(q) => {
|
||||
let caps = pad.pad_template_caps();
|
||||
let res = if let Some(filter) = q.filter() {
|
||||
filter.intersect_with_mode(&caps, gst::CapsIntersectMode::First)
|
||||
} else {
|
||||
caps
|
||||
};
|
||||
|
||||
q.set_result(&res);
|
||||
|
||||
true
|
||||
}
|
||||
gst::QueryViewMut::AcceptCaps(q) => {
|
||||
let caps = q.caps();
|
||||
let res = caps.can_intersect(&pad.pad_template_caps());
|
||||
q.set_result(res);
|
||||
|
||||
true
|
||||
}
|
||||
_ => pad.query_default(Some(element), query),
|
||||
}
|
||||
}
|
||||
|
||||
fn src_event(
|
||||
&self,
|
||||
pad: &gst::Pad,
|
||||
element: &super::OnvifMetadataParse,
|
||||
event: gst::Event,
|
||||
) -> bool {
|
||||
gst::log!(CAT, obj: pad, "Handling event {:?}", event);
|
||||
|
||||
match event.view() {
|
||||
gst::EventView::FlushStop(_) => {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
state.pre_queued_buffers.clear();
|
||||
state.queued_frames.clear();
|
||||
state.utc_time_pts_mapping = None;
|
||||
drop(state);
|
||||
pad.event_default(Some(element), event)
|
||||
}
|
||||
_ => pad.event_default(Some(element), event),
|
||||
}
|
||||
}
|
||||
|
||||
fn src_query(
|
||||
&self,
|
||||
pad: &gst::Pad,
|
||||
element: &super::OnvifMetadataParse,
|
||||
query: &mut gst::QueryRef,
|
||||
) -> bool {
|
||||
gst::log!(CAT, obj: pad, "Handling query {:?}", query);
|
||||
|
||||
match query.view_mut() {
|
||||
gst::QueryViewMut::Caps(q) => {
|
||||
let caps = pad.pad_template_caps();
|
||||
let res = if let Some(filter) = q.filter() {
|
||||
filter.intersect_with_mode(&caps, gst::CapsIntersectMode::First)
|
||||
} else {
|
||||
caps
|
||||
};
|
||||
|
||||
q.set_result(&res);
|
||||
|
||||
true
|
||||
}
|
||||
gst::QueryViewMut::AcceptCaps(q) => {
|
||||
let caps = q.caps();
|
||||
let res = caps.can_intersect(&pad.pad_template_caps());
|
||||
q.set_result(res);
|
||||
|
||||
true
|
||||
}
|
||||
gst::QueryViewMut::Latency(q) => {
|
||||
let mut upstream_query = gst::query::Latency::new();
|
||||
|
||||
let ret = self.sinkpad.peer_query(&mut upstream_query);
|
||||
|
||||
if ret {
|
||||
let (live, mut min, mut max) = upstream_query.result();
|
||||
|
||||
let state = self.state.lock().unwrap();
|
||||
min += state.configured_latency;
|
||||
max = max.map(|max| max + state.configured_latency);
|
||||
|
||||
q.set(live, min, max);
|
||||
|
||||
gst::debug!(
|
||||
CAT,
|
||||
obj: pad,
|
||||
"Latency query response: live {} min {} max {}",
|
||||
live,
|
||||
min,
|
||||
max.display()
|
||||
);
|
||||
}
|
||||
|
||||
ret
|
||||
}
|
||||
_ => pad.query_default(Some(element), query),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[glib::object_subclass]
|
||||
impl ObjectSubclass for OnvifMetadataParse {
|
||||
const NAME: &'static str = "OnvifMetadataParse";
|
||||
type Type = super::OnvifMetadataParse;
|
||||
type ParentType = gst::Element;
|
||||
|
||||
fn with_class(klass: &Self::Class) -> Self {
|
||||
let templ = klass.pad_template("sink").unwrap();
|
||||
let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink"))
|
||||
.chain_function(|pad, parent, buffer| {
|
||||
OnvifMetadataParse::catch_panic_pad_function(
|
||||
parent,
|
||||
|| Err(gst::FlowError::Error),
|
||||
|parse, element| parse.sink_chain(pad, element, buffer),
|
||||
)
|
||||
})
|
||||
.event_function(|pad, parent, event| {
|
||||
OnvifMetadataParse::catch_panic_pad_function(
|
||||
parent,
|
||||
|| false,
|
||||
|parse, element| parse.sink_event(pad, element, event),
|
||||
)
|
||||
})
|
||||
.query_function(|pad, parent, query| {
|
||||
OnvifMetadataParse::catch_panic_pad_function(
|
||||
parent,
|
||||
|| false,
|
||||
|parse, element| parse.sink_query(pad, element, query),
|
||||
)
|
||||
})
|
||||
.flags(gst::PadFlags::PROXY_ALLOCATION)
|
||||
.build();
|
||||
|
||||
let templ = klass.pad_template("src").unwrap();
|
||||
let srcpad = gst::Pad::builder_with_template(&templ, Some("src"))
|
||||
.event_function(|pad, parent, event| {
|
||||
OnvifMetadataParse::catch_panic_pad_function(
|
||||
parent,
|
||||
|| false,
|
||||
|parse, element| parse.src_event(pad, element, event),
|
||||
)
|
||||
})
|
||||
.query_function(|pad, parent, query| {
|
||||
OnvifMetadataParse::catch_panic_pad_function(
|
||||
parent,
|
||||
|| false,
|
||||
|parse, element| parse.src_query(pad, element, query),
|
||||
)
|
||||
})
|
||||
.flags(gst::PadFlags::PROXY_ALLOCATION)
|
||||
.flags(gst::PadFlags::FIXED_CAPS)
|
||||
.build();
|
||||
|
||||
Self {
|
||||
srcpad,
|
||||
sinkpad,
|
||||
settings: Mutex::default(),
|
||||
state: Mutex::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ObjectImpl for OnvifMetadataParse {
|
||||
fn properties() -> &'static [glib::ParamSpec] {
|
||||
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
|
||||
vec![glib::ParamSpecUInt64::new(
|
||||
"latency",
|
||||
"Latency",
|
||||
"Maximum latency to introduce for reordering metadata \
|
||||
(max=auto: 6s if unparsed input, 0s if parsed input)",
|
||||
0,
|
||||
u64::MAX,
|
||||
Settings::default()
|
||||
.latency
|
||||
.map(|l| l.nseconds())
|
||||
.unwrap_or(u64::MAX),
|
||||
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
|
||||
)]
|
||||
});
|
||||
|
||||
PROPERTIES.as_ref()
|
||||
}
|
||||
|
||||
fn set_property(
|
||||
&self,
|
||||
obj: &Self::Type,
|
||||
_id: usize,
|
||||
value: &glib::Value,
|
||||
pspec: &glib::ParamSpec,
|
||||
) {
|
||||
match pspec.name() {
|
||||
"latency" => {
|
||||
self.settings.lock().unwrap().latency = value.get().expect("type checked upstream");
|
||||
|
||||
let _ = obj.post_message(gst::message::Latency::builder().src(obj).build());
|
||||
}
|
||||
_ => unimplemented!(),
|
||||
};
|
||||
}
|
||||
|
||||
fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
|
||||
match pspec.name() {
|
||||
"latency" => self.settings.lock().unwrap().latency.to_value(),
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
}
|
||||
|
||||
fn constructed(&self, obj: &Self::Type) {
|
||||
self.parent_constructed(obj);
|
||||
|
||||
obj.add_pad(&self.sinkpad).unwrap();
|
||||
obj.add_pad(&self.srcpad).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
impl GstObjectImpl for OnvifMetadataParse {}
|
||||
|
||||
impl ElementImpl for OnvifMetadataParse {
|
||||
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
|
||||
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
|
||||
gst::subclass::ElementMetadata::new(
|
||||
"ONVIF Metadata Parser",
|
||||
"Metadata/Parser",
|
||||
"Parses ONVIF Timed XML Metadata",
|
||||
"Sebastian Dröge <sebastian@centricular.com>",
|
||||
)
|
||||
});
|
||||
|
||||
Some(&*ELEMENT_METADATA)
|
||||
}
|
||||
|
||||
fn pad_templates() -> &'static [gst::PadTemplate] {
|
||||
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
|
||||
let caps = gst::Caps::builder("application/x-onvif-metadata")
|
||||
.field("encoding", "utf8")
|
||||
.field("parsed", true)
|
||||
.build();
|
||||
let src_pad_template = gst::PadTemplate::new(
|
||||
"src",
|
||||
gst::PadDirection::Src,
|
||||
gst::PadPresence::Always,
|
||||
&caps,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let caps = gst::Caps::builder("application/x-onvif-metadata")
|
||||
.field("encoding", "utf8")
|
||||
.build();
|
||||
let sink_pad_template = gst::PadTemplate::new(
|
||||
"sink",
|
||||
gst::PadDirection::Sink,
|
||||
gst::PadPresence::Always,
|
||||
&caps,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
vec![src_pad_template, sink_pad_template]
|
||||
});
|
||||
|
||||
PAD_TEMPLATES.as_ref()
|
||||
}
|
||||
|
||||
fn change_state(
|
||||
&self,
|
||||
element: &Self::Type,
|
||||
transition: gst::StateChange,
|
||||
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
|
||||
gst::trace!(CAT, obj: element, "Changing state {:?}", transition);
|
||||
|
||||
if transition == gst::StateChange::PausedToReady {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
*state = State::default();
|
||||
}
|
||||
|
||||
let ret = self.parent_change_state(element, transition)?;
|
||||
|
||||
if transition == gst::StateChange::ReadyToPaused {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
*state = State::default();
|
||||
}
|
||||
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
25
net/onvif/src/onvifmetadataparse/mod.rs
Normal file
25
net/onvif/src/onvifmetadataparse/mod.rs
Normal file
|
@ -0,0 +1,25 @@
|
|||
// Copyright (C) 2022 Sebastian Dröge <sebastian@centricular.com>
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
|
||||
// If a copy of the MPL was not distributed with this file, You can obtain one at
|
||||
// <https://mozilla.org/MPL/2.0/>.
|
||||
//
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
use gst::glib;
|
||||
use gst::prelude::*;
|
||||
|
||||
mod imp;
|
||||
|
||||
glib::wrapper! {
|
||||
pub struct OnvifMetadataParse(ObjectSubclass<imp::OnvifMetadataParse>) @extends gst::Element, gst::Object;
|
||||
}
|
||||
|
||||
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
|
||||
gst::Element::register(
|
||||
Some(plugin),
|
||||
"onvifmetadataparse",
|
||||
gst::Rank::None,
|
||||
OnvifMetadataParse::static_type(),
|
||||
)
|
||||
}
|
Loading…
Reference in a new issue