analytics: Add new analyticscombiner / analyticssplitter elements

These batch buffers from one or more streams into a single stream via
the GstAnalyticsBatchMeta and allow splitting that single stream into
the individual ones again later.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2317>
This commit is contained in:
Sebastian Dröge 2025-06-19 15:34:36 +03:00 committed by GStreamer Marge Bot
parent a6b22bc047
commit 67175f70d9
11 changed files with 3451 additions and 8 deletions

1
Cargo.lock generated
View file

@ -2678,6 +2678,7 @@ dependencies = [
"gstreamer",
"gstreamer-analytics",
"gstreamer-base",
"gstreamer-check",
"gstreamer-rtp",
"gstreamer-video",
"xmltree",

View file

@ -1,13 +1,18 @@
[package]
name = "gst-plugin-analytics"
version.workspace = true
authors = ["Benjamin Gaignard <benjamin.gaignard@collabora.com>"]
authors = ["Benjamin Gaignard <benjamin.gaignard@collabora.com>", "Sebastian Dröge <sebastian@centricular.com>"]
repository.workspace = true
license = "MPL-2.0"
description = "GStreamer Rust Analytics Plugin"
edition.workspace = true
rust-version.workspace = true
[lib]
name = "gstrsanalytics"
crate-type = ["cdylib", "rlib"]
path = "src/lib.rs"
[dependencies]
gst = { workspace = true, features = ["v1_24"] }
gst-rtp = { workspace = true, features = ["v1_24"] }
@ -18,10 +23,8 @@ chrono = { version = "0.4.31", default-features = false }
xmltree = "0.11"
glib = { workspace = true, features = ["v2_62"] }
[lib]
name = "gstrsanalytics"
crate-type = ["cdylib", "rlib"]
path = "src/lib.rs"
[dev-dependencies]
gst-check.workspace = true
[build-dependencies]
gst-plugin-version-helper.workspace = true
@ -29,7 +32,16 @@ gst-plugin-version-helper.workspace = true
[features]
static = []
capi = []
doc = ["gst/v1_18"]
doc = []
v1_28 = ["gst-analytics/v1_28"]
[[test]]
name = "analyticscombiner"
required-features = ["v1_28"]
[[test]]
name = "analyticssplitter"
required-features = ["v1_28"]
[package.metadata.capi]
min_version = "0.9.21"

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,97 @@
// SPDX-License-Identifier: MPL-2.0
/**
* SECTION:element-analyticscombiner
* @see_also: analyticssplitter
*
* `analyticscombiner` is a generic, media-agnostic stream combiner element for analytics purposes.
*
* Buffers and serialized events of one or more streams are combined into batches of a specific
* duration, which can be configured via the `batch-duration` property. The batches are pushed
* downstream as empty buffers with the `GstAnalyticsBatchMeta`, which contains the original
* data flow of each stream. The order of the streams inside the `GstAnalyticsBatchMeta` are
* defined by the `index` property on each of the sink pads, which defaults to being the index of
* all sink pads when sorted according to the pad name.
*
* The caps on the source pad are of type `multistream/x-analytics-batch` and contain the original
* caps of each stream in the `streams` field in the same order as they appear in the
* `GstAnalyticsBatchMeta`. Caps negotiation ensures that downstream can provide constraints for
* each of the input streams.
*
* By default all buffers that start inside a batch are included in the output. Via the
* `batch-strategy` property on the sink pads this behaviour can be modified.
*
* The intended usage is to follow `analyticscombiner` by one or more inference elements that
* process the combined stream, attach additional meta on the buffers, and then pass through
* `analyticssplitter` that then splits the combined stream into its original streams again.
*
* ## Usage
*
* ```shell
* gst-launch-1.0 filesrc location=file-1.mp4 ! decodebin3 ! combiner.sink_0 \
* filesrc location=file-2.mp4 ! decodebin3 ! combiner.sink_1 \
* analyticscombiner name=combiner batch-duration=100000000
* sink_0::batch-strategy=first-in-batch-with-overlay \
* sink_1::batch-strategy=first-in-batch-with-overlay ! \
* inference-elements ! \
* analyticssplitter name=splitter \
* splitter.src_0_0 ! objectdetectionoverlay ! videoconvert ! autovideosink \
* splitter.src_0_1 ! objectdetectionoverlay ! videoconvert ! autovideosink
* ```
*
* Since: plugins-rs-0.14.0
*/
use glib::prelude::*;
mod imp;
glib::wrapper! {
pub struct AnalyticsCombiner(ObjectSubclass<imp::AnalyticsCombiner>) @extends gst_base::Aggregator, gst::Element, gst::Object, @implements gst::ChildProxy;
}
glib::wrapper! {
pub struct AnalyticsCombinerSinkPad(ObjectSubclass<imp::AnalyticsCombinerSinkPad>) @extends gst_base::AggregatorPad, gst::Pad, gst::Object;
}
#[derive(Debug, Copy, Clone, Default, PartialEq, Eq, glib::Enum)]
#[enum_type(name = "GstAnalyticsCombinerBatchStrategy")]
#[repr(C)]
pub enum BatchStrategy {
#[default]
#[enum_value(
name = "All: Include all buffers that start inside the batch.",
nick = "all"
)]
All,
#[enum_value(
name = "FirstInBatch: Include only the first buffer that starts inside the batch.",
nick = "first-in-batch"
)]
FirstInBatch,
#[enum_value(
name = "FirstInBatchWithOverlap: Include only the first buffer that starts inside the batch unless there was a previously unused buffer at most half a batch duration earlier. If no buffer is available, allow taking a buffer up to half a batch duration later. The buffer closest to the batch start is included.",
nick = "first-in-batch-with-overlap"
)]
FirstInBatchWithOverlap,
#[enum_value(
name = "LastInBatch: Include only the last buffer that starts inside the batch.",
nick = "last-in-batch"
)]
LastInBatch,
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
#[cfg(feature = "doc")]
{
use gst::prelude::*;
BatchStrategy::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
AnalyticsCombinerSinkPad::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
}
gst::Element::register(
Some(plugin),
"analyticscombiner",
gst::Rank::NONE,
AnalyticsCombiner::static_type(),
)
}

