net/webrtc/whip_signaller: multiple client support in the server

- generate a new session id for every new client
use the session id in the resource url

- remove the producer-peer-id property in the WhipServer signaler as it
is redundant to have producer id in a session having only one producer

- read the 'producer-peer-id' property on the signaller conditionally
if it exists else use the session id as producer id

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1339>
This commit is contained in:
Taruntej Kanakamalla 2023-10-27 17:47:28 +05:30 committed by GStreamer Marge Bot
parent de726ca8d2
commit 712d4757c3
4 changed files with 92 additions and 146 deletions

10
Cargo.lock generated
View file

@ -1285,15 +1285,6 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "crossbeam-channel"
version = "0.5.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2"
dependencies = [
"crossbeam-utils",
]
[[package]] [[package]]
name = "crossbeam-deque" name = "crossbeam-deque"
version = "0.8.5" version = "0.8.5"
@ -3005,7 +2996,6 @@ dependencies = [
"aws-types", "aws-types",
"chrono", "chrono",
"clap", "clap",
"crossbeam-channel",
"data-encoding", "data-encoding",
"fastrand", "fastrand",
"futures", "futures",

View file

@ -58,7 +58,6 @@ livekit-protocol = { version = "0.3", optional = true }
livekit-api = { version = "0.3", default-features = false, features = ["signal-client", "access-token", "native-tls"], optional = true } livekit-api = { version = "0.3", default-features = false, features = ["signal-client", "access-token", "native-tls"], optional = true }
warp = {version = "0.3", optional = true } warp = {version = "0.3", optional = true }
crossbeam-channel = { version = "0.5", optional = true }
[dev-dependencies] [dev-dependencies]
gst-plugin-rtp = { path = "../rtp" } gst-plugin-rtp = { path = "../rtp" }
@ -89,7 +88,7 @@ aws = ["dep:aws-config", "dep:aws-types", "dep:aws-credential-types", "dep:aws-s
"dep:aws-sdk-kinesisvideosignaling", "dep:data-encoding", "dep:http", "dep:url-escape"] "dep:aws-sdk-kinesisvideosignaling", "dep:data-encoding", "dep:http", "dep:url-escape"]
janus = ["dep:http"] janus = ["dep:http"]
livekit = ["dep:livekit-protocol", "dep:livekit-api"] livekit = ["dep:livekit-protocol", "dep:livekit-api"]
whip = ["dep:async-recursion", "dep:crossbeam-channel", "dep:reqwest", "dep:warp"] whip = ["dep:async-recursion", "dep:reqwest", "dep:warp"]
[package.metadata.capi] [package.metadata.capi]
min_version = "0.9.21" min_version = "0.9.21"

View file

