net/webrtc: Example for whipserver

rudimentary sample to test multiple WHIP client connections

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1339>
This commit is contained in:
Taruntej Kanakamalla 2023-10-16 11:52:58 +05:30 committed by GStreamer Marge Bot
parent 712d4757c3
commit 83f76280f5
3 changed files with 185 additions and 2 deletions

24
Cargo.lock generated
View file

@ -1383,6 +1383,16 @@ dependencies = [
"cipher", "cipher",
] ]
[[package]]
name = "ctrlc"
version = "3.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82e95fbd621905b854affdc67943b043a0fbb6ed7385fd5a25650d19a8a6cfdf"
dependencies = [
"nix 0.27.1",
"windows-sys 0.48.0",
]
[[package]] [[package]]
name = "darling" name = "darling"
version = "0.20.9" version = "0.20.9"
@ -2996,6 +3006,7 @@ dependencies = [
"aws-types", "aws-types",
"chrono", "chrono",
"clap", "clap",
"ctrlc",
"data-encoding", "data-encoding",
"fastrand", "fastrand",
"futures", "futures",
@ -4267,7 +4278,7 @@ dependencies = [
"if-addrs", "if-addrs",
"log", "log",
"multimap 0.8.3", "multimap 0.8.3",
"nix", "nix 0.23.2",
"rand", "rand",
"socket2 0.4.10", "socket2 0.4.10",
"thiserror", "thiserror",
@ -4758,6 +4769,17 @@ dependencies = [
"memoffset 0.6.5", "memoffset 0.6.5",
] ]
[[package]]
name = "nix"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053"
dependencies = [
"bitflags 2.5.0",
"cfg-if",
"libc",
]
[[package]] [[package]]
name = "nnnoiseless" name = "nnnoiseless"
version = "0.5.1" version = "0.5.1"

View file

@ -58,6 +58,8 @@ livekit-protocol = { version = "0.3", optional = true }
livekit-api = { version = "0.3", default-features = false, features = ["signal-client", "access-token", "native-tls"], optional = true } livekit-api = { version = "0.3", default-features = false, features = ["signal-client", "access-token", "native-tls"], optional = true }
warp = {version = "0.3", optional = true } warp = {version = "0.3", optional = true }
ctrlc = {version = "3.4.0", optional = true }
[dev-dependencies] [dev-dependencies]
gst-plugin-rtp = { path = "../rtp" } gst-plugin-rtp = { path = "../rtp" }
@ -88,7 +90,7 @@ aws = ["dep:aws-config", "dep:aws-types", "dep:aws-credential-types", "dep:aws-s
"dep:aws-sdk-kinesisvideosignaling", "dep:data-encoding", "dep:http", "dep:url-escape"] "dep:aws-sdk-kinesisvideosignaling", "dep:data-encoding", "dep:http", "dep:url-escape"]
janus = ["dep:http"] janus = ["dep:http"]
livekit = ["dep:livekit-protocol", "dep:livekit-api"] livekit = ["dep:livekit-protocol", "dep:livekit-api"]
whip = ["dep:async-recursion", "dep:reqwest", "dep:warp"] whip = ["dep:async-recursion", "dep:reqwest", "dep:warp", "dep:ctrlc"]
[package.metadata.capi] [package.metadata.capi]
min_version = "0.9.21" min_version = "0.9.21"
@ -118,3 +120,7 @@ name = "webrtc-precise-sync-send"
[[example]] [[example]]
name = "webrtc-precise-sync-recv" name = "webrtc-precise-sync-recv"
[[example]]
name = "whipserver"
required-features = [ "whip" ]

View file