View file

@ -1,4 +1,5 @@
// Copyright (C) 2024 Benjamin Gaignard <benjamin.gaignard@collabora.com>
// Copyright (C) 2025 Sebastian Dröge <sebastian@centricular.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
@ -20,6 +21,11 @@ pub(crate) const ONVIF_METADATA_PREFIX: &str = "tt";
mod onvifmeta2relationmeta;
mod relationmeta2onvifmeta;
#[cfg(feature = "v1_28")]
mod combiner;
#[cfg(feature = "v1_28")]
mod splitter;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
relationmeta2onvifmeta::register(plugin)?;
onvifmeta2relationmeta::register(plugin)?;
@ -28,6 +34,12 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::meta::CustomMeta::register("OnvifXMLFrameMeta", &[]);
}
#[cfg(feature = "v1_28")]
{
combiner::register(plugin)?;
splitter::register(plugin)?;
}
Ok(())
}

View file

@ -0,0 +1,444 @@
// SPDX-License-Identifier: MPL-2.0
use gst::{prelude::*, subclass::prelude::*};
use std::{
mem,
sync::{LazyLock, Mutex},
};
static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
gst::DebugCategory::new(
"analyticssplitter",
gst::DebugColorFlags::empty(),
Some("Analytics batch / splitter element"),
)
});
struct Stream {
pad: gst::Pad,
caps: gst::Caps,
}
struct State {
generation: usize,
streams: Vec<Stream>,
combiner: Option<gst_base::UniqueFlowCombiner>,
}
impl Default for State {
fn default() -> Self {
Self {
generation: 0,
streams: Vec::new(),
combiner: Some(gst_base::UniqueFlowCombiner::default()),
}
}
}
pub struct AnalyticsSplitter {
sinkpad: gst::Pad,
state: Mutex<State>,
}
#[glib::object_subclass]
impl ObjectSubclass for AnalyticsSplitter {
const NAME: &'static str = "GstAnalyticsSplitter";
type Type = super::AnalyticsSplitter;
type ParentType = gst::Element;
fn with_class(klass: &Self::Class) -> Self {
let templ = klass.pad_template("sink").unwrap();
let sinkpad = gst::Pad::builder_from_template(&templ)
.chain_function(|pad, parent, buffer| {
AnalyticsSplitter::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|self_| self_.sink_chain(pad, buffer),
)
})
.event_function(|pad, parent, event| {
AnalyticsSplitter::catch_panic_pad_function(
parent,
|| false,
|self_| self_.sink_event(pad, event),
)
})
.query_function(|pad, parent, query| {
AnalyticsSplitter::catch_panic_pad_function(
parent,
|| false,
|self_| self_.sink_query(pad, query),
)
})
.build();
Self {
sinkpad,
state: Mutex::new(State::default()),
}
}
}
impl ObjectImpl for AnalyticsSplitter {
fn constructed(&self) {
self.parent_constructed();
self.obj().add_pad(&self.sinkpad).unwrap();
}
}
impl GstObjectImpl for AnalyticsSplitter {}
impl ElementImpl for AnalyticsSplitter {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: LazyLock<gst::subclass::ElementMetadata> = LazyLock::new(|| {
gst::subclass::ElementMetadata::new(
"Analytics batch splitter element",
"Demuxer/Analytics",
"Analytics batch splitter element",
"Sebastian Dröge <sebastian@centricular.com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: LazyLock<Vec<gst::PadTemplate>> = LazyLock::new(|| {
let sink_pad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&gst::Caps::builder("multistream/x-analytics-batch")
.features([gst_analytics::CAPS_FEATURE_META_ANALYTICS_BATCH_META])
.build(),
)
.unwrap();
let src_pad_template = gst::PadTemplate::new(
"src_%u_%u",
gst::PadDirection::Src,
gst::PadPresence::Sometimes,
&gst::Caps::new_any(),
)
.unwrap();
vec![sink_pad_template, src_pad_template]
});
PAD_TEMPLATES.as_ref()
}
#[allow(clippy::single_match)]
fn change_state(
&self,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
let res = self.parent_change_state(transition)?;
match transition {
gst::StateChange::PausedToReady => {
let mut state_guard = self.state.lock().unwrap();
let streams = mem::take(&mut state_guard.streams);
*state_guard = State::default();
drop(state_guard);
for stream in streams {
let _ = self.obj().remove_pad(&stream.pad);
}
}
_ => (),
}
Ok(res)
}
}
impl AnalyticsSplitter {
fn sink_chain(
&self,
_pad: &gst::Pad,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::log!(CAT, imp = self, "Handling buffer {buffer:?}");
let Some(meta) = buffer.meta::<gst_analytics::AnalyticsBatchMeta>() else {
gst::error!(CAT, imp = self, "No batch meta");
gst::element_imp_error!(self, gst::StreamError::Demux, ["No batch meta"]);
return Err(gst::FlowError::Error);
};
let mut state_guard = self.state.lock().unwrap();
if meta.streams().len() != state_guard.streams.len() {
gst::error!(CAT, imp = self, "Wrong number of streams");
gst::element_imp_error!(self, gst::StreamError::Demux, ["Wrong number of streams"]);
return Err(gst::FlowError::NotNegotiated);
}
let pads = state_guard
.streams
.iter()
.map(|s| &s.pad)
.cloned()
.collect::<Vec<_>>();
// Temporarily take combiner out so we can release the lock, and later
// store it again in the state.
let mut combiner = state_guard.combiner.take().unwrap();
drop(state_guard);
let mut res = Ok(gst::FlowSuccess::Ok);
'next_stream: for (pad, stream) in Iterator::zip(pads.into_iter(), meta.streams().iter()) {
for buffer in stream.buffers() {
for event in buffer.sticky_events() {
gst::trace!(CAT, obj = pad, "Storing sticky event {event:?}");
let _ = pad.store_sticky_event(event);
}
for event in buffer.serialized_events() {
gst::trace!(CAT, obj = pad, "Pushing serialized event {event:?}");
let _ = pad.push_event(event.clone());
}
if let Some(buffer) = buffer.buffer_owned() {
gst::trace!(CAT, obj = pad, "Pushing buffer {buffer:?}");
let pad_res = pad.push(buffer);
res = combiner.update_pad_flow(&pad, pad_res);
}
if let Some(buffer_list) = buffer.buffer_list_owned() {
gst::trace!(CAT, obj = pad, "Pushing buffer list {buffer_list:?}");
let pad_res = pad.push_list(buffer_list);
res = combiner.update_pad_flow(&pad, pad_res);
}
if let Err(err) = res {
gst::debug!(CAT, obj = pad, "Got flow error {err:?}");
break 'next_stream;
}
}
}
let mut state_guard = self.state.lock().unwrap();
state_guard.combiner = Some(combiner);
gst::trace!(CAT, imp = self, "Returning {res:?}");
res
}
fn sink_event(&self, pad: &gst::Pad, event: gst::Event) -> bool {
use gst::EventView;
gst::log!(CAT, obj = pad, "Handling event {event:?}");
match event.view() {
EventView::Caps(ev) => {
let caps = ev.caps();
let s = caps.structure(0).unwrap();
let Some(streams) = s.get::<gst::ArrayRef>("streams").ok() else {
return false;
};
let streams = streams
.iter()
.map(|v| v.get::<gst::Caps>().unwrap())
.collect::<Vec<_>>();
let mut state_guard = self.state.lock().unwrap();
let mut to_remove = vec![];
let mut to_add = vec![];
if streams.len() != state_guard.streams.len()
|| !Iterator::zip(streams.iter(), state_guard.streams.iter().map(|s| &s.caps))
.all(|(a, b)| a == b)
{
let templ = self.obj().class().pad_template("src_%u_%u").unwrap();
to_remove = state_guard.streams.drain(..).map(|s| s.pad).collect();
for pad in &to_remove {
state_guard.combiner.as_mut().unwrap().remove_pad(pad);
}
state_guard.combiner.as_mut().unwrap().reset();
for (idx, stream) in streams.iter().enumerate() {
let pad = gst::Pad::builder_from_template(&templ)
.name(format!("src_{}_{idx}", state_guard.generation))
.event_function(|pad, parent, event| {
AnalyticsSplitter::catch_panic_pad_function(
parent,
|| false,
|self_| self_.src_event(pad, event),
)
})
.query_function(|pad, parent, query| {
AnalyticsSplitter::catch_panic_pad_function(
parent,
|| false,
|self_| self_.src_query(pad, query),
)
})
.build();
gst::debug!(
CAT,
imp = self,
"Creating pad {} with caps {stream:?}",
pad.name()
);
to_add.push(pad.clone());
state_guard.combiner.as_mut().unwrap().add_pad(&pad);
state_guard.streams.push(Stream {
pad,
caps: stream.clone(),
});
}
state_guard.generation += 1;
}
drop(state_guard);
for pad in to_add {
let _ = pad.set_active(true);
let _ = self.obj().add_pad(&pad);
}
for pad in to_remove {
let _ = pad.set_active(false);
let _ = self.obj().remove_pad(&pad);
}
self.obj().no_more_pads();
}
// Pass through
EventView::Eos(_) | EventView::FlushStart(_) | EventView::FlushStop(_) => (),
// Drop all other events, we take them from the individual streams
_ => return true,
}
gst::Pad::event_default(pad, Some(&*self.obj()), event)
}
#[allow(clippy::single_match)]
fn sink_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool {
use gst::QueryViewMut;
gst::log!(CAT, obj = pad, "Handling query {query:?}");
match query.view_mut() {
QueryViewMut::Caps(q) => {
let state_guard = self.state.lock().unwrap();
let sinkpads = state_guard
.streams
.iter()
.map(|s| s.pad.clone())
.collect::<Vec<_>>();
drop(state_guard);
let streams = if sinkpads.is_empty() {
None
} else {
Some(
sinkpads
.iter()
.map(|pad| pad.peer_query_caps(None))
.map(|caps| caps.to_send_value())
.collect::<gst::Array>(),
)
};
let res = gst::Caps::builder("multistream/x-analytics-batch")
.features([gst_analytics::CAPS_FEATURE_META_ANALYTICS_BATCH_META])
.field_if_some("streams", streams)
.build();
let filter = q.filter();
let res = &filter
.map(|filter| filter.intersect_with_mode(&res, gst::CapsIntersectMode::First))
.unwrap_or(res);
q.set_result(res);
gst::log!(CAT, imp = self, "Returning caps {res:?}");
return true;
}
_ => (),
}
gst::Pad::query_default(pad, Some(&*self.obj()), query)
}
fn src_event(&self, pad: &gst::Pad, event: gst::Event) -> bool {
gst::log!(CAT, obj = pad, "Handling event {event:?}");
gst::Pad::event_default(pad, Some(&*self.obj()), event)
}
#[allow(clippy::single_match)]
fn src_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool {
use gst::QueryViewMut;
gst::log!(CAT, obj = pad, "Handling query {query:?}");
match query.view_mut() {
QueryViewMut::Caps(q) => {
let filter = q.filter_owned();
let peer_caps = self.sinkpad.peer_query_caps(None);
if peer_caps.is_any() {
let res = filter.unwrap_or(peer_caps);
gst::log!(CAT, obj = pad, "Returning caps {res:?}");
q.set_result(&res);
} else if peer_caps.is_empty() {
gst::log!(CAT, obj = pad, "Returning caps {peer_caps:?}");
q.set_result(&peer_caps);
} else {
let state_guard = self.state.lock().unwrap();
let pad_index = state_guard
.streams
.iter()
.enumerate()
.find_map(|(idx, p)| (pad == &p.pad).then_some(idx));
drop(state_guard);
// Shouldn't happen, pad not found!
let Some(pad_index) = pad_index else {
gst::warning!(CAT, obj = pad, "Unknown pad");
return false;
};
// return expected caps for this pad from upstream, if any
let mut res = gst::Caps::new_empty();
for s in peer_caps.iter() {
let Some(streams) = s.get::<gst::ArrayRef>("streams").ok() else {
continue;
};
let Some(stream_caps) = streams
.get(pad_index)
.map(|v| v.get::<gst::Caps>().unwrap())
else {
continue;
};
res.merge(stream_caps);
}
// No stream specific caps found, return ANY
let res = if res.is_empty() {
filter.unwrap_or(gst::Caps::new_any())
} else {
filter
.as_ref()
.map(|filter| {
filter.intersect_with_mode(&res, gst::CapsIntersectMode::First)
})
.unwrap_or(res)
};
gst::log!(CAT, obj = pad, "Returning caps {res:?}");
q.set_result(&res);
}
return true;
}
_ => (),
}
gst::Pad::query_default(pad, Some(&*self.obj()), query)
}
}