@ -472,11 +472,24 @@ impl Session {
} }
if let Some(srcpad) = srcpad { if let Some(srcpad) = srcpad {
let producer_id = element let signaller = element.imp().signaller();
.imp()
.signaller() // Signalers like WhipServer do not need a peer producer id as they run as a server
// waiting for a peer connection so they don't have that property. In that case use
// the session id as producer id
// In order to avoid breaking any existing signallers that depend on peer-producer-id,
// continue to use that or fallback to webrtcbin pad's msid if the
// peer-producer-id is None.
let producer_id = if signaller
.has_property("producer-peer-id", Some(Option::<String>::static_type()))
{
signaller
.property::<Option<String>>("producer-peer-id") .property::<Option<String>>("producer-peer-id")
.or_else(|| webrtcbin_pad.property("msid")); .or_else(|| webrtcbin_pad.property("msid"))
} else {
Some(self.id.clone())
};
let encoded_filter = element.emit_by_name::<Option<gst::Element>>( let encoded_filter = element.emit_by_name::<Option<gst::Element>>(
"request-encoded-filter", "request-encoded-filter",

View file

@ -18,9 +18,8 @@ use reqwest::header::HeaderValue;
use reqwest::StatusCode; use reqwest::StatusCode;
use std::sync::Mutex; use std::sync::Mutex;
use core::time::Duration;
use crossbeam_channel::unbounded;
use std::net::SocketAddr; use std::net::SocketAddr;
use tokio::sync::mpsc;
use url::Url; use url::Url;
use warp::{ use warp::{
http, http,
@ -47,7 +46,6 @@ const ENDPOINT_PATH: &str = "endpoint";
const RESOURCE_PATH: &str = "resource"; const RESOURCE_PATH: &str = "resource";
const DEFAULT_HOST_ADDR: &str = "http://127.0.0.1:8080"; const DEFAULT_HOST_ADDR: &str = "http://127.0.0.1:8080";
const DEFAULT_STUN_SERVER: Option<&str> = Some("stun://stun.l.google.com:19303"); const DEFAULT_STUN_SERVER: Option<&str> = Some("stun://stun.l.google.com:19303");
const DEFAULT_PRODUCER_PEER_ID: Option<&str> = Some("whip-client");
const CONTENT_SDP: &str = "application/sdp"; const CONTENT_SDP: &str = "application/sdp";
const CONTENT_TRICKLE_ICE: &str = "application/trickle-ice-sdpfrag"; const CONTENT_TRICKLE_ICE: &str = "application/trickle-ice-sdpfrag";
@ -193,7 +191,7 @@ impl WhipClient {
let mut headermap = HeaderMap::new(); let mut headermap = HeaderMap::new();
headermap.insert( headermap.insert(
reqwest::header::CONTENT_TYPE, reqwest::header::CONTENT_TYPE,
HeaderValue::from_static("application/sdp"), HeaderValue::from_static(CONTENT_SDP),
); );
if let Some(token) = auth_token.as_ref() { if let Some(token) = auth_token.as_ref() {
@ -616,27 +614,14 @@ impl ObjectImpl for WhipClient {
// WHIP server implementation // WHIP server implementation
#[derive(Debug)] #[derive(Debug)]
enum WhipServerState {
Idle,
Negotiating,
Ready,
}
impl Default for WhipServerState {
fn default() -> Self {
Self::Idle
}
}
struct WhipServerSettings { struct WhipServerSettings {
stun_server: Option<String>, stun_server: Option<String>,
turn_servers: gst::Array, turn_servers: gst::Array,
host_addr: Url, host_addr: Url,
producer_peer_id: Option<String>,
timeout: u32, timeout: u32,
shutdown_signal: Option<tokio::sync::oneshot::Sender<()>>, shutdown_signal: Option<tokio::sync::oneshot::Sender<()>>,
server_handle: Option<tokio::task::JoinHandle<()>>, server_handle: Option<tokio::task::JoinHandle<()>>,
sdp_answer: Option<crossbeam_channel::Sender<Option<SDPMessage>>>, sdp_answer: Option<mpsc::Sender<Option<SDPMessage>>>,
} }
impl Default for WhipServerSettings { impl Default for WhipServerSettings {
@ -645,7 +630,6 @@ impl Default for WhipServerSettings {
host_addr: Url::parse(DEFAULT_HOST_ADDR).unwrap(), host_addr: Url::parse(DEFAULT_HOST_ADDR).unwrap(),
stun_server: DEFAULT_STUN_SERVER.map(String::from), stun_server: DEFAULT_STUN_SERVER.map(String::from),
turn_servers: gst::Array::new(Vec::new() as Vec<glib::SendValue>), turn_servers: gst::Array::new(Vec::new() as Vec<glib::SendValue>),
producer_peer_id: DEFAULT_PRODUCER_PEER_ID.map(String::from),
timeout: DEFAULT_TIMEOUT, timeout: DEFAULT_TIMEOUT,
shutdown_signal: None, shutdown_signal: None,
server_handle: None, server_handle: None,
@ -654,18 +638,10 @@ impl Default for WhipServerSettings {
} }
} }
#[derive(Default)]
pub struct WhipServer { pub struct WhipServer {
state: Mutex<WhipServerState>,
settings: Mutex<WhipServerSettings>, settings: Mutex<WhipServerSettings>,
} canceller: Mutex<Option<futures::future::AbortHandle>>,
impl Default for WhipServer {
fn default() -> Self {
Self {
settings: Mutex::new(WhipServerSettings::default()),
state: Mutex::new(WhipServerState::default()),
}
}
} }
impl WhipServer { impl WhipServer {
@ -689,7 +665,7 @@ impl WhipServer {
WebRTCICEGatheringState::Complete => { WebRTCICEGatheringState::Complete => {
gst::info!(CAT, obj: obj, "ICE gathering complete"); gst::info!(CAT, obj: obj, "ICE gathering complete");
let ans: Option<gst_sdp::SDPMessage>; let ans: Option<gst_sdp::SDPMessage>;
let settings = obj.imp().settings.lock().unwrap(); let mut settings = obj.imp().settings.lock().unwrap();
if let Some(answer_desc) = webrtcbin if let Some(answer_desc) = webrtcbin
.property::<Option<WebRTCSessionDescription>>("local-description") .property::<Option<WebRTCSessionDescription>>("local-description")
{ {
@ -697,9 +673,22 @@ impl WhipServer {
} else { } else {
ans = None; ans = None;
} }
if let Some(tx) = &settings.sdp_answer { let tx = settings
tx.send(ans).unwrap() .sdp_answer
.take()
.expect("SDP answer Sender needs to be valid");
let obj_weak = obj.downgrade();
RUNTIME.spawn(async move {
let obj = match obj_weak.upgrade() {
Some(obj) => obj,
None => return,
};
if let Err(e) = tx.send(ans).await {
gst::error!(CAT, obj: obj, "Failed to send SDP {e}");
} }
});
} }
_ => (), _ => (),
} }
@ -717,57 +706,23 @@ impl WhipServer {
//FIXME: add state checking once ICE trickle is implemented //FIXME: add state checking once ICE trickle is implemented
} }
async fn delete_handler(&self, _id: String) -> Result<impl warp::Reply, warp::Rejection> { async fn delete_handler(&self, id: String) -> Result<impl warp::Reply, warp::Rejection> {
let mut state = self.state.lock().unwrap(); if self
match *state { .obj()
WhipServerState::Ready => { .emit_by_name::<bool>("session-ended", &[&id.as_str()])
// FIXME: session-ended will make webrtcsrc send EOS {
// and producer-removed is not handled gst::info!(CAT, imp:self, "Ended session {id}");
// Need to address the usecase where when the client terminates } else {
// the webrtcsrc should be running without sending EOS and reset gst::info!(CAT, imp:self, "Failed to End session {id}");
// for next client connection like a usual server // FIXME: Do we send a different response
}
self.obj().emit_by_name::<bool>("session-ended", &[&ROOT]);
gst::info!(CAT, imp:self, "Ending session");
*state = WhipServerState::Idle;
Ok(warp::reply::reply().into_response()) Ok(warp::reply::reply().into_response())
} }
_ => {
gst::error!(CAT, imp: self, "DELETE requested in {state:?} state. Can't Proceed");
let res = http::Response::builder()
.status(http::StatusCode::CONFLICT)
.body(Body::from(String::from("Session not Ready")))
.unwrap();
Ok(res)
}
}
}
async fn options_handler(&self) -> Result<impl warp::Reply, warp::Rejection> { async fn options_handler(&self) -> Result<impl warp::Reply, warp::Rejection> {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
let peer_id = settings.producer_peer_id.clone().unwrap();
drop(settings); drop(settings);
let mut state = self.state.lock().unwrap();
match *state {
WhipServerState::Idle => {
self.obj()
.emit_by_name::<()>("session-started", &[&ROOT, &peer_id]);
*state = WhipServerState::Negotiating
}
WhipServerState::Ready => {
gst::error!(CAT, imp: self, "OPTIONS requested in {state:?} state. Can't proceed");
let res = http::Response::builder()
.status(http::StatusCode::CONFLICT)
.body(Body::from(String::from("Session active already")))
.unwrap();
return Ok(res);
}
_ => {}
};
drop(state);
let mut links = HeaderMap::new(); let mut links = HeaderMap::new();
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
match &settings.stun_server { match &settings.stun_server {
@ -801,7 +756,7 @@ impl WhipServer {
} }
let mut res = http::Response::builder() let mut res = http::Response::builder()
.header("Access-Post", "application/sdp") .header("Access-Post", CONTENT_SDP)
.body(Body::empty()) .body(Body::empty())
.unwrap(); .unwrap();
@ -815,31 +770,15 @@ impl WhipServer {
&self, &self,
body: warp::hyper::body::Bytes, body: warp::hyper::body::Bytes,
) -> Result<http::Response<warp::hyper::Body>, warp::Rejection> { ) -> Result<http::Response<warp::hyper::Body>, warp::Rejection> {
let session_id = uuid::Uuid::new_v4().to_string();
let (tx, mut rx) = mpsc::channel::<Option<SDPMessage>>(1);
let wait_timeout = {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let peer_id = settings.producer_peer_id.clone().unwrap();
let wait_timeout = settings.timeout; let wait_timeout = settings.timeout;
let (tx, rx) = unbounded::<Option<SDPMessage>>();
settings.sdp_answer = Some(tx); settings.sdp_answer = Some(tx);
drop(settings); drop(settings);
wait_timeout
let mut state = self.state.lock().unwrap();
match *state {
WhipServerState::Idle => {
self.obj()
.emit_by_name::<()>("session-started", &[&ROOT, &peer_id]);
*state = WhipServerState::Negotiating
}
WhipServerState::Ready => {
gst::error!(CAT, imp: self, "POST requested in {state:?} state. Can't Proceed");
let res = http::Response::builder()
.status(http::StatusCode::CONFLICT)
.body(Body::from(String::from("Session active already")))
.unwrap();
return Ok(res);
}
_ => {}
}; };
drop(state);
match gst_sdp::SDPMessage::parse_buffer(body.as_ref()) { match gst_sdp::SDPMessage::parse_buffer(body.as_ref()) {
Ok(offer_sdp) => { Ok(offer_sdp) => {
@ -849,7 +788,9 @@ impl WhipServer {
); );
self.obj() self.obj()
.emit_by_name::<()>("session-description", &[&"unique", &offer]); .emit_by_name::<()>("session-started", &[&session_id, &session_id]);
self.obj()
.emit_by_name::<()>("session-description", &[&session_id, &offer]);
} }
Err(err) => { Err(err) => {
gst::error!(CAT, imp: self, "Could not parse offer SDP: {err}"); gst::error!(CAT, imp: self, "Could not parse offer SDP: {err}");
@ -859,20 +800,32 @@ impl WhipServer {
} }
} }
// We don't want to wait infinitely for the ice gathering to complete. let result = wait_async(&self.canceller, rx.recv(), wait_timeout).await;
let answer = match rx.recv_timeout(Duration::from_secs(wait_timeout as u64)) {
Ok(a) => a, let answer = match result {
Err(e) => { Ok(ans) => match ans {
let reply = warp::reply::reply(); Some(a) => a,
let res; None => {
if e.is_timeout() { let err = "Channel closed, can't receive SDP".to_owned();
res = warp::reply::with_status(reply, http::StatusCode::REQUEST_TIMEOUT); let res = http::Response::builder()
gst::error!(CAT, imp: self, "Timedout waiting for SDP answer"); .status(http::StatusCode::INTERNAL_SERVER_ERROR)
} else { .body(Body::from(err))
res = warp::reply::with_status(reply, http::StatusCode::INTERNAL_SERVER_ERROR); .unwrap();
gst::error!(CAT, imp: self, "Channel got disconnected");
return Ok(res);
} }
return Ok(res.into_response()); },
Err(e) => {
let err = match e {
WaitError::FutureAborted => "Aborted".to_owned(),
WaitError::FutureError(err) => err.to_string(),
};
let res = http::Response::builder()
.status(http::StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(err))
.unwrap();
return Ok(res);
} }
}; };
@ -942,10 +895,10 @@ impl WhipServer {
drop(settings); drop(settings);
// Got SDP answer, send answer in the response // Got SDP answer, send answer in the response
let resource_url = "/".to_owned() + ROOT + "/" + RESOURCE_PATH + "/" + &peer_id; let resource_url = "/".to_owned() + ROOT + "/" + RESOURCE_PATH + "/" + &session_id;
let mut res = http::Response::builder() let mut res = http::Response::builder()
.status(StatusCode::CREATED) .status(StatusCode::CREATED)
.header(CONTENT_TYPE, "application/sdp") .header(CONTENT_TYPE, CONTENT_SDP)
.header("location", resource_url) .header("location", resource_url)
.body(Body::from(ans_text.unwrap())) .body(Body::from(ans_text.unwrap()))
.unwrap(); .unwrap();
@ -953,10 +906,6 @@ impl WhipServer {
let headers = res.headers_mut(); let headers = res.headers_mut();
headers.extend(links); headers.extend(links);
let mut state = self.state.lock().unwrap();
*state = WhipServerState::Ready;
drop(state);
Ok(res) Ok(res)
} }
@ -1112,7 +1061,8 @@ impl SignallableImpl for WhipServer {
gst::info!(CAT, imp: self, "stopped the WHIP server"); gst::info!(CAT, imp: self, "stopped the WHIP server");
} }
fn end_session(&self, _session_id: &str) { fn end_session(&self, session_id: &str) {
gst::info!(CAT, imp: self, "Session {session_id} ended");
//FIXME: send any events to the client //FIXME: send any events to the client
} }
} }
@ -1135,11 +1085,6 @@ impl ObjectImpl for WhipServer {
.default_value(DEFAULT_HOST_ADDR) .default_value(DEFAULT_HOST_ADDR)
.flags(glib::ParamFlags::READWRITE) .flags(glib::ParamFlags::READWRITE)
.build(), .build(),
// needed by webrtcsrc in handle_webrtc_src_pad
glib::ParamSpecString::builder("producer-peer-id")
.default_value(DEFAULT_PRODUCER_PEER_ID)
.flags(glib::ParamFlags::READABLE)
.build(),
glib::ParamSpecString::builder("stun-server") glib::ParamSpecString::builder("stun-server")
.nick("STUN Server") .nick("STUN Server")
.blurb("The STUN server of the form stun://hostname:port") .blurb("The STUN server of the form stun://hostname:port")
@ -1199,7 +1144,6 @@ impl ObjectImpl for WhipServer {
"host-addr" => settings.host_addr.to_string().to_value(), "host-addr" => settings.host_addr.to_string().to_value(),
"stun-server" => settings.stun_server.to_value(), "stun-server" => settings.stun_server.to_value(),
"turn-servers" => settings.turn_servers.to_value(), "turn-servers" => settings.turn_servers.to_value(),
"producer-peer-id" => settings.producer_peer_id.to_value(),
"timeout" => settings.timeout.to_value(), "timeout" => settings.timeout.to_value(),
_ => unimplemented!(), _ => unimplemented!(),
} }