From f12bd415101d2cebde2863c2b7fc58a30a24dd6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alicia=20Boya=20Garc=C3=ADa?= Date: Thu, 22 Aug 2024 14:59:25 +0200 Subject: [PATCH] Add streamgrouper element streamgrouper allows to construct simple gst-launch pipelines where streams of different group-ids are merged to use the same group-id. Part-of: --- Cargo.lock | 9 + Cargo.toml | 2 + docs/plugins/gst_plugins_cache.json | 37 ++ generic/streamgrouper/Cargo.toml | 41 ++ generic/streamgrouper/build.rs | 3 + generic/streamgrouper/src/lib.rs | 36 ++ .../streamgrouper/src/streamgrouper/imp.rs | 406 ++++++++++++++++++ .../streamgrouper/src/streamgrouper/mod.rs | 71 +++ generic/streamgrouper/tests/streamgrouper.rs | 170 ++++++++ meson.build | 1 + meson_options.txt | 1 + 11 files changed, 777 insertions(+) create mode 100644 generic/streamgrouper/Cargo.toml create mode 100644 generic/streamgrouper/build.rs create mode 100644 generic/streamgrouper/src/lib.rs create mode 100644 generic/streamgrouper/src/streamgrouper/imp.rs create mode 100644 generic/streamgrouper/src/streamgrouper/mod.rs create mode 100644 generic/streamgrouper/tests/streamgrouper.rs diff --git a/Cargo.lock b/Cargo.lock index 03cb7bd0..c136d914 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2882,6 +2882,15 @@ dependencies = [ "url", ] +[[package]] +name = "gst-plugin-streamgrouper" +version = "0.14.0-alpha.1" +dependencies = [ + "gst-plugin-version-helper", + "gstreamer", + "gstreamer-check", +] + [[package]] name = "gst-plugin-textahead" version = "0.14.0-alpha.1" diff --git a/Cargo.toml b/Cargo.toml index 0c4659bb..641a9c1f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ members = [ "generic/sodium", "generic/threadshare", "generic/inter", + "generic/streamgrouper", "generic/gopbuffer", "mux/flavors", @@ -74,6 +75,7 @@ default-members = [ "generic/threadshare", "generic/inter", "generic/gopbuffer", + "generic/streamgrouper", "mux/fmp4", "mux/mp4", diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index dfecc382..f71e645a 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -11921,6 +11921,43 @@ "tracers": {}, "url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" }, + "streamgrouper": { + "description": "Filter element that makes all the incoming streams share a group-id", + "elements": { + "streamgrouper": { + "author": "Alicia Boya García ", + "description": "Modifies all input streams to use the same group-id", + "hierarchy": [ + "GstStreamGrouper", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "Generic", + "pad-templates": { + "sink_%%u": { + "caps": "ANY", + "direction": "sink", + "presence": "request" + }, + "src_%%u": { + "caps": "ANY", + "direction": "src", + "presence": "sometimes" + } + }, + "rank": "none" + } + }, + "filename": "gststreamgrouper", + "license": "MPL", + "other-types": {}, + "package": "gst-plugin-streamgrouper", + "source": "gst-plugin-streamgrouper", + "tracers": {}, + "url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" + }, "textahead": { "description": "GStreamer Plugin for displaying upcoming text buffers ahead of time", "elements": { diff --git a/generic/streamgrouper/Cargo.toml b/generic/streamgrouper/Cargo.toml new file mode 100644 index 00000000..f9be4d6d --- /dev/null +++ b/generic/streamgrouper/Cargo.toml @@ -0,0 +1,41 @@ +[package] +name = "gst-plugin-streamgrouper" +authors = ["Alicia Boya García "] +license = "MPL-2.0" +description = "Filter element that makes all the incoming streams share a group-id" +version.workspace = true +repository.workspace = true +edition.workspace = true +rust-version.workspace = true + +[dependencies] +gst.workspace = true + +[dev-dependencies] +gst-check.workspace = true + +[lib] +name = "gststreamgrouper" +crate-type = ["cdylib", "rlib"] +path = "src/lib.rs" + +[build-dependencies] +gst-plugin-version-helper.workspace = true + +[features] +static = [] +capi = [] +doc = ["gst/v1_18"] + +[package.metadata.capi] +min_version = "0.8.0" + +[package.metadata.capi.header] +enabled = false + +[package.metadata.capi.library] +install_subdir = "gstreamer-1.0" +versioning = false + +[package.metadata.capi.pkg_config] +requires_private = "gstreamer-1.0, gobject-2.0, glib-2.0, gmodule-2.0" \ No newline at end of file diff --git a/generic/streamgrouper/build.rs b/generic/streamgrouper/build.rs new file mode 100644 index 00000000..12ae7a24 --- /dev/null +++ b/generic/streamgrouper/build.rs @@ -0,0 +1,3 @@ +fn main() { + gst_plugin_version_helper::info(); +} diff --git a/generic/streamgrouper/src/lib.rs b/generic/streamgrouper/src/lib.rs new file mode 100644 index 00000000..86981502 --- /dev/null +++ b/generic/streamgrouper/src/lib.rs @@ -0,0 +1,36 @@ +// Copyright (C) 2024 Igalia S.L. +// Copyright (C) 2024 Comcast +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +#![allow(unused_doc_comments)] +use gst::glib; + +mod streamgrouper; + +/** + * plugin-streamgrouper: + * + * Since: plugins-rs-0.14.0 + */ + +gst::plugin_define!( + streamgrouper, + env!("CARGO_PKG_DESCRIPTION"), + plugin_init, + concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")), + "MPL", + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_REPOSITORY"), + env!("BUILD_REL_DATE") +); + +fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + streamgrouper::register(plugin).unwrap(); + Ok(()) +} diff --git a/generic/streamgrouper/src/streamgrouper/imp.rs b/generic/streamgrouper/src/streamgrouper/imp.rs new file mode 100644 index 00000000..3ac91144 --- /dev/null +++ b/generic/streamgrouper/src/streamgrouper/imp.rs @@ -0,0 +1,406 @@ +// Copyright (C) 2024 Igalia S.L. +// Copyright (C) 2024 Comcast +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use std::collections::BTreeMap; +use std::sync::Mutex; + +use gst::prelude::{ElementExt, GstObjectExt, PadExt, PadExtManual}; +use gst::subclass::{prelude::*, ElementMetadata}; +use gst::{glib, Caps, GroupId, Pad, PadDirection, PadPresence}; + +use gst::{Element, PadTemplate}; +use std::sync::LazyLock; + +pub static CAT: LazyLock = LazyLock::new(|| { + gst::DebugCategory::new( + "streamgrouper", + gst::DebugColorFlags::empty(), + Some("Filter element that makes all the incoming streams share a group-id"), + ) +}); + +#[derive(Default)] +pub struct StreamGrouper { + pub state: Mutex, +} + +pub struct State { + pub group_id: GroupId, + pub streams_by_number: BTreeMap, +} + +impl Default for State { + fn default() -> Self { + Self { + group_id: GroupId::next(), + streams_by_number: Default::default(), + } + } +} + +impl State { + fn find_unused_number(&self) -> usize { + match self.streams_by_number.keys().last() { + Some(n) => n + 1, + None => 0, + } + } + + fn get_stream_with_number(&self, number: usize) -> Option<&Stream> { + self.streams_by_number.get(&number) + } + + fn get_stream_with_number_or_panic(&self, number: usize) -> &Stream { + self.get_stream_with_number(number) + .unwrap_or_else(|| panic!("Pad is associated with stream {number} which should exist")) + } + + fn add_stream_or_panic(&mut self, number: usize, stream: Stream) -> &Stream { + use std::collections::btree_map::Entry::{Occupied, Vacant}; + match self.streams_by_number.entry(number) { + Occupied(_) => panic!("Stream {number} already exists!"), + Vacant(entry) => { + return entry.insert(stream); + } + }; + } + + fn remove_stream_or_panic(&mut self, number: usize) { + self.streams_by_number.remove(&number).or_else(|| { + panic!("Attempted to delete stream number {number}, which does not exist"); + }); + } +} + +pub struct Stream { + pub stream_number: usize, + pub sinkpad: Pad, + pub srcpad: Pad, +} + +impl StreamGrouper { + fn request_new_pad_with_number(&self, stream_number: Option) -> Option { + let mut state = self.state.lock().unwrap(); + let stream_number = stream_number.unwrap_or_else(|| state.find_unused_number()); + if state.get_stream_with_number(stream_number).is_some() { + gst::error!( + CAT, + imp = self, + "New pad with number {stream_number} was requested, but it already exists", + ); + return None; + } + + // Create the pads + let srcpad = Pad::builder(PadDirection::Src) + .name(format!("src_{stream_number}")) + .query_function(move |pad, parent, query| { + StreamGrouper::catch_panic_pad_function( + parent, + || false, + |streamgrouper| streamgrouper.src_query(pad, query, stream_number), + ) + }) + .event_function(move |pad, parent, event| { + StreamGrouper::catch_panic_pad_function( + parent, + || false, + |streamgrouper| streamgrouper.src_event(pad, event, stream_number), + ) + }) + .iterate_internal_links_function(move |pad, parent| { + StreamGrouper::catch_panic_pad_function( + parent, + || gst::Iterator::from_vec(vec![]), + |streamgrouper| streamgrouper.iterate_internal_links(pad, stream_number), + ) + }) + .build(); + let sinkpad = Pad::builder(PadDirection::Sink) + .name(format!("sink_{stream_number}")) + .query_function(move |pad, parent, query| { + StreamGrouper::catch_panic_pad_function( + parent, + || false, + |streamgrouper| streamgrouper.sink_query(pad, query, stream_number), + ) + }) + .chain_function(move |pad, parent, buffer| { + StreamGrouper::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |streamgrouper| streamgrouper.sink_chain(pad, buffer, stream_number), + ) + }) + .event_function(move |pad, parent, event| { + StreamGrouper::catch_panic_pad_function( + parent, + || false, + |streamgrouper| streamgrouper.sink_event(pad, event, stream_number), + ) + }) + .iterate_internal_links_function(move |pad, parent| { + StreamGrouper::catch_panic_pad_function( + parent, + || gst::Iterator::from_vec(vec![]), + |streamgrouper| streamgrouper.iterate_internal_links(pad, stream_number), + ) + }) + .build(); + + sinkpad.set_active(true).unwrap(); + srcpad.set_active(true).unwrap(); + + // Add the stream + let stream = Stream { + stream_number, + sinkpad: sinkpad.clone(), + srcpad: srcpad.clone(), + }; + state.add_stream_or_panic(stream_number, stream); + + drop(state); + self.obj().add_pad(&srcpad).unwrap(); + self.obj().add_pad(&sinkpad).unwrap(); + + Some(sinkpad) + } + + fn src_query( + &self, + _srcpad: &gst::Pad, + query: &mut gst::QueryRef, + stream_number: usize, + ) -> bool { + let state = self.state.lock().unwrap(); + let stream = state.get_stream_with_number_or_panic(stream_number); + let sinkpad = stream.sinkpad.clone(); + drop(state); + sinkpad.peer_query(query) // Passthrough + } + + fn sink_query( + &self, + _sinkpad: &gst::Pad, + query: &mut gst::QueryRef, + stream_number: usize, + ) -> bool { + let state = self.state.lock().unwrap(); + let stream = state.get_stream_with_number_or_panic(stream_number); + let srcpad = stream.srcpad.clone(); + drop(state); + srcpad.peer_query(query) // Passthrough + } + + fn sink_event(&self, _sinkpad: &gst::Pad, mut event: gst::Event, stream_number: usize) -> bool { + let state = self.state.lock().unwrap(); + let stream = state.get_stream_with_number_or_panic(stream_number); + + let target_group_id = state.group_id; + let srcpad = stream.srcpad.clone(); + drop(state); + + if event.type_() != gst::EventType::StreamStart { + return srcpad.push_event(event); + } + + // Patch stream-start group-id + match event.make_mut().view_mut() { + gst::EventViewMut::StreamStart(stream_start) => { + stream_start.set_group_id(target_group_id); + } + _ => unreachable!(), + }; + srcpad.push_event(event) + } + + fn src_event(&self, _srcpad: &gst::Pad, event: gst::Event, stream_number: usize) -> bool { + let state = self.state.lock().unwrap(); + let stream = state.get_stream_with_number_or_panic(stream_number); + + let sinkpad = stream.sinkpad.clone(); + drop(state); + sinkpad.push_event(event) + } + + fn iterate_internal_links( + &self, + pad: &gst::Pad, + stream_number: usize, + ) -> gst::Iterator { + let state = self.state.lock().unwrap(); + let stream = state.get_stream_with_number_or_panic(stream_number); + + if pad == &stream.sinkpad { + gst::Iterator::from_vec(vec![stream.srcpad.clone()]) + } else { + gst::Iterator::from_vec(vec![stream.sinkpad.clone()]) + } + } + + fn sink_chain( + &self, + _pad: &Pad, + buffer: gst::Buffer, + stream_number: usize, + ) -> Result { + let state = self.state.lock().unwrap(); + let stream = state.get_stream_with_number_or_panic(stream_number); + + let srcpad = stream.srcpad.clone(); + drop(state); + srcpad.push(buffer) + } +} + +#[glib::object_subclass] +impl ObjectSubclass for StreamGrouper { + const NAME: &'static str = "GstStreamGrouper"; + type Type = super::StreamGrouper; + type ParentType = Element; +} + +impl ObjectImpl for StreamGrouper {} + +impl GstObjectImpl for StreamGrouper {} + +impl ElementImpl for StreamGrouper { + fn metadata() -> Option<&'static ElementMetadata> { + static ELEMENT_METADATA: LazyLock = LazyLock::new(|| { + ElementMetadata::new( + "Stream Grouping Filter", + "Generic", + "Modifies all input streams to use the same group-id", + "Alicia Boya García ", + ) + }); + Some(&*ELEMENT_METADATA) + } + + fn change_state( + &self, + transition: gst::StateChange, + ) -> Result { + if transition == gst::StateChange::PausedToReady { + let mut state = self.state.lock().unwrap(); + let group_id = GroupId::next(); + gst::debug!( + CAT, + imp = self, + "Invalidating previous group id: {:?} Next group id: {group_id:?}", + state.group_id, + ); + state.group_id = group_id; + }; + self.parent_change_state(transition) + } + + fn pad_templates() -> &'static [PadTemplate] { + static PAD_TEMPLATES: LazyLock> = LazyLock::new(|| { + // src side + let src_pad_template = PadTemplate::new( + "src_%u", + PadDirection::Src, + PadPresence::Sometimes, + &Caps::new_any(), + ) + .unwrap(); + + // sink side + let sink_pad_template = PadTemplate::new( + "sink_%u", + PadDirection::Sink, + PadPresence::Request, + &Caps::new_any(), + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } + + fn request_new_pad( + &self, + templ: &gst::PadTemplate, + name: Option<&str>, + _caps: Option<&gst::Caps>, + ) -> Option { + if templ.name_template() != "sink_%u" { + gst::error!( + CAT, + imp = self, + "Pad requested on extraneous template: {:?}", + templ.name_template() + ); + return None; + } + let stream_number = match name { + None => None, + Some(name) => { + match name + .strip_prefix("sink_") + .and_then(|s| s.parse::().ok()) + { + Some(idx) => Some(idx), + None => { + gst::error!(CAT, imp = self, "Invalid pad name requested: {name:?}"); + return None; + } + } + } + }; + + self.request_new_pad_with_number(stream_number) + } + + fn release_pad(&self, pad: &gst::Pad) { + let mut state = self.state.lock().unwrap(); + let stream = match pad + .name() + .strip_prefix("sink_") + .and_then(|s| s.parse::().ok()) + .and_then(|stream_number| state.get_stream_with_number(stream_number)) + { + Some(stream) => stream, + None => { + gst::error!( + CAT, + imp = self, + "Requested to remove pad {}, which is not a request pad of this element", + pad.name() + ); + return; + } + }; + let stream_number = stream.stream_number; + let srcpad = stream.srcpad.clone(); + let sinkpad = stream.sinkpad.clone(); + state.remove_stream_or_panic(stream_number); + drop(state); + + sinkpad.set_active(false).unwrap_or_else(|_| { + gst::warning!( + CAT, + imp = self, + "Failed to deactivate sinkpad for id {stream_number}", + ); + }); + srcpad.set_active(false).unwrap_or_else(|_| { + gst::warning!( + CAT, + imp = self, + "Failed to deactivate srcpad for id {stream_number}", + ); + }); + self.obj().remove_pad(&sinkpad).unwrap(); + self.obj().remove_pad(&srcpad).unwrap(); + } +} diff --git a/generic/streamgrouper/src/streamgrouper/mod.rs b/generic/streamgrouper/src/streamgrouper/mod.rs new file mode 100644 index 00000000..946b1200 --- /dev/null +++ b/generic/streamgrouper/src/streamgrouper/mod.rs @@ -0,0 +1,71 @@ +// Copyright (C) 2024 Igalia S.L. +// Copyright (C) 2024 Comcast +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; + +mod imp; + +/** +* SECTION:element-streamgrouper +* +* #streamgrouper takes any number of streams in the sinkpads and patches the STREAM_START +* events so that they all belong to the same group-id. +* +* This is useful for constructing simple pipelines where different sources push buffers +* into an element that contains a streamsynchronizer element, like playsink. +* +* Notice that because of this group-id merging, using streamgrouper is incompatible with +* gapless playback. However, this is not a problem, since streamgrouper is currently +* intended only for use cases in which only one stream group will be played. +* +* ## Example +* +* This is a simple pipeline where, because the audio and video streams come from +* unrelated source elements, they end up with different group-ids and therefore get stuck +* forever waiting inside the streamsynchronizer inside playsink, and never play: +* +* |[ +* # Will get stuck! The streams from audiotestsrc and videotestsrc don't +* # share a group-id. +* gst-launch-1.0 \ +* playsink name=myplaysink \ +* audiotestsrc ! myplaysink.audio_sink \ +* videotestsrc ! myplaysink.video_sink +* ]| +* +* By adding streamgrouper to the pipeline, the streams are become part of the same group +* and playback is possible. +* +* |[ + gst-launch-1.0 \ + playsink name=myplaysink \ + streamgrouper name=grouper \ + audiotestsrc ! grouper.sink_0 grouper.src_0 ! myplaysink.audio_sink \ + videotestsrc ! grouper.sink_1 grouper.src_1 ! myplaysink.video_sink +* ]| +* +* Since: plugins-rs-0.14.0 +*/ + +glib::wrapper! { + pub struct StreamGrouper(ObjectSubclass) + @extends + gst::Element, + gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "streamgrouper", + gst::Rank::NONE, + StreamGrouper::static_type(), + ) +} diff --git a/generic/streamgrouper/tests/streamgrouper.rs b/generic/streamgrouper/tests/streamgrouper.rs new file mode 100644 index 00000000..695e6629 --- /dev/null +++ b/generic/streamgrouper/tests/streamgrouper.rs @@ -0,0 +1,170 @@ +// Copyright (C) 2024 Igalia S.L. +// Copyright (C) 2024 Comcast +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 +use gst::{prelude::*, Element, GroupId}; + +use gst_check::Harness; +use std::sync::LazyLock; + +#[allow(unused)] +static CAT: LazyLock = LazyLock::new(|| { + gst::DebugCategory::new( + "streamgrouper-test", + gst::DebugColorFlags::empty(), + Some("streamgrouper test"), + ) +}); + +fn init() { + use std::sync::Once; + static INIT: Once = Once::new(); + + INIT.call_once(|| { + gst::init().unwrap(); + gststreamgrouper::plugin_register_static().expect("gststreamgrouper streamgrouper test"); + }); +} + +#[test] +fn test_request_invalid_pad_name() { + init(); + let sg = gst::ElementFactory::make("streamgrouper") + .build() + .expect("streamgrouper factory should exist"); + assert!(sg.request_pad_simple("invalid_name").is_none()); +} + +#[test] +fn test_can_change_state() { + init(); + let sg = gst::ElementFactory::make("streamgrouper") + .build() + .expect("streamgrouper factory should exist"); + if let Err(error) = sg.set_state(gst::State::Playing) { + panic!("Failed to change to PLAYING: {error:?}"); + } + if let Err(error) = sg.set_state(gst::State::Null) { + panic!("Failed to change to NULL: {error:?}"); + } +} + +fn make_with_double_harness() -> (Element, Harness, Harness) { + init(); + let sg = gst::ElementFactory::make("streamgrouper") + .build() + .expect("streamgrouper factory should exist"); + // gst_harness_add_element_full() sets the element to PLAYING, but not before sending + // a stream-start, which means it ends up sending buffers while in NULL state. + // streamgrouper can handle that, but let's rather test what applications should be + // doing instead and set the element to a higher state first. + if let Err(error) = sg.set_state(gst::State::Playing) { + panic!("Failed to change to PLAYING: {error:?}"); + } + let mut h1 = gst_check::Harness::with_element(&sg, Some("sink_1"), Some("src_1")); + let mut h2 = gst_check::Harness::with_element(&sg, Some("sink_2"), Some("src_2")); + // Consume the stream-start that harness sends internally in + // gst_harness_add_element_full(). For some reason this is not done automatically (!?) + while h1.try_pull_event().is_some() {} + while h2.try_pull_event().is_some() {} + (sg, h1, h2) +} + +#[test] +fn test_push_stream_start() { + let (_, mut h1, mut h2) = make_with_double_harness(); + let input_group_id1 = GroupId::next(); + let input_group_id2 = GroupId::next(); + h1.push_event( + gst::event::StreamStart::builder("stream1") + .group_id(input_group_id1) + .build(), + ); + h2.push_event( + gst::event::StreamStart::builder("stream2") + .group_id(input_group_id2) + .build(), + ); + let e1 = h1 + .pull_event() + .expect("an event should have been pushed at the other end"); + let e2 = h2 + .pull_event() + .expect("an event should have been pushed at the other end"); + assert_eq!(e1.type_(), gst::EventType::StreamStart); + assert_eq!(e2.type_(), gst::EventType::StreamStart); + let output_group_id1 = match e1.view() { + gst::EventView::StreamStart(ev) => ev.group_id().expect("There must be a group id"), + _ => panic!("unexpected event: {e1:?}"), + }; + let output_group_id2 = match e2.view() { + gst::EventView::StreamStart(ev) => ev.group_id().expect("There must be a group id"), + _ => panic!("unexpected event: {e2:?}"), + }; + assert_eq!(output_group_id1, output_group_id2); + assert_ne!(output_group_id1, input_group_id1); + assert_ne!(output_group_id1, input_group_id2); +} + +#[test] +fn test_push_buffer() { + let (_, mut h1, _) = make_with_double_harness(); + let segment = gst::event::Segment::new(&gst::FormattedSegment::::new()); + h1.push_event(segment); + let segment_other_side = h1.pull_event().unwrap(); + assert_eq!(gst::EventType::Segment, segment_other_side.type_()); + let buffer = gst::Buffer::new(); + h1.push(buffer.clone()).unwrap(); + let buffer_other_side = h1.pull().unwrap(); + assert_eq!( + buffer.as_ptr(), + buffer_other_side.as_ptr(), + "buffer should be unmodified" + ); +} + +#[test] +fn test_upstream_seek() { + let (_, mut h1, _) = make_with_double_harness(); + let seek = gst::event::Seek::new( + 1.0, + gst::SeekFlags::FLUSH, + gst::SeekType::Set, + 3.seconds(), + gst::SeekType::None, + 0.seconds(), + ); + h1.push_upstream_event(seek); + let mut received_seek = false; + // A reconfigure event is generated, so we'll loop to skip that. + loop { + let ev = match h1.try_pull_upstream_event() { + None => break, + Some(ev) => ev, + }; + if let gst::EventView::Seek(seek) = ev.view() { + let start = seek.get().3; + let clock_time = match start { + gst::GenericFormattedValue::Time(clock_time) => clock_time, + _ => panic!("Invalid start: {start:?}"), + }; + assert_eq!(Some(3.seconds()), clock_time); + received_seek = true; + break; + } + } + assert!(received_seek); +} + +#[test] +fn test_query() { + let (_, mut h1, _) = make_with_double_harness(); + let expected_latency = 1.seconds(); + h1.set_upstream_latency(expected_latency); + let actual_latency = h1.query_latency(); + assert_eq!(Some(expected_latency), actual_latency); +} diff --git a/meson.build b/meson.build index b11de095..44b19c03 100644 --- a/meson.build +++ b/meson.build @@ -130,6 +130,7 @@ plugins = { ], }, 'inter': {'library': 'libgstrsinter'}, + 'streamgrouper': {'library': 'libgststreamgrouper'}, 'mp4': {'library': 'libgstmp4'}, 'fmp4': { diff --git a/meson_options.txt b/meson_options.txt index ca0315cd..69bf53e0 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -18,6 +18,7 @@ option('sodium-source', type: 'combo', description: 'Whether to use libsodium from the system or the built-in version from the sodiumoxide crate') option('threadshare', type: 'feature', value: 'auto', description: 'Build threadshare plugin') option('inter', type: 'feature', value: 'auto', description: 'Build inter plugin') +option('streamgrouper', type: 'feature', value: 'auto', description: 'Build streamgrouper plugin') # mux option('flavors', type: 'feature', value: 'auto', description: 'Build flavors plugin')