View file

@ -0,0 +1,28 @@
// SPDX-License-Identifier: MPL-2.0
/**
* SECTION:element-analyticssplitter
* @see_also: analyticscombiner
*
* `analyticssplitter` is a generic, media-agnostic stream splitter for streams generated by
* `analyticscombiner` and other streams that make use of `GstAnalyticsBatchMeta`.
*
* See the `analyticscombiner` and `GstAnalyticsBatchMeta` documentation for more details.
*
* Since: plugins-rs-0.14.0
*/
use glib::prelude::*;
mod imp;
glib::wrapper! {
pub struct AnalyticsSplitter(ObjectSubclass<imp::AnalyticsSplitter>) @extends gst::Element, gst::Object;
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"analyticssplitter",
gst::Rank::PRIMARY,
AnalyticsSplitter::static_type(),
)
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,376 @@
// Copyright (C) 2025 Sebastian Dröge <sebastian@centricular.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
//
use gst::prelude::*;
fn init() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
gst::init().unwrap();
gstrsanalytics::plugin_register_static().unwrap();
});
}
#[test]
fn test_combine_split_single() {
init();
let bin = gst::parse::bin_from_description("videotestsrc name=src num-buffers=10 ! video/x-raw,framerate=25/1 ! analyticscombiner batch-duration=100000000 ! analyticssplitter ! identity name=identity", false).unwrap();
let identity = bin.by_name("identity").unwrap();
bin.add_pad(&gst::GhostPad::with_target(&identity.static_pad("src").unwrap()).unwrap())
.unwrap();
let mut h = gst_check::Harness::with_element(&bin, None, Some("src"));
h.play();
let ptss = std::array::from_fn::<_, 10, _>(|idx| idx as u64 * 40.mseconds());
for pts in ptss {
let buffer = h.pull().unwrap();
assert_eq!(buffer.pts(), Some(pts));
assert_eq!(buffer.duration(), Some(40.mseconds()));
}
// Now finally check all the events
let element = h.element().unwrap().downcast::<gst::Bin>().unwrap();
let src = element.by_name("src").unwrap();
let srcpad = src.static_pad("src").unwrap();
let stream_id = srcpad.stream_id().unwrap();
let caps = srcpad.current_caps().unwrap();
let ev = h.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::StreamStart);
let gst::EventView::StreamStart(ev) = ev.view() else {
assert_eq!(ev.type_(), gst::EventType::StreamStart);
unreachable!();
};
assert_eq!(ev.stream_id(), &stream_id);
let ev = h.pull_event().unwrap();
let gst::EventView::Caps(ev) = ev.view() else {
assert_eq!(ev.type_(), gst::EventType::Caps);
unreachable!();
};
assert_eq!(ev.caps(), &caps);
let ev = h.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::Segment);
let ev = h.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::Eos);
}
#[test]
fn test_combine_split_multi() {
init();
let bin = gst::parse::bin_from_description(
"videotestsrc name=src_0 num-buffers=10 ! video/x-raw,framerate=25/1 ! combiner.sink_0 \
videotestsrc name=src_1 num-buffers=20 ! video/x-raw,framerate=50/1 ! combiner.sink_1 \
analyticscombiner name=combiner batch-duration=100000000 ! analyticssplitter name=splitter \
splitter.src_0_0 ! queue name=queue_0 \
splitter.src_0_1 ! queue name=queue_1",
false,
)
.unwrap();
let queue = bin.by_name("queue_0").unwrap();
bin.add_pad(
&gst::GhostPad::builder_with_target(&queue.static_pad("src").unwrap())
.unwrap()
.name("src_0")
.build(),
)
.unwrap();
let queue = bin.by_name("queue_1").unwrap();
bin.add_pad(
&gst::GhostPad::builder_with_target(&queue.static_pad("src").unwrap())
.unwrap()
.name("src_1")
.build(),
)
.unwrap();
let mut h0 = gst_check::Harness::with_element(&bin, None, Some("src_0"));
let mut h1 = gst_check::Harness::with_element(&bin, None, Some("src_1"));
h0.play();
h1.play();
let ptss = std::array::from_fn::<_, 20, _>(|idx| idx as u64 * 20.mseconds());
for (idx, pts) in ptss.into_iter().enumerate() {
if idx % 2 == 0 {
let buffer = h0.pull().unwrap();
assert_eq!(buffer.pts(), Some(pts));
assert_eq!(buffer.duration(), Some(40.mseconds()));
}
let buffer = h1.pull().unwrap();
assert_eq!(buffer.pts(), Some(pts));
assert_eq!(buffer.duration(), Some(20.mseconds()));
}
// Now finally check all the events
let src = bin.by_name("src_0").unwrap();
let srcpad = src.static_pad("src").unwrap();
let stream_id_0 = srcpad.stream_id().unwrap();
let caps_0 = srcpad.current_caps().unwrap();
let src = bin.by_name("src_1").unwrap();
let srcpad = src.static_pad("src").unwrap();
let stream_id_1 = srcpad.stream_id().unwrap();
let caps_1 = srcpad.current_caps().unwrap();
let ev = h0.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::StreamStart);
let gst::EventView::StreamStart(ev) = ev.view() else {
assert_eq!(ev.type_(), gst::EventType::StreamStart);
unreachable!();
};
assert_eq!(ev.stream_id(), &stream_id_0);
let ev = h0.pull_event().unwrap();
let gst::EventView::Caps(ev) = ev.view() else {
assert_eq!(ev.type_(), gst::EventType::Caps);
unreachable!();
};
assert_eq!(ev.caps(), &caps_0);
let ev = h0.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::Segment);
let ev = h0.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::Eos);
let ev = h1.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::StreamStart);
let gst::EventView::StreamStart(ev) = ev.view() else {
assert_eq!(ev.type_(), gst::EventType::StreamStart);
unreachable!();
};
assert_eq!(ev.stream_id(), &stream_id_1);
let ev = h1.pull_event().unwrap();
let gst::EventView::Caps(ev) = ev.view() else {
assert_eq!(ev.type_(), gst::EventType::Caps);
unreachable!();
};
assert_eq!(ev.caps(), &caps_1);
let ev = h1.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::Segment);
let ev = h1.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::Eos);
}
#[test]
fn test_combine_split_multi_with_initial_gap() {
init();
let h0_caps = gst_video::VideoInfo::builder(gst_video::VideoFormat::Rgb, 320, 240)
.fps(gst::Fraction::new(25, 1))
.build()
.unwrap()
.to_caps()
.unwrap();
let h1_caps = gst_video::VideoInfo::builder(gst_video::VideoFormat::Gray8, 320, 240)
.fps(gst::Fraction::new(50, 1))
.build()
.unwrap()
.to_caps()
.unwrap();
let bin = gst::Bin::new();
let combiner = gst::ElementFactory::make("analyticscombiner")
.property("batch-duration", 200.mseconds())
.build()
.unwrap();
let splitter = gst::ElementFactory::make("analyticssplitter")
.build()
.unwrap();
bin.add_many([&combiner, &splitter]).unwrap();
combiner.link(&splitter).unwrap();
let sink_0 = combiner.request_pad_simple("sink_0").unwrap();
bin.add_pad(
&gst::GhostPad::builder_with_target(&sink_0)
.unwrap()
.name("sink_0")
.build(),
)
.unwrap();
let sink_1 = combiner.request_pad_simple("sink_1").unwrap();
bin.add_pad(
&gst::GhostPad::builder_with_target(&sink_1)
.unwrap()
.name("sink_1")
.build(),
)
.unwrap();
let queue = gst::ElementFactory::make("queue")
.name("queue_0")
.build()
.unwrap();
bin.add(&queue).unwrap();
bin.add_pad(
&gst::GhostPad::builder_with_target(&queue.static_pad("src").unwrap())
.unwrap()
.name("src_0")
.build(),
)
.unwrap();
let queue = gst::ElementFactory::make("queue")
.name("queue_1")
.build()
.unwrap();
bin.add(&queue).unwrap();
bin.add_pad(
&gst::GhostPad::builder_with_target(&queue.static_pad("src").unwrap())
.unwrap()
.name("src_1")
.build(),
)
.unwrap();
splitter.connect_closure(
"pad-added",
false,
glib::closure!(
#[weak]
bin,
move |_splitter: &gst::Element, pad: &gst::Pad| {
let pad_name = pad.name();
let [_, _count, stream_id] =
pad_name.split("_").collect::<Vec<_>>().try_into().unwrap();
let Some(queue) = bin.by_name(&format!("queue_{stream_id}")) else {
return;
};
let sinkpad = queue.static_pad("sink").unwrap();
if let Some(peer) = sinkpad.peer() {
peer.unlink(&sinkpad).unwrap();
}
pad.link(&sinkpad).unwrap();
}
),
);
let mut h0 = gst_check::Harness::with_element(&bin, Some("sink_0"), Some("src_0"));
h0.set_src_caps(h0_caps.clone());
h0.play();
// Push first 6 buffers on the first stream only
for i in 0..6 {
let mut buffer = gst::Buffer::with_size(1).unwrap();
{
let buffer = buffer.get_mut().unwrap();
buffer.set_pts(i * 40.mseconds());
buffer.set_duration(40.mseconds());
}
assert_eq!(h0.push(buffer), Ok(gst::FlowSuccess::Ok));
}
// Time out so the first batch is output
h0.crank_single_clock_wait().unwrap();
// Now consume buffers for the first stream for the first batch
let ptss = std::array::from_fn::<_, 5, _>(|idx| idx as u64 * 40.mseconds());
for pts in ptss {
let buffer = h0.pull().unwrap();
assert_eq!(buffer.pts(), Some(pts));
assert_eq!(buffer.duration(), Some(40.mseconds()));
}
// Start the second stream
let mut h1 = gst_check::Harness::with_element(&bin, Some("sink_1"), Some("src_1"));
h1.set_src_caps(h1_caps.clone());
h1.play();
// Push another batch of buffers for both streams this time.
// Skip the first buffer for the first stream as it was pushed above already.
for i in 0..10 {
if i > 0 && i % 2 == 0 {
let mut buffer = gst::Buffer::with_size(1).unwrap();
{
let buffer = buffer.get_mut().unwrap();
buffer.set_pts(200.mseconds() + (i / 2) * 40.mseconds());
buffer.set_duration(40.mseconds());
}
assert_eq!(h0.push(buffer), Ok(gst::FlowSuccess::Ok));
}
let mut buffer = gst::Buffer::with_size(1).unwrap();
{
let buffer = buffer.get_mut().unwrap();
buffer.set_pts(200.mseconds() + i * 20.mseconds());
buffer.set_duration(20.mseconds());
}
assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok));
}
// Now EOS so that the second batch is output
h0.push_event(gst::event::Eos::new());
h1.push_event(gst::event::Eos::new());
let ptss = std::array::from_fn::<_, 10, _>(|idx| 200.mseconds() + idx as u64 * 20.mseconds());
for (idx, pts) in ptss.into_iter().enumerate() {
if idx % 2 == 0 {
let buffer = h0.pull().unwrap();
assert_eq!(buffer.pts(), Some(pts));
assert_eq!(buffer.duration(), Some(40.mseconds()));
}
let buffer = h1.pull().unwrap();
assert_eq!(buffer.pts(), Some(pts));
assert_eq!(buffer.duration(), Some(20.mseconds()));
}
// Now finally check all the events
let stream_id_0 = sink_0.stream_id().unwrap();
let stream_id_1 = sink_1.stream_id().unwrap();
let ev = h0.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::StreamStart);
let gst::EventView::StreamStart(ev) = ev.view() else {
assert_eq!(ev.type_(), gst::EventType::StreamStart);
unreachable!();
};
assert_eq!(ev.stream_id(), &stream_id_0);
let ev = h0.pull_event().unwrap();
let gst::EventView::Caps(ev) = ev.view() else {
assert_eq!(ev.type_(), gst::EventType::Caps);
unreachable!();
};
assert_eq!(ev.caps(), &h0_caps);
let ev = h0.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::Segment);
let ev = h0.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::Eos);
let ev = h1.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::StreamStart);
let gst::EventView::StreamStart(ev) = ev.view() else {
assert_eq!(ev.type_(), gst::EventType::StreamStart);
unreachable!();
};
assert_eq!(ev.stream_id(), &stream_id_1);
let ev = h1.pull_event().unwrap();
let gst::EventView::Caps(ev) = ev.view() else {
assert_eq!(ev.type_(), gst::EventType::Caps);
unreachable!();
};
assert_eq!(ev.caps(), &h1_caps);
let ev = h1.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::Segment);
let ev = h1.pull_event().unwrap();
assert_eq!(ev.type_(), gst::EventType::Eos);
}