@ -0,0 +1,155 @@
use std::process::exit;
use anyhow::Error;
use clap::Parser;
use gst::prelude::*;
#[derive(Parser, Debug)]
struct Args {
host_addr: String,
}
fn link_video(pad: &gst::Pad, pipeline: &gst::Pipeline) {
let q = gst::ElementFactory::make_with_name(
"queue",
Some(format!("queue_{}", pad.name()).as_str()),
)
.unwrap();
let vsink = gst::ElementFactory::make_with_name(
"autovideosink",
Some(format!("vsink_{}", pad.name()).as_str()),
)
.unwrap();
pipeline.add_many([&q, &vsink]).unwrap();
gst::Element::link_many([&q, &vsink]).unwrap();
let qsinkpad = q.static_pad("sink").unwrap();
pad.link(&qsinkpad).expect("linking should work");
q.sync_state_with_parent().unwrap();
vsink.sync_state_with_parent().unwrap();
}
fn unlink_video(pad: &gst::Pad, pipeline: &gst::Pipeline) {
let q = pipeline
.by_name(format!("queue_{}", pad.name()).as_str())
.unwrap();
let vsink = pipeline
.by_name(format!("vsink_{}", pad.name()).as_str())
.unwrap();
q.set_state(gst::State::Null).unwrap();
vsink.set_state(gst::State::Null).unwrap();
pipeline.remove_many([&q, &vsink]).unwrap();
}
fn link_audio(pad: &gst::Pad, pipeline: &gst::Pipeline) {
let aq = gst::ElementFactory::make_with_name(
"queue",
Some(format!("aqueue_{}", pad.name()).as_str()),
)
.unwrap();
let asink = gst::ElementFactory::make_with_name(
"autoaudiosink",
Some(format!("asink_{}", pad.name()).as_str()),
)
.unwrap();
pipeline.add_many([&aq, &asink]).unwrap();
gst::Element::link_many([&aq, &asink]).unwrap();
let qsinkpad = aq.static_pad("sink").unwrap();
pad.link(&qsinkpad).expect("linking should work");
aq.sync_state_with_parent().unwrap();
asink.sync_state_with_parent().unwrap();
}
fn unlink_audio(pad: &gst::Pad, pipeline: &gst::Pipeline) {
let aq = pipeline
.by_name(format!("aqueue_{}", pad.name()).as_str())
.unwrap();
let asink = pipeline
.by_name(format!("asink_{}", pad.name()).as_str())
.unwrap();
aq.set_state(gst::State::Null).unwrap();
asink.set_state(gst::State::Null).unwrap();
pipeline.remove_many([&aq, &asink]).unwrap();
}
fn main() -> Result<(), Error> {
gst::init()?;
let args = Args::parse();
let pipeline = gst::Pipeline::builder().build();
let ws = gst::ElementFactory::make("whipserversrc").build()?;
ws.dynamic_cast_ref::<gst::ChildProxy>()
.unwrap()
.set_child_property("signaller::host-addr", args.host_addr);
ws.set_property("enable-data-channel-navigation", true);
let pipe = pipeline.clone();
ws.connect_pad_added(move |_ws, pad| {
if pad.name().contains("video_") {
link_video(pad, &pipe);
} else if pad.name().contains("audio_") {
link_audio(pad, &pipe);
} else {
println!("unknown pad type {}", pad.name());
}
});
let pipe = pipeline.clone();
ws.connect_pad_removed(move |_ws, pad| {
if pad.name().contains("video_") {
unlink_video(pad, &pipe);
} else if pad.name().contains("audio_") {
unlink_audio(pad, &pipe);
} else {
println!("unknown pad type {}", pad.name());
}
});
pipeline.add(&ws)?;
pipeline.set_state(gst::State::Playing)?;
let p = pipeline.clone();
ctrlc::set_handler(move || {
p.set_state(gst::State::Null).unwrap();
exit(0);
})
.expect("Error setting Ctrl-C handler");
let bus = pipeline.bus().expect("Pipeline should have a bus");
for msg in bus.iter_timed(gst::ClockTime::NONE) {
use gst::MessageView;
match msg.view() {
MessageView::Eos(..) => {
println!("EOS");
break;
}
MessageView::Error(err) => {
pipeline.set_state(gst::State::Null)?;
eprintln!(
"Got error from {}: {} ({})",
msg.src()
.map(|s| String::from(s.path_string()))
.unwrap_or_else(|| "None".into()),
err.error(),
err.debug().unwrap_or_else(|| "".into()),
);
break;
}
_ => (),
}
}
pipeline.set_state(gst::State::Null)?;
Ok(())
}