From 712d4757c3a1fd6d30082c2845405ae35646bcf5 Mon Sep 17 00:00:00 2001 From: Taruntej Kanakamalla Date: Fri, 27 Oct 2023 17:47:28 +0530 Subject: [PATCH] net/webrtc/whip_signaller: multiple client support in the server - generate a new session id for every new client use the session id in the resource url - remove the producer-peer-id property in the WhipServer signaler as it is redundant to have producer id in a session having only one producer - read the 'producer-peer-id' property on the signaller conditionally if it exists else use the session id as producer id Part-of: --- Cargo.lock | 10 -- net/webrtc/Cargo.toml | 3 +- net/webrtc/src/webrtcsrc/imp.rs | 23 ++- net/webrtc/src/whip_signaller/imp.rs | 202 ++++++++++----------------- 4 files changed, 92 insertions(+), 146 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4ee92a2f..b716454e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1285,15 +1285,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "crossbeam-channel" -version = "0.5.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "crossbeam-deque" version = "0.8.5" @@ -3005,7 +2996,6 @@ dependencies = [ "aws-types", "chrono", "clap", - "crossbeam-channel", "data-encoding", "fastrand", "futures", diff --git a/net/webrtc/Cargo.toml b/net/webrtc/Cargo.toml index 42a7c5aa..a852b299 100644 --- a/net/webrtc/Cargo.toml +++ b/net/webrtc/Cargo.toml @@ -58,7 +58,6 @@ livekit-protocol = { version = "0.3", optional = true } livekit-api = { version = "0.3", default-features = false, features = ["signal-client", "access-token", "native-tls"], optional = true } warp = {version = "0.3", optional = true } -crossbeam-channel = { version = "0.5", optional = true } [dev-dependencies] gst-plugin-rtp = { path = "../rtp" } @@ -89,7 +88,7 @@ aws = ["dep:aws-config", "dep:aws-types", "dep:aws-credential-types", "dep:aws-s "dep:aws-sdk-kinesisvideosignaling", "dep:data-encoding", "dep:http", "dep:url-escape"] janus = ["dep:http"] livekit = ["dep:livekit-protocol", "dep:livekit-api"] -whip = ["dep:async-recursion", "dep:crossbeam-channel", "dep:reqwest", "dep:warp"] +whip = ["dep:async-recursion", "dep:reqwest", "dep:warp"] [package.metadata.capi] min_version = "0.9.21" diff --git a/net/webrtc/src/webrtcsrc/imp.rs b/net/webrtc/src/webrtcsrc/imp.rs index 60161656..859b80f4 100644 --- a/net/webrtc/src/webrtcsrc/imp.rs +++ b/net/webrtc/src/webrtcsrc/imp.rs @@ -472,11 +472,24 @@ impl Session { } if let Some(srcpad) = srcpad { - let producer_id = element - .imp() - .signaller() - .property::>("producer-peer-id") - .or_else(|| webrtcbin_pad.property("msid")); + let signaller = element.imp().signaller(); + + // Signalers like WhipServer do not need a peer producer id as they run as a server + // waiting for a peer connection so they don't have that property. In that case use + // the session id as producer id + + // In order to avoid breaking any existing signallers that depend on peer-producer-id, + // continue to use that or fallback to webrtcbin pad's msid if the + // peer-producer-id is None. + let producer_id = if signaller + .has_property("producer-peer-id", Some(Option::::static_type())) + { + signaller + .property::>("producer-peer-id") + .or_else(|| webrtcbin_pad.property("msid")) + } else { + Some(self.id.clone()) + }; let encoded_filter = element.emit_by_name::>( "request-encoded-filter", diff --git a/net/webrtc/src/whip_signaller/imp.rs b/net/webrtc/src/whip_signaller/imp.rs index b3b41996..f5317f47 100644 --- a/net/webrtc/src/whip_signaller/imp.rs +++ b/net/webrtc/src/whip_signaller/imp.rs @@ -18,9 +18,8 @@ use reqwest::header::HeaderValue; use reqwest::StatusCode; use std::sync::Mutex; -use core::time::Duration; -use crossbeam_channel::unbounded; use std::net::SocketAddr; +use tokio::sync::mpsc; use url::Url; use warp::{ http, @@ -47,7 +46,6 @@ const ENDPOINT_PATH: &str = "endpoint"; const RESOURCE_PATH: &str = "resource"; const DEFAULT_HOST_ADDR: &str = "http://127.0.0.1:8080"; const DEFAULT_STUN_SERVER: Option<&str> = Some("stun://stun.l.google.com:19303"); -const DEFAULT_PRODUCER_PEER_ID: Option<&str> = Some("whip-client"); const CONTENT_SDP: &str = "application/sdp"; const CONTENT_TRICKLE_ICE: &str = "application/trickle-ice-sdpfrag"; @@ -193,7 +191,7 @@ impl WhipClient { let mut headermap = HeaderMap::new(); headermap.insert( reqwest::header::CONTENT_TYPE, - HeaderValue::from_static("application/sdp"), + HeaderValue::from_static(CONTENT_SDP), ); if let Some(token) = auth_token.as_ref() { @@ -616,27 +614,14 @@ impl ObjectImpl for WhipClient { // WHIP server implementation #[derive(Debug)] -enum WhipServerState { - Idle, - Negotiating, - Ready, -} - -impl Default for WhipServerState { - fn default() -> Self { - Self::Idle - } -} - struct WhipServerSettings { stun_server: Option, turn_servers: gst::Array, host_addr: Url, - producer_peer_id: Option, timeout: u32, shutdown_signal: Option>, server_handle: Option>, - sdp_answer: Option>>, + sdp_answer: Option>>, } impl Default for WhipServerSettings { @@ -645,7 +630,6 @@ impl Default for WhipServerSettings { host_addr: Url::parse(DEFAULT_HOST_ADDR).unwrap(), stun_server: DEFAULT_STUN_SERVER.map(String::from), turn_servers: gst::Array::new(Vec::new() as Vec), - producer_peer_id: DEFAULT_PRODUCER_PEER_ID.map(String::from), timeout: DEFAULT_TIMEOUT, shutdown_signal: None, server_handle: None, @@ -654,18 +638,10 @@ impl Default for WhipServerSettings { } } +#[derive(Default)] pub struct WhipServer { - state: Mutex, settings: Mutex, -} - -impl Default for WhipServer { - fn default() -> Self { - Self { - settings: Mutex::new(WhipServerSettings::default()), - state: Mutex::new(WhipServerState::default()), - } - } + canceller: Mutex>, } impl WhipServer { @@ -689,7 +665,7 @@ impl WhipServer { WebRTCICEGatheringState::Complete => { gst::info!(CAT, obj: obj, "ICE gathering complete"); let ans: Option; - let settings = obj.imp().settings.lock().unwrap(); + let mut settings = obj.imp().settings.lock().unwrap(); if let Some(answer_desc) = webrtcbin .property::>("local-description") { @@ -697,9 +673,22 @@ impl WhipServer { } else { ans = None; } - if let Some(tx) = &settings.sdp_answer { - tx.send(ans).unwrap() - } + let tx = settings + .sdp_answer + .take() + .expect("SDP answer Sender needs to be valid"); + + let obj_weak = obj.downgrade(); + RUNTIME.spawn(async move { + let obj = match obj_weak.upgrade() { + Some(obj) => obj, + None => return, + }; + + if let Err(e) = tx.send(ans).await { + gst::error!(CAT, obj: obj, "Failed to send SDP {e}"); + } + }); } _ => (), } @@ -717,57 +706,23 @@ impl WhipServer { //FIXME: add state checking once ICE trickle is implemented } - async fn delete_handler(&self, _id: String) -> Result { - let mut state = self.state.lock().unwrap(); - match *state { - WhipServerState::Ready => { - // FIXME: session-ended will make webrtcsrc send EOS - // and producer-removed is not handled - // Need to address the usecase where when the client terminates - // the webrtcsrc should be running without sending EOS and reset - // for next client connection like a usual server - - self.obj().emit_by_name::("session-ended", &[&ROOT]); - - gst::info!(CAT, imp:self, "Ending session"); - *state = WhipServerState::Idle; - Ok(warp::reply::reply().into_response()) - } - _ => { - gst::error!(CAT, imp: self, "DELETE requested in {state:?} state. Can't Proceed"); - let res = http::Response::builder() - .status(http::StatusCode::CONFLICT) - .body(Body::from(String::from("Session not Ready"))) - .unwrap(); - Ok(res) - } + async fn delete_handler(&self, id: String) -> Result { + if self + .obj() + .emit_by_name::("session-ended", &[&id.as_str()]) + { + gst::info!(CAT, imp:self, "Ended session {id}"); + } else { + gst::info!(CAT, imp:self, "Failed to End session {id}"); + // FIXME: Do we send a different response } + Ok(warp::reply::reply().into_response()) } async fn options_handler(&self) -> Result { let settings = self.settings.lock().unwrap(); - let peer_id = settings.producer_peer_id.clone().unwrap(); drop(settings); - let mut state = self.state.lock().unwrap(); - match *state { - WhipServerState::Idle => { - self.obj() - .emit_by_name::<()>("session-started", &[&ROOT, &peer_id]); - *state = WhipServerState::Negotiating - } - WhipServerState::Ready => { - gst::error!(CAT, imp: self, "OPTIONS requested in {state:?} state. Can't proceed"); - let res = http::Response::builder() - .status(http::StatusCode::CONFLICT) - .body(Body::from(String::from("Session active already"))) - .unwrap(); - return Ok(res); - } - _ => {} - }; - drop(state); - let mut links = HeaderMap::new(); let settings = self.settings.lock().unwrap(); match &settings.stun_server { @@ -801,7 +756,7 @@ impl WhipServer { } let mut res = http::Response::builder() - .header("Access-Post", "application/sdp") + .header("Access-Post", CONTENT_SDP) .body(Body::empty()) .unwrap(); @@ -815,31 +770,15 @@ impl WhipServer { &self, body: warp::hyper::body::Bytes, ) -> Result, warp::Rejection> { - let mut settings = self.settings.lock().unwrap(); - let peer_id = settings.producer_peer_id.clone().unwrap(); - let wait_timeout = settings.timeout; - let (tx, rx) = unbounded::>(); - settings.sdp_answer = Some(tx); - drop(settings); - - let mut state = self.state.lock().unwrap(); - match *state { - WhipServerState::Idle => { - self.obj() - .emit_by_name::<()>("session-started", &[&ROOT, &peer_id]); - *state = WhipServerState::Negotiating - } - WhipServerState::Ready => { - gst::error!(CAT, imp: self, "POST requested in {state:?} state. Can't Proceed"); - let res = http::Response::builder() - .status(http::StatusCode::CONFLICT) - .body(Body::from(String::from("Session active already"))) - .unwrap(); - return Ok(res); - } - _ => {} + let session_id = uuid::Uuid::new_v4().to_string(); + let (tx, mut rx) = mpsc::channel::>(1); + let wait_timeout = { + let mut settings = self.settings.lock().unwrap(); + let wait_timeout = settings.timeout; + settings.sdp_answer = Some(tx); + drop(settings); + wait_timeout }; - drop(state); match gst_sdp::SDPMessage::parse_buffer(body.as_ref()) { Ok(offer_sdp) => { @@ -849,7 +788,9 @@ impl WhipServer { ); self.obj() - .emit_by_name::<()>("session-description", &[&"unique", &offer]); + .emit_by_name::<()>("session-started", &[&session_id, &session_id]); + self.obj() + .emit_by_name::<()>("session-description", &[&session_id, &offer]); } Err(err) => { gst::error!(CAT, imp: self, "Could not parse offer SDP: {err}"); @@ -859,20 +800,32 @@ impl WhipServer { } } - // We don't want to wait infinitely for the ice gathering to complete. - let answer = match rx.recv_timeout(Duration::from_secs(wait_timeout as u64)) { - Ok(a) => a, - Err(e) => { - let reply = warp::reply::reply(); - let res; - if e.is_timeout() { - res = warp::reply::with_status(reply, http::StatusCode::REQUEST_TIMEOUT); - gst::error!(CAT, imp: self, "Timedout waiting for SDP answer"); - } else { - res = warp::reply::with_status(reply, http::StatusCode::INTERNAL_SERVER_ERROR); - gst::error!(CAT, imp: self, "Channel got disconnected"); + let result = wait_async(&self.canceller, rx.recv(), wait_timeout).await; + + let answer = match result { + Ok(ans) => match ans { + Some(a) => a, + None => { + let err = "Channel closed, can't receive SDP".to_owned(); + let res = http::Response::builder() + .status(http::StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from(err)) + .unwrap(); + + return Ok(res); } - return Ok(res.into_response()); + }, + Err(e) => { + let err = match e { + WaitError::FutureAborted => "Aborted".to_owned(), + WaitError::FutureError(err) => err.to_string(), + }; + let res = http::Response::builder() + .status(http::StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from(err)) + .unwrap(); + + return Ok(res); } }; @@ -942,10 +895,10 @@ impl WhipServer { drop(settings); // Got SDP answer, send answer in the response - let resource_url = "/".to_owned() + ROOT + "/" + RESOURCE_PATH + "/" + &peer_id; + let resource_url = "/".to_owned() + ROOT + "/" + RESOURCE_PATH + "/" + &session_id; let mut res = http::Response::builder() .status(StatusCode::CREATED) - .header(CONTENT_TYPE, "application/sdp") + .header(CONTENT_TYPE, CONTENT_SDP) .header("location", resource_url) .body(Body::from(ans_text.unwrap())) .unwrap(); @@ -953,10 +906,6 @@ impl WhipServer { let headers = res.headers_mut(); headers.extend(links); - let mut state = self.state.lock().unwrap(); - *state = WhipServerState::Ready; - drop(state); - Ok(res) } @@ -1112,7 +1061,8 @@ impl SignallableImpl for WhipServer { gst::info!(CAT, imp: self, "stopped the WHIP server"); } - fn end_session(&self, _session_id: &str) { + fn end_session(&self, session_id: &str) { + gst::info!(CAT, imp: self, "Session {session_id} ended"); //FIXME: send any events to the client } } @@ -1135,11 +1085,6 @@ impl ObjectImpl for WhipServer { .default_value(DEFAULT_HOST_ADDR) .flags(glib::ParamFlags::READWRITE) .build(), - // needed by webrtcsrc in handle_webrtc_src_pad - glib::ParamSpecString::builder("producer-peer-id") - .default_value(DEFAULT_PRODUCER_PEER_ID) - .flags(glib::ParamFlags::READABLE) - .build(), glib::ParamSpecString::builder("stun-server") .nick("STUN Server") .blurb("The STUN server of the form stun://hostname:port") @@ -1199,7 +1144,6 @@ impl ObjectImpl for WhipServer { "host-addr" => settings.host_addr.to_string().to_value(), "stun-server" => settings.stun_server.to_value(), "turn-servers" => settings.turn_servers.to_value(), - "producer-peer-id" => settings.producer_peer_id.to_value(), "timeout" => settings.timeout.to_value(), _ => unimplemented!(), }