View file

@ -8074,6 +8074,90 @@
"rsanalytics": {
"description": "GStreamer Rust Analytics Plugin",
"elements": {
"analyticscombiner": {
"author": "Sebastian Dröge <sebastian@centricular.com>",
"description": "Analytics combiner / batcher element",
"hierarchy": [
"GstAnalyticsCombiner",
"GstAggregator",
"GstElement",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"interfaces": [
"GstChildProxy"
],
"klass": "Combiner/Analytics",
"pad-templates": {
"sink_%%u": {
"caps": "ANY",
"direction": "sink",
"presence": "request",
"type": "GstAnalyticsCombinerSinkPad"
},
"src": {
"caps": "multistream/x-analytics-batch(meta:GstAnalyticsBatchMeta):\n",
"direction": "src",
"presence": "always",
"type": "GstAggregatorPad"
}
},
"properties": {
"batch-duration": {
"blurb": "Batch Duration",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "100000000",
"max": "18446744073709551615",
"min": "0",
"mutable": "ready",
"readable": true,
"type": "guint64",
"writable": true
},
"force-live": {
"blurb": "Always operate in live mode and aggregate on timeout regardless of whether any live sources are linked upstream",
"conditionally-available": false,
"construct": false,
"construct-only": true,
"controllable": false,
"default": "false",
"mutable": "null",
"readable": true,
"type": "gboolean",
"writable": true
}
},
"rank": "none"
},
"analyticssplitter": {
"author": "Sebastian Dröge <sebastian@centricular.com>",
"description": "Analytics batch splitter element",
"hierarchy": [
"GstAnalyticsSplitter",
"GstElement",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"klass": "Demuxer/Analytics",
"pad-templates": {
"sink": {
"caps": "multistream/x-analytics-batch(meta:GstAnalyticsBatchMeta):\n",
"direction": "sink",
"presence": "always"
},
"src_%%u_%%u": {
"caps": "ANY",
"direction": "src",
"presence": "sometimes"
}
},
"rank": "primary"
},
"onvifmeta2relationmeta": {
"author": "Benjamin Gaignard <benjamin.gaignard@collabora.com>",
"description": "Convert ONVIF metadata to relation metadata",
@ -8142,6 +8226,70 @@
"filename": "gstrsanalytics",
"license": "MPL-2.0",
"other-types": {
"GstAnalyticsCombinerBatchStrategy": {
"kind": "enum",
"values": [
{
"desc": "All: Include all buffers that start inside the batch.",
"name": "all",
"value": "0"
},
{
"desc": "FirstInBatch: Include only the first buffer that starts inside the batch.",
"name": "first-in-batch",
"value": "1"
},
{
"desc": "FirstInBatchWithOverlap: Include only the first buffer that starts inside the batch unless there was a previously unused buffer at most half a batch duration earlier. If no buffer is available, allow taking a buffer up to half a batch duration later. The buffer closest to the batch start is included.",
"name": "first-in-batch-with-overlap",
"value": "2"
},
{
"desc": "LastInBatch: Include only the last buffer that starts inside the batch.",
"name": "last-in-batch",
"value": "3"
}
]
},
"GstAnalyticsCombinerSinkPad": {
"hierarchy": [
"GstAnalyticsCombinerSinkPad",
"GstAggregatorPad",
"GstPad",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"kind": "object",
"properties": {
"batch-strategy": {
"blurb": "Batching strategy to use for this stream",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "all (0)",
"mutable": "ready",
"readable": true,
"type": "GstAnalyticsCombinerBatchStrategy",
"writable": true
},
"index": {
"blurb": "Index, must be consecutive and starting at 0 and is fixed up otherwise",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "0",
"max": "-1",
"min": "0",
"mutable": "ready",
"readable": true,
"type": "guint",
"writable": true
}
}
},
"GstRsOnvifNtpTimeSource": {
"kind": "enum",
"values": [

View file

@ -112,7 +112,10 @@ endforeach
# kept in the same order as the `members` list in Cargo.toml
plugins = {
'analytics': {'library': 'libgstrsanalytics'},
'analytics': {
'library': 'libgstrsanalytics',
'features': ['gst-rtp/v1_24', 'gst-base/v1_24', 'gst-video/v1_24'],
},
'audiofx': {
'library': 'libgstrsaudiofx',
'examples': ['hrtfrender'],
@ -227,9 +230,23 @@ plugins = {
'vvdec': {
'library': 'libgstvvdec',
'extra-deps': {'libvvdec': ['>= 3.0']}
}
},
}
# The splitter/combiner requires 1.28+
if get_option('analytics').allowed()
gst_analytics_dep = deps_cache['gstreamer-analytics-1.0']
if gst_analytics_dep.found() and gst_analytics_dep.version().version_compare('>= 1.28')
plugins_analytics = plugins['analytics']
plugins_analytics += {
'features': plugins_analytics['features'] + ['v1_28'],
}
plugins += {
'analytics': plugins_analytics
}
endif
endif
# Won't build on platforms where it bundles the sources because of:
# https://github.com/qnighy/libwebp-sys2-rs/issues/12
# the fix is: