fallbacksrc: Add multi-stream support with the stream API

Brings support for multiple streams of each kind to fallbacksrc.

Usage past 1video/1audio stream now requires using the stream selection
API.
fallbacksrc will expose its own collection of streams, which will be
mapped to streams from the main and fallback source automatically.
This mapping can be changed via the map-streams signal.
The amount of streams being exposed by fallbacksrc is dictated by the
main source.

CustomSource has been updated to also support multi-stream scenarios,
both for stream-aware elements and for simple bins without such
functionality.

Co-authored-by: Sebastian Dröge <sebastian@centricular.com>
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1832>
This commit is contained in:
Piotr Brzeziński 2024-09-19 03:55:07 +02:00 committed by GStreamer Marge Bot
parent 3a03310dbc
commit 5b41d11f0c
5 changed files with 2094 additions and 804 deletions

1
Cargo.lock generated
View file

@ -2598,6 +2598,7 @@ dependencies = [
"gstreamer-video",
"gtk4",
"parking_lot",
"rand",
]
[[package]]

View file

@ -2456,10 +2456,20 @@
"direction": "src",
"presence": "sometimes"
},
"audio_%%u": {
"caps": "ANY",
"direction": "src",
"presence": "sometimes"
},
"video": {
"caps": "ANY",
"direction": "src",
"presence": "sometimes"
},
"video_%%u": {
"caps": "ANY",
"direction": "src",
"presence": "sometimes"
}
},
"properties": {
@ -2679,6 +2689,24 @@
},
"rank": "none",
"signals": {
"map-streams": {
"args": [
{
"name": "arg0",
"type": "GstStructure"
},
{
"name": "arg1",
"type": "GstStreamCollection"
},
{
"name": "arg2",
"type": "GstStreamCollection"
}
],
"return-type": "GstStructure",
"when": "last"
},
"unblock": {
"action": true,
"args": [],

View file

@ -17,6 +17,7 @@ gst-plugin-gtk4 = { path = "../../video/gtk4", optional = true }
gtk = { workspace = true, optional = true }
gio = { workspace = true, optional = true }
parking_lot = "0.12"
rand = "0.8"
[dev-dependencies]
gst-app.workspace = true

View file

@ -6,11 +6,12 @@
//
// SPDX-License-Identifier: MPL-2.0
use gst::glib;
use gst::glib::SignalHandlerId;
use gst::glib::{self, GString};
use gst::prelude::*;
use gst::subclass::prelude::*;
use std::sync::MutexGuard;
use std::{
mem,
sync::{Mutex, OnceLock},
@ -29,18 +30,51 @@ static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
struct Stream {
source_pad: gst::Pad,
ghost_pad: gst::GhostPad,
// Dummy stream we created
stream: gst::Stream,
// Used if source isn't stream-aware and we're handling the stream selection manually
is_selected: bool,
}
impl Stream {
// If source isn't stream-aware, we expose pads only after no-more-pads and READY->PAUSED
fn is_exposed(&self) -> bool {
self.ghost_pad.parent().is_some()
}
}
#[derive(Default)]
struct State {
stream_id_prefix: String,
received_collection: Option<gst::StreamCollection>,
pads: Vec<Stream>,
num_audio: usize,
num_video: usize,
pad_added_sig_id: Option<SignalHandlerId>,
pad_removed_sig_id: Option<SignalHandlerId>,
no_more_pads_sig_id: Option<SignalHandlerId>,
selection_seqnum: Option<gst::Seqnum>,
// Signals either:
// - ready->paused to post collection after no-more-pads was already called
// - or no-more-pads to post collection because we're after ready->paused
should_post_collection: bool,
}
impl State {
/// If source sent us a collection, it's stream-aware
/// and we can just forward the collection and selection events
fn is_passthrough(&self) -> bool {
self.received_collection.is_some()
}
fn our_collection(&self) -> gst::StreamCollection {
let streams = self.pads.iter().map(|p| p.stream.clone());
gst::StreamCollection::builder(None)
.streams(streams)
.build()
}
}
#[derive(Default)]
@ -127,6 +161,17 @@ impl ElementImpl for CustomSource {
gst::StateChange::NullToReady => {
self.start()?;
}
gst::StateChange::ReadyToPaused => {
let mut state = self.state.lock().unwrap();
if !state.is_passthrough() {
if state.should_post_collection {
self.post_collection(state);
} else {
// Tells no-more-pads handler it can post the collection right away
state.should_post_collection = true;
}
}
}
_ => (),
}
@ -141,6 +186,44 @@ impl ElementImpl for CustomSource {
Ok(res)
}
fn send_event(&self, event: gst::Event) -> bool {
match event.view() {
gst::EventView::SelectStreams(e) => {
if self.state.lock().unwrap().is_passthrough() {
gst::debug!(CAT, imp = self, "Forwarding select streams event to source");
let streams = e.streams();
let event = gst::event::SelectStreams::builder(
&streams.iter().map(|s| s.as_ref()).collect::<Vec<_>>(),
)
.build();
return self.source.get().unwrap().send_event(event);
}
gst::debug!(CAT, imp = self, "Handling select streams event");
let stream_ids = e
.streams()
.into_iter()
.map(glib::GString::from)
.collect::<Vec<_>>();
if let Some(message) = self.handle_stream_selection(stream_ids) {
let mut state = self.state.lock().unwrap();
state.selection_seqnum = Some(e.seqnum());
drop(state);
if let Err(err) = self.obj().post_message(message) {
gst::warning!(CAT, imp = self, "Failed to post message: {}", err);
}
return true;
}
false
}
_ => true,
}
}
}
impl BinImpl for CustomSource {
@ -149,11 +232,46 @@ impl BinImpl for CustomSource {
use gst::MessageView;
match msg.view() {
MessageView::StreamCollection(_) => {
// TODO: Drop stream collection message for now, we only create a simple custom
// one here so that fallbacksrc can know about our streams. It is never
// forwarded.
self.handle_source_no_more_pads();
MessageView::StreamCollection(collection) => {
// Receiving a stream collection indicates we can be in passthrough mode
// Otherwise if no collection is received, we generate our own one and handle selection etc.
gst::debug!(
CAT,
imp = self,
"Forwarding stream collection message from source: {:?}",
collection.stream_collection()
);
let mut state = self.state.lock().unwrap();
state.received_collection = Some(collection.stream_collection().clone());
drop(state);
let message =
gst::message::StreamCollection::builder(&collection.stream_collection())
.src(&*self.obj())
.build();
if let Err(err) = self.obj().post_message(message) {
gst::warning!(CAT, imp = self, "Failed to post message: {}", err);
}
}
MessageView::StreamsSelected(selected) => {
gst::debug!(
CAT,
imp = self,
"Forwarding streams-selected from source: {:?}",
selected.streams()
);
let message = gst::message::StreamsSelected::builder(&selected.stream_collection())
.streams(selected.streams())
.src(&*self.obj())
.build();
if let Err(err) = self.obj().post_message(message) {
gst::warning!(CAT, imp = self, "Failed to post message: {}", err);
}
}
_ => self.parent_handle_message(msg),
}
@ -165,6 +283,10 @@ impl CustomSource {
gst::debug!(CAT, imp = self, "Starting");
let source = self.source.get().unwrap();
let mut state = self.state.lock().unwrap();
state.stream_id_prefix = format!("{:016x}", rand::random::<u64>());
drop(state);
let templates = source.pad_template_list();
if templates
@ -251,11 +373,12 @@ impl CustomSource {
let mut state = self.state.lock().unwrap();
let mut stream_type = None;
let (mut stream_type, mut stream_id) = (None, None);
// Take stream type from stream-start event if we can
if let Some(ev) = pad.sticky_event::<gst::event::StreamStart>(0) {
stream_type = ev.stream().map(|s| s.stream_type());
stream_id = ev.stream().and_then(|s| s.stream_id());
}
// Otherwise from the caps
@ -298,18 +421,44 @@ impl CustomSource {
.unwrap()
.name(name)
.build();
ghost_pad.set_active(true).unwrap();
// If source posted a stream collection, we can(?) assume that the stream has an ID
// Otherwise we create our own simple collection
if !state.is_passthrough() {
stream_id = if stream_type.contains(gst::StreamType::AUDIO) {
Some(format!("{}/audio/{}", state.stream_id_prefix, state.num_audio - 1).into())
} else {
Some(format!("{}/video/{}", state.stream_id_prefix, state.num_video - 1).into())
};
} else {
assert!(stream_id.is_some());
}
let expose_pad = state.is_passthrough();
let gst_stream = gst::Stream::new(
Some(stream_id.as_ref().unwrap()),
None,
stream_type,
gst::StreamFlags::empty(),
);
let stream = Stream {
source_pad: pad.clone(),
ghost_pad: ghost_pad.clone().upcast(),
// TODO: We only add the stream type right now
stream: gst::Stream::new(None, None, stream_type, gst::StreamFlags::empty()),
stream: gst_stream.clone(),
is_selected: true,
};
state.pads.push(stream);
drop(state);
ghost_pad.set_active(true).unwrap();
self.obj().add_pad(&ghost_pad).unwrap();
if expose_pad {
let stream_start_event = gst::event::StreamStart::builder(&stream_id.unwrap())
.stream(gst_stream)
.build();
ghost_pad.store_sticky_event(&stream_start_event).unwrap();
self.obj().add_pad(&ghost_pad).unwrap();
}
Ok(())
}
@ -328,31 +477,152 @@ impl CustomSource {
Some(v) => v,
};
let ghost_pad = stream.ghost_pad.clone();
state.pads.remove(i);
drop(state);
// If we're in streams-aware mode (have a collection from source)
// then this is fine, probably happens because streams were de-selected.
// Otherwise if the source is not stream-aware, this means the stream disappeared
// and we need to remove it from our proxy collection.
ghost_pad.set_active(false).unwrap();
let _ = ghost_pad.set_target(None::<&gst::Pad>);
let _ = self.obj().remove_pad(&ghost_pad);
let (ghost_pad, is_exposed) = (stream.ghost_pad.clone(), stream.is_exposed());
state.pads.remove(i);
if !state.is_passthrough() {
let our_collection = state.our_collection();
let our_seqnum = gst::Seqnum::next();
state.selection_seqnum = Some(our_seqnum);
drop(state);
let _ = self.obj().post_message(
gst::message::StreamsSelected::builder(&our_collection)
.src(&*self.obj())
.build(),
);
let state = self.state.lock().unwrap();
if state.selection_seqnum == Some(our_seqnum) {
let selected_ids = state
.pads
.iter()
.filter(|p| p.is_selected)
.map(|p| p.stream.stream_id().unwrap())
.collect::<Vec<_>>();
if let Some(message) = self.handle_stream_selection(selected_ids) {
let _ = self.obj().post_message(message);
}
}
}
if is_exposed {
ghost_pad.set_active(false).unwrap();
let _ = ghost_pad.set_target(None::<&gst::Pad>);
let _ = self.obj().remove_pad(&ghost_pad);
}
}
fn handle_source_no_more_pads(&self) {
gst::debug!(CAT, imp = self, "Source signalled no-more-pads");
let state = self.state.lock().unwrap();
let collection = gst::StreamCollection::builder(None)
.streams(state.pads.iter().map(|p| p.stream.clone()))
.build();
let mut state = self.state.lock().unwrap();
// Make sure this isn't happening if a source posted a stream collection
assert!(!state.is_passthrough());
// Tells ready->paused handler to post collection and handle selection there
if !state.should_post_collection {
state.should_post_collection = true;
return;
}
self.post_collection(state);
}
fn post_collection(&self, mut state: MutexGuard<State>) {
let collection = state.our_collection();
let our_seqnum = gst::Seqnum::next();
state.selection_seqnum = Some(our_seqnum);
state.should_post_collection = false;
drop(state);
self.obj().no_more_pads();
let _ = self.obj().post_message(
gst::message::StreamsSelected::builder(&collection)
gst::message::StreamCollection::builder(&collection)
.src(&*self.obj())
.build(),
);
let state = self.state.lock().unwrap();
if state.selection_seqnum == Some(our_seqnum) {
// Exposes all available pads by default
let selected_ids = state
.pads
.iter()
.map(|p| p.stream.stream_id().unwrap())
.collect::<Vec<_>>();
drop(state);
if let Some(message) = self.handle_stream_selection(selected_ids) {
let _ = self.obj().post_message(message);
}
}
}
fn handle_stream_selection(&self, stream_ids: Vec<GString>) -> Option<gst::Message> {
let mut state_guard = self.state.lock().unwrap();
let state = &mut *state_guard;
for id in stream_ids.iter() {
if !state
.pads
.iter()
.any(|p| p.stream.stream_id().unwrap() == *id)
{
gst::error!(CAT, imp = self, "Stream with ID {} not found!", id);
return None;
}
}
let mut selected_streams = vec![];
for stream in state.pads.iter_mut() {
if stream_ids.contains(&stream.stream.stream_id().unwrap()) {
stream.is_selected = true;
selected_streams.push(stream.stream.clone());
gst::log!(
CAT,
imp = self,
"Stream {} selected",
stream.stream.stream_id().unwrap()
);
} else {
stream.is_selected = false;
gst::log!(
CAT,
imp = self,
"Stream {} not selected",
stream.stream.stream_id().unwrap()
);
}
}
let our_collection = state.our_collection();
let message = gst::message::StreamsSelected::builder(&our_collection)
.streams(selected_streams)
.src(&*self.obj())
.build();
self.expose_only_selected_streams(state);
Some(message)
}
fn expose_only_selected_streams(&self, state: &mut State) {
for stream in state.pads.iter() {
if stream.is_selected && !stream.is_exposed() {
let event = gst::event::StreamStart::builder(&stream.stream.stream_id().unwrap())
.stream(stream.stream.clone())
.build();
stream.ghost_pad.store_sticky_event(&event).unwrap();
self.obj().add_pad(&stream.ghost_pad).unwrap();
} else if !stream.is_selected && stream.is_exposed() {
let _ = stream.ghost_pad.set_target(None::<&gst::Pad>);
let _ = self.obj().remove_pad(&stream.ghost_pad);
}
}
}
fn stop(&self) {
@ -375,9 +645,10 @@ impl CustomSource {
let pads = mem::take(&mut state.pads);
state.num_audio = 0;
state.num_video = 0;
state.received_collection = None;
drop(state);
for pad in pads {
for pad in pads.iter().filter(|s| s.is_exposed()) {
let _ = pad.ghost_pad.set_target(None::<&gst::Pad>);
let _ = self.obj().remove_pad(&pad.ghost_pad);
}

File diff suppressed because it is too large Load diff