From 67175f70d903f0a37bfbebb34d92bea3ca5e73e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Thu, 19 Jun 2025 15:34:36 +0300 Subject: [PATCH] analytics: Add new analyticscombiner / analyticssplitter elements These batch buffers from one or more streams into a single stream via the GstAnalyticsBatchMeta and allow splitting that single stream into the individual ones again later. Part-of: --- Cargo.lock | 1 + analytics/analytics/Cargo.toml | 24 +- analytics/analytics/src/combiner/imp.rs | 1123 ++++++++++++++++ analytics/analytics/src/combiner/mod.rs | 97 ++ analytics/analytics/src/lib.rs | 12 + analytics/analytics/src/splitter/imp.rs | 444 ++++++ analytics/analytics/src/splitter/mod.rs | 28 + .../analytics/tests/analyticscombiner.rs | 1185 +++++++++++++++++ .../analytics/tests/analyticssplitter.rs | 376 ++++++ docs/plugins/gst_plugins_cache.json | 148 ++ meson.build | 21 +- 11 files changed, 3451 insertions(+), 8 deletions(-) create mode 100644 analytics/analytics/src/combiner/imp.rs create mode 100644 analytics/analytics/src/combiner/mod.rs create mode 100644 analytics/analytics/src/splitter/imp.rs create mode 100644 analytics/analytics/src/splitter/mod.rs create mode 100644 analytics/analytics/tests/analyticscombiner.rs create mode 100644 analytics/analytics/tests/analyticssplitter.rs diff --git a/Cargo.lock b/Cargo.lock index 057e82ce8..1fe0c2c34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2678,6 +2678,7 @@ dependencies = [ "gstreamer", "gstreamer-analytics", "gstreamer-base", + "gstreamer-check", "gstreamer-rtp", "gstreamer-video", "xmltree", diff --git a/analytics/analytics/Cargo.toml b/analytics/analytics/Cargo.toml index 3ae028d46..7ff4b6357 100644 --- a/analytics/analytics/Cargo.toml +++ b/analytics/analytics/Cargo.toml @@ -1,13 +1,18 @@ [package] name = "gst-plugin-analytics" version.workspace = true -authors = ["Benjamin Gaignard "] +authors = ["Benjamin Gaignard ", "Sebastian Dröge "] repository.workspace = true license = "MPL-2.0" description = "GStreamer Rust Analytics Plugin" edition.workspace = true rust-version.workspace = true +[lib] +name = "gstrsanalytics" +crate-type = ["cdylib", "rlib"] +path = "src/lib.rs" + [dependencies] gst = { workspace = true, features = ["v1_24"] } gst-rtp = { workspace = true, features = ["v1_24"] } @@ -18,10 +23,8 @@ chrono = { version = "0.4.31", default-features = false } xmltree = "0.11" glib = { workspace = true, features = ["v2_62"] } -[lib] -name = "gstrsanalytics" -crate-type = ["cdylib", "rlib"] -path = "src/lib.rs" +[dev-dependencies] +gst-check.workspace = true [build-dependencies] gst-plugin-version-helper.workspace = true @@ -29,7 +32,16 @@ gst-plugin-version-helper.workspace = true [features] static = [] capi = [] -doc = ["gst/v1_18"] +doc = [] +v1_28 = ["gst-analytics/v1_28"] + +[[test]] +name = "analyticscombiner" +required-features = ["v1_28"] + +[[test]] +name = "analyticssplitter" +required-features = ["v1_28"] [package.metadata.capi] min_version = "0.9.21" diff --git a/analytics/analytics/src/combiner/imp.rs b/analytics/analytics/src/combiner/imp.rs new file mode 100644 index 000000000..38ab6bf24 --- /dev/null +++ b/analytics/analytics/src/combiner/imp.rs @@ -0,0 +1,1123 @@ +// SPDX-License-Identifier: MPL-2.0 + +use glib::{prelude::*, subclass::prelude::*}; +use gst::{prelude::*, subclass::prelude::*}; +use gst_base::{prelude::*, subclass::prelude::*}; + +use std::{ + cmp, + collections::{BTreeMap, VecDeque}, + mem, ptr, + sync::{LazyLock, Mutex}, +}; + +static CAT: LazyLock = LazyLock::new(|| { + gst::DebugCategory::new( + "analyticscombiner", + gst::DebugColorFlags::empty(), + Some("Analytics combiner / batcher element"), + ) +}); + +#[derive(Default)] +struct State { + // Sorted by index + sinkpads: Vec, +} + +#[derive(Clone)] +struct Settings { + batch_duration: gst::ClockTime, +} + +impl Default for Settings { + fn default() -> Self { + Settings { + batch_duration: gst::ClockTime::from_mseconds(100), + } + } +} + +#[derive(Default)] +pub struct AnalyticsCombiner { + state: Mutex, + settings: Mutex, +} + +#[glib::object_subclass] +impl ObjectSubclass for AnalyticsCombiner { + const NAME: &'static str = "GstAnalyticsCombiner"; + type Type = super::AnalyticsCombiner; + type ParentType = gst_base::Aggregator; + type Interfaces = (gst::ChildProxy,); +} + +impl ObjectImpl for AnalyticsCombiner { + fn constructed(&self) { + self.parent_constructed(); + + let settings = self.settings.lock().unwrap().clone(); + self.obj().set_latency(settings.batch_duration, None); + } + + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: LazyLock> = LazyLock::new(|| { + vec![ + glib::ParamSpecUInt64::builder("batch-duration") + .nick("Batch Duration") + .blurb("Batch Duration") + .mutable_ready() + .build(), + glib::ParamSpecBoolean::builder("force-live") + .nick("Force Live") + .blurb( + "Always operate in live mode and aggregate on timeout regardless of \ + whether any live sources are linked upstream", + ) + .construct_only() + .build(), + ] + }); + + &PROPERTIES + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "batch-duration" => { + let mut settings = self.settings.lock().unwrap(); + let batch_duration = value.get().unwrap(); + + if batch_duration != settings.batch_duration { + settings.batch_duration = batch_duration; + drop(settings); + self.update_latency(); + } + } + "force-live" => { + self.obj().set_force_live(value.get().unwrap()); + } + _ => unimplemented!(), + }; + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "batch-duration" => self.settings.lock().unwrap().batch_duration.to_value(), + "force-live" => self.obj().is_force_live().to_value(), + _ => unimplemented!(), + } + } +} + +impl GstObjectImpl for AnalyticsCombiner {} + +impl ElementImpl for AnalyticsCombiner { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: LazyLock = LazyLock::new(|| { + gst::subclass::ElementMetadata::new( + "Analytics Combiner / Batcher", + "Combiner/Analytics", + "Analytics combiner / batcher element", + "Sebastian Dröge ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: LazyLock> = LazyLock::new(|| { + let src_pad_template = gst::PadTemplate::with_gtype( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &gst::Caps::builder("multistream/x-analytics-batch") + .features([gst_analytics::CAPS_FEATURE_META_ANALYTICS_BATCH_META]) + .build(), + gst_base::AggregatorPad::static_type(), + ) + .unwrap(); + + let sink_pad_template = gst::PadTemplate::with_gtype( + "sink_%u", + gst::PadDirection::Sink, + gst::PadPresence::Request, + &gst::Caps::new_any(), + super::AnalyticsCombinerSinkPad::static_type(), + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } + + fn request_new_pad( + &self, + templ: &gst::PadTemplate, + name: Option<&str>, + caps: Option<&gst::Caps>, + ) -> Option { + let pad = self.parent_request_new_pad(templ, name, caps)?; + self.obj().child_added(&pad, &pad.name()); + Some(pad) + } + + fn release_pad(&self, pad: &gst::Pad) { + self.obj().child_removed(pad, &pad.name()); + self.parent_release_pad(pad); + } +} + +impl AggregatorImpl for AnalyticsCombiner { + fn start(&self) -> Result<(), gst::ErrorMessage> { + let mut state_guard = self.state.lock().unwrap(); + *state_guard = State::default(); + + let mut sinkpads = self + .obj() + .sink_pads() + .into_iter() + .map(|pad| pad.downcast::().unwrap()) + .collect::>(); + sinkpads.sort_by(|a, b| { + let index_a = a.property::("index"); + let index_b = b.property::("index"); + index_a.cmp(&index_b).then_with(|| a.name().cmp(&b.name())) + }); + + if sinkpads.is_empty() { + gst::error!(CAT, imp = self, "Can't start without sink pads"); + return Err(gst::error_msg!( + gst::CoreError::StateChange, + ("Can't start without sink pads") + )); + } + + for (idx, pad) in sinkpads.iter_mut().enumerate() { + let index_pad = pad.property::("index"); + if idx as u32 != index_pad { + gst::warning!( + CAT, + obj = pad, + "Updating pad from index {index_pad} to {idx}" + ); + pad.set_property("index", idx as u32); + } else { + gst::debug!(CAT, obj = pad, "Using index {idx}"); + } + } + state_guard.sinkpads = sinkpads; + + gst::debug!(CAT, imp = self, "Started"); + + Ok(()) + } + + fn stop(&self) -> Result<(), gst::ErrorMessage> { + let mut state_guard = self.state.lock().unwrap(); + + for pad in &state_guard.sinkpads { + let pad_imp = pad.imp(); + let mut pad_state = pad_imp.state.lock().unwrap(); + pad_state.sticky_events.clear(); + pad_state.pending_serialized_events.clear(); + pad_state.pending_buffers.clear(); + pad_state.previous_buffer = None; + } + + *state_guard = State::default(); + drop(state_guard); + + gst::debug!(CAT, imp = self, "Stopped"); + + Ok(()) + } + + fn create_new_pad( + &self, + templ: &gst::PadTemplate, + req_name: Option<&str>, + caps: Option<&gst::Caps>, + ) -> Option { + let state_guard = self.state.lock().unwrap(); + if !state_guard.sinkpads.is_empty() { + gst::warning!(CAT, imp = self, "Can't add new pads after started"); + return None; + } + drop(state_guard); + + self.parent_create_new_pad(templ, req_name, caps) + } + + fn next_time(&self) -> Option { + self.obj().simple_get_next_time() + } + + fn aggregate(&self, timeout: bool) -> Result { + gst::trace!(CAT, imp = self, "Aggregate (timeout: {timeout})"); + + let settings = self.settings.lock().unwrap().clone(); + + let Some(segment) = self + .obj() + .src_pad() + .segment() + .downcast::() + .ok() + else { + gst::error!(CAT, imp = self, "Non-TIME segment"); + gst::element_imp_error!(self, gst::CoreError::Clock, ["Received non-time segment"]); + return Err(gst::FlowError::Error); + }; + + let start_position = segment.position().unwrap_or(segment.start().unwrap()); + let end_position = start_position + settings.batch_duration; + gst::trace!( + CAT, + imp = self, + "Collecting buffers for batch {start_position}-{end_position}" + ); + + let state_guard = self.state.lock().unwrap(); + self.fill_queues( + &state_guard, + &settings, + timeout, + start_position, + end_position, + )?; + let buffer = self.drain(&state_guard, &settings, start_position, end_position); + drop(state_guard); + + gst::trace!(CAT, imp = self, "Finishing buffer {buffer:?}",); + + let res = self.obj().finish_buffer(buffer)?; + + self.obj().set_position(end_position); + + Ok(res) + } + + fn sink_event(&self, pad: &gst_base::AggregatorPad, event: gst::Event) -> bool { + use gst::EventView; + + gst::log!(CAT, obj = pad, "Handling event {event:?}"); + + let pad = pad + .downcast_ref::() + .unwrap(); + let pad_imp = pad.imp(); + + match event.view() { + EventView::Caps(caps) => { + let caps = caps.caps_owned(); + + gst::debug!(CAT, obj = pad, "Received new caps {caps:?}"); + + let mut pad_state = pad_imp.state.lock().unwrap(); + pad_state.current_caps = Some(caps); + + // Renegotiate before the next aggregate call + self.obj().src_pad().mark_reconfigure(); + } + + EventView::StreamStart(ev) => { + let mut pad_state = pad_imp.state.lock().unwrap(); + + let stream_id_changed = pad_state + .sticky_events + .get(&(OrderedEventType(gst::EventType::StreamStart), None)) + .is_none_or(|old_ev| { + let new_stream_id = ev.stream_id(); + let old_stream_id = { + let gst::EventView::StreamStart(old_ev) = old_ev.view() else { + unreachable!(); + }; + old_ev.stream_id() + }; + + new_stream_id != old_stream_id + }); + + pad_state + .sticky_events + .remove(&(OrderedEventType(gst::EventType::Eos), None)); + pad_state + .sticky_events + .remove(&(OrderedEventType(gst::EventType::StreamGroupDone), None)); + + if stream_id_changed { + pad_state + .sticky_events + .remove(&(OrderedEventType(gst::EventType::Tag), None)); + } + } + EventView::FlushStop(_) => { + let mut pad_state = pad_imp.state.lock().unwrap(); + + pad_state + .sticky_events + .remove(&(OrderedEventType(gst::EventType::Eos), None)); + pad_state + .sticky_events + .remove(&(OrderedEventType(gst::EventType::StreamGroupDone), None)); + pad_state + .sticky_events + .remove(&(OrderedEventType(gst::EventType::Segment), None)); + + pad_state.pending_serialized_events.clear(); + pad_state.pending_buffers.clear(); + pad_state.previous_buffer = None; + } + _ => (), + } + + // Collect all events to be sent as part of the next buffer + if event.is_sticky() { + let mut pad_state = pad_imp.state.lock().unwrap(); + let key = if event.is_sticky_multi() { + ( + OrderedEventType(event.type_()), + event.structure().map(|s| s.name().to_string()), + ) + } else { + (OrderedEventType(event.type_()), None) + }; + pad_state.sticky_events.insert(key, event.clone()); + } else if event.is_serialized() { + let mut pad_state = pad_imp.state.lock().unwrap(); + pad_state.pending_serialized_events.push_back(event.clone()); + } + + self.parent_sink_event(pad.upcast_ref(), event) + } + + #[allow(clippy::single_match)] + fn sink_query(&self, pad: &gst_base::AggregatorPad, query: &mut gst::QueryRef) -> bool { + use gst::QueryViewMut; + + let pad = pad + .downcast_ref::() + .unwrap(); + + match query.view_mut() { + QueryViewMut::Caps(q) => { + let filter = q.filter_owned(); + let peer_caps = self.obj().src_pad().peer_query_caps(None); + + if peer_caps.is_any() { + let res = filter.unwrap_or(peer_caps); + gst::log!(CAT, obj = pad, "Returning caps {res:?}"); + q.set_result(&res); + } else if peer_caps.is_empty() { + gst::log!(CAT, obj = pad, "Returning caps {peer_caps:?}"); + + q.set_result(&peer_caps); + } else { + let state_guard = self.state.lock().unwrap(); + let pad_index = state_guard + .sinkpads + .iter() + .enumerate() + .find_map(|(idx, p)| (pad == p).then_some(idx)); + drop(state_guard); + + // Shouldn't happen, pad not found! + let Some(pad_index) = pad_index else { + gst::warning!(CAT, obj = pad, "Unknown pad"); + return false; + }; + + // return expected caps for this pad from downstream, if any + let mut res = gst::Caps::new_empty(); + for s in peer_caps.iter() { + let Some(streams) = s.get::("streams").ok() else { + continue; + }; + + let Some(stream_caps) = streams + .get(pad_index) + .map(|v| v.get::().unwrap()) + else { + continue; + }; + + res.merge(stream_caps); + } + + // No stream specific caps found, return ANY + let res = if res.is_empty() { + filter.unwrap_or(gst::Caps::new_any()) + } else { + filter + .as_ref() + .map(|filter| { + filter.intersect_with_mode(&res, gst::CapsIntersectMode::First) + }) + .unwrap_or(res) + }; + + gst::log!(CAT, obj = pad, "Returning caps {res:?}"); + + q.set_result(&res); + } + + return true; + } + _ => (), + } + + self.parent_sink_query(pad.upcast_ref(), query) + } + + #[allow(clippy::single_match)] + fn src_query(&self, query: &mut gst::QueryRef) -> bool { + use gst::QueryViewMut; + + match query.view_mut() { + QueryViewMut::Caps(q) => { + let state_guard = self.state.lock().unwrap(); + let sinkpads = state_guard.sinkpads.clone(); + drop(state_guard); + + let streams = if sinkpads.is_empty() { + None + } else { + Some( + sinkpads + .iter() + .map(|pad| pad.peer_query_caps(None)) + .map(|caps| caps.to_send_value()) + .collect::(), + ) + }; + + let res = gst::Caps::builder("multistream/x-analytics-batch") + .features([gst_analytics::CAPS_FEATURE_META_ANALYTICS_BATCH_META]) + .field_if_some("streams", streams) + .build(); + + let filter = q.filter(); + let res = &filter + .map(|filter| filter.intersect_with_mode(&res, gst::CapsIntersectMode::First)) + .unwrap_or(res); + q.set_result(res); + + gst::log!(CAT, imp = self, "Returning caps {res:?}"); + + return true; + } + _ => (), + } + + self.parent_src_query(query) + } + + fn negotiate(&self) -> bool { + let state_guard = self.state.lock().unwrap(); + + let mut streams = gst::Array::default(); + for pad in &state_guard.sinkpads { + let pad_imp = pad.imp(); + let pad_state = pad_imp.state.lock().unwrap(); + + let caps = pad_state + .sticky_events + .values() + .find_map(|ev| { + let gst::EventView::Caps(caps) = ev.view() else { + return None; + }; + let caps = caps.caps_owned(); + Some(caps) + }) + .unwrap_or_else(|| { + gst::warning!(CAT, obj = pad, "No caps for pad, using empty caps for now"); + gst::Caps::new_empty() + }); + + streams.append(caps); + } + + let caps = gst::Caps::builder("multistream/x-analytics-batch") + .features([gst_analytics::CAPS_FEATURE_META_ANALYTICS_BATCH_META]) + .field("streams", streams) + .build(); + + gst::debug!(CAT, imp = self, "Configuring caps {caps:?}"); + + drop(state_guard); + + self.obj().set_src_caps(&caps); + + true + } +} + +impl ChildProxyImpl for AnalyticsCombiner { + fn child_by_index(&self, index: u32) -> Option { + self.obj() + .sink_pads() + .get(index as usize) + .cloned() + .map(|pad| pad.upcast()) + } + + fn child_by_name(&self, name: &str) -> Option { + self.obj() + .sink_pads() + .into_iter() + .find(|pad| pad.name() == name) + .map(|pad| pad.upcast()) + } + + fn children_count(&self) -> u32 { + self.obj().num_sink_pads() as u32 + } +} + +impl AnalyticsCombiner { + fn update_latency(&self) { + let mut latency = self.settings.lock().unwrap().batch_duration; + let with_overlap = self.obj().sink_pads().into_iter().any(|pad| { + pad.property::("batch-strategy") + == super::BatchStrategy::FirstInBatchWithOverlap + }); + + if with_overlap { + latency += latency / 2; + } + + self.obj().set_latency(latency, None); + } + + fn fill_queues( + &self, + state: &State, + settings: &Settings, + timeout: bool, + start_position: gst::ClockTime, + end_position: gst::ClockTime, + ) -> Result<(), gst::FlowError> { + let mut all_eos = true; + let mut all_done = true; + + 'next_pad: for pad in &state.sinkpads { + let pad_imp = pad.imp(); + let mut pad_state = pad_imp.state.lock().unwrap(); + let pad_settings = pad_imp.settings.lock().unwrap().clone(); + + // Take any leftover previous buffer now if it's not too far in the past + if pad_settings.batch_strategy == super::BatchStrategy::FirstInBatchWithOverlap { + if let Some(buffer) = pad_state.previous_buffer.take() { + if buffer.running_time.is_none_or(|running_time| { + running_time >= start_position.saturating_sub(settings.batch_duration / 2) + }) { + gst::trace!( + CAT, + obj = pad, + "Taking previous buffer {:?} with running time {}", + buffer.buffer, + buffer.running_time.display(), + ); + pad_state.pending_buffers.push_back(buffer); + } else { + gst::trace!( + CAT, + obj = pad, + "Dropping previous buffer {:?} with running time {}", + buffer.buffer, + buffer.running_time.display(), + ); + } + } + } + + loop { + all_eos = all_eos && pad.is_eos() && pad_state.pending_buffers.is_empty(); + if pad.is_eos() { + // This pad is done for this batch, no need to update all_done + gst::trace!(CAT, obj = pad, "Pad is EOS"); + continue 'next_pad; + } + + let Some(buffer) = pad.peek_buffer() else { + // Not EOS and no buffer, so we don't know if this pad still has buffers + // for this batch + all_done = false; + gst::trace!(CAT, obj = pad, "Pad has no pending buffer"); + continue 'next_pad; + }; + + let Some(segment) = pad.segment().downcast::().ok() else { + gst::error!(CAT, obj = pad, "Non-TIME segment"); + gst::element_imp_error!( + self, + gst::CoreError::Clock, + ["Received non-time segment"] + ); + return Err(gst::FlowError::Error); + }; + + let Some(ts) = buffer.dts_or_pts() else { + // Buffers without PTS are immediately taken + gst::trace!( + CAT, + obj = pad, + "Taking buffer {buffer:?} without DTS or PTS" + ); + let buffer = BatchBuffer { + running_time: None, + sticky_events: pad_state.sticky_events.values().cloned().collect(), + serialized_events: pad_state.pending_serialized_events.drain(..).collect(), + buffer: Some(buffer), + }; + pad_state.pending_buffers.push_back(buffer); + let _ = pad.drop_buffer(); + // Check next buffer + continue; + }; + + let running_time = segment.to_running_time_full(ts).unwrap(); + if running_time < end_position { + // Buffer is for this batch + gst::trace!( + CAT, + obj = pad, + "Taking buffer {buffer:?} with running time {running_time}" + ); + } else if pad_state.pending_buffers.is_empty() + && pad_settings.batch_strategy != super::BatchStrategy::FirstInBatchWithOverlap + && running_time < end_position + settings.batch_duration / 2 + { + // Buffer is still for this batch because of the batch strategy + gst::trace!( + CAT, + obj = pad, + "Taking future buffer {buffer:?} with running time {running_time} because of batch strategy" + ); + } else { + // Buffer is for the next batch + gst::trace!( + CAT, + obj = pad, + "Keeping buffer {buffer:?} with running time {running_time} for next batch" + ); + continue 'next_pad; + } + + let buffer = BatchBuffer { + running_time: Some(running_time), + sticky_events: pad_state.sticky_events.values().cloned().collect(), + serialized_events: pad_state.pending_serialized_events.drain(..).collect(), + buffer: Some(buffer), + }; + pad_state.pending_buffers.push_back(buffer); + let _ = pad.drop_buffer(); + // Check next buffer + } + } + + if all_eos && !self.obj().is_force_live() { + gst::debug!(CAT, imp = self, "All pads EOS"); + return Err(gst::FlowError::Eos); + } + + if !all_done && !timeout { + gst::trace!(CAT, imp = self, "Waiting for more data"); + return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); + } + + Ok(()) + } + + fn drain( + &self, + state: &State, + settings: &Settings, + start_position: gst::ClockTime, + _end_position: gst::ClockTime, + ) -> gst::Buffer { + let mut buffer = gst::Buffer::new(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(start_position); + buffer.set_duration(settings.batch_duration); + + let mut meta = gst_analytics::AnalyticsBatchMeta::add(buffer); + + unsafe { + let meta = &mut *(meta.as_mut_ptr()); + + meta.streams = glib::ffi::g_malloc0_n( + state.sinkpads.len(), + mem::size_of::(), + ) + as *mut gst_analytics::ffi::GstAnalyticsBatchStream; + meta.n_streams = state.sinkpads.len(); + } + + for (idx, pad) in state.sinkpads.iter().enumerate() { + let pad_imp = pad.imp(); + let mut pad_state = pad_imp.state.lock().unwrap(); + let pad_settings = pad_imp.settings.lock().unwrap().clone(); + + // Post-filter the pending buffers based on the batch strategy + match pad_settings.batch_strategy { + crate::combiner::BatchStrategy::All => { + // Nothing to do here, take all buffers + } + crate::combiner::BatchStrategy::FirstInBatch => { + if pad_state.pending_buffers.len() > 1 { + // Drop all but the first buffer and store the serialized events + // for the next batch + let mut serialized_events = vec![]; + for buffer in pad_state.pending_buffers.drain(1..) { + gst::trace!( + CAT, obj = pad, + "Dropping buffer {:?} with running time {} because of batch strategy", + buffer.buffer, + buffer.running_time.display(), + ); + for event in buffer.serialized_events { + serialized_events.push(event); + } + } + + for event in serialized_events.into_iter().rev() { + pad_state.pending_serialized_events.push_front(event); + } + } + } + crate::combiner::BatchStrategy::LastInBatch => { + if pad_state.pending_buffers.len() > 1 { + // Drop all but the last buffer and store the serialized events + // for the first buffer + let mut selected_buffer = pad_state.pending_buffers.pop_back().unwrap(); + + let mut serialized_events = vec![]; + for buffer in pad_state.pending_buffers.drain(..) { + gst::trace!( + CAT, obj = pad, + "Dropping buffer {:?} with running time {} because of batch strategy", + buffer.buffer, + buffer.running_time.display(), + ); + for event in buffer.serialized_events { + serialized_events.push(event); + } + } + for event in selected_buffer.serialized_events.drain(..) { + serialized_events.push(event); + } + selected_buffer.serialized_events = serialized_events; + + // And store the selected buffer in the pending buffers for the meta + pad_state.pending_buffers.push_back(selected_buffer); + } + } + crate::combiner::BatchStrategy::FirstInBatchWithOverlap => { + if pad_state.pending_buffers.len() > 1 { + // Take the buffer closest to the batch start, and drop all others. + // Keep the last buffer around if there is any and store all serialized + // events of the dropped buffers. + let selected_buffer = { + let first_buffer = pad_state.pending_buffers.pop_front().unwrap(); + // If the first buffer has no running time, select it directly + if first_buffer.running_time.is_none() { + first_buffer + } else { + let second_buffer = pad_state.pending_buffers.front().unwrap(); + + if second_buffer.running_time.is_none() { + // If the second buffer has no running time, select the first + // buffer + first_buffer + } else { + // If both have a running time, select the one closest to the + // batch start + let first_buffer_distance = first_buffer + .running_time + .map(|running_time| { + (running_time - start_position).abs() + }) + .unwrap(); + let second_buffer_distance = second_buffer + .running_time + .map(|running_time| { + (running_time - start_position).abs() + }) + .unwrap(); + + if first_buffer_distance <= second_buffer_distance { + first_buffer + } else { + pad_state.pending_buffers.pop_front().unwrap() + } + } + } + }; + + // Keep the last buffer around, if any + let last_buffer = pad_state.pending_buffers.pop_back(); + + // Drain all others and keep serialized events + let mut serialized_events = vec![]; + for buffer in pad_state.pending_buffers.drain(..) { + gst::trace!( + CAT, obj = pad, + "Dropping buffer {:?} with running time {} because of batch strategy", + buffer.buffer, + buffer.running_time.display(), + ); + for event in buffer.serialized_events { + serialized_events.push(event); + } + } + + if let Some(mut last_buffer) = last_buffer { + gst::trace!( + CAT, obj = pad, + "Keeping last buffer {:?} with running time {} for next batch because of batch strategy", + last_buffer.buffer, + last_buffer.running_time.display(), + ); + for event in last_buffer.serialized_events.drain(..) { + serialized_events.push(event); + } + last_buffer.serialized_events = serialized_events; + pad_state.previous_buffer = Some(last_buffer); + } else { + pad_state.previous_buffer = None; + for event in serialized_events.into_iter().rev() { + pad_state.pending_serialized_events.push_front(event); + } + } + + // And store the selected buffer in the pending buffers for the meta + pad_state.pending_buffers.push_back(selected_buffer); + } + } + } + + // Include all pending events and all the serialized events if + // there is no buffer for this pad at this time. + if pad_state.pending_buffers.is_empty() { + let buffer = BatchBuffer { + running_time: None, + sticky_events: pad_state.sticky_events.values().cloned().collect(), + serialized_events: pad_state.pending_serialized_events.drain(..).collect(), + buffer: None, + }; + pad_state.pending_buffers.push_back(buffer); + } + + // And finally fill the meta + unsafe { + use glib::translate::IntoGlibPtr; + + let meta = &mut *(meta.as_mut_ptr()); + let stream = &mut *meta.streams.add(idx); + stream.index = idx as u32; + + stream.buffers = glib::ffi::g_malloc0_n( + pad_state.pending_buffers.len(), + mem::size_of::(), + ) + as *mut gst_analytics::ffi::GstAnalyticsBatchBuffer; + stream.n_buffers = pad_state.pending_buffers.len(); + + for (buffer_idx, mut buffer) in pad_state.pending_buffers.drain(..).enumerate() + { + let buffer_storage = &mut *stream.buffers.add(buffer_idx); + + // Replace GAP buffers with a GAP event + if let Some(ref b) = buffer.buffer { + if b.flags().contains(gst::BufferFlags::GAP) + && b.flags().contains(gst::BufferFlags::DROPPABLE) + && b.size() == 0 + { + let ev = gst::event::Gap::builder(b.pts().unwrap()) + .duration(b.duration()) + .build(); + buffer.buffer = None; + buffer.serialized_events.push(ev); + } + } + + buffer_storage.sticky_events = glib::ffi::g_malloc0_n( + buffer.sticky_events.len(), + mem::size_of::<*mut gst::ffi::GstEvent>(), + ) + as *mut *mut gst::ffi::GstEvent; + buffer_storage.n_sticky_events = buffer.sticky_events.len(); + for (event_idx, event) in buffer.sticky_events.into_iter().enumerate() { + *buffer_storage.sticky_events.add(event_idx) = event.into_glib_ptr(); + } + + if !buffer.serialized_events.is_empty() { + buffer_storage.serialized_events = glib::ffi::g_malloc0_n( + buffer.serialized_events.len(), + mem::size_of::<*mut gst::ffi::GstEvent>(), + ) + as *mut *mut gst::ffi::GstEvent; + buffer_storage.n_serialized_events = buffer.serialized_events.len(); + for (event_idx, event) in + buffer.serialized_events.into_iter().enumerate() + { + *buffer_storage.serialized_events.add(event_idx) = + event.into_glib_ptr(); + } + } else { + buffer_storage.serialized_events = ptr::null_mut(); + buffer_storage.n_serialized_events = 0; + } + + buffer_storage.buffer = buffer.buffer.into_glib_ptr(); + } + } + } + } + + buffer + } +} + +#[derive(Debug, Clone, Copy)] +struct OrderedEventType(gst::EventType); + +impl cmp::PartialEq for OrderedEventType { + fn eq(&self, other: &Self) -> bool { + self.partial_cmp(other) == Some(cmp::Ordering::Equal) + } +} + +impl cmp::Eq for OrderedEventType {} + +impl cmp::PartialOrd for OrderedEventType { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl cmp::Ord for OrderedEventType { + fn cmp(&self, other: &Self) -> cmp::Ordering { + self.0.partial_cmp(&other.0).unwrap_or_else(|| { + use glib::translate::IntoGlib; + + self.0.into_glib().cmp(&other.0.into_glib()) + }) + } +} + +struct BatchBuffer { + running_time: Option>, + sticky_events: Vec, + serialized_events: Vec, + buffer: Option, +} + +#[derive(Default)] +struct PadState { + pending_serialized_events: VecDeque, + sticky_events: BTreeMap<(OrderedEventType, Option), gst::Event>, + pending_buffers: VecDeque, + current_caps: Option, + previous_buffer: Option, +} + +#[derive(Default, Clone)] +struct PadSettings { + index: u32, + batch_strategy: super::BatchStrategy, +} + +#[derive(Default)] +pub struct AnalyticsCombinerSinkPad { + state: Mutex, + settings: Mutex, +} + +#[glib::object_subclass] +impl ObjectSubclass for AnalyticsCombinerSinkPad { + const NAME: &'static str = "GstAnalyticsCombinerSinkPad"; + type Type = super::AnalyticsCombinerSinkPad; + type ParentType = gst_base::AggregatorPad; +} + +impl ObjectImpl for AnalyticsCombinerSinkPad { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: LazyLock> = LazyLock::new(|| { + vec![ + glib::ParamSpecUInt::builder("index") + .nick("Index") + .blurb("Index, must be consecutive and starting at 0 and is fixed up otherwise") + .mutable_ready() + .build(), + glib::ParamSpecEnum::builder::("batch-strategy") + .nick("Batch Strategy") + .blurb("Batching strategy to use for this stream") + .mutable_ready() + .build(), + ] + }); + + &PROPERTIES + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "index" => { + self.settings.lock().unwrap().index = value.get().unwrap(); + } + "batch-strategy" => { + self.settings.lock().unwrap().batch_strategy = value.get().unwrap(); + if let Some(parent) = self + .obj() + .parent() + .map(|parent| parent.downcast::().unwrap()) + { + parent.imp().update_latency(); + } + } + _ => unimplemented!(), + }; + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "index" => self.settings.lock().unwrap().index.to_value(), + "batch-strategy" => self.settings.lock().unwrap().batch_strategy.to_value(), + _ => unimplemented!(), + } + } +} + +impl GstObjectImpl for AnalyticsCombinerSinkPad {} + +impl PadImpl for AnalyticsCombinerSinkPad {} + +impl AggregatorPadImpl for AnalyticsCombinerSinkPad { + fn flush(&self, aggregator: &gst_base::Aggregator) -> Result { + let mut pad_state = self.state.lock().unwrap(); + + pad_state + .sticky_events + .remove(&(OrderedEventType(gst::EventType::Eos), None)); + pad_state + .sticky_events + .remove(&(OrderedEventType(gst::EventType::StreamGroupDone), None)); + pad_state + .sticky_events + .remove(&(OrderedEventType(gst::EventType::Segment), None)); + + pad_state.pending_serialized_events.clear(); + pad_state.pending_buffers.clear(); + pad_state.previous_buffer = None; + + self.parent_flush(aggregator) + } +} diff --git a/analytics/analytics/src/combiner/mod.rs b/analytics/analytics/src/combiner/mod.rs new file mode 100644 index 000000000..d9b26168c --- /dev/null +++ b/analytics/analytics/src/combiner/mod.rs @@ -0,0 +1,97 @@ +// SPDX-License-Identifier: MPL-2.0 +/** + * SECTION:element-analyticscombiner + * @see_also: analyticssplitter + * + * `analyticscombiner` is a generic, media-agnostic stream combiner element for analytics purposes. + * + * Buffers and serialized events of one or more streams are combined into batches of a specific + * duration, which can be configured via the `batch-duration` property. The batches are pushed + * downstream as empty buffers with the `GstAnalyticsBatchMeta`, which contains the original + * data flow of each stream. The order of the streams inside the `GstAnalyticsBatchMeta` are + * defined by the `index` property on each of the sink pads, which defaults to being the index of + * all sink pads when sorted according to the pad name. + * + * The caps on the source pad are of type `multistream/x-analytics-batch` and contain the original + * caps of each stream in the `streams` field in the same order as they appear in the + * `GstAnalyticsBatchMeta`. Caps negotiation ensures that downstream can provide constraints for + * each of the input streams. + * + * By default all buffers that start inside a batch are included in the output. Via the + * `batch-strategy` property on the sink pads this behaviour can be modified. + * + * The intended usage is to follow `analyticscombiner` by one or more inference elements that + * process the combined stream, attach additional meta on the buffers, and then pass through + * `analyticssplitter` that then splits the combined stream into its original streams again. + * + * ## Usage + * + * ```shell + * gst-launch-1.0 filesrc location=file-1.mp4 ! decodebin3 ! combiner.sink_0 \ + * filesrc location=file-2.mp4 ! decodebin3 ! combiner.sink_1 \ + * analyticscombiner name=combiner batch-duration=100000000 + * sink_0::batch-strategy=first-in-batch-with-overlay \ + * sink_1::batch-strategy=first-in-batch-with-overlay ! \ + * inference-elements ! \ + * analyticssplitter name=splitter \ + * splitter.src_0_0 ! objectdetectionoverlay ! videoconvert ! autovideosink \ + * splitter.src_0_1 ! objectdetectionoverlay ! videoconvert ! autovideosink + * ``` + * + * Since: plugins-rs-0.14.0 + */ +use glib::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct AnalyticsCombiner(ObjectSubclass) @extends gst_base::Aggregator, gst::Element, gst::Object, @implements gst::ChildProxy; +} + +glib::wrapper! { + pub struct AnalyticsCombinerSinkPad(ObjectSubclass) @extends gst_base::AggregatorPad, gst::Pad, gst::Object; +} + +#[derive(Debug, Copy, Clone, Default, PartialEq, Eq, glib::Enum)] +#[enum_type(name = "GstAnalyticsCombinerBatchStrategy")] +#[repr(C)] +pub enum BatchStrategy { + #[default] + #[enum_value( + name = "All: Include all buffers that start inside the batch.", + nick = "all" + )] + All, + #[enum_value( + name = "FirstInBatch: Include only the first buffer that starts inside the batch.", + nick = "first-in-batch" + )] + FirstInBatch, + #[enum_value( + name = "FirstInBatchWithOverlap: Include only the first buffer that starts inside the batch unless there was a previously unused buffer at most half a batch duration earlier. If no buffer is available, allow taking a buffer up to half a batch duration later. The buffer closest to the batch start is included.", + nick = "first-in-batch-with-overlap" + )] + FirstInBatchWithOverlap, + #[enum_value( + name = "LastInBatch: Include only the last buffer that starts inside the batch.", + nick = "last-in-batch" + )] + LastInBatch, +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + #[cfg(feature = "doc")] + { + use gst::prelude::*; + + BatchStrategy::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); + AnalyticsCombinerSinkPad::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); + } + + gst::Element::register( + Some(plugin), + "analyticscombiner", + gst::Rank::NONE, + AnalyticsCombiner::static_type(), + ) +} diff --git a/analytics/analytics/src/lib.rs b/analytics/analytics/src/lib.rs index 31830c360..07afc9f5a 100644 --- a/analytics/analytics/src/lib.rs +++ b/analytics/analytics/src/lib.rs @@ -1,4 +1,5 @@ // Copyright (C) 2024 Benjamin Gaignard +// Copyright (C) 2025 Sebastian Dröge // // This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. // If a copy of the MPL was not distributed with this file, You can obtain one at @@ -20,6 +21,11 @@ pub(crate) const ONVIF_METADATA_PREFIX: &str = "tt"; mod onvifmeta2relationmeta; mod relationmeta2onvifmeta; +#[cfg(feature = "v1_28")] +mod combiner; +#[cfg(feature = "v1_28")] +mod splitter; + fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { relationmeta2onvifmeta::register(plugin)?; onvifmeta2relationmeta::register(plugin)?; @@ -28,6 +34,12 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { gst::meta::CustomMeta::register("OnvifXMLFrameMeta", &[]); } + #[cfg(feature = "v1_28")] + { + combiner::register(plugin)?; + splitter::register(plugin)?; + } + Ok(()) } diff --git a/analytics/analytics/src/splitter/imp.rs b/analytics/analytics/src/splitter/imp.rs new file mode 100644 index 000000000..37df29b7c --- /dev/null +++ b/analytics/analytics/src/splitter/imp.rs @@ -0,0 +1,444 @@ +// SPDX-License-Identifier: MPL-2.0 + +use gst::{prelude::*, subclass::prelude::*}; +use std::{ + mem, + sync::{LazyLock, Mutex}, +}; + +static CAT: LazyLock = LazyLock::new(|| { + gst::DebugCategory::new( + "analyticssplitter", + gst::DebugColorFlags::empty(), + Some("Analytics batch / splitter element"), + ) +}); + +struct Stream { + pad: gst::Pad, + caps: gst::Caps, +} + +struct State { + generation: usize, + streams: Vec, + combiner: Option, +} + +impl Default for State { + fn default() -> Self { + Self { + generation: 0, + streams: Vec::new(), + combiner: Some(gst_base::UniqueFlowCombiner::default()), + } + } +} + +pub struct AnalyticsSplitter { + sinkpad: gst::Pad, + state: Mutex, +} + +#[glib::object_subclass] +impl ObjectSubclass for AnalyticsSplitter { + const NAME: &'static str = "GstAnalyticsSplitter"; + type Type = super::AnalyticsSplitter; + type ParentType = gst::Element; + + fn with_class(klass: &Self::Class) -> Self { + let templ = klass.pad_template("sink").unwrap(); + let sinkpad = gst::Pad::builder_from_template(&templ) + .chain_function(|pad, parent, buffer| { + AnalyticsSplitter::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |self_| self_.sink_chain(pad, buffer), + ) + }) + .event_function(|pad, parent, event| { + AnalyticsSplitter::catch_panic_pad_function( + parent, + || false, + |self_| self_.sink_event(pad, event), + ) + }) + .query_function(|pad, parent, query| { + AnalyticsSplitter::catch_panic_pad_function( + parent, + || false, + |self_| self_.sink_query(pad, query), + ) + }) + .build(); + + Self { + sinkpad, + state: Mutex::new(State::default()), + } + } +} + +impl ObjectImpl for AnalyticsSplitter { + fn constructed(&self) { + self.parent_constructed(); + + self.obj().add_pad(&self.sinkpad).unwrap(); + } +} + +impl GstObjectImpl for AnalyticsSplitter {} + +impl ElementImpl for AnalyticsSplitter { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: LazyLock = LazyLock::new(|| { + gst::subclass::ElementMetadata::new( + "Analytics batch splitter element", + "Demuxer/Analytics", + "Analytics batch splitter element", + "Sebastian Dröge ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: LazyLock> = LazyLock::new(|| { + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &gst::Caps::builder("multistream/x-analytics-batch") + .features([gst_analytics::CAPS_FEATURE_META_ANALYTICS_BATCH_META]) + .build(), + ) + .unwrap(); + + let src_pad_template = gst::PadTemplate::new( + "src_%u_%u", + gst::PadDirection::Src, + gst::PadPresence::Sometimes, + &gst::Caps::new_any(), + ) + .unwrap(); + + vec![sink_pad_template, src_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } + + #[allow(clippy::single_match)] + fn change_state( + &self, + transition: gst::StateChange, + ) -> Result { + let res = self.parent_change_state(transition)?; + + match transition { + gst::StateChange::PausedToReady => { + let mut state_guard = self.state.lock().unwrap(); + let streams = mem::take(&mut state_guard.streams); + *state_guard = State::default(); + drop(state_guard); + + for stream in streams { + let _ = self.obj().remove_pad(&stream.pad); + } + } + _ => (), + } + + Ok(res) + } +} + +impl AnalyticsSplitter { + fn sink_chain( + &self, + _pad: &gst::Pad, + buffer: gst::Buffer, + ) -> Result { + gst::log!(CAT, imp = self, "Handling buffer {buffer:?}"); + + let Some(meta) = buffer.meta::() else { + gst::error!(CAT, imp = self, "No batch meta"); + gst::element_imp_error!(self, gst::StreamError::Demux, ["No batch meta"]); + return Err(gst::FlowError::Error); + }; + + let mut state_guard = self.state.lock().unwrap(); + + if meta.streams().len() != state_guard.streams.len() { + gst::error!(CAT, imp = self, "Wrong number of streams"); + gst::element_imp_error!(self, gst::StreamError::Demux, ["Wrong number of streams"]); + return Err(gst::FlowError::NotNegotiated); + } + + let pads = state_guard + .streams + .iter() + .map(|s| &s.pad) + .cloned() + .collect::>(); + // Temporarily take combiner out so we can release the lock, and later + // store it again in the state. + let mut combiner = state_guard.combiner.take().unwrap(); + drop(state_guard); + + let mut res = Ok(gst::FlowSuccess::Ok); + 'next_stream: for (pad, stream) in Iterator::zip(pads.into_iter(), meta.streams().iter()) { + for buffer in stream.buffers() { + for event in buffer.sticky_events() { + gst::trace!(CAT, obj = pad, "Storing sticky event {event:?}"); + let _ = pad.store_sticky_event(event); + } + + for event in buffer.serialized_events() { + gst::trace!(CAT, obj = pad, "Pushing serialized event {event:?}"); + let _ = pad.push_event(event.clone()); + } + + if let Some(buffer) = buffer.buffer_owned() { + gst::trace!(CAT, obj = pad, "Pushing buffer {buffer:?}"); + let pad_res = pad.push(buffer); + res = combiner.update_pad_flow(&pad, pad_res); + } + if let Some(buffer_list) = buffer.buffer_list_owned() { + gst::trace!(CAT, obj = pad, "Pushing buffer list {buffer_list:?}"); + let pad_res = pad.push_list(buffer_list); + res = combiner.update_pad_flow(&pad, pad_res); + } + + if let Err(err) = res { + gst::debug!(CAT, obj = pad, "Got flow error {err:?}"); + break 'next_stream; + } + } + } + + let mut state_guard = self.state.lock().unwrap(); + state_guard.combiner = Some(combiner); + + gst::trace!(CAT, imp = self, "Returning {res:?}"); + + res + } + + fn sink_event(&self, pad: &gst::Pad, event: gst::Event) -> bool { + use gst::EventView; + + gst::log!(CAT, obj = pad, "Handling event {event:?}"); + match event.view() { + EventView::Caps(ev) => { + let caps = ev.caps(); + let s = caps.structure(0).unwrap(); + + let Some(streams) = s.get::("streams").ok() else { + return false; + }; + let streams = streams + .iter() + .map(|v| v.get::().unwrap()) + .collect::>(); + + let mut state_guard = self.state.lock().unwrap(); + let mut to_remove = vec![]; + let mut to_add = vec![]; + if streams.len() != state_guard.streams.len() + || !Iterator::zip(streams.iter(), state_guard.streams.iter().map(|s| &s.caps)) + .all(|(a, b)| a == b) + { + let templ = self.obj().class().pad_template("src_%u_%u").unwrap(); + to_remove = state_guard.streams.drain(..).map(|s| s.pad).collect(); + for pad in &to_remove { + state_guard.combiner.as_mut().unwrap().remove_pad(pad); + } + + state_guard.combiner.as_mut().unwrap().reset(); + + for (idx, stream) in streams.iter().enumerate() { + let pad = gst::Pad::builder_from_template(&templ) + .name(format!("src_{}_{idx}", state_guard.generation)) + .event_function(|pad, parent, event| { + AnalyticsSplitter::catch_panic_pad_function( + parent, + || false, + |self_| self_.src_event(pad, event), + ) + }) + .query_function(|pad, parent, query| { + AnalyticsSplitter::catch_panic_pad_function( + parent, + || false, + |self_| self_.src_query(pad, query), + ) + }) + .build(); + gst::debug!( + CAT, + imp = self, + "Creating pad {} with caps {stream:?}", + pad.name() + ); + to_add.push(pad.clone()); + state_guard.combiner.as_mut().unwrap().add_pad(&pad); + state_guard.streams.push(Stream { + pad, + caps: stream.clone(), + }); + } + + state_guard.generation += 1; + } + + drop(state_guard); + + for pad in to_add { + let _ = pad.set_active(true); + let _ = self.obj().add_pad(&pad); + } + for pad in to_remove { + let _ = pad.set_active(false); + let _ = self.obj().remove_pad(&pad); + } + + self.obj().no_more_pads(); + } + // Pass through + EventView::Eos(_) | EventView::FlushStart(_) | EventView::FlushStop(_) => (), + // Drop all other events, we take them from the individual streams + _ => return true, + } + + gst::Pad::event_default(pad, Some(&*self.obj()), event) + } + + #[allow(clippy::single_match)] + fn sink_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool { + use gst::QueryViewMut; + + gst::log!(CAT, obj = pad, "Handling query {query:?}"); + match query.view_mut() { + QueryViewMut::Caps(q) => { + let state_guard = self.state.lock().unwrap(); + let sinkpads = state_guard + .streams + .iter() + .map(|s| s.pad.clone()) + .collect::>(); + drop(state_guard); + + let streams = if sinkpads.is_empty() { + None + } else { + Some( + sinkpads + .iter() + .map(|pad| pad.peer_query_caps(None)) + .map(|caps| caps.to_send_value()) + .collect::(), + ) + }; + + let res = gst::Caps::builder("multistream/x-analytics-batch") + .features([gst_analytics::CAPS_FEATURE_META_ANALYTICS_BATCH_META]) + .field_if_some("streams", streams) + .build(); + + let filter = q.filter(); + let res = &filter + .map(|filter| filter.intersect_with_mode(&res, gst::CapsIntersectMode::First)) + .unwrap_or(res); + q.set_result(res); + + gst::log!(CAT, imp = self, "Returning caps {res:?}"); + + return true; + } + _ => (), + } + gst::Pad::query_default(pad, Some(&*self.obj()), query) + } + + fn src_event(&self, pad: &gst::Pad, event: gst::Event) -> bool { + gst::log!(CAT, obj = pad, "Handling event {event:?}"); + + gst::Pad::event_default(pad, Some(&*self.obj()), event) + } + + #[allow(clippy::single_match)] + fn src_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool { + use gst::QueryViewMut; + + gst::log!(CAT, obj = pad, "Handling query {query:?}"); + match query.view_mut() { + QueryViewMut::Caps(q) => { + let filter = q.filter_owned(); + let peer_caps = self.sinkpad.peer_query_caps(None); + + if peer_caps.is_any() { + let res = filter.unwrap_or(peer_caps); + gst::log!(CAT, obj = pad, "Returning caps {res:?}"); + q.set_result(&res); + } else if peer_caps.is_empty() { + gst::log!(CAT, obj = pad, "Returning caps {peer_caps:?}"); + + q.set_result(&peer_caps); + } else { + let state_guard = self.state.lock().unwrap(); + let pad_index = state_guard + .streams + .iter() + .enumerate() + .find_map(|(idx, p)| (pad == &p.pad).then_some(idx)); + drop(state_guard); + + // Shouldn't happen, pad not found! + let Some(pad_index) = pad_index else { + gst::warning!(CAT, obj = pad, "Unknown pad"); + return false; + }; + + // return expected caps for this pad from upstream, if any + let mut res = gst::Caps::new_empty(); + for s in peer_caps.iter() { + let Some(streams) = s.get::("streams").ok() else { + continue; + }; + + let Some(stream_caps) = streams + .get(pad_index) + .map(|v| v.get::().unwrap()) + else { + continue; + }; + + res.merge(stream_caps); + } + + // No stream specific caps found, return ANY + let res = if res.is_empty() { + filter.unwrap_or(gst::Caps::new_any()) + } else { + filter + .as_ref() + .map(|filter| { + filter.intersect_with_mode(&res, gst::CapsIntersectMode::First) + }) + .unwrap_or(res) + }; + + gst::log!(CAT, obj = pad, "Returning caps {res:?}"); + + q.set_result(&res); + } + + return true; + } + _ => (), + } + gst::Pad::query_default(pad, Some(&*self.obj()), query) + } +} diff --git a/analytics/analytics/src/splitter/mod.rs b/analytics/analytics/src/splitter/mod.rs new file mode 100644 index 000000000..c7c192782 --- /dev/null +++ b/analytics/analytics/src/splitter/mod.rs @@ -0,0 +1,28 @@ +// SPDX-License-Identifier: MPL-2.0 +/** + * SECTION:element-analyticssplitter + * @see_also: analyticscombiner + * + * `analyticssplitter` is a generic, media-agnostic stream splitter for streams generated by + * `analyticscombiner` and other streams that make use of `GstAnalyticsBatchMeta`. + * + * See the `analyticscombiner` and `GstAnalyticsBatchMeta` documentation for more details. + * + * Since: plugins-rs-0.14.0 + */ +use glib::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct AnalyticsSplitter(ObjectSubclass) @extends gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "analyticssplitter", + gst::Rank::PRIMARY, + AnalyticsSplitter::static_type(), + ) +} diff --git a/analytics/analytics/tests/analyticscombiner.rs b/analytics/analytics/tests/analyticscombiner.rs new file mode 100644 index 000000000..c6aa80845 --- /dev/null +++ b/analytics/analytics/tests/analyticscombiner.rs @@ -0,0 +1,1185 @@ +// Copyright (C) 2025 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 +// + +use gst::prelude::*; + +fn init() { + use std::sync::Once; + static INIT: Once = Once::new(); + + INIT.call_once(|| { + gst::init().unwrap(); + gstrsanalytics::plugin_register_static().unwrap(); + }); +} + +#[test] +fn test_combine_multi() { + init(); + + let combiner = gst::ElementFactory::make("analyticscombiner") + .property("batch-duration", 200.mseconds()) + .build() + .unwrap(); + let sink_0 = combiner.request_pad_simple("sink_0").unwrap(); + let sink_1 = combiner.request_pad_simple("sink_1").unwrap(); + + let mut h0 = gst_check::Harness::with_element(&combiner, None, Some("src")); + h0.add_element_sink_pad(&sink_0); + let mut h1 = gst_check::Harness::with_element(&combiner, None, None); + h1.add_element_sink_pad(&sink_1); + + let h0_caps = gst_video::VideoInfo::builder(gst_video::VideoFormat::Rgb, 320, 240) + .fps(gst::Fraction::new(50, 1)) + .build() + .unwrap() + .to_caps() + .unwrap(); + + let h1_caps = gst_video::VideoInfo::builder(gst_video::VideoFormat::Gray8, 320, 240) + .fps(gst::Fraction::new(25, 1)) + .build() + .unwrap() + .to_caps() + .unwrap(); + + h0.set_src_caps(h0_caps.clone()); + h0.play(); + + h1.set_src_caps(h1_caps.clone()); + h1.play(); + + // Push buffers according to the framerate for the first batch + // and one additional for the second batch to get an output. + for i in 0..12 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i * 20.mseconds()); + buffer.set_duration(20.mseconds()); + } + assert_eq!(h0.push(buffer), Ok(gst::FlowSuccess::Ok)); + + if i % 2 == 0 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts((i / 2) * 40.mseconds()); + buffer.set_duration(40.mseconds()); + } + assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok)); + } + } + + let buffer = h0.pull().unwrap(); + + assert_eq!(buffer.pts(), Some(0.mseconds())); + assert_eq!(buffer.duration(), Some(200.mseconds())); + let meta = buffer.meta::().unwrap(); + let streams = meta.streams(); + assert_eq!(streams.len(), 2); + let stream = &streams[0]; + assert_eq!(stream.index(), 0); + let buffers = stream.buffers(); + assert_eq!(buffers.len(), 10); + for (idx, buffer) in buffers.iter().enumerate() { + assert_eq!( + buffer.stream_id(), + Some(sink_0.stream_id().unwrap().as_gstr()) + ); + assert_eq!( + buffer.segment(), + Some(gst::FormattedSegment::::new().upcast()) + ); + assert_eq!(buffer.caps().as_ref(), Some(&h0_caps)); + let b = buffer.buffer().unwrap(); + assert_eq!(b.pts(), Some(idx as u64 * 20.mseconds())); + assert_eq!(b.duration(), Some(20.mseconds())); + } + let stream = &streams[1]; + assert_eq!(stream.index(), 1); + let buffers = stream.buffers(); + assert_eq!(buffers.len(), 5); + for (idx, buffer) in buffers.iter().enumerate() { + assert_eq!( + buffer.stream_id(), + Some(sink_1.stream_id().unwrap().as_gstr()) + ); + assert_eq!( + buffer.segment(), + Some(gst::FormattedSegment::::new().upcast()) + ); + assert_eq!(buffer.caps().as_ref(), Some(&h1_caps)); + let b = buffer.buffer().unwrap(); + assert_eq!(b.pts(), Some(idx as u64 * 40.mseconds())); + assert_eq!(b.duration(), Some(40.mseconds())); + } + + h0.push_event(gst::event::Eos::new()); + h1.push_event(gst::event::Eos::new()); + + let buffer = h0.pull().unwrap(); + + assert_eq!(buffer.pts(), Some(200.mseconds())); + assert_eq!(buffer.duration(), Some(200.mseconds())); + let meta = buffer.meta::().unwrap(); + let streams = meta.streams(); + assert_eq!(streams.len(), 2); + let stream = &streams[0]; + assert_eq!(stream.index(), 0); + let buffers = stream.buffers(); + assert_eq!(buffers.len(), 2); + for (idx, buffer) in buffers.iter().enumerate() { + assert_eq!( + buffer.stream_id(), + Some(sink_0.stream_id().unwrap().as_gstr()) + ); + assert_eq!( + buffer.segment(), + Some(gst::FormattedSegment::::new().upcast()) + ); + assert_eq!(buffer.caps().as_ref(), Some(&h0_caps)); + let b = buffer.buffer().unwrap(); + assert_eq!(b.pts(), Some(200.mseconds() + idx as u64 * 20.mseconds())); + assert_eq!(b.duration(), Some(20.mseconds())); + } + let stream = &streams[1]; + assert_eq!(stream.index(), 1); + let buffers = stream.buffers(); + assert_eq!(buffers.len(), 1); + for (idx, buffer) in buffers.iter().enumerate() { + assert_eq!( + buffer.stream_id(), + Some(sink_1.stream_id().unwrap().as_gstr()) + ); + assert_eq!( + buffer.segment(), + Some(gst::FormattedSegment::::new().upcast()) + ); + assert_eq!(buffer.caps().as_ref(), Some(&h1_caps)); + let b = buffer.buffer().unwrap(); + assert_eq!(b.pts(), Some(200.mseconds() + idx as u64 * 40.mseconds())); + assert_eq!(b.duration(), Some(40.mseconds())); + } + + // Now finally check all the events + + let ev = h0.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::StreamStart); + + let ev = h0.pull_event().unwrap(); + let gst::EventView::Caps(ev) = ev.view() else { + assert_eq!(ev.type_(), gst::EventType::Caps); + unreachable!(); + }; + let caps = ev.caps(); + let s = caps.structure(0).unwrap(); + assert_eq!(s.name(), "multistream/x-analytics-batch"); + let streams = s + .get::("streams") + .unwrap() + .iter() + .map(|v| v.get::().unwrap()) + .collect::>(); + assert_eq!(streams.len(), 2); + assert_eq!(&streams[0], &h0_caps); + assert_eq!(&streams[1], &h1_caps); + + let ev = h0.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Segment); + let ev = h0.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Eos); +} + +#[test] +fn test_strategy_all() { + init(); + + let combiner = gst::ElementFactory::make("analyticscombiner") + .property("batch-duration", 100.mseconds()) + .build() + .unwrap(); + let sink_0 = combiner.request_pad_simple("sink_0").unwrap(); + sink_0.set_property_from_str("batch-strategy", "all"); + + let mut h = gst_check::Harness::with_element(&combiner, None, Some("src")); + h.add_element_sink_pad(&sink_0); + + let h_caps = gst_video::VideoInfo::builder(gst_video::VideoFormat::Rgb, 320, 240) + .fps(gst::Fraction::new(30, 1)) + .build() + .unwrap() + .to_caps() + .unwrap(); + + h.set_src_caps(h_caps.clone()); + h.play(); + + let ptss = [0, 33, 66, 100]; + for pts in ptss { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(gst::ClockTime::from_mseconds(pts)); + buffer.set_duration(33_333_333.nseconds()); + } + assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok)); + } + + let buffer = h.pull().unwrap(); + assert_eq!(buffer.pts(), Some(0.mseconds())); + assert_eq!(buffer.duration(), Some(100.mseconds())); + let meta = buffer.meta::().unwrap(); + let streams = meta.streams(); + assert_eq!(streams.len(), 1); + let stream = &streams[0]; + assert_eq!(stream.index(), 0); + let buffers = stream.buffers(); + assert_eq!(buffers.len(), 3); + for (idx, buffer) in buffers.iter().enumerate() { + assert_eq!( + buffer.stream_id(), + Some(sink_0.stream_id().unwrap().as_gstr()) + ); + assert_eq!( + buffer.segment(), + Some(gst::FormattedSegment::::new().upcast()) + ); + assert_eq!(buffer.caps().as_ref(), Some(&h_caps)); + let b = buffer.buffer().unwrap(); + assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(ptss[idx]))); + assert_eq!(b.duration(), Some(33_333_333.nseconds())); + } + + let ptss = [133, 200]; + for pts in ptss { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(gst::ClockTime::from_mseconds(pts)); + buffer.set_duration(33_333_333.nseconds()); + } + assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok)); + } + + let ptss = [100, 133]; + let buffer = h.pull().unwrap(); + assert_eq!(buffer.pts(), Some(100.mseconds())); + assert_eq!(buffer.duration(), Some(100.mseconds())); + let meta = buffer.meta::().unwrap(); + let streams = meta.streams(); + assert_eq!(streams.len(), 1); + let stream = &streams[0]; + assert_eq!(stream.index(), 0); + let buffers = stream.buffers(); + assert_eq!(buffers.len(), 2); + for (idx, buffer) in buffers.iter().enumerate() { + assert_eq!( + buffer.stream_id(), + Some(sink_0.stream_id().unwrap().as_gstr()) + ); + assert_eq!( + buffer.segment(), + Some(gst::FormattedSegment::::new().upcast()) + ); + assert_eq!(buffer.caps().as_ref(), Some(&h_caps)); + let b = buffer.buffer().unwrap(); + assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(ptss[idx]))); + assert_eq!(b.duration(), Some(33_333_333.nseconds())); + } + + let ptss = [233, 233, 266, 300]; + for pts in ptss { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(gst::ClockTime::from_mseconds(pts)); + buffer.set_duration(33_333_333.nseconds()); + } + assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok)); + } + + let ptss = [200, 233, 233, 266]; + let buffer = h.pull().unwrap(); + assert_eq!(buffer.pts(), Some(200.mseconds())); + assert_eq!(buffer.duration(), Some(100.mseconds())); + let meta = buffer.meta::().unwrap(); + let streams = meta.streams(); + assert_eq!(streams.len(), 1); + let stream = &streams[0]; + assert_eq!(stream.index(), 0); + let buffers = stream.buffers(); + assert_eq!(buffers.len(), 4); + for (idx, buffer) in buffers.iter().enumerate() { + assert_eq!( + buffer.stream_id(), + Some(sink_0.stream_id().unwrap().as_gstr()) + ); + assert_eq!( + buffer.segment(), + Some(gst::FormattedSegment::::new().upcast()) + ); + assert_eq!(buffer.caps().as_ref(), Some(&h_caps)); + let b = buffer.buffer().unwrap(); + assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(ptss[idx]))); + assert_eq!(b.duration(), Some(33_333_333.nseconds())); + } + + h.push_event(gst::event::Eos::new()); + + let buffer = h.pull().unwrap(); + assert_eq!(buffer.pts(), Some(300.mseconds())); + assert_eq!(buffer.duration(), Some(100.mseconds())); + let meta = buffer.meta::().unwrap(); + let streams = meta.streams(); + assert_eq!(streams.len(), 1); + let stream = &streams[0]; + assert_eq!(stream.index(), 0); + let buffers = stream.buffers(); + assert_eq!(buffers.len(), 1); + let buffer = &buffers[0]; + assert_eq!( + buffer.stream_id(), + Some(sink_0.stream_id().unwrap().as_gstr()) + ); + assert_eq!( + buffer.segment(), + Some(gst::FormattedSegment::::new().upcast()) + ); + assert_eq!(buffer.caps().as_ref(), Some(&h_caps)); + let b = buffer.buffer().unwrap(); + assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(300))); + assert_eq!(b.duration(), Some(33_333_333.nseconds())); + + // Now finally check all the events + + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::StreamStart); + + let ev = h.pull_event().unwrap(); + let gst::EventView::Caps(ev) = ev.view() else { + assert_eq!(ev.type_(), gst::EventType::Caps); + unreachable!(); + }; + let caps = ev.caps(); + let s = caps.structure(0).unwrap(); + assert_eq!(s.name(), "multistream/x-analytics-batch"); + let streams = s + .get::("streams") + .unwrap() + .iter() + .map(|v| v.get::().unwrap()) + .collect::>(); + assert_eq!(streams.len(), 1); + assert_eq!(&streams[0], &h_caps); + + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Segment); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Eos); +} + +#[test] +fn test_strategy_first() { + init(); + + let combiner = gst::ElementFactory::make("analyticscombiner") + .property("batch-duration", 100.mseconds()) + .build() + .unwrap(); + let sink_0 = combiner.request_pad_simple("sink_0").unwrap(); + sink_0.set_property_from_str("batch-strategy", "first-in-batch"); + + let mut h = gst_check::Harness::with_element(&combiner, None, Some("src")); + h.add_element_sink_pad(&sink_0); + + let h_caps = gst_video::VideoInfo::builder(gst_video::VideoFormat::Rgb, 320, 240) + .fps(gst::Fraction::new(30, 1)) + .build() + .unwrap() + .to_caps() + .unwrap(); + + h.set_src_caps(h_caps.clone()); + h.play(); + + let ptss = [0, 33, 66, 100]; + for pts in ptss { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(gst::ClockTime::from_mseconds(pts)); + buffer.set_duration(33_333_333.nseconds()); + } + assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok)); + } + + let buffer = h.pull().unwrap(); + assert_eq!(buffer.pts(), Some(0.mseconds())); + assert_eq!(buffer.duration(), Some(100.mseconds())); + let meta = buffer.meta::().unwrap(); + let streams = meta.streams(); + assert_eq!(streams.len(), 1); + let stream = &streams[0]; + assert_eq!(stream.index(), 0); + let buffers = stream.buffers(); + assert_eq!(buffers.len(), 1); + let buffer = &buffers[0]; + assert_eq!( + buffer.stream_id(), + Some(sink_0.stream_id().unwrap().as_gstr()) + ); + assert_eq!( + buffer.segment(), + Some(gst::FormattedSegment::::new().upcast()) + ); + assert_eq!(buffer.caps().as_ref(), Some(&h_caps)); + let b = buffer.buffer().unwrap(); + assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(0))); + assert_eq!(b.duration(), Some(33_333_333.nseconds())); + + let ptss = [133, 200]; + for pts in ptss { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(gst::ClockTime::from_mseconds(pts)); + buffer.set_duration(33_333_333.nseconds()); + } + assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok)); + } + + let buffer = h.pull().unwrap(); + assert_eq!(buffer.pts(), Some(100.mseconds())); + assert_eq!(buffer.duration(), Some(100.mseconds())); + let meta = buffer.meta::().unwrap(); + let streams = meta.streams(); + assert_eq!(streams.len(), 1); + let stream = &streams[0]; + assert_eq!(stream.index(), 0); + let buffers = stream.buffers(); + assert_eq!(buffers.len(), 1); + let buffer = &buffers[0]; + assert_eq!( + buffer.stream_id(), + Some(sink_0.stream_id().unwrap().as_gstr()) + ); + assert_eq!( + buffer.segment(), + Some(gst::FormattedSegment::::new().upcast()) + ); + assert_eq!(buffer.caps().as_ref(), Some(&h_caps)); + let b = buffer.buffer().unwrap(); + assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(100))); + assert_eq!(b.duration(), Some(33_333_333.nseconds())); + + let ptss = [233, 233, 266, 300]; + for pts in ptss { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(gst::ClockTime::from_mseconds(pts)); + buffer.set_duration(33_333_333.nseconds()); + } + assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok)); + } + + let buffer = h.pull().unwrap(); + assert_eq!(buffer.pts(), Some(200.mseconds())); + assert_eq!(buffer.duration(), Some(100.mseconds())); + let meta = buffer.meta::().unwrap(); + let streams = meta.streams(); + assert_eq!(streams.len(), 1); + let stream = &streams[0]; + assert_eq!(stream.index(), 0); + let buffers = stream.buffers(); + assert_eq!(buffers.len(), 1); + let buffer = &buffers[0]; + assert_eq!( + buffer.stream_id(), + Some(sink_0.stream_id().unwrap().as_gstr()) + ); + assert_eq!( + buffer.segment(), + Some(gst::FormattedSegment::::new().upcast()) + ); + assert_eq!(buffer.caps().as_ref(), Some(&h_caps)); + let b = buffer.buffer().unwrap(); + assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(200))); + assert_eq!(b.duration(), Some(33_333_333.nseconds())); + + h.push_event(gst::event::Eos::new()); + + let buffer = h.pull().unwrap(); + assert_eq!(buffer.pts(), Some(300.mseconds())); + assert_eq!(buffer.duration(), Some(100.mseconds())); + let meta = buffer.meta::().unwrap(); + let streams = meta.streams(); + assert_eq!(streams.len(), 1); + let stream = &streams[0]; + assert_eq!(stream.index(), 0); + let buffers = stream.buffers(); + assert_eq!(buffers.len(), 1); + let buffer = &buffers[0]; + assert_eq!( + buffer.stream_id(), + Some(sink_0.stream_id().unwrap().as_gstr()) + ); + assert_eq!( + buffer.segment(), + Some(gst::FormattedSegment::::new().upcast()) + ); + assert_eq!(buffer.caps().as_ref(), Some(&h_caps)); + let b = buffer.buffer().unwrap(); + assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(300))); + assert_eq!(b.duration(), Some(33_333_333.nseconds())); + + // Now finally check all the events + + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::StreamStart); + + let ev = h.pull_event().unwrap(); + let gst::EventView::Caps(ev) = ev.view() else { + assert_eq!(ev.type_(), gst::EventType::Caps); + unreachable!(); + }; + let caps = ev.caps(); + let s = caps.structure(0).unwrap(); + assert_eq!(s.name(), "multistream/x-analytics-batch"); + let streams = s + .get::("streams") + .unwrap() + .iter() + .map(|v| v.get::().unwrap()) + .collect::>(); + assert_eq!(streams.len(), 1); + assert_eq!(&streams[0], &h_caps); + + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Segment); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Eos); +} + +#[test] +fn test_strategy_first_with_overlap() { + init(); + + let combiner = gst::ElementFactory::make("analyticscombiner") + .property("batch-duration", 100.mseconds()) + .build() + .unwrap(); + let sink_0 = combiner.request_pad_simple("sink_0").unwrap(); + sink_0.set_property_from_str("batch-strategy", "first-in-batch-with-overlap"); + + let mut h = gst_check::Harness::with_element(&combiner, None, Some("src")); + h.add_element_sink_pad(&sink_0); + + let h_caps = gst_video::VideoInfo::builder(gst_video::VideoFormat::Rgb, 320, 240) + .fps(gst::Fraction::new(30, 1)) + .build() + .unwrap() + .to_caps() + .unwrap(); + + h.set_src_caps(h_caps.clone()); + h.play(); + + let ptss = [0, 33, 66, 100]; + for pts in ptss { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(gst::ClockTime::from_mseconds(pts)); + buffer.set_duration(33_333_333.nseconds()); + } + assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok)); + } + + let buffer = h.pull().unwrap(); + assert_eq!(buffer.pts(), Some(0.mseconds())); + assert_eq!(buffer.duration(), Some(100.mseconds())); + let meta = buffer.meta::().unwrap(); + let streams = meta.streams(); + assert_eq!(streams.len(), 1); + let stream = &streams[0]; + assert_eq!(stream.index(), 0); + let buffers = stream.buffers(); + assert_eq!(buffers.len(), 1); + let buffer = &buffers[0]; + assert_eq!( + buffer.stream_id(), + Some(sink_0.stream_id().unwrap().as_gstr()) + ); + assert_eq!( + buffer.segment(), + Some(gst::FormattedSegment::::new().upcast()) + ); + assert_eq!(buffer.caps().as_ref(), Some(&h_caps)); + let b = buffer.buffer().unwrap(); + assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(0))); + assert_eq!(b.duration(), Some(33_333_333.nseconds())); + + let ptss = [133, 199, 233]; + for pts in ptss { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(gst::ClockTime::from_mseconds(pts)); + buffer.set_duration(33_333_333.nseconds()); + } + assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok)); + } + + let buffer = h.pull().unwrap(); + assert_eq!(buffer.pts(), Some(100.mseconds())); + assert_eq!(buffer.duration(), Some(100.mseconds())); + let meta = buffer.meta::().unwrap(); + let streams = meta.streams(); + assert_eq!(streams.len(), 1); + let stream = &streams[0]; + assert_eq!(stream.index(), 0); + let buffers = stream.buffers(); + assert_eq!(buffers.len(), 1); + let buffer = &buffers[0]; + assert_eq!( + buffer.stream_id(), + Some(sink_0.stream_id().unwrap().as_gstr()) + ); + assert_eq!( + buffer.segment(), + Some(gst::FormattedSegment::::new().upcast()) + ); + assert_eq!(buffer.caps().as_ref(), Some(&h_caps)); + let b = buffer.buffer().unwrap(); + assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(100))); + assert_eq!(b.duration(), Some(33_333_333.nseconds())); + + let ptss = [233, 266, 301, 333]; + for pts in ptss { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(gst::ClockTime::from_mseconds(pts)); + buffer.set_duration(33_333_333.nseconds()); + } + assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok)); + } + + let buffer = h.pull().unwrap(); + assert_eq!(buffer.pts(), Some(200.mseconds())); + assert_eq!(buffer.duration(), Some(100.mseconds())); + let meta = buffer.meta::().unwrap(); + let streams = meta.streams(); + assert_eq!(streams.len(), 1); + let stream = &streams[0]; + assert_eq!(stream.index(), 0); + let buffers = stream.buffers(); + assert_eq!(buffers.len(), 1); + let buffer = &buffers[0]; + assert_eq!( + buffer.stream_id(), + Some(sink_0.stream_id().unwrap().as_gstr()) + ); + assert_eq!( + buffer.segment(), + Some(gst::FormattedSegment::::new().upcast()) + ); + assert_eq!(buffer.caps().as_ref(), Some(&h_caps)); + let b = buffer.buffer().unwrap(); + assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(199))); + assert_eq!(b.duration(), Some(33_333_333.nseconds())); + + h.push_event(gst::event::Eos::new()); + + let buffer = h.pull().unwrap(); + assert_eq!(buffer.pts(), Some(300.mseconds())); + assert_eq!(buffer.duration(), Some(100.mseconds())); + let meta = buffer.meta::().unwrap(); + let streams = meta.streams(); + assert_eq!(streams.len(), 1); + let stream = &streams[0]; + assert_eq!(stream.index(), 0); + let buffers = stream.buffers(); + assert_eq!(buffers.len(), 1); + let buffer = &buffers[0]; + assert_eq!( + buffer.stream_id(), + Some(sink_0.stream_id().unwrap().as_gstr()) + ); + assert_eq!( + buffer.segment(), + Some(gst::FormattedSegment::::new().upcast()) + ); + assert_eq!(buffer.caps().as_ref(), Some(&h_caps)); + let b = buffer.buffer().unwrap(); + assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(301))); + assert_eq!(b.duration(), Some(33_333_333.nseconds())); + + // Now finally check all the events + + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::StreamStart); + + let ev = h.pull_event().unwrap(); + let gst::EventView::Caps(ev) = ev.view() else { + assert_eq!(ev.type_(), gst::EventType::Caps); + unreachable!(); + }; + let caps = ev.caps(); + let s = caps.structure(0).unwrap(); + assert_eq!(s.name(), "multistream/x-analytics-batch"); + let streams = s + .get::("streams") + .unwrap() + .iter() + .map(|v| v.get::().unwrap()) + .collect::>(); + assert_eq!(streams.len(), 1); + assert_eq!(&streams[0], &h_caps); + + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Segment); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Eos); +} + +#[test] +fn test_strategy_last() { + init(); + + let combiner = gst::ElementFactory::make("analyticscombiner") + .property("batch-duration", 100.mseconds()) + .build() + .unwrap(); + let sink_0 = combiner.request_pad_simple("sink_0").unwrap(); + sink_0.set_property_from_str("batch-strategy", "last-in-batch"); + + let mut h = gst_check::Harness::with_element(&combiner, None, Some("src")); + h.add_element_sink_pad(&sink_0); + + let h_caps = gst_video::VideoInfo::builder(gst_video::VideoFormat::Rgb, 320, 240) + .fps(gst::Fraction::new(30, 1)) + .build() + .unwrap() + .to_caps() + .unwrap(); + + h.set_src_caps(h_caps.clone()); + h.play(); + + let ptss = [0, 33, 66, 100]; + for pts in ptss { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(gst::ClockTime::from_mseconds(pts)); + buffer.set_duration(33_333_333.nseconds()); + } + assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok)); + } + + let buffer = h.pull().unwrap(); + assert_eq!(buffer.pts(), Some(0.mseconds())); + assert_eq!(buffer.duration(), Some(100.mseconds())); + let meta = buffer.meta::().unwrap(); + let streams = meta.streams(); + assert_eq!(streams.len(), 1); + let stream = &streams[0]; + assert_eq!(stream.index(), 0); + let buffers = stream.buffers(); + assert_eq!(buffers.len(), 1); + let buffer = &buffers[0]; + assert_eq!( + buffer.stream_id(), + Some(sink_0.stream_id().unwrap().as_gstr()) + ); + assert_eq!( + buffer.segment(), + Some(gst::FormattedSegment::::new().upcast()) + ); + assert_eq!(buffer.caps().as_ref(), Some(&h_caps)); + let b = buffer.buffer().unwrap(); + assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(66))); + assert_eq!(b.duration(), Some(33_333_333.nseconds())); + + let ptss = [133, 200]; + for pts in ptss { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(gst::ClockTime::from_mseconds(pts)); + buffer.set_duration(33_333_333.nseconds()); + } + assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok)); + } + + let buffer = h.pull().unwrap(); + assert_eq!(buffer.pts(), Some(100.mseconds())); + assert_eq!(buffer.duration(), Some(100.mseconds())); + let meta = buffer.meta::().unwrap(); + let streams = meta.streams(); + assert_eq!(streams.len(), 1); + let stream = &streams[0]; + assert_eq!(stream.index(), 0); + let buffers = stream.buffers(); + assert_eq!(buffers.len(), 1); + let buffer = &buffers[0]; + assert_eq!( + buffer.stream_id(), + Some(sink_0.stream_id().unwrap().as_gstr()) + ); + assert_eq!( + buffer.segment(), + Some(gst::FormattedSegment::::new().upcast()) + ); + assert_eq!(buffer.caps().as_ref(), Some(&h_caps)); + let b = buffer.buffer().unwrap(); + assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(133))); + assert_eq!(b.duration(), Some(33_333_333.nseconds())); + + let ptss = [233, 233, 266, 300]; + for pts in ptss { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(gst::ClockTime::from_mseconds(pts)); + buffer.set_duration(33_333_333.nseconds()); + } + assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok)); + } + + let buffer = h.pull().unwrap(); + assert_eq!(buffer.pts(), Some(200.mseconds())); + assert_eq!(buffer.duration(), Some(100.mseconds())); + let meta = buffer.meta::().unwrap(); + let streams = meta.streams(); + assert_eq!(streams.len(), 1); + let stream = &streams[0]; + assert_eq!(stream.index(), 0); + let buffers = stream.buffers(); + assert_eq!(buffers.len(), 1); + let buffer = &buffers[0]; + assert_eq!( + buffer.stream_id(), + Some(sink_0.stream_id().unwrap().as_gstr()) + ); + assert_eq!( + buffer.segment(), + Some(gst::FormattedSegment::::new().upcast()) + ); + assert_eq!(buffer.caps().as_ref(), Some(&h_caps)); + let b = buffer.buffer().unwrap(); + assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(266))); + assert_eq!(b.duration(), Some(33_333_333.nseconds())); + + h.push_event(gst::event::Eos::new()); + + let buffer = h.pull().unwrap(); + assert_eq!(buffer.pts(), Some(300.mseconds())); + assert_eq!(buffer.duration(), Some(100.mseconds())); + let meta = buffer.meta::().unwrap(); + let streams = meta.streams(); + assert_eq!(streams.len(), 1); + let stream = &streams[0]; + assert_eq!(stream.index(), 0); + let buffers = stream.buffers(); + assert_eq!(buffers.len(), 1); + let buffer = &buffers[0]; + assert_eq!( + buffer.stream_id(), + Some(sink_0.stream_id().unwrap().as_gstr()) + ); + assert_eq!( + buffer.segment(), + Some(gst::FormattedSegment::::new().upcast()) + ); + assert_eq!(buffer.caps().as_ref(), Some(&h_caps)); + let b = buffer.buffer().unwrap(); + assert_eq!(b.pts(), Some(gst::ClockTime::from_mseconds(300))); + assert_eq!(b.duration(), Some(33_333_333.nseconds())); + + // Now finally check all the events + + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::StreamStart); + + let ev = h.pull_event().unwrap(); + let gst::EventView::Caps(ev) = ev.view() else { + assert_eq!(ev.type_(), gst::EventType::Caps); + unreachable!(); + }; + let caps = ev.caps(); + let s = caps.structure(0).unwrap(); + assert_eq!(s.name(), "multistream/x-analytics-batch"); + let streams = s + .get::("streams") + .unwrap() + .iter() + .map(|v| v.get::().unwrap()) + .collect::>(); + assert_eq!(streams.len(), 1); + assert_eq!(&streams[0], &h_caps); + + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Segment); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Eos); +} + +#[test] +fn test_combine_multi_initial_gap() { + init(); + + let combiner = gst::ElementFactory::make("analyticscombiner") + .property("batch-duration", 200.mseconds()) + .build() + .unwrap(); + let sink_0 = combiner.request_pad_simple("sink_0").unwrap(); + let sink_1 = combiner.request_pad_simple("sink_1").unwrap(); + + let mut h0 = gst_check::Harness::with_element(&combiner, None, Some("src")); + h0.add_element_sink_pad(&sink_0); + let mut h1 = gst_check::Harness::with_element(&combiner, None, None); + h1.add_element_sink_pad(&sink_1); + + let h0_caps = gst_video::VideoInfo::builder(gst_video::VideoFormat::Rgb, 320, 240) + .fps(gst::Fraction::new(50, 1)) + .build() + .unwrap() + .to_caps() + .unwrap(); + + let h1_caps = gst_video::VideoInfo::builder(gst_video::VideoFormat::Gray8, 320, 240) + .fps(gst::Fraction::new(25, 1)) + .build() + .unwrap() + .to_caps() + .unwrap(); + + h0.set_src_caps(h0_caps.clone()); + h0.play(); + + // Push buffers according to the framerate for the first batch but only for the first stream + // and one additional buffer for the second batch to get an output. + for i in 0..11 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i * 20.mseconds()); + buffer.set_duration(20.mseconds()); + } + assert_eq!(h0.push(buffer), Ok(gst::FlowSuccess::Ok)); + } + + // Crank the clock for timing out + h0.crank_single_clock_wait().unwrap(); + + let buffer = h0.pull().unwrap(); + + assert_eq!(buffer.pts(), Some(0.mseconds())); + assert_eq!(buffer.duration(), Some(200.mseconds())); + let meta = buffer.meta::().unwrap(); + let streams = meta.streams(); + assert_eq!(streams.len(), 2); + let stream = &streams[0]; + assert_eq!(stream.index(), 0); + let buffers = stream.buffers(); + assert_eq!(buffers.len(), 10); + for (idx, buffer) in buffers.iter().enumerate() { + assert_eq!( + buffer.stream_id(), + Some(sink_0.stream_id().unwrap().as_gstr()) + ); + assert_eq!( + buffer.segment(), + Some(gst::FormattedSegment::::new().upcast()) + ); + assert_eq!(buffer.caps().as_ref(), Some(&h0_caps)); + let b = buffer.buffer().unwrap(); + assert_eq!(b.pts(), Some(idx as u64 * 20.mseconds())); + assert_eq!(b.duration(), Some(20.mseconds())); + } + let stream = &streams[1]; + assert_eq!(stream.index(), 1); + let buffers = stream.buffers(); + // Only an empty buffer with no events or anything for the second stream + assert_eq!(buffers.len(), 1); + let buffer = &buffers[0]; + assert_eq!(buffer.stream_id(), None); + assert_eq!(buffer.segment(), None); + assert_eq!(buffer.caps().as_ref(), None); + assert_eq!(buffer.buffer(), None); + + // Now start the second stream + h1.set_src_caps(h1_caps.clone()); + h1.play(); + + // Push buffers according to the framerate for the second batch for both streams + for i in 0..11 { + if i > 0 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(200.mseconds() + i * 20.mseconds()); + buffer.set_duration(20.mseconds()); + } + assert_eq!(h0.push(buffer), Ok(gst::FlowSuccess::Ok)); + } + + if i % 2 == 0 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(200.mseconds() + (i / 2) * 40.mseconds()); + buffer.set_duration(40.mseconds()); + } + assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok)); + } + } + + let buffer = h0.pull().unwrap(); + + assert_eq!(buffer.pts(), Some(200.mseconds())); + assert_eq!(buffer.duration(), Some(200.mseconds())); + let meta = buffer.meta::().unwrap(); + let streams = meta.streams(); + assert_eq!(streams.len(), 2); + let stream = &streams[0]; + assert_eq!(stream.index(), 0); + let buffers = stream.buffers(); + assert_eq!(buffers.len(), 10); + for (idx, buffer) in buffers.iter().enumerate() { + assert_eq!( + buffer.stream_id(), + Some(sink_0.stream_id().unwrap().as_gstr()) + ); + assert_eq!( + buffer.segment(), + Some(gst::FormattedSegment::::new().upcast()) + ); + assert_eq!(buffer.caps().as_ref(), Some(&h0_caps)); + let b = buffer.buffer().unwrap(); + assert_eq!(b.pts(), Some(200.mseconds() + idx as u64 * 20.mseconds())); + assert_eq!(b.duration(), Some(20.mseconds())); + } + let stream = &streams[1]; + assert_eq!(stream.index(), 1); + let buffers = stream.buffers(); + assert_eq!(buffers.len(), 5); + for (idx, buffer) in buffers.iter().enumerate() { + assert_eq!( + buffer.stream_id(), + Some(sink_1.stream_id().unwrap().as_gstr()) + ); + assert_eq!( + buffer.segment(), + Some(gst::FormattedSegment::::new().upcast()) + ); + assert_eq!(buffer.caps().as_ref(), Some(&h1_caps)); + let b = buffer.buffer().unwrap(); + assert_eq!(b.pts(), Some(200.mseconds() + idx as u64 * 40.mseconds())); + assert_eq!(b.duration(), Some(40.mseconds())); + } + + h0.push_event(gst::event::Eos::new()); + h1.push_event(gst::event::Eos::new()); + + let buffer = h0.pull().unwrap(); + + assert_eq!(buffer.pts(), Some(400.mseconds())); + assert_eq!(buffer.duration(), Some(200.mseconds())); + let meta = buffer.meta::().unwrap(); + let streams = meta.streams(); + assert_eq!(streams.len(), 2); + let stream = &streams[0]; + assert_eq!(stream.index(), 0); + let buffers = stream.buffers(); + assert_eq!(buffers.len(), 1); + for (idx, buffer) in buffers.iter().enumerate() { + assert_eq!( + buffer.stream_id(), + Some(sink_0.stream_id().unwrap().as_gstr()) + ); + assert_eq!( + buffer.segment(), + Some(gst::FormattedSegment::::new().upcast()) + ); + assert_eq!(buffer.caps().as_ref(), Some(&h0_caps)); + let b = buffer.buffer().unwrap(); + assert_eq!(b.pts(), Some(400.mseconds() + idx as u64 * 20.mseconds())); + assert_eq!(b.duration(), Some(20.mseconds())); + } + let stream = &streams[1]; + assert_eq!(stream.index(), 1); + let buffers = stream.buffers(); + assert_eq!(buffers.len(), 1); + for (idx, buffer) in buffers.iter().enumerate() { + assert_eq!( + buffer.stream_id(), + Some(sink_1.stream_id().unwrap().as_gstr()) + ); + assert_eq!( + buffer.segment(), + Some(gst::FormattedSegment::::new().upcast()) + ); + assert_eq!(buffer.caps().as_ref(), Some(&h1_caps)); + let b = buffer.buffer().unwrap(); + assert_eq!(b.pts(), Some(400.mseconds() + idx as u64 * 40.mseconds())); + assert_eq!(b.duration(), Some(40.mseconds())); + } + + // Now finally check all the events + + let ev = h0.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::StreamStart); + + let ev = h0.pull_event().unwrap(); + let gst::EventView::Caps(ev) = ev.view() else { + assert_eq!(ev.type_(), gst::EventType::Caps); + unreachable!(); + }; + let caps = ev.caps(); + let s = caps.structure(0).unwrap(); + assert_eq!(s.name(), "multistream/x-analytics-batch"); + let streams = s + .get::("streams") + .unwrap() + .iter() + .map(|v| v.get::().unwrap()) + .collect::>(); + assert_eq!(streams.len(), 2); + assert_eq!(&streams[0], &h0_caps); + assert_eq!(&streams[1], &gst::Caps::new_empty()); + + let ev = h0.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Segment); + + let ev = h0.pull_event().unwrap(); + let gst::EventView::Caps(ev) = ev.view() else { + assert_eq!(ev.type_(), gst::EventType::Caps); + unreachable!(); + }; + let caps = ev.caps(); + let s = caps.structure(0).unwrap(); + assert_eq!(s.name(), "multistream/x-analytics-batch"); + let streams = s + .get::("streams") + .unwrap() + .iter() + .map(|v| v.get::().unwrap()) + .collect::>(); + assert_eq!(streams.len(), 2); + assert_eq!(&streams[0], &h0_caps); + assert_eq!(&streams[1], &h1_caps); + + let ev = h0.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Eos); +} diff --git a/analytics/analytics/tests/analyticssplitter.rs b/analytics/analytics/tests/analyticssplitter.rs new file mode 100644 index 000000000..e7bb1bddf --- /dev/null +++ b/analytics/analytics/tests/analyticssplitter.rs @@ -0,0 +1,376 @@ +// Copyright (C) 2025 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 +// + +use gst::prelude::*; + +fn init() { + use std::sync::Once; + static INIT: Once = Once::new(); + + INIT.call_once(|| { + gst::init().unwrap(); + gstrsanalytics::plugin_register_static().unwrap(); + }); +} + +#[test] +fn test_combine_split_single() { + init(); + + let bin = gst::parse::bin_from_description("videotestsrc name=src num-buffers=10 ! video/x-raw,framerate=25/1 ! analyticscombiner batch-duration=100000000 ! analyticssplitter ! identity name=identity", false).unwrap(); + let identity = bin.by_name("identity").unwrap(); + bin.add_pad(&gst::GhostPad::with_target(&identity.static_pad("src").unwrap()).unwrap()) + .unwrap(); + + let mut h = gst_check::Harness::with_element(&bin, None, Some("src")); + h.play(); + + let ptss = std::array::from_fn::<_, 10, _>(|idx| idx as u64 * 40.mseconds()); + for pts in ptss { + let buffer = h.pull().unwrap(); + assert_eq!(buffer.pts(), Some(pts)); + assert_eq!(buffer.duration(), Some(40.mseconds())); + } + + // Now finally check all the events + let element = h.element().unwrap().downcast::().unwrap(); + let src = element.by_name("src").unwrap(); + let srcpad = src.static_pad("src").unwrap(); + let stream_id = srcpad.stream_id().unwrap(); + let caps = srcpad.current_caps().unwrap(); + + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::StreamStart); + let gst::EventView::StreamStart(ev) = ev.view() else { + assert_eq!(ev.type_(), gst::EventType::StreamStart); + unreachable!(); + }; + assert_eq!(ev.stream_id(), &stream_id); + + let ev = h.pull_event().unwrap(); + let gst::EventView::Caps(ev) = ev.view() else { + assert_eq!(ev.type_(), gst::EventType::Caps); + unreachable!(); + }; + assert_eq!(ev.caps(), &caps); + + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Segment); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Eos); +} + +#[test] +fn test_combine_split_multi() { + init(); + + let bin = gst::parse::bin_from_description( + "videotestsrc name=src_0 num-buffers=10 ! video/x-raw,framerate=25/1 ! combiner.sink_0 \ + videotestsrc name=src_1 num-buffers=20 ! video/x-raw,framerate=50/1 ! combiner.sink_1 \ + analyticscombiner name=combiner batch-duration=100000000 ! analyticssplitter name=splitter \ + splitter.src_0_0 ! queue name=queue_0 \ + splitter.src_0_1 ! queue name=queue_1", + false, + ) + .unwrap(); + let queue = bin.by_name("queue_0").unwrap(); + bin.add_pad( + &gst::GhostPad::builder_with_target(&queue.static_pad("src").unwrap()) + .unwrap() + .name("src_0") + .build(), + ) + .unwrap(); + let queue = bin.by_name("queue_1").unwrap(); + bin.add_pad( + &gst::GhostPad::builder_with_target(&queue.static_pad("src").unwrap()) + .unwrap() + .name("src_1") + .build(), + ) + .unwrap(); + + let mut h0 = gst_check::Harness::with_element(&bin, None, Some("src_0")); + let mut h1 = gst_check::Harness::with_element(&bin, None, Some("src_1")); + h0.play(); + h1.play(); + + let ptss = std::array::from_fn::<_, 20, _>(|idx| idx as u64 * 20.mseconds()); + for (idx, pts) in ptss.into_iter().enumerate() { + if idx % 2 == 0 { + let buffer = h0.pull().unwrap(); + assert_eq!(buffer.pts(), Some(pts)); + assert_eq!(buffer.duration(), Some(40.mseconds())); + } + let buffer = h1.pull().unwrap(); + assert_eq!(buffer.pts(), Some(pts)); + assert_eq!(buffer.duration(), Some(20.mseconds())); + } + + // Now finally check all the events + let src = bin.by_name("src_0").unwrap(); + let srcpad = src.static_pad("src").unwrap(); + let stream_id_0 = srcpad.stream_id().unwrap(); + let caps_0 = srcpad.current_caps().unwrap(); + + let src = bin.by_name("src_1").unwrap(); + let srcpad = src.static_pad("src").unwrap(); + let stream_id_1 = srcpad.stream_id().unwrap(); + let caps_1 = srcpad.current_caps().unwrap(); + + let ev = h0.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::StreamStart); + let gst::EventView::StreamStart(ev) = ev.view() else { + assert_eq!(ev.type_(), gst::EventType::StreamStart); + unreachable!(); + }; + assert_eq!(ev.stream_id(), &stream_id_0); + + let ev = h0.pull_event().unwrap(); + let gst::EventView::Caps(ev) = ev.view() else { + assert_eq!(ev.type_(), gst::EventType::Caps); + unreachable!(); + }; + assert_eq!(ev.caps(), &caps_0); + + let ev = h0.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Segment); + let ev = h0.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Eos); + + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::StreamStart); + let gst::EventView::StreamStart(ev) = ev.view() else { + assert_eq!(ev.type_(), gst::EventType::StreamStart); + unreachable!(); + }; + assert_eq!(ev.stream_id(), &stream_id_1); + + let ev = h1.pull_event().unwrap(); + let gst::EventView::Caps(ev) = ev.view() else { + assert_eq!(ev.type_(), gst::EventType::Caps); + unreachable!(); + }; + assert_eq!(ev.caps(), &caps_1); + + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Segment); + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Eos); +} + +#[test] +fn test_combine_split_multi_with_initial_gap() { + init(); + + let h0_caps = gst_video::VideoInfo::builder(gst_video::VideoFormat::Rgb, 320, 240) + .fps(gst::Fraction::new(25, 1)) + .build() + .unwrap() + .to_caps() + .unwrap(); + + let h1_caps = gst_video::VideoInfo::builder(gst_video::VideoFormat::Gray8, 320, 240) + .fps(gst::Fraction::new(50, 1)) + .build() + .unwrap() + .to_caps() + .unwrap(); + + let bin = gst::Bin::new(); + let combiner = gst::ElementFactory::make("analyticscombiner") + .property("batch-duration", 200.mseconds()) + .build() + .unwrap(); + let splitter = gst::ElementFactory::make("analyticssplitter") + .build() + .unwrap(); + + bin.add_many([&combiner, &splitter]).unwrap(); + combiner.link(&splitter).unwrap(); + + let sink_0 = combiner.request_pad_simple("sink_0").unwrap(); + bin.add_pad( + &gst::GhostPad::builder_with_target(&sink_0) + .unwrap() + .name("sink_0") + .build(), + ) + .unwrap(); + + let sink_1 = combiner.request_pad_simple("sink_1").unwrap(); + bin.add_pad( + &gst::GhostPad::builder_with_target(&sink_1) + .unwrap() + .name("sink_1") + .build(), + ) + .unwrap(); + + let queue = gst::ElementFactory::make("queue") + .name("queue_0") + .build() + .unwrap(); + bin.add(&queue).unwrap(); + bin.add_pad( + &gst::GhostPad::builder_with_target(&queue.static_pad("src").unwrap()) + .unwrap() + .name("src_0") + .build(), + ) + .unwrap(); + + let queue = gst::ElementFactory::make("queue") + .name("queue_1") + .build() + .unwrap(); + bin.add(&queue).unwrap(); + bin.add_pad( + &gst::GhostPad::builder_with_target(&queue.static_pad("src").unwrap()) + .unwrap() + .name("src_1") + .build(), + ) + .unwrap(); + + splitter.connect_closure( + "pad-added", + false, + glib::closure!( + #[weak] + bin, + move |_splitter: &gst::Element, pad: &gst::Pad| { + let pad_name = pad.name(); + let [_, _count, stream_id] = + pad_name.split("_").collect::>().try_into().unwrap(); + let Some(queue) = bin.by_name(&format!("queue_{stream_id}")) else { + return; + }; + let sinkpad = queue.static_pad("sink").unwrap(); + if let Some(peer) = sinkpad.peer() { + peer.unlink(&sinkpad).unwrap(); + } + pad.link(&sinkpad).unwrap(); + } + ), + ); + + let mut h0 = gst_check::Harness::with_element(&bin, Some("sink_0"), Some("src_0")); + h0.set_src_caps(h0_caps.clone()); + h0.play(); + + // Push first 6 buffers on the first stream only + for i in 0..6 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i * 40.mseconds()); + buffer.set_duration(40.mseconds()); + } + assert_eq!(h0.push(buffer), Ok(gst::FlowSuccess::Ok)); + } + + // Time out so the first batch is output + h0.crank_single_clock_wait().unwrap(); + + // Now consume buffers for the first stream for the first batch + let ptss = std::array::from_fn::<_, 5, _>(|idx| idx as u64 * 40.mseconds()); + for pts in ptss { + let buffer = h0.pull().unwrap(); + assert_eq!(buffer.pts(), Some(pts)); + assert_eq!(buffer.duration(), Some(40.mseconds())); + } + + // Start the second stream + let mut h1 = gst_check::Harness::with_element(&bin, Some("sink_1"), Some("src_1")); + h1.set_src_caps(h1_caps.clone()); + h1.play(); + + // Push another batch of buffers for both streams this time. + // Skip the first buffer for the first stream as it was pushed above already. + for i in 0..10 { + if i > 0 && i % 2 == 0 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(200.mseconds() + (i / 2) * 40.mseconds()); + buffer.set_duration(40.mseconds()); + } + assert_eq!(h0.push(buffer), Ok(gst::FlowSuccess::Ok)); + } + + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(200.mseconds() + i * 20.mseconds()); + buffer.set_duration(20.mseconds()); + } + assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok)); + } + + // Now EOS so that the second batch is output + h0.push_event(gst::event::Eos::new()); + h1.push_event(gst::event::Eos::new()); + + let ptss = std::array::from_fn::<_, 10, _>(|idx| 200.mseconds() + idx as u64 * 20.mseconds()); + for (idx, pts) in ptss.into_iter().enumerate() { + if idx % 2 == 0 { + let buffer = h0.pull().unwrap(); + assert_eq!(buffer.pts(), Some(pts)); + assert_eq!(buffer.duration(), Some(40.mseconds())); + } + let buffer = h1.pull().unwrap(); + assert_eq!(buffer.pts(), Some(pts)); + assert_eq!(buffer.duration(), Some(20.mseconds())); + } + + // Now finally check all the events + let stream_id_0 = sink_0.stream_id().unwrap(); + let stream_id_1 = sink_1.stream_id().unwrap(); + + let ev = h0.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::StreamStart); + let gst::EventView::StreamStart(ev) = ev.view() else { + assert_eq!(ev.type_(), gst::EventType::StreamStart); + unreachable!(); + }; + assert_eq!(ev.stream_id(), &stream_id_0); + + let ev = h0.pull_event().unwrap(); + let gst::EventView::Caps(ev) = ev.view() else { + assert_eq!(ev.type_(), gst::EventType::Caps); + unreachable!(); + }; + assert_eq!(ev.caps(), &h0_caps); + + let ev = h0.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Segment); + let ev = h0.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Eos); + + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::StreamStart); + let gst::EventView::StreamStart(ev) = ev.view() else { + assert_eq!(ev.type_(), gst::EventType::StreamStart); + unreachable!(); + }; + assert_eq!(ev.stream_id(), &stream_id_1); + + let ev = h1.pull_event().unwrap(); + let gst::EventView::Caps(ev) = ev.view() else { + assert_eq!(ev.type_(), gst::EventType::Caps); + unreachable!(); + }; + assert_eq!(ev.caps(), &h1_caps); + + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Segment); + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Eos); +} diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index b40958ac6..e2d67313f 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -8074,6 +8074,90 @@ "rsanalytics": { "description": "GStreamer Rust Analytics Plugin", "elements": { + "analyticscombiner": { + "author": "Sebastian Dröge ", + "description": "Analytics combiner / batcher element", + "hierarchy": [ + "GstAnalyticsCombiner", + "GstAggregator", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "interfaces": [ + "GstChildProxy" + ], + "klass": "Combiner/Analytics", + "pad-templates": { + "sink_%%u": { + "caps": "ANY", + "direction": "sink", + "presence": "request", + "type": "GstAnalyticsCombinerSinkPad" + }, + "src": { + "caps": "multistream/x-analytics-batch(meta:GstAnalyticsBatchMeta):\n", + "direction": "src", + "presence": "always", + "type": "GstAggregatorPad" + } + }, + "properties": { + "batch-duration": { + "blurb": "Batch Duration", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "100000000", + "max": "18446744073709551615", + "min": "0", + "mutable": "ready", + "readable": true, + "type": "guint64", + "writable": true + }, + "force-live": { + "blurb": "Always operate in live mode and aggregate on timeout regardless of whether any live sources are linked upstream", + "conditionally-available": false, + "construct": false, + "construct-only": true, + "controllable": false, + "default": "false", + "mutable": "null", + "readable": true, + "type": "gboolean", + "writable": true + } + }, + "rank": "none" + }, + "analyticssplitter": { + "author": "Sebastian Dröge ", + "description": "Analytics batch splitter element", + "hierarchy": [ + "GstAnalyticsSplitter", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "Demuxer/Analytics", + "pad-templates": { + "sink": { + "caps": "multistream/x-analytics-batch(meta:GstAnalyticsBatchMeta):\n", + "direction": "sink", + "presence": "always" + }, + "src_%%u_%%u": { + "caps": "ANY", + "direction": "src", + "presence": "sometimes" + } + }, + "rank": "primary" + }, "onvifmeta2relationmeta": { "author": "Benjamin Gaignard ", "description": "Convert ONVIF metadata to relation metadata", @@ -8142,6 +8226,70 @@ "filename": "gstrsanalytics", "license": "MPL-2.0", "other-types": { + "GstAnalyticsCombinerBatchStrategy": { + "kind": "enum", + "values": [ + { + "desc": "All: Include all buffers that start inside the batch.", + "name": "all", + "value": "0" + }, + { + "desc": "FirstInBatch: Include only the first buffer that starts inside the batch.", + "name": "first-in-batch", + "value": "1" + }, + { + "desc": "FirstInBatchWithOverlap: Include only the first buffer that starts inside the batch unless there was a previously unused buffer at most half a batch duration earlier. If no buffer is available, allow taking a buffer up to half a batch duration later. The buffer closest to the batch start is included.", + "name": "first-in-batch-with-overlap", + "value": "2" + }, + { + "desc": "LastInBatch: Include only the last buffer that starts inside the batch.", + "name": "last-in-batch", + "value": "3" + } + ] + }, + "GstAnalyticsCombinerSinkPad": { + "hierarchy": [ + "GstAnalyticsCombinerSinkPad", + "GstAggregatorPad", + "GstPad", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "kind": "object", + "properties": { + "batch-strategy": { + "blurb": "Batching strategy to use for this stream", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "all (0)", + "mutable": "ready", + "readable": true, + "type": "GstAnalyticsCombinerBatchStrategy", + "writable": true + }, + "index": { + "blurb": "Index, must be consecutive and starting at 0 and is fixed up otherwise", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "-1", + "min": "0", + "mutable": "ready", + "readable": true, + "type": "guint", + "writable": true + } + } + }, "GstRsOnvifNtpTimeSource": { "kind": "enum", "values": [ diff --git a/meson.build b/meson.build index 891a08e4d..9458fa41e 100644 --- a/meson.build +++ b/meson.build @@ -112,7 +112,10 @@ endforeach # kept in the same order as the `members` list in Cargo.toml plugins = { - 'analytics': {'library': 'libgstrsanalytics'}, + 'analytics': { + 'library': 'libgstrsanalytics', + 'features': ['gst-rtp/v1_24', 'gst-base/v1_24', 'gst-video/v1_24'], + }, 'audiofx': { 'library': 'libgstrsaudiofx', 'examples': ['hrtfrender'], @@ -227,9 +230,23 @@ plugins = { 'vvdec': { 'library': 'libgstvvdec', 'extra-deps': {'libvvdec': ['>= 3.0']} - } + }, } +# The splitter/combiner requires 1.28+ +if get_option('analytics').allowed() + gst_analytics_dep = deps_cache['gstreamer-analytics-1.0'] + if gst_analytics_dep.found() and gst_analytics_dep.version().version_compare('>= 1.28') + plugins_analytics = plugins['analytics'] + plugins_analytics += { + 'features': plugins_analytics['features'] + ['v1_28'], + } + plugins += { + 'analytics': plugins_analytics + } + endif +endif + # Won't build on platforms where it bundles the sources because of: # https://github.com/qnighy/libwebp-sys2-rs/issues/12 # the fix is: