From 43ee6bfc1c991b8fd8117d7f57df2c39f0f9377c Mon Sep 17 00:00:00 2001 From: Taruntej Kanakamalla Date: Wed, 19 Jul 2023 10:35:33 +0530 Subject: [PATCH] net/webrtc: add whipserversrc Implement new signaller WhipServerSignaller - an http server using 'warp' - handlers for the POST, OPTIONS, PATCH and DELETE - fixed path `/whip/endpoint` as the URI - fixed value 'whip-client' as the producer peer id - fixed resource url `/whip/resource/whip-client` Derive whipserversrc element from BaseWebRTCSrc - implement constructed method for ObjectImpl to set non-default signaller, i.e., WhipServerSignaller - bind the properties stun-server and turn-servers to those on the Signaller Connect to 'webrtcbin-ready' signal in the constructor of WhipServerSignaller - it will be emitted by the webrtcsrc when the webrtcbin element is ready - the closure for this signal will in turn connect to webrtcbin's ice-gathering-state and perform send with the answer sdp via the channel - the WhipServer will hold its HTTP response in POST handler until this signal is received or timeout which happens early Part-of: --- Cargo.lock | 106 +++++ docs/plugins/gst_plugins_cache.json | 32 ++ net/webrtc/Cargo.toml | 3 + net/webrtc/README.md | 53 ++- net/webrtc/src/utils.rs | 37 ++ net/webrtc/src/webrtcsrc/imp.rs | 68 +++ net/webrtc/src/webrtcsrc/mod.rs | 15 +- net/webrtc/src/whip_signaller/imp.rs | 626 ++++++++++++++++++++++++++- net/webrtc/src/whip_signaller/mod.rs | 17 +- 9 files changed, 952 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e9dc3d32..18a32890 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1208,6 +1208,16 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.3" @@ -2799,6 +2809,7 @@ dependencies = [ "aws-types", "chrono", "clap", + "crossbeam-channel", "data-encoding", "fastrand", "futures", @@ -2831,6 +2842,7 @@ dependencies = [ "url", "url-escape", "uuid", + "warp", ] [[package]] @@ -4235,6 +4247,16 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "mime_guess" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -4274,6 +4296,24 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "956787520e75e9bd233246045d19f42fb73242759cc57fba9611d940ae96d4b0" +[[package]] +name = "multer" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http", + "httparse", + "log", + "memchr", + "mime", + "spin 0.9.8", + "version_check", +] + [[package]] name = "multimap" version = "0.8.3" @@ -4726,6 +4766,26 @@ dependencies = [ "indexmap 2.1.0", ] +[[package]] +name = "pin-project" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.39", +] + [[package]] name = "pin-project-lite" version = "0.2.13" @@ -5421,6 +5481,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.2.0" @@ -6260,6 +6326,15 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "unicase" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.13" @@ -6437,6 +6512,37 @@ dependencies = [ "try-lock", ] +[[package]] +name = "warp" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e92e22e03ff1230c03a1a8ee37d2f89cd489e2e541b7550d6afad96faed169" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "headers", + "http", + "hyper", + "log", + "mime", + "mime_guess", + "multer", + "percent-encoding", + "pin-project", + "rustls-pemfile", + "scoped-tls", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-stream", + "tokio-tungstenite", + "tokio-util", + "tower-service", + "tracing", +] + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 1539cec5..fc7ebb5d 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -6499,6 +6499,38 @@ } }, "rank": "none" + }, + "whipserversrc": { + "author": "Taruntej Kanakamalla ", + "description": "WebRTC source element using WHIP Server as the signaller", + "hierarchy": [ + "GstWhipServerSrc", + "GstBaseWebRTCSrc", + "GstBin", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "interfaces": [ + "GstChildProxy" + ], + "klass": "Source/Network/WebRTC", + "pad-templates": { + "audio_%%u": { + "caps": "audio/x-raw(ANY):\napplication/x-rtp:\naudio/x-opus:\n", + "direction": "src", + "presence": "sometimes", + "type": "GstWebRTCSrcPad" + }, + "video_%%u": { + "caps": "video/x-raw(ANY):\napplication/x-rtp:\nvideo/x-vp8:\nvideo/x-h264:\nvideo/x-vp9:\nvideo/x-h265:\n", + "direction": "src", + "presence": "sometimes", + "type": "GstWebRTCSrcPad" + } + }, + "rank": "primary" } }, "filename": "gstrswebrtc", diff --git a/net/webrtc/Cargo.toml b/net/webrtc/Cargo.toml index 02f8a81f..6a8b2e3f 100644 --- a/net/webrtc/Cargo.toml +++ b/net/webrtc/Cargo.toml @@ -54,6 +54,9 @@ async-recursion = "1.0.0" livekit-protocol = { version = "0.2" } livekit-api = { version = "0.2", default-features = false, features = ["signal-client", "access-token", "native-tls"] } +warp = "0.3" +crossbeam-channel = "0.5" + [dev-dependencies] tracing = { version = "0.1", features = ["log"] } tracing-subscriber = { version = "0.3", features = ["registry", "env-filter"] } diff --git a/net/webrtc/README.md b/net/webrtc/README.md index 97c5e597..7dbc9eba 100644 --- a/net/webrtc/README.md +++ b/net/webrtc/README.md @@ -245,7 +245,11 @@ AWS_ACCESS_KEY_ID="XXX" AWS_SECRET_ACCESS_KEY="XXX" gst-launch-1.0 videotestsrc ## Using the WHIP Signaller -Testing the whip signaller can be done by setting up janus and +### WHIP Client + +WHIP Client Signaller uses BaseWebRTCSink + +Testing the whip client as the signaller can be done by setting up janus and . * Set up a [janus] instance with the videoroom plugin configured @@ -269,6 +273,53 @@ gst-launch-1.0 -e uridecodebin uri=file:///home/meh/path/to/video/file ! \ You should see a second video displayed in the videoroomtest web page. +### WHIP Server + +WHIP Server Signaller uses BaseWebRTCSrc + +The WHIP Server as the signaller can be tested in two ways. + +Note: The initial version of `whipserversrc` does not check any auth or encryption. +Host application using `whipserversrc` behind an HTTP(s) proxy to enforce the auth and encryption between the WHIP client and server + +#### 1. Using the Gstreamer element `whipwebrtcsink` + +a. In one tab of the terminal start the WHIP server using the below command + +``` shell +RUST_BACKTRACE=full GST_DEBUG=webrtc*:6 GST_PLUGIN_PATH=target/x86_64-unknown-linux-gnu/debug:$GST_PLUGIN_PATH gst-launch-1.0 whipserversrc signaller::host-addr=http://127.0.0.1:8190 stun-server="stun://stun.l.google.com:19302" turn-servers="\<\"turns://user1:pass1@turn.serverone.com:7806\", \"turn://user2:pass2@turn.servertwo.com:7809\"\>" ! videoconvert ! autovideosink +``` + +b. In the second tab start the WHIP Client by sending a test video as shown in the below command + +``` shell +RUST_BACKTRACE=full GST_DEBUG=webrtc*:6 GST_PLUGIN_PATH=target/x86_64-unknown-linux-gnu/debug:$GST_PLUGIN_PATH gst-launch-1.0 videotestsrc ! videoconvert ! video/x-raw ! queue ! \ + whipwebrtcsink name=ws signaller::whip-endpoint="http://127.0.0.1:8190/whip/endpoint" +``` + +#### 2. Using Meetecho's `simple-whip-client` + +Set up the simple whip client using using the instructions present in https://github.com/meetecho/simple-whip-client#readme + +a. In one tab of the terminal start the WHIP server using the below command + +``` shell +RUST_BACKTRACE=full GST_DEBUG=webrtc*:6 GST_PLUGIN_PATH=target/x86_64-unknown-linux-gnu/debug:$GST_PLUGIN_PATH gst-launch-1.0 whipserversrc signaller::host-addr=http://127.0.0.1:8190 stun-server="stun://stun.l.google.com:19302" turn-servers="\<\"turns://user1:pass1@turn.serverone.com:7806\", \"turn://user2:pass2@turn.servertwo.com:7809\"\>" name=ws ! videoconvert ! autovideosink ws. ! audioconvert ! autoaudiosink +``` + +b. In the second tab start the `simple-whip-client` as shown in the below command + +``` shell +./whip-client --url http://127.0.0.1:8190/whip/endpoint \ + -A "audiotestsrc is-live=true wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay pt=100 ssrc=1 ! queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=100" \ + -V "videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! vp8enc deadline=1 ! rtpvp8pay pt=96 ssrc=2 ! queue ! application/x-rtp,media=video,encoding-name=VP8,payload=96" \ + -S stun://stun.l.google.com:19302 \ + -l 7 \ + -n true +``` + +Terminating the client will close the session and the client should receive 200 (OK) as the response to the DELETE request + ## Using the LiveKit Signaller Testing the LiveKit signaller can be done by setting up [LiveKit] and creating a room. diff --git a/net/webrtc/src/utils.rs b/net/webrtc/src/utils.rs index 6b28ee96..808bc09a 100644 --- a/net/webrtc/src/utils.rs +++ b/net/webrtc/src/utils.rs @@ -357,6 +357,43 @@ pub fn set_ice_servers( Ok(()) } +pub fn build_link_header(url_str: &str) -> Result { + let url = url::Url::parse(url_str)?; + + let mut link_str: String = "<".to_owned() + url.scheme(); + if let Some(host) = url.host_str() { + link_str = link_str + ":" + host; + } + + if let Some(port) = url.port() { + link_str = link_str + ":" + port.to_string().as_str(); + } + + link_str += url.path(); + + if let Some(query) = url.query() { + link_str = link_str + "?" + query; + } + + link_str += ">"; + + if let Some(password) = url.password() { + link_str = link_str + + "; " + + "rel=\"ice-server\"" + + "; " + + "username=\"" + + url.username() + + "\"; " + + "credential:\"" + + password + + "\"; " + + "credential-type:\"password\";"; + } + + Ok(link_str) +} + /// Wrapper around `gst::ElementFactory::make` with a better error /// message pub fn make_element(element: &str, name: Option<&str>) -> Result { diff --git a/net/webrtc/src/webrtcsrc/imp.rs b/net/webrtc/src/webrtcsrc/imp.rs index 42a05108..5dbc53eb 100644 --- a/net/webrtc/src/webrtcsrc/imp.rs +++ b/net/webrtc/src/webrtcsrc/imp.rs @@ -5,6 +5,7 @@ use gst::prelude::*; use crate::signaller::{prelude::*, Signallable, Signaller}; use crate::utils::{Codec, Codecs, NavigationEvent, AUDIO_CAPS, RTP_CAPS, VIDEO_CAPS}; use crate::webrtcsrc::WebRTCSrcPad; +use crate::whip_signaller::WhipServerSignaller; use anyhow::{Context, Error}; use gst::glib; use gst::glib::once_cell::sync::Lazy; @@ -552,6 +553,9 @@ impl BaseWebRTCSrc { }), ); + self.signaller() + .emit_by_name::<()>("webrtcbin-ready", &[&"none", &webrtcbin]); + bin.add(&webrtcbin).unwrap(); self.obj().add(&bin).context("Could not add `webrtcbin`")?; @@ -995,6 +999,16 @@ impl BaseWebRTCSrc { gst::info!(CAT, imp: self, "Stopped signaller"); } } + + pub fn set_signaller(&self, signaller: Signallable) -> Result<(), Error> { + let sigobj = signaller.clone(); + let mut settings = self.settings.lock().unwrap(); + + self.connect_signaller(&sigobj); + settings.signaller = signaller; + + Ok(()) + } } impl ElementImpl for BaseWebRTCSrc { @@ -1225,3 +1239,57 @@ impl ObjectSubclass for WebRTCSrc { type ParentType = super::BaseWebRTCSrc; type Interfaces = (gst::URIHandler,); } + +#[derive(Default)] +pub struct WhipServerSrc {} + +impl ObjectImpl for WhipServerSrc { + fn constructed(&self) { + self.parent_constructed(); + let element = self.obj(); + let ws = element.upcast_ref::().imp(); + + let _ = ws.set_signaller(WhipServerSignaller::default().upcast()); + + let obj = &*self.obj(); + + obj.set_suppressed_flags(gst::ElementFlags::SINK | gst::ElementFlags::SOURCE); + obj.set_element_flags(gst::ElementFlags::SOURCE); + + let settings = ws.settings.lock().unwrap(); + element + .bind_property("stun-server", &settings.signaller, "stun-server") + .build(); + element + .bind_property("turn-servers", &settings.signaller, "turn-servers") + .build(); + } +} + +impl GstObjectImpl for WhipServerSrc {} + +impl BinImpl for WhipServerSrc {} + +impl ElementImpl for WhipServerSrc { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "WhipServerSrc", + "Source/Network/WebRTC", + "WebRTC source element using WHIP Server as the signaller", + "Taruntej Kanakamalla ", + ) + }); + + Some(&*ELEMENT_METADATA) + } +} + +impl BaseWebRTCSrcImpl for WhipServerSrc {} + +#[glib::object_subclass] +impl ObjectSubclass for WhipServerSrc { + const NAME: &'static str = "GstWhipServerSrc"; + type Type = super::WhipServerSrc; + type ParentType = super::BaseWebRTCSrc; +} diff --git a/net/webrtc/src/webrtcsrc/mod.rs b/net/webrtc/src/webrtcsrc/mod.rs index b85612bc..45bef4ba 100644 --- a/net/webrtc/src/webrtcsrc/mod.rs +++ b/net/webrtc/src/webrtcsrc/mod.rs @@ -49,6 +49,10 @@ glib::wrapper! { pub struct WebRTCSrc(ObjectSubclass) @extends BaseWebRTCSrc, gst::Bin, gst::Element, gst::Object, @implements gst::URIHandler, gst::ChildProxy; } +glib::wrapper! { + pub struct WhipServerSrc(ObjectSubclass) @extends BaseWebRTCSrc, gst::Bin, gst::Element, gst::Object, @implements gst::URIHandler, gst::ChildProxy; +} + glib::wrapper! { pub struct WebRTCSrcPad(ObjectSubclass) @extends gst::GhostPad, gst::ProxyPad, gst::Pad, gst::Object; } @@ -63,5 +67,14 @@ pub fn register(plugin: Option<&gst::Plugin>) -> Result<(), glib::BoolError> { "webrtcsrc", gst::Rank::PRIMARY, WebRTCSrc::static_type(), - ) + )?; + + gst::Element::register( + plugin, + "whipserversrc", + gst::Rank::PRIMARY, + WhipServerSrc::static_type(), + )?; + + Ok(()) } diff --git a/net/webrtc/src/whip_signaller/imp.rs b/net/webrtc/src/whip_signaller/imp.rs index 741a50c6..50ff91f6 100644 --- a/net/webrtc/src/whip_signaller/imp.rs +++ b/net/webrtc/src/whip_signaller/imp.rs @@ -2,20 +2,35 @@ use crate::signaller::{Signallable, SignallableImpl}; use crate::utils::{ - build_reqwest_client, parse_redirect_location, set_ice_servers, wait, wait_async, WaitError, + build_link_header, build_reqwest_client, parse_redirect_location, set_ice_servers, wait, + wait_async, WaitError, }; use crate::RUNTIME; use async_recursion::async_recursion; -use gst::glib; use gst::glib::once_cell::sync::Lazy; +use gst::glib::{self, RustClosure}; use gst::prelude::*; use gst::subclass::prelude::*; +use gst_sdp::SDPMessage; use gst_webrtc::{WebRTCICEGatheringState, WebRTCSessionDescription}; use reqwest::header::HeaderMap; use reqwest::header::HeaderValue; use reqwest::StatusCode; use std::sync::Mutex; +use core::time::Duration; +use crossbeam_channel::unbounded; +use std::net::SocketAddr; +use url::Url; +use warp::{ + http, + hyper::{ + header::{CONTENT_TYPE, LINK}, + Body, + }, + Filter, Reply, +}; + static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( "webrtc-whip-signaller", @@ -27,6 +42,15 @@ static CAT: Lazy = Lazy::new(|| { const MAX_REDIRECTS: u8 = 10; const DEFAULT_TIMEOUT: u32 = 15; +const ROOT: &str = "whip"; +const ENDPOINT_PATH: &str = "endpoint"; +const RESOURCE_PATH: &str = "resource"; +const DEFAULT_HOST_ADDR: &str = "http://127.0.0.1:8080"; +const DEFAULT_STUN_SERVER: Option<&str> = Some("stun://stun.l.google.com:19303"); +const DEFAULT_PRODUCER_PEER_ID: Option<&str> = Some("whip-client"); +const CONTENT_SDP: &str = "application/sdp"; +const CONTENT_TRICKLE_ICE: &str = "application/trickle-ice-sdpfrag"; + #[derive(Debug)] enum WhipClientState { Stopped, @@ -588,3 +612,601 @@ impl ObjectImpl for WhipClient { } } } + +// WHIP server implementation + +#[derive(Debug)] +enum WhipServerState { + Idle, + Negotiating, + Ready, +} + +impl Default for WhipServerState { + fn default() -> Self { + Self::Idle + } +} + +struct WhipServerSettings { + stun_server: Option, + turn_servers: gst::Array, + host_addr: Url, + producer_peer_id: Option, + timeout: u32, + shutdown_signal: Option>, + server_handle: Option>, + sdp_answer: Option>>, +} + +impl Default for WhipServerSettings { + fn default() -> Self { + Self { + host_addr: Url::parse(DEFAULT_HOST_ADDR).unwrap(), + stun_server: DEFAULT_STUN_SERVER.map(String::from), + turn_servers: gst::Array::new(Vec::new() as Vec), + producer_peer_id: DEFAULT_PRODUCER_PEER_ID.map(String::from), + timeout: DEFAULT_TIMEOUT, + shutdown_signal: None, + server_handle: None, + sdp_answer: None, + } + } +} + +pub struct WhipServer { + state: Mutex, + settings: Mutex, +} + +impl Default for WhipServer { + fn default() -> Self { + Self { + settings: Mutex::new(WhipServerSettings::default()), + state: Mutex::new(WhipServerState::default()), + } + } +} + +#[derive(Debug)] +struct InternalError; + +impl warp::reject::Reject for InternalError {} + +impl WhipServer { + pub fn on_webrtcbin_ready(&self) -> RustClosure { + glib::closure!(|signaller: &super::WhipServerSignaller, + _producer_identifier: &str, + webrtcbin: &gst::Element| { + let obj_weak = signaller.downgrade(); + webrtcbin.connect_notify(Some("ice-gathering-state"), move |webrtcbin, _pspec| { + let obj = match obj_weak.upgrade() { + Some(obj) => obj, + None => return, + }; + + let state = webrtcbin.property::("ice-gathering-state"); + + match state { + WebRTCICEGatheringState::Gathering => { + gst::info!(CAT, obj: obj, "ICE gathering started"); + } + WebRTCICEGatheringState::Complete => { + gst::info!(CAT, obj: obj, "ICE gathering complete"); + let ans: Option; + let settings = obj.imp().settings.lock().unwrap(); + if let Some(answer_sdp) = webrtcbin + .property::>("local-description") + { + ans = Some(answer_sdp.sdp()); + } else { + ans = None; + } + if let Some(tx) = &settings.sdp_answer { + tx.send(ans).unwrap() + } + } + _ => (), + } + }); + }) + } + + async fn patch_handler(&self, _id: String) -> Result { + // FIXME: implement ICE Trickle and ICE restart + // emit signal `handle-ice` to for ICE trickle + let reply = warp::reply::reply(); + let res = warp::reply::with_status(reply, http::StatusCode::NOT_IMPLEMENTED); + Ok(res.into_response()) + + //FIXME: add state checking once ICE trickle is implemented + } + + async fn delete_handler(&self, _id: String) -> Result { + let mut state = self.state.lock().unwrap(); + match *state { + WhipServerState::Ready => { + // FIXME: session-ended will make webrtcsrc send EOS + // and producer-removed is not handled + // Need to address the usecase where when the client terminates + // the webrtcsrc should be running without sending EOS and reset + // for next client connection like a usual server + + self.obj().emit_by_name::("session-ended", &[&ROOT]); + + gst::info!(CAT, imp:self, "Ending session"); + *state = WhipServerState::Idle; + Ok(warp::reply::reply().into_response()) + } + _ => { + gst::error!(CAT, imp: self, "DELETE requested in {state:?} state. Can't Proceed"); + let res = http::Response::builder() + .status(http::StatusCode::CONFLICT) + .body(Body::from(String::from("Session not Ready"))) + .unwrap(); + Ok(res) + } + } + } + + async fn options_handler(&self) -> Result { + let settings = self.settings.lock().unwrap(); + let peer_id = settings.producer_peer_id.clone().unwrap(); + drop(settings); + + let mut state = self.state.lock().unwrap(); + match *state { + WhipServerState::Idle => { + self.obj() + .emit_by_name::<()>("session-started", &[&ROOT, &peer_id]); + *state = WhipServerState::Negotiating + } + WhipServerState::Ready => { + gst::error!(CAT, imp: self, "OPTIONS requested in {state:?} state. Can't proceed"); + let res = http::Response::builder() + .status(http::StatusCode::CONFLICT) + .body(Body::from(String::from("Session active already"))) + .unwrap(); + return Ok(res); + } + _ => {} + }; + drop(state); + + let mut links = HeaderMap::new(); + let settings = self.settings.lock().unwrap(); + match &settings.stun_server { + Some(stun) => match build_link_header(stun.as_str()) { + Ok(stun_link) => { + links.append(LINK, HeaderValue::from_str(stun_link.as_str()).unwrap()); + } + Err(e) => { + gst::error!(CAT, imp: self, "Failed to parse {stun:?} : {e:?}"); + } + }, + None => {} + } + + if !settings.turn_servers.is_empty() { + for turn_server in settings.turn_servers.iter() { + if let Ok(turn) = turn_server.get::() { + gst::debug!(CAT, imp: self, "turn server: {}",turn.as_str()); + match build_link_header(turn.as_str()) { + Ok(turn_link) => { + links.append(LINK, HeaderValue::from_str(turn_link.as_str()).unwrap()); + } + Err(e) => { + gst::error!(CAT, imp: self, "Failed to parse {turn_server:?} : {e:?}"); + } + } + } else { + gst::debug!(CAT, imp: self, "Failed to get String value of {turn_server:?}"); + } + } + } + + let mut res = http::Response::builder() + .header("Access-Post", "application/sdp") + .body(Body::empty()) + .unwrap(); + + let headers = res.headers_mut(); + headers.extend(links); + + Ok(res) + } + + async fn post_handler( + &self, + body: warp::hyper::body::Bytes, + ) -> Result, warp::Rejection> { + let mut settings = self.settings.lock().unwrap(); + let peer_id = settings.producer_peer_id.clone().unwrap(); + let wait_timeout = settings.timeout; + let (tx, rx) = unbounded::>(); + settings.sdp_answer = Some(tx); + drop(settings); + + let mut state = self.state.lock().unwrap(); + match *state { + WhipServerState::Idle => { + self.obj() + .emit_by_name::<()>("session-started", &[&ROOT, &peer_id]); + *state = WhipServerState::Negotiating + } + WhipServerState::Ready => { + gst::error!(CAT, imp: self, "POST requested in {state:?} state. Can't Proceed"); + let res = http::Response::builder() + .status(http::StatusCode::CONFLICT) + .body(Body::from(String::from("Session active already"))) + .unwrap(); + return Ok(res); + } + _ => {} + }; + drop(state); + + match gst_sdp::SDPMessage::parse_buffer(body.as_ref()) { + Ok(offer_sdp) => { + let offer = gst_webrtc::WebRTCSessionDescription::new( + gst_webrtc::WebRTCSDPType::Offer, + offer_sdp, + ); + + self.obj() + .emit_by_name::<()>("session-description", &[&"unique", &offer]); + } + Err(err) => { + gst::error!(CAT, imp: self, "Could not parse offer SDP: {err}"); + let reply = warp::reply::reply(); + let res = warp::reply::with_status(reply, http::StatusCode::NOT_ACCEPTABLE); + return Ok(res.into_response()); + } + } + + // We don't want to wait infinitely for the ice gathering to complete. + let answer = match rx.recv_timeout(Duration::from_secs(wait_timeout as u64)) { + Ok(a) => a, + Err(e) => { + let reply = warp::reply::reply(); + let res; + if e.is_timeout() { + res = warp::reply::with_status(reply, http::StatusCode::REQUEST_TIMEOUT); + gst::error!(CAT, imp: self, "Timedout waiting for SDP answer"); + } else { + res = warp::reply::with_status(reply, http::StatusCode::INTERNAL_SERVER_ERROR); + gst::error!(CAT, imp: self, "Channel got disconnected"); + } + return Ok(res.into_response()); + } + }; + + let settings = self.settings.lock().unwrap(); + let mut links = HeaderMap::new(); + + match &settings.stun_server { + Some(stun) => match build_link_header(stun.as_str()) { + Ok(stun_link) => { + links.append(LINK, HeaderValue::from_str(stun_link.as_str()).unwrap()); + } + Err(e) => { + gst::error!(CAT, imp: self, "Failed to parse {stun:?} : {e:?}"); + } + }, + None => {} + } + + if !settings.turn_servers.is_empty() { + for turn_server in settings.turn_servers.iter() { + if let Ok(turn) = turn_server.get::() { + gst::debug!(CAT, imp: self, "turn server: {}",turn.as_str()); + match build_link_header(turn.as_str()) { + Ok(turn_link) => { + links.append(LINK, HeaderValue::from_str(turn_link.as_str()).unwrap()); + } + Err(e) => { + gst::error!(CAT, imp: self, "Failed to parse {turn_server:?} : {e:?}"); + } + } + } else { + gst::error!(CAT, imp: self, "Failed to get String value of {turn_server:?}"); + } + } + } + + // Note: including the ETag in the original "201 Created" response is only REQUIRED + // if the WHIP resource supports ICE restarts and OPTIONAL otherwise. + + let ans_text: Result; + if let Some(sdp) = answer { + match sdp.as_text() { + Ok(text) => { + ans_text = Ok(text); + gst::debug!(CAT, imp: self, "{ans_text:?}"); + } + Err(e) => { + ans_text = Err(format!("Failed to get SDP answer: {e:?}")); + gst::error!(CAT, imp: self, "{e:?}"); + } + } + } else { + let e = "SDP Answer is empty!".to_string(); + gst::error!(CAT, imp: self, "{e:?}"); + ans_text = Err(e); + } + + // If ans_text is an error. Send error code and error string in the response + if let Err(e) = ans_text { + let res = http::Response::builder() + .status(http::StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from(e)) + .unwrap(); + return Ok(res); + } + + drop(settings); + + // Got SDP answer, send answer in the response + let resource_url = "/".to_owned() + ROOT + "/" + RESOURCE_PATH + "/" + &peer_id; + let mut res = http::Response::builder() + .status(StatusCode::CREATED) + .header(CONTENT_TYPE, "application/sdp") + .header("location", resource_url) + .body(Body::from(ans_text.unwrap())) + .unwrap(); + + let headers = res.headers_mut(); + headers.extend(links); + + let mut state = self.state.lock().unwrap(); + *state = WhipServerState::Ready; + drop(state); + + Ok(res) + } + + fn serve(&self) -> Option> { + let mut settings = self.settings.lock().unwrap(); + let addr: SocketAddr; + match settings.host_addr.socket_addrs(|| None) { + Ok(v) => { + // pick the first vector item + addr = v[0]; + gst::info!(CAT, imp:self, "using {addr:?} as address"); + } + Err(e) => { + gst::error!(CAT, imp:self, "error getting addr from uri {e:?}"); + self.obj() + .emit_by_name::<()>("error", &[&format!("Unable to start WHIP Server: {e:?}")]); + return None; + } + } + + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + settings.shutdown_signal = Some(tx); + drop(settings); + + let prefix = warp::path(ROOT); + + let self_weak = self.downgrade(); + + // POST /endpoint + let post_filter = warp::post() + .and(warp::path(ENDPOINT_PATH)) + .and(warp::path::end()) + .and(warp::header::exact(CONTENT_TYPE.as_str(), CONTENT_SDP)) + .and(warp::body::bytes()) + .and_then(move |body| { + let s = self_weak.upgrade(); + async { + let self_ = s.expect("Need to have the ObjectRef"); + self_.post_handler(body).await + } + }); + + let self_weak = self.downgrade(); + + // OPTIONS /endpoint + let options_filter = warp::options() + .and(warp::path(ENDPOINT_PATH)) + .and(warp::path::end()) + .and_then(move || { + let s = self_weak.upgrade(); + async { + let self_ = s.expect("Need to have the ObjectRef"); + self_.options_handler().await + } + }); + + let self_weak = self.downgrade(); + + // PATCH /resource/:id + let patch_filter = warp::patch() + .and(warp::path(RESOURCE_PATH)) + .and(warp::path::param::()) + .and(warp::path::end()) + .and(warp::header::exact( + CONTENT_TYPE.as_str(), + CONTENT_TRICKLE_ICE, + )) + .and_then(move |id| { + let s = self_weak.upgrade(); + async { + let self_ = s.expect("Need to have the ObjectRef"); + self_.patch_handler(id).await + } + }); + + let self_weak = self.downgrade(); + + // DELETE /resource/:id + let delete_filter = warp::delete() + .and(warp::path(RESOURCE_PATH)) + .and(warp::path::param::()) + .and(warp::path::end()) + .and_then(move |id| { + let s = self_weak.upgrade(); + async { + let self_ = s.expect("Need to have the ObjectRef"); + self_.delete_handler(id).await + } + }); + + let api = prefix + .and(post_filter) + .or(prefix.and(options_filter)) + .or(prefix.and(patch_filter)) + .or(prefix.and(delete_filter)); + + let s = warp::serve(api); + let jh = RUNTIME.spawn(async move { + let (_, server) = s.bind_with_graceful_shutdown(addr, async move { + match rx.await { + Ok(_) => gst::debug!(CAT, "Server shut down signal received"), + Err(e) => gst::error!(CAT, "{e:?}: Sender dropped"), + } + }); + + server.await; + gst::debug!(CAT, "Stopped the server task..."); + }); + + gst::debug!(CAT, imp: self, "Started the server..."); + Some(jh) + } + + fn set_host_addr(&self, host_addr: &str) -> Result<(), url::ParseError> { + let mut settings = self.settings.lock().unwrap(); + settings.host_addr = Url::parse(host_addr)?; + Ok(()) + } +} + +impl SignallableImpl for WhipServer { + fn start(&self) { + gst::info!(CAT, imp: self, "starting the WHIP server"); + let jh = self.serve(); + let mut settings = self.settings.lock().unwrap(); + settings.server_handle = jh; + } + + fn stop(&self) { + let mut settings = self.settings.lock().unwrap(); + + let handle = settings + .server_handle + .take() + .expect("Server handle should be set"); + + let tx = settings + .shutdown_signal + .take() + .expect("Shutdown signal Sender needs to be valid"); + + if tx.send(()).is_err() { + gst::error!(CAT, imp: self, "Failed to send shutdown signal. Receiver dropped"); + } + + gst::debug!(CAT, imp: self, "Await server handle to join"); + RUNTIME.block_on(async { + if let Err(e) = handle.await { + gst::error!(CAT, imp:self, "Failed to join server handle: {e:?}"); + }; + }); + + gst::info!(CAT, imp: self, "stopped the WHIP server"); + } + + fn end_session(&self, _session_id: &str) { + //FIXME: send any events to the client + } +} + +#[glib::object_subclass] +impl ObjectSubclass for WhipServer { + const NAME: &'static str = "GstWhipServerSignaller"; + type Type = super::WhipServerSignaller; + type ParentType = glib::Object; + type Interfaces = (Signallable,); +} + +impl ObjectImpl for WhipServer { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecString::builder("host-addr") + .nick("Host address") + .blurb("The the host address of the WHIP endpoint e.g., http://127.0.0.1:8080") + .default_value(DEFAULT_HOST_ADDR) + .flags(glib::ParamFlags::READWRITE) + .build(), + // needed by webrtcsrc in handle_webrtc_src_pad + glib::ParamSpecString::builder("producer-peer-id") + .default_value(DEFAULT_PRODUCER_PEER_ID) + .flags(glib::ParamFlags::READABLE) + .build(), + glib::ParamSpecString::builder("stun-server") + .nick("STUN Server") + .blurb("The STUN server of the form stun://hostname:port") + .default_value(DEFAULT_STUN_SERVER) + .build(), + gst::ParamSpecArray::builder("turn-servers") + .nick("List of TURN Servers to user") + .blurb("The TURN servers of the form <\"turn(s)://username:password@host:port\", \"turn(s)://username1:password1@host1:port1\">") + .element_spec(&glib::ParamSpecString::builder("turn-server") + .nick("TURN Server") + .blurb("The TURN server of the form turn(s)://username:password@host:port.") + .build() + ) + .mutable_ready() + .build(), + glib::ParamSpecUInt::builder("timeout") + .nick("Timeout") + .blurb("Value in seconds to timeout WHIP endpoint requests (0 = No timeout).") + .maximum(3600) + .default_value(DEFAULT_TIMEOUT) + .build(), + ] + }); + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "host-addr" => { + if let Err(e) = + self.set_host_addr(value.get::<&str>().expect("type checked upstream")) + { + gst::error!(CAT, "Couldn't set the host address as {e:?}, fallback to the default value {DEFAULT_HOST_ADDR:?}"); + } + } + "stun-server" => { + let mut settings = self.settings.lock().unwrap(); + settings.stun_server = value + .get::>() + .expect("type checked upstream") + } + "turn-servers" => { + let mut settings = self.settings.lock().unwrap(); + settings.turn_servers = value.get::().expect("type checked upstream") + } + "timeout" => { + let mut settings = self.settings.lock().unwrap(); + settings.timeout = value.get().unwrap(); + } + _ => unimplemented!(), + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + let settings = self.settings.lock().unwrap(); + match pspec.name() { + "host-addr" => settings.host_addr.to_string().to_value(), + "stun-server" => settings.stun_server.to_value(), + "turn-servers" => settings.turn_servers.to_value(), + "producer-peer-id" => settings.producer_peer_id.to_value(), + "timeout" => settings.timeout.to_value(), + _ => unimplemented!(), + } + } +} diff --git a/net/webrtc/src/whip_signaller/mod.rs b/net/webrtc/src/whip_signaller/mod.rs index 16b20d0a..d699b015 100644 --- a/net/webrtc/src/whip_signaller/mod.rs +++ b/net/webrtc/src/whip_signaller/mod.rs @@ -1,7 +1,7 @@ // SPDX-License-Identifier: MPL-2.0 use crate::signaller::Signallable; -use gst::glib; +use gst::{glib, prelude::ObjectExt, subclass::prelude::ObjectSubclassIsExt}; mod imp; @@ -9,6 +9,10 @@ glib::wrapper! { pub struct WhipClientSignaller(ObjectSubclass) @implements Signallable; } +glib::wrapper! { + pub struct WhipServerSignaller(ObjectSubclass) @implements Signallable; +} + unsafe impl Send for WhipClientSignaller {} unsafe impl Sync for WhipClientSignaller {} @@ -17,3 +21,14 @@ impl Default for WhipClientSignaller { glib::Object::new() } } + +unsafe impl Send for WhipServerSignaller {} +unsafe impl Sync for WhipServerSignaller {} + +impl Default for WhipServerSignaller { + fn default() -> Self { + let sig: WhipServerSignaller = glib::Object::new(); + sig.connect_closure("webrtcbin-ready", false, sig.imp().on_webrtcbin_ready()); + sig + } +}