From 80877c49c342887b6671ddbed6d65f5302a580e7 Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Thu, 5 Jun 2025 13:32:04 +0200 Subject: [PATCH] net/webrtc: add new examples for stream selection over data channel Two examples are added, a server that sends N video streams, and a client that composites them together, then sends messages over the control data channel to enable some of them and disable some others. This demonstrates how custom upstream events can be sent from a client to a server, and how once a connection is established one can start and stop the flow of data for a specific media without affecting the overall connection. Part-of: --- net/webrtc/Cargo.toml | 6 + net/webrtc/examples/stream-selector-client.rs | 225 ++++++++++++++++++ net/webrtc/examples/stream-selector-server.rs | 139 +++++++++++ 3 files changed, 370 insertions(+) create mode 100644 net/webrtc/examples/stream-selector-client.rs create mode 100644 net/webrtc/examples/stream-selector-server.rs diff --git a/net/webrtc/Cargo.toml b/net/webrtc/Cargo.toml index 370d23a14..fc43b4fac 100644 --- a/net/webrtc/Cargo.toml +++ b/net/webrtc/Cargo.toml @@ -134,3 +134,9 @@ name = "webrtcsink-define-encoder-bitrates" [[example]] name = "whipclient" required-features = [ "whip" ] + +[[example]] +name = "stream-selector-server" + +[[example]] +name = "stream-selector-client" diff --git a/net/webrtc/examples/stream-selector-client.rs b/net/webrtc/examples/stream-selector-client.rs new file mode 100644 index 000000000..d690c9363 --- /dev/null +++ b/net/webrtc/examples/stream-selector-client.rs @@ -0,0 +1,225 @@ +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use anyhow::Error; +use clap::Parser; +use gst::glib; +use gst::prelude::*; +use rand::Rng; + +#[derive(Parser, Debug)] +#[clap(about, version, author)] +/// Program arguments +struct Args { + /// Frequency for selection of enabled / disabled streams, in seconds + frequency: u64, +} + +fn connect_input_stream( + pipeline: &gst::Pipeline, + pad: &gst::Pad, + compositor: &gst::Element, + capsfilter: &gst::Element, +) -> Result<(), Error> { + let depay = gst::ElementFactory::make("rtph264depay") + .property("request-keyframe", true) + .property("wait-for-keyframe", true) + .build()?; + let decode = gst::ElementFactory::make("avdec_h264").build()?; + pipeline.add_many([&depay, &decode])?; + gst::Element::link_many([&depay, &decode])?; + + let n_sink_pads = compositor.num_sink_pads() as i32; + + let srcpad = decode.static_pad("src").unwrap(); + let sinkpad = compositor.request_pad_simple("sink_%u").unwrap(); + + let pos_x = n_sink_pads % 4 * 320; + let pos_y = n_sink_pads / 4 * 240; + + sinkpad.set_property("width", 320); + sinkpad.set_property("height", 240); + sinkpad.set_property("xpos", pos_x); + sinkpad.set_property("ypos", pos_y); + + let width = std::cmp::min(320 * 4, (n_sink_pads + 1) * 320); + let height = pos_y + 240; + + let caps = gst::Caps::builder("video/x-raw") + .field("width", width) + .field("height", height) + .build(); + + capsfilter.set_property("caps", caps); + + srcpad.link(&sinkpad).unwrap(); + + depay.sync_state_with_parent().unwrap(); + decode.sync_state_with_parent().unwrap(); + + pad.link(&depay.static_pad("sink").unwrap()).unwrap(); + + Ok(()) +} + +fn build_pipeline(args: &Args) -> Result { + let pipeline = gst::Pipeline::new(); + + let src = gst::ElementFactory::make("webrtcsrc") + .property("connect-to-first-producer", true) + .property("enable-control-data-channel", true) + .build()?; + let compositor = gst::ElementFactory::make("compositor") + .property("force-live", true) + .build()?; + let capsfilter = gst::ElementFactory::make("capsfilter") + .property( + "caps", + gst::Caps::builder("video/x-raw") + .field("width", 1920) + .field("height", 1080) + .build(), + ) + .build()?; + let sink = gst::ElementFactory::make("autovideosink").build()?; + + pipeline.add_many([&src, &compositor, &capsfilter, &sink])?; + + gst::Element::link_many([&compositor, &capsfilter, &sink])?; + + let n_medias: Arc>> = Arc::new(Mutex::new(None)); + + let n_medias_clone = n_medias.clone(); + let frequency = args.frequency; + src.connect_pad_added(glib::clone!( + #[weak] + pipeline, + #[weak] + compositor, + #[weak] + capsfilter, + move |src, pad| { + if let Err(err) = connect_input_stream(&pipeline, pad, &compositor, &capsfilter) { + pipeline.post_error_message(gst::error_msg!( + gst::StreamError::Failed, + ["Failed to connect input stream: {err:?}",] + )); + } + + if let Some(n_medias) = *n_medias_clone.lock().unwrap() { + if n_medias == src.num_src_pads() as usize + 1 { + let src_weak = src.downgrade(); + std::thread::spawn(move || loop { + std::thread::sleep(Duration::from_secs(frequency)); + + let Some(src) = src_weak.upgrade() else { + break; + }; + + let mut rng = rand::rng(); + for srcpad in src.src_pads() { + let s = gst::Structure::builder("example/enabled-stream") + .field("enabled", rng.random::()) + .build(); + + let custom_event = gst::event::CustomUpstream::builder(s).build(); + srcpad.send_event(custom_event); + } + }); + } + } + } + )); + + let signaller = src.property::("signaller"); + signaller.connect_closure( + "webrtcbin-ready", + false, + glib::closure!( + #[strong] + n_medias, + move |_signaller: &glib::Object, _session_id: &str, webrtcbin: &gst::Element| { + webrtcbin.connect_notify( + Some("signaling-state"), + glib::clone!( + #[strong] + n_medias, + move |element, _param_spec| { + let state = element + .property::("signaling-state"); + + if state == gst_webrtc::WebRTCSignalingState::HaveRemoteOffer { + let description = element + .property::( + "remote-description", + ); + + *n_medias.lock().unwrap() = + Some(description.sdp().medias().count()); + } + } + ), + ); + } + ), + ); + + Ok(pipeline) +} + +fn run_pipeline(pipeline: gst::Pipeline) -> Result<(), Error> { + pipeline.set_state(gst::State::Playing)?; + + let bus = pipeline.bus().unwrap(); + + while let Some(message) = bus.timed_pop(gst::ClockTime::NONE) { + match message.view() { + gst::MessageView::Eos(_) => { + eprintln!("EOS!"); + break; + } + gst::MessageView::Error(err) => { + eprintln!( + "Error in element {:?}, {}", + err.src().map(|o| o.name()), + err.debug() + .map(|d| format!("debug:\n{}", d)) + .unwrap_or(String::from("no debug")) + ); + + pipeline.debug_to_dot_file_with_ts( + gst::DebugGraphDetails::ALL, + format!("{}-error", pipeline.name(),), + ); + + break; + } + gst::MessageView::Latency(_) => { + let _ = pipeline.recalculate_latency(); + } + gst::MessageView::StateChanged(sc) => { + if sc.src() == Some(pipeline.upcast_ref()) { + pipeline.debug_to_dot_file_with_ts( + gst::DebugGraphDetails::ALL, + format!("{}-{:?}-{:?}", pipeline.name(), sc.old(), sc.current()), + ); + } + } + _ => (), + } + } + + let _ = pipeline.set_state(gst::State::Null); + + Ok(()) +} + +fn main() -> Result<(), Error> { + gst::init()?; + + let args = Args::parse(); + + let pipeline = build_pipeline(&args)?; + + run_pipeline(pipeline) +} diff --git a/net/webrtc/examples/stream-selector-server.rs b/net/webrtc/examples/stream-selector-server.rs new file mode 100644 index 000000000..2a9843286 --- /dev/null +++ b/net/webrtc/examples/stream-selector-server.rs @@ -0,0 +1,139 @@ +use anyhow::Error; +use clap::Parser; +use gst::glib; +use gst::prelude::*; + +#[derive(Parser, Debug)] +#[clap(about, version, author)] +/// Program arguments +struct Args { + /// The number of streams to serve over the peer connection + n_streams: u32, +} + +fn build_pipeline(args: &Args) -> Result { + let pipeline = gst::Pipeline::new(); + + let sink = gst::ElementFactory::make("webrtcsink") + .property("run-signalling-server", true) + .property("enable-control-data-channel", true) + .property("video-caps", gst::Caps::new_empty_simple("video/x-h264")) + .build()?; + + pipeline.add(&sink)?; + + let mut valves: Vec = vec![]; + + for i in 0..args.n_streams { + let src = gst::ElementFactory::make("videotestsrc").build()?; + let textoverlay = gst::ElementFactory::make("textoverlay") + .property("text", i.to_string()) + .property("font-desc", "sans 72") + .build()?; + let valve = gst::ElementFactory::make("valve") + .property_from_str("drop-mode", "transform-to-gap") + .build()?; + + pipeline.add_many([&src, &textoverlay, &valve])?; + gst::Element::link_many([&src, &textoverlay, &valve, &sink])?; + + valve.static_pad("src").unwrap().add_probe( + gst::PadProbeType::EVENT_UPSTREAM, + glib::clone!( + #[weak] + valve, + #[upgrade_or] + gst::PadProbeReturn::Ok, + move |_pad, probe_info| { + let event = probe_info.event().unwrap(); + + if let gst::EventView::CustomUpstream(cup) = event.view() { + let s = cup.structure().unwrap(); + + if s.name() == "example/enabled-stream" { + let enabled: bool = s.get("enabled").unwrap(); + + valve.set_property("drop", !enabled); + } + } + + gst::PadProbeReturn::Ok + } + ), + ); + + valves.push(valve); + } + + let signaller = sink.property::("signaller"); + signaller.connect_closure( + "webrtcbin-ready", + false, + glib::closure!(move |_signaller: &glib::Object, + _session_id: &str, + _webrtcbin: &gst::Element| { + for valve in &valves { + valve.set_property("drop", false); + } + }), + ); + + Ok(pipeline) +} + +fn run_pipeline(pipeline: gst::Pipeline) -> Result<(), Error> { + pipeline.set_state(gst::State::Playing)?; + + let bus = pipeline.bus().unwrap(); + + while let Some(message) = bus.timed_pop(gst::ClockTime::NONE) { + match message.view() { + gst::MessageView::Eos(_) => { + eprintln!("EOS!"); + break; + } + gst::MessageView::Error(err) => { + eprintln!( + "Error in element {:?}, {}", + err.src().map(|o| o.name()), + err.debug() + .map(|d| format!("debug:\n{}", d)) + .unwrap_or(String::from("no debug")) + ); + + pipeline.debug_to_dot_file_with_ts( + gst::DebugGraphDetails::ALL, + format!("{}-error", pipeline.name(),), + ); + + break; + } + gst::MessageView::Latency(_) => { + let _ = pipeline.recalculate_latency(); + } + gst::MessageView::StateChanged(sc) => { + if sc.src() == Some(pipeline.upcast_ref()) { + pipeline.debug_to_dot_file_with_ts( + gst::DebugGraphDetails::ALL, + format!("{}-{:?}-{:?}", pipeline.name(), sc.old(), sc.current()), + ); + } + } + _ => (), + } + } + + let _ = pipeline.set_state(gst::State::Null); + + Ok(()) +} + +fn main() -> Result<(), Error> { + gst::init()?; + + let args = Args::parse(); + + let pipeline = build_pipeline(&args)?; + + run_pipeline(pipeline) +}