Add streamgrouper element

streamgrouper allows to construct simple gst-launch pipelines where
streams of different group-ids are merged to use the same group-id.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1745>
This commit is contained in:
Alicia Boya García 2024-08-22 14:59:25 +02:00 committed by GStreamer Marge Bot
parent 97cf6b859f
commit f12bd41510
11 changed files with 777 additions and 0 deletions

9
Cargo.lock generated
View file

@ -2882,6 +2882,15 @@ dependencies = [
"url",
]
[[package]]
name = "gst-plugin-streamgrouper"
version = "0.14.0-alpha.1"
dependencies = [
"gst-plugin-version-helper",
"gstreamer",
"gstreamer-check",
]
[[package]]
name = "gst-plugin-textahead"
version = "0.14.0-alpha.1"

View file

@ -17,6 +17,7 @@ members = [
"generic/sodium",
"generic/threadshare",
"generic/inter",
"generic/streamgrouper",
"generic/gopbuffer",
"mux/flavors",
@ -74,6 +75,7 @@ default-members = [
"generic/threadshare",
"generic/inter",
"generic/gopbuffer",
"generic/streamgrouper",
"mux/fmp4",
"mux/mp4",

View file

@ -11921,6 +11921,43 @@
"tracers": {},
"url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
},
"streamgrouper": {
"description": "Filter element that makes all the incoming streams share a group-id",
"elements": {
"streamgrouper": {
"author": "Alicia Boya García <aboya@igalia.com>",
"description": "Modifies all input streams to use the same group-id",
"hierarchy": [
"GstStreamGrouper",
"GstElement",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"klass": "Generic",
"pad-templates": {
"sink_%%u": {
"caps": "ANY",
"direction": "sink",
"presence": "request"
},
"src_%%u": {
"caps": "ANY",
"direction": "src",
"presence": "sometimes"
}
},
"rank": "none"
}
},
"filename": "gststreamgrouper",
"license": "MPL",
"other-types": {},
"package": "gst-plugin-streamgrouper",
"source": "gst-plugin-streamgrouper",
"tracers": {},
"url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
},
"textahead": {
"description": "GStreamer Plugin for displaying upcoming text buffers ahead of time",
"elements": {

View file

@ -0,0 +1,41 @@
[package]
name = "gst-plugin-streamgrouper"
authors = ["Alicia Boya García <aboya@igalia.com>"]
license = "MPL-2.0"
description = "Filter element that makes all the incoming streams share a group-id"
version.workspace = true
repository.workspace = true
edition.workspace = true
rust-version.workspace = true
[dependencies]
gst.workspace = true
[dev-dependencies]
gst-check.workspace = true
[lib]
name = "gststreamgrouper"
crate-type = ["cdylib", "rlib"]
path = "src/lib.rs"
[build-dependencies]
gst-plugin-version-helper.workspace = true
[features]
static = []
capi = []
doc = ["gst/v1_18"]
[package.metadata.capi]
min_version = "0.8.0"
[package.metadata.capi.header]
enabled = false
[package.metadata.capi.library]
install_subdir = "gstreamer-1.0"
versioning = false
[package.metadata.capi.pkg_config]
requires_private = "gstreamer-1.0, gobject-2.0, glib-2.0, gmodule-2.0"

View file

@ -0,0 +1,3 @@
fn main() {
gst_plugin_version_helper::info();
}

View file

@ -0,0 +1,36 @@
// Copyright (C) 2024 Igalia S.L. <aboya@igalia.com>
// Copyright (C) 2024 Comcast <aboya@igalia.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
#![allow(unused_doc_comments)]
use gst::glib;
mod streamgrouper;
/**
* plugin-streamgrouper:
*
* Since: plugins-rs-0.14.0
*/
gst::plugin_define!(
streamgrouper,
env!("CARGO_PKG_DESCRIPTION"),
plugin_init,
concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")),
"MPL",
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_REPOSITORY"),
env!("BUILD_REL_DATE")
);
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
streamgrouper::register(plugin).unwrap();
Ok(())
}

View file

@ -0,0 +1,406 @@
// Copyright (C) 2024 Igalia S.L. <aboya@igalia.com>
// Copyright (C) 2024 Comcast <aboya@igalia.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use std::collections::BTreeMap;
use std::sync::Mutex;
use gst::prelude::{ElementExt, GstObjectExt, PadExt, PadExtManual};
use gst::subclass::{prelude::*, ElementMetadata};
use gst::{glib, Caps, GroupId, Pad, PadDirection, PadPresence};
use gst::{Element, PadTemplate};
use std::sync::LazyLock;
pub static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
gst::DebugCategory::new(
"streamgrouper",
gst::DebugColorFlags::empty(),
Some("Filter element that makes all the incoming streams share a group-id"),
)
});
#[derive(Default)]
pub struct StreamGrouper {
pub state: Mutex<State>,
}
pub struct State {
pub group_id: GroupId,
pub streams_by_number: BTreeMap<usize, Stream>,
}
impl Default for State {
fn default() -> Self {
Self {
group_id: GroupId::next(),
streams_by_number: Default::default(),
}
}
}
impl State {
fn find_unused_number(&self) -> usize {
match self.streams_by_number.keys().last() {
Some(n) => n + 1,
None => 0,
}
}
fn get_stream_with_number(&self, number: usize) -> Option<&Stream> {
self.streams_by_number.get(&number)
}
fn get_stream_with_number_or_panic(&self, number: usize) -> &Stream {
self.get_stream_with_number(number)
.unwrap_or_else(|| panic!("Pad is associated with stream {number} which should exist"))
}
fn add_stream_or_panic(&mut self, number: usize, stream: Stream) -> &Stream {
use std::collections::btree_map::Entry::{Occupied, Vacant};
match self.streams_by_number.entry(number) {
Occupied(_) => panic!("Stream {number} already exists!"),
Vacant(entry) => {
return entry.insert(stream);
}
};
}
fn remove_stream_or_panic(&mut self, number: usize) {
self.streams_by_number.remove(&number).or_else(|| {
panic!("Attempted to delete stream number {number}, which does not exist");
});
}
}
pub struct Stream {
pub stream_number: usize,
pub sinkpad: Pad,
pub srcpad: Pad,
}
impl StreamGrouper {
fn request_new_pad_with_number(&self, stream_number: Option<usize>) -> Option<Pad> {
let mut state = self.state.lock().unwrap();
let stream_number = stream_number.unwrap_or_else(|| state.find_unused_number());
if state.get_stream_with_number(stream_number).is_some() {
gst::error!(
CAT,
imp = self,
"New pad with number {stream_number} was requested, but it already exists",
);
return None;
}
// Create the pads
let srcpad = Pad::builder(PadDirection::Src)
.name(format!("src_{stream_number}"))
.query_function(move |pad, parent, query| {
StreamGrouper::catch_panic_pad_function(
parent,
|| false,
|streamgrouper| streamgrouper.src_query(pad, query, stream_number),
)
})
.event_function(move |pad, parent, event| {
StreamGrouper::catch_panic_pad_function(
parent,
|| false,
|streamgrouper| streamgrouper.src_event(pad, event, stream_number),
)
})
.iterate_internal_links_function(move |pad, parent| {
StreamGrouper::catch_panic_pad_function(
parent,
|| gst::Iterator::from_vec(vec![]),
|streamgrouper| streamgrouper.iterate_internal_links(pad, stream_number),
)
})
.build();
let sinkpad = Pad::builder(PadDirection::Sink)
.name(format!("sink_{stream_number}"))
.query_function(move |pad, parent, query| {
StreamGrouper::catch_panic_pad_function(
parent,
|| false,
|streamgrouper| streamgrouper.sink_query(pad, query, stream_number),
)
})
.chain_function(move |pad, parent, buffer| {
StreamGrouper::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|streamgrouper| streamgrouper.sink_chain(pad, buffer, stream_number),
)
})
.event_function(move |pad, parent, event| {
StreamGrouper::catch_panic_pad_function(
parent,
|| false,
|streamgrouper| streamgrouper.sink_event(pad, event, stream_number),
)
})
.iterate_internal_links_function(move |pad, parent| {
StreamGrouper::catch_panic_pad_function(
parent,
|| gst::Iterator::from_vec(vec![]),
|streamgrouper| streamgrouper.iterate_internal_links(pad, stream_number),
)
})
.build();
sinkpad.set_active(true).unwrap();
srcpad.set_active(true).unwrap();
// Add the stream
let stream = Stream {
stream_number,
sinkpad: sinkpad.clone(),
srcpad: srcpad.clone(),
};
state.add_stream_or_panic(stream_number, stream);
drop(state);
self.obj().add_pad(&srcpad).unwrap();
self.obj().add_pad(&sinkpad).unwrap();
Some(sinkpad)
}
fn src_query(
&self,
_srcpad: &gst::Pad,
query: &mut gst::QueryRef,
stream_number: usize,
) -> bool {
let state = self.state.lock().unwrap();
let stream = state.get_stream_with_number_or_panic(stream_number);
let sinkpad = stream.sinkpad.clone();
drop(state);
sinkpad.peer_query(query) // Passthrough
}
fn sink_query(
&self,
_sinkpad: &gst::Pad,
query: &mut gst::QueryRef,
stream_number: usize,
) -> bool {
let state = self.state.lock().unwrap();
let stream = state.get_stream_with_number_or_panic(stream_number);
let srcpad = stream.srcpad.clone();
drop(state);
srcpad.peer_query(query) // Passthrough
}
fn sink_event(&self, _sinkpad: &gst::Pad, mut event: gst::Event, stream_number: usize) -> bool {
let state = self.state.lock().unwrap();
let stream = state.get_stream_with_number_or_panic(stream_number);
let target_group_id = state.group_id;
let srcpad = stream.srcpad.clone();
drop(state);
if event.type_() != gst::EventType::StreamStart {
return srcpad.push_event(event);
}
// Patch stream-start group-id
match event.make_mut().view_mut() {
gst::EventViewMut::StreamStart(stream_start) => {
stream_start.set_group_id(target_group_id);
}
_ => unreachable!(),
};
srcpad.push_event(event)
}
fn src_event(&self, _srcpad: &gst::Pad, event: gst::Event, stream_number: usize) -> bool {
let state = self.state.lock().unwrap();
let stream = state.get_stream_with_number_or_panic(stream_number);
let sinkpad = stream.sinkpad.clone();
drop(state);
sinkpad.push_event(event)
}
fn iterate_internal_links(
&self,
pad: &gst::Pad,
stream_number: usize,
) -> gst::Iterator<gst::Pad> {
let state = self.state.lock().unwrap();
let stream = state.get_stream_with_number_or_panic(stream_number);
if pad == &stream.sinkpad {
gst::Iterator::from_vec(vec![stream.srcpad.clone()])
} else {
gst::Iterator::from_vec(vec![stream.sinkpad.clone()])
}
}
fn sink_chain(
&self,
_pad: &Pad,
buffer: gst::Buffer,
stream_number: usize,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let state = self.state.lock().unwrap();
let stream = state.get_stream_with_number_or_panic(stream_number);
let srcpad = stream.srcpad.clone();
drop(state);
srcpad.push(buffer)
}
}
#[glib::object_subclass]
impl ObjectSubclass for StreamGrouper {
const NAME: &'static str = "GstStreamGrouper";
type Type = super::StreamGrouper;
type ParentType = Element;
}
impl ObjectImpl for StreamGrouper {}
impl GstObjectImpl for StreamGrouper {}
impl ElementImpl for StreamGrouper {
fn metadata() -> Option<&'static ElementMetadata> {
static ELEMENT_METADATA: LazyLock<ElementMetadata> = LazyLock::new(|| {
ElementMetadata::new(
"Stream Grouping Filter",
"Generic",
"Modifies all input streams to use the same group-id",
"Alicia Boya García <aboya@igalia.com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn change_state(
&self,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
if transition == gst::StateChange::PausedToReady {
let mut state = self.state.lock().unwrap();
let group_id = GroupId::next();
gst::debug!(
CAT,
imp = self,
"Invalidating previous group id: {:?} Next group id: {group_id:?}",
state.group_id,
);
state.group_id = group_id;
};
self.parent_change_state(transition)
}
fn pad_templates() -> &'static [PadTemplate] {
static PAD_TEMPLATES: LazyLock<Vec<PadTemplate>> = LazyLock::new(|| {
// src side
let src_pad_template = PadTemplate::new(
"src_%u",
PadDirection::Src,
PadPresence::Sometimes,
&Caps::new_any(),
)
.unwrap();
// sink side
let sink_pad_template = PadTemplate::new(
"sink_%u",
PadDirection::Sink,
PadPresence::Request,
&Caps::new_any(),
)
.unwrap();
vec![src_pad_template, sink_pad_template]
});
PAD_TEMPLATES.as_ref()
}
fn request_new_pad(
&self,
templ: &gst::PadTemplate,
name: Option<&str>,
_caps: Option<&gst::Caps>,
) -> Option<gst::Pad> {
if templ.name_template() != "sink_%u" {
gst::error!(
CAT,
imp = self,
"Pad requested on extraneous template: {:?}",
templ.name_template()
);
return None;
}
let stream_number = match name {
None => None,
Some(name) => {
match name
.strip_prefix("sink_")
.and_then(|s| s.parse::<usize>().ok())
{
Some(idx) => Some(idx),
None => {
gst::error!(CAT, imp = self, "Invalid pad name requested: {name:?}");
return None;
}
}
}
};
self.request_new_pad_with_number(stream_number)
}
fn release_pad(&self, pad: &gst::Pad) {
let mut state = self.state.lock().unwrap();
let stream = match pad
.name()
.strip_prefix("sink_")
.and_then(|s| s.parse::<usize>().ok())
.and_then(|stream_number| state.get_stream_with_number(stream_number))
{
Some(stream) => stream,
None => {
gst::error!(
CAT,
imp = self,
"Requested to remove pad {}, which is not a request pad of this element",
pad.name()
);
return;
}
};
let stream_number = stream.stream_number;
let srcpad = stream.srcpad.clone();
let sinkpad = stream.sinkpad.clone();
state.remove_stream_or_panic(stream_number);
drop(state);
sinkpad.set_active(false).unwrap_or_else(|_| {
gst::warning!(
CAT,
imp = self,
"Failed to deactivate sinkpad for id {stream_number}",
);
});
srcpad.set_active(false).unwrap_or_else(|_| {
gst::warning!(
CAT,
imp = self,
"Failed to deactivate srcpad for id {stream_number}",
);
});
self.obj().remove_pad(&sinkpad).unwrap();
self.obj().remove_pad(&srcpad).unwrap();
}
}

View file

@ -0,0 +1,71 @@
// Copyright (C) 2024 Igalia S.L. <aboya@igalia.com>
// Copyright (C) 2024 Comcast <aboya@igalia.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use gst::glib;
use gst::prelude::*;
mod imp;
/**
* SECTION:element-streamgrouper
*
* #streamgrouper takes any number of streams in the sinkpads and patches the STREAM_START
* events so that they all belong to the same group-id.
*
* This is useful for constructing simple pipelines where different sources push buffers
* into an element that contains a streamsynchronizer element, like playsink.
*
* Notice that because of this group-id merging, using streamgrouper is incompatible with
* gapless playback. However, this is not a problem, since streamgrouper is currently
* intended only for use cases in which only one stream group will be played.
*
* ## Example
*
* This is a simple pipeline where, because the audio and video streams come from
* unrelated source elements, they end up with different group-ids and therefore get stuck
* forever waiting inside the streamsynchronizer inside playsink, and never play:
*
* |[
* # Will get stuck! The streams from audiotestsrc and videotestsrc don't
* # share a group-id.
* gst-launch-1.0 \
* playsink name=myplaysink \
* audiotestsrc ! myplaysink.audio_sink \
* videotestsrc ! myplaysink.video_sink
* ]|
*
* By adding streamgrouper to the pipeline, the streams are become part of the same group
* and playback is possible.
*
* |[
gst-launch-1.0 \
playsink name=myplaysink \
streamgrouper name=grouper \
audiotestsrc ! grouper.sink_0 grouper.src_0 ! myplaysink.audio_sink \
videotestsrc ! grouper.sink_1 grouper.src_1 ! myplaysink.video_sink
* ]|
*
* Since: plugins-rs-0.14.0
*/
glib::wrapper! {
pub struct StreamGrouper(ObjectSubclass<imp::StreamGrouper>)
@extends
gst::Element,
gst::Object;
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"streamgrouper",
gst::Rank::NONE,
StreamGrouper::static_type(),
)
}

