From e2cbfbaaeca619e3ba41daea1a9d0dc6b7ba3036 Mon Sep 17 00:00:00 2001 From: Taruntej Kanakamalla Date: Mon, 16 Oct 2023 11:52:58 +0530 Subject: [PATCH] net/webrtc: Example for whipserver rudimentary sample to test multiple WHIP client connections --- Cargo.lock | 24 ++++- net/webrtc/Cargo.toml | 7 +- net/webrtc/examples/whipserver.rs | 156 ++++++++++++++++++++++++++++++ 3 files changed, 185 insertions(+), 2 deletions(-) create mode 100644 net/webrtc/examples/whipserver.rs diff --git a/Cargo.lock b/Cargo.lock index a7421117..f6719f40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1361,6 +1361,16 @@ dependencies = [ "cipher", ] +[[package]] +name = "ctrlc" +version = "3.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82e95fbd621905b854affdc67943b043a0fbb6ed7385fd5a25650d19a8a6cfdf" +dependencies = [ + "nix 0.27.1", + "windows-sys 0.48.0", +] + [[package]] name = "darling" version = "0.20.8" @@ -2963,6 +2973,7 @@ dependencies = [ "aws-types", "chrono", "clap", + "ctrlc", "data-encoding", "fastrand", "futures", @@ -4199,7 +4210,7 @@ dependencies = [ "if-addrs", "log", "multimap 0.8.3", - "nix", + "nix 0.23.2", "rand", "socket2 0.4.10", "thiserror", @@ -4690,6 +4701,17 @@ dependencies = [ "memoffset 0.6.5", ] +[[package]] +name = "nix" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" +dependencies = [ + "bitflags 2.5.0", + "cfg-if", + "libc", +] + [[package]] name = "nnnoiseless" version = "0.5.1" diff --git a/net/webrtc/Cargo.toml b/net/webrtc/Cargo.toml index a852b299..9a8a3cc5 100644 --- a/net/webrtc/Cargo.toml +++ b/net/webrtc/Cargo.toml @@ -58,6 +58,8 @@ livekit-protocol = { version = "0.3", optional = true } livekit-api = { version = "0.3", default-features = false, features = ["signal-client", "access-token", "native-tls"], optional = true } warp = {version = "0.3", optional = true } +ctrlc = {version = "3.4.0", optional = true } + [dev-dependencies] gst-plugin-rtp = { path = "../rtp" } @@ -88,7 +90,7 @@ aws = ["dep:aws-config", "dep:aws-types", "dep:aws-credential-types", "dep:aws-s "dep:aws-sdk-kinesisvideosignaling", "dep:data-encoding", "dep:http", "dep:url-escape"] janus = ["dep:http"] livekit = ["dep:livekit-protocol", "dep:livekit-api"] -whip = ["dep:async-recursion", "dep:reqwest", "dep:warp"] +whip = ["dep:async-recursion", "dep:reqwest", "dep:warp", "dep:ctrlc"] [package.metadata.capi] min_version = "0.9.21" @@ -118,3 +120,6 @@ name = "webrtc-precise-sync-send" [[example]] name = "webrtc-precise-sync-recv" + +[[example]] +name = "whipserver" diff --git a/net/webrtc/examples/whipserver.rs b/net/webrtc/examples/whipserver.rs new file mode 100644 index 00000000..952dc4cd --- /dev/null +++ b/net/webrtc/examples/whipserver.rs @@ -0,0 +1,156 @@ +use std::process::exit; + +use anyhow::Error; +use clap::Parser; +use gst::prelude::*; + +#[derive(Parser, Debug)] +struct Args { + host_addr: String, +} + +fn link_video(pad: &gst::Pad, pipeline: &gst::Pipeline) { + let q = gst::ElementFactory::make_with_name( + "queue", + Some(format!("queue_{}", pad.name()).as_str()), + ) + .unwrap(); + let vsink = gst::ElementFactory::make_with_name( + "autovideosink", + Some(format!("vsink_{}", pad.name()).as_str()), + ) + .unwrap(); + + pipeline.add_many([&q, &vsink]).unwrap(); + gst::Element::link_many([&q, &vsink]).unwrap(); + let qsinkpad = q.static_pad("sink").unwrap(); + pad.link(&qsinkpad).expect("linking should work"); + + q.sync_state_with_parent().unwrap(); + vsink.sync_state_with_parent().unwrap(); +} + +fn unlink_video(pad: &gst::Pad, pipeline: &gst::Pipeline) { + let q = pipeline + .by_name(format!("queue_{}", pad.name()).as_str()) + .unwrap(); + let vsink = pipeline + .by_name(format!("vsink_{}", pad.name()).as_str()) + .unwrap(); + + q.set_state(gst::State::Null).unwrap(); + vsink.set_state(gst::State::Null).unwrap(); + + pipeline.remove_many([&q, &vsink]).unwrap(); +} + +fn link_audio(pad: &gst::Pad, pipeline: &gst::Pipeline) { + let aq = gst::ElementFactory::make_with_name( + "queue", + Some(format!("aqueue_{}", pad.name()).as_str()), + ) + .unwrap(); + + + let asink = gst::ElementFactory::make_with_name( + "autoaudiosink", + Some(format!("asink_{}", pad.name()).as_str()), + ) + .unwrap(); + + pipeline.add_many([&aq, &asink]).unwrap(); + gst::Element::link_many([&aq, &asink]).unwrap(); + let qsinkpad = aq.static_pad("sink").unwrap(); + pad.link(&qsinkpad).expect("linking should work"); + + aq.sync_state_with_parent().unwrap(); + asink.sync_state_with_parent().unwrap(); +} + +fn unlink_audio(pad: &gst::Pad, pipeline: &gst::Pipeline) { + let aq = pipeline + .by_name(format!("aqueue_{}", pad.name()).as_str()) + .unwrap(); + let asink = pipeline + .by_name(format!("asink_{}", pad.name()).as_str()) + .unwrap(); + + aq.set_state(gst::State::Null).unwrap(); + asink.set_state(gst::State::Null).unwrap(); + + pipeline.remove_many([&aq, &asink]).unwrap(); +} + +fn main() -> Result<(), Error> { + gst::init()?; + + let args = Args::parse(); + + let pipeline = gst::Pipeline::builder().build(); + let ws = gst::ElementFactory::make("whipserversrc").build()?; + ws.dynamic_cast_ref::() + .unwrap() + .set_child_property("signaller::host-addr", &args.host_addr); + + ws.set_property("enable-data-channel-navigation", true); + + let pipe = pipeline.clone(); + ws.connect_pad_added(move |_ws, pad| { + if pad.name().contains("video_") { + link_video(pad, &pipe); + } else if pad.name().contains("audio_") { + link_audio(pad, &pipe); + } else { + println!("unknown pad type {}", pad.name()); + } + }); + + let pipe = pipeline.clone(); + ws.connect_pad_removed(move |_ws, pad| { + if pad.name().contains("video_") { + unlink_video(pad, &pipe); + } else if pad.name().contains("audio_") { + unlink_audio(pad, &pipe); + } else { + println!("unknown pad type {}", pad.name()); + } + }); + pipeline.add(&ws)?; + pipeline.set_state(gst::State::Playing)?; + + let p = pipeline.clone(); + ctrlc::set_handler(move || { + p.set_state(gst::State::Null).unwrap(); + exit(0); + }) + .expect("Error setting Ctrl-C handler"); + + let bus = pipeline.bus().expect("Pipeline should have a bus"); + for msg in bus.iter_timed(gst::ClockTime::NONE) { + use gst::MessageView; + + match msg.view() { + MessageView::Eos(..) => { + println!("EOS"); + break; + } + MessageView::Error(err) => { + pipeline.set_state(gst::State::Null)?; + eprintln!( + "Got error from {}: {} ({})", + msg.src() + .map(|s| String::from(s.path_string())) + .unwrap_or_else(|| "None".into()), + err.error(), + err.debug().unwrap_or_else(|| "".into()), + ); + break; + } + _ => (), + } + } + + pipeline.set_state(gst::State::Null)?; + + Ok(()) +}