View file

@ -0,0 +1,170 @@
// Copyright (C) 2024 Igalia S.L. <aboya@igalia.com>
// Copyright (C) 2024 Comcast <aboya@igalia.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use gst::{prelude::*, Element, GroupId};
use gst_check::Harness;
use std::sync::LazyLock;
#[allow(unused)]
static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
gst::DebugCategory::new(
"streamgrouper-test",
gst::DebugColorFlags::empty(),
Some("streamgrouper test"),
)
});
fn init() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
gst::init().unwrap();
gststreamgrouper::plugin_register_static().expect("gststreamgrouper streamgrouper test");
});
}
#[test]
fn test_request_invalid_pad_name() {
init();
let sg = gst::ElementFactory::make("streamgrouper")
.build()
.expect("streamgrouper factory should exist");
assert!(sg.request_pad_simple("invalid_name").is_none());
}
#[test]
fn test_can_change_state() {
init();
let sg = gst::ElementFactory::make("streamgrouper")
.build()
.expect("streamgrouper factory should exist");
if let Err(error) = sg.set_state(gst::State::Playing) {
panic!("Failed to change to PLAYING: {error:?}");
}
if let Err(error) = sg.set_state(gst::State::Null) {
panic!("Failed to change to NULL: {error:?}");
}
}
fn make_with_double_harness() -> (Element, Harness, Harness) {
init();
let sg = gst::ElementFactory::make("streamgrouper")
.build()
.expect("streamgrouper factory should exist");
// gst_harness_add_element_full() sets the element to PLAYING, but not before sending
// a stream-start, which means it ends up sending buffers while in NULL state.
// streamgrouper can handle that, but let's rather test what applications should be
// doing instead and set the element to a higher state first.
if let Err(error) = sg.set_state(gst::State::Playing) {
panic!("Failed to change to PLAYING: {error:?}");
}
let mut h1 = gst_check::Harness::with_element(&sg, Some("sink_1"), Some("src_1"));
let mut h2 = gst_check::Harness::with_element(&sg, Some("sink_2"), Some("src_2"));
// Consume the stream-start that harness sends internally in
// gst_harness_add_element_full(). For some reason this is not done automatically (!?)
while h1.try_pull_event().is_some() {}
while h2.try_pull_event().is_some() {}
(sg, h1, h2)
}
#[test]
fn test_push_stream_start() {
let (_, mut h1, mut h2) = make_with_double_harness();
let input_group_id1 = GroupId::next();
let input_group_id2 = GroupId::next();
h1.push_event(
gst::event::StreamStart::builder("stream1")
.group_id(input_group_id1)
.build(),
);
h2.push_event(
gst::event::StreamStart::builder("stream2")
.group_id(input_group_id2)
.build(),
);
let e1 = h1
.pull_event()
.expect("an event should have been pushed at the other end");
let e2 = h2
.pull_event()
.expect("an event should have been pushed at the other end");
assert_eq!(e1.type_(), gst::EventType::StreamStart);
assert_eq!(e2.type_(), gst::EventType::StreamStart);
let output_group_id1 = match e1.view() {
gst::EventView::StreamStart(ev) => ev.group_id().expect("There must be a group id"),
_ => panic!("unexpected event: {e1:?}"),
};
let output_group_id2 = match e2.view() {
gst::EventView::StreamStart(ev) => ev.group_id().expect("There must be a group id"),
_ => panic!("unexpected event: {e2:?}"),
};
assert_eq!(output_group_id1, output_group_id2);
assert_ne!(output_group_id1, input_group_id1);
assert_ne!(output_group_id1, input_group_id2);
}
#[test]
fn test_push_buffer() {
let (_, mut h1, _) = make_with_double_harness();
let segment = gst::event::Segment::new(&gst::FormattedSegment::<gst::ClockTime>::new());
h1.push_event(segment);
let segment_other_side = h1.pull_event().unwrap();
assert_eq!(gst::EventType::Segment, segment_other_side.type_());
let buffer = gst::Buffer::new();
h1.push(buffer.clone()).unwrap();
let buffer_other_side = h1.pull().unwrap();
assert_eq!(
buffer.as_ptr(),
buffer_other_side.as_ptr(),
"buffer should be unmodified"
);
}
#[test]
fn test_upstream_seek() {
let (_, mut h1, _) = make_with_double_harness();
let seek = gst::event::Seek::new(
1.0,
gst::SeekFlags::FLUSH,
gst::SeekType::Set,
3.seconds(),
gst::SeekType::None,
0.seconds(),
);
h1.push_upstream_event(seek);
let mut received_seek = false;
// A reconfigure event is generated, so we'll loop to skip that.
loop {
let ev = match h1.try_pull_upstream_event() {
None => break,
Some(ev) => ev,
};
if let gst::EventView::Seek(seek) = ev.view() {
let start = seek.get().3;
let clock_time = match start {
gst::GenericFormattedValue::Time(clock_time) => clock_time,
_ => panic!("Invalid start: {start:?}"),
};
assert_eq!(Some(3.seconds()), clock_time);
received_seek = true;
break;
}
}
assert!(received_seek);
}
#[test]
fn test_query() {
let (_, mut h1, _) = make_with_double_harness();
let expected_latency = 1.seconds();
h1.set_upstream_latency(expected_latency);
let actual_latency = h1.query_latency();
assert_eq!(Some(expected_latency), actual_latency);
}

View file

@ -130,6 +130,7 @@ plugins = {
],
},
'inter': {'library': 'libgstrsinter'},
'streamgrouper': {'library': 'libgststreamgrouper'},
'mp4': {'library': 'libgstmp4'},
'fmp4': {

View file

@ -18,6 +18,7 @@ option('sodium-source', type: 'combo',
description: 'Whether to use libsodium from the system or the built-in version from the sodiumoxide crate')
option('threadshare', type: 'feature', value: 'auto', description: 'Build threadshare plugin')
option('inter', type: 'feature', value: 'auto', description: 'Build inter plugin')
option('streamgrouper', type: 'feature', value: 'auto', description: 'Build streamgrouper plugin')
# mux
option('flavors', type: 'feature', value: 'auto', description: 'Build flavors plugin')