Taruntej Kanakamalla 4d6e0f707b net/webrtc: add WHEP server signaller
WHEP server implementation derived from BaseWebRTCSink
2024-04-17 16:22:13 +05:30

581 lines
21 KiB

// SPDX-License-Identifier: MPL-2.0
use crate::signaller::{Signallable, SignallableImpl};
use crate::utils::{build_link_header, wait_async, WaitError};
use crate::RUNTIME;
use once_cell::sync::Lazy;
use gst::glib::{self, RustClosure};
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst_sdp::SDPMessage;
use gst_webrtc::{WebRTCICEGatheringState, WebRTCSessionDescription};
use reqwest::header::HeaderMap;
use reqwest::header::HeaderValue;
use reqwest::StatusCode;
use std::sync::Mutex;
use std::net::SocketAddr;
use tokio::sync::mpsc;
use url::Url;
use warp::{
Filter, Reply,
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
Some("WHEP Server Signaller"),
const DEFAULT_TIMEOUT: u32 = 30;
const ROOT: &str = "whep";
const ENDPOINT_PATH: &str = "endpoint";
const RESOURCE_PATH: &str = "resource";
const DEFAULT_HOST_ADDR: &str = "";
const DEFAULT_STUN_SERVER: Option<&str> = Some("stun://");
const CONTENT_SDP: &str = "application/sdp";
const CONTENT_TRICKLE_ICE: &str = "application/trickle-ice-sdpfrag";
struct Settings {
stun_server: Option<String>,
turn_servers: gst::Array,
host_addr: Url,
timeout: u32,
shutdown_signal: Option<tokio::sync::oneshot::Sender<()>>,
server_handle: Option<tokio::task::JoinHandle<()>>,
sdp_answer: Option<mpsc::Sender<Option<SDPMessage>>>,
impl Default for Settings {
fn default() -> Self {
Self {
host_addr: Url::parse(DEFAULT_HOST_ADDR).unwrap(),
turn_servers: gst::Array::new(Vec::new() as Vec<glib::SendValue>),
shutdown_signal: None,
server_handle: None,
sdp_answer: None,
pub struct WhepServer {
settings: Mutex<Settings>,
canceller: Mutex<Option<futures::future::AbortHandle>>,
impl WhepServer {
pub fn on_webrtcbin_ready(&self) -> RustClosure {
glib::closure!(|signaller: &super::WhepServerSignaller,
_consumer_identifier: &str,
webrtcbin: &gst::Element| {
let obj_weak = signaller.downgrade();
webrtcbin.connect_notify(Some("ice-gathering-state"), move |webrtcbin, _pspec| {
let obj = match obj_weak.upgrade() {
Some(obj) => obj,
None => return,
let state =<WebRTCICEGatheringState>("ice-gathering-state");
match state {
WebRTCICEGatheringState::Gathering => {
gst::info!(CAT, obj: obj, "ICE gathering started");
WebRTCICEGatheringState::Complete => {
gst::info!(CAT, obj: obj, "ICE gathering complete");
let ans: Option<gst_sdp::SDPMessage>;
let mut settings = obj.imp().settings.lock().unwrap();
if let Some(answer_desc) = webrtcbin
ans = Some(answer_desc.sdp().to_owned());
} else {
ans = None;
let tx = settings
.expect("SDP answer Sender needs to be valid");
let obj_weak = obj.downgrade();
RUNTIME.spawn( async move {
let obj = match obj_weak.upgrade() {
Some(obj) => obj,
None => return,
if let Err(e) = tx.send(ans).await {
gst::error!(CAT, obj: obj, "Failed to send SDP {e}");
_ => (),
async fn patch_handler(&self, _id: String) -> Result<impl warp::Reply, warp::Rejection> {
// FIXME: implement ICE Trickle and ICE restart
// emit signal `handle-ice` to for ICE trickle
let reply = warp::reply::reply();
let res = warp::reply::with_status(reply, http::StatusCode::NOT_IMPLEMENTED);
//FIXME: add state checking once ICE trickle is implemented
async fn delete_handler(&self, id: String) -> Result<impl warp::Reply, warp::Rejection> {
if self
.emit_by_name::<bool>("session-ended", &[&id.as_str()])
//do nothing
// FIXME: revisit once the return values are changed in webrtcsink/ and webrtcsrc/
gst::info!(CAT, imp:self, "Ended session {id}");
async fn options_handler(&self) -> Result<impl warp::Reply, warp::Rejection> {
let mut links = HeaderMap::new();
let settings = self.settings.lock().unwrap();
match &settings.stun_server {
Some(stun) => match build_link_header(stun.as_str()) {
Ok(stun_link) => {
links.append(LINK, HeaderValue::from_str(stun_link.as_str()).unwrap());
Err(e) => {
gst::error!(CAT, imp: self, "Failed to parse {stun:?} : {e:?}");
None => {}
if !settings.turn_servers.is_empty() {
for turn_server in settings.turn_servers.iter() {
if let Ok(turn) = turn_server.get::<String>() {
gst::debug!(CAT, imp: self, "turn server: {}",turn.as_str());
match build_link_header(turn.as_str()) {
Ok(turn_link) => {
links.append(LINK, HeaderValue::from_str(turn_link.as_str()).unwrap());
Err(e) => {
gst::error!(CAT, imp: self, "Failed to parse {turn_server:?} : {e:?}");
} else {
gst::debug!(CAT, imp: self, "Failed to get String value of {turn_server:?}");
let mut res = http::Response::builder()
.header("Access-Post", CONTENT_SDP)
let headers = res.headers_mut();
async fn post_handler(
body: warp::hyper::body::Bytes,
) -> Result<http::Response<warp::hyper::Body>, warp::Rejection> {
let session_id = uuid::Uuid::new_v4().to_string();
let (tx, mut rx) = mpsc::channel::<Option<SDPMessage>>(1);
let wait_timeout = {
let mut settings = self.settings.lock().unwrap();
let wait_timeout = settings.timeout;
settings.sdp_answer = Some(tx);
match gst_sdp::SDPMessage::parse_buffer(body.as_ref()) {
Ok(offer_sdp) => {
let offer = gst_webrtc::WebRTCSessionDescription::new(
.emit_by_name::<()>("session-requested", &[&session_id, &session_id, &offer]);
Err(err) => {
gst::error!(CAT, imp: self, "Could not parse offer SDP: {err}");
let reply = warp::reply::reply();
let res = warp::reply::with_status(reply, http::StatusCode::NOT_ACCEPTABLE);
return Ok(res.into_response());
let result = wait_async(&self.canceller, rx.recv(), wait_timeout).await;
let answer = match result {
Ok(ans) => {
match ans {
Some(a) => a,
None => {
let err = "Channel closed, can't receive SDP".to_owned();
let res = http::Response::builder()
return Ok(res);
Err(e) => {
let err = match e {
WaitError::FutureAborted => {
WaitError::FutureError(err) => {
let res = http::Response::builder()
return Ok(res);
let settings = self.settings.lock().unwrap();
let mut links = HeaderMap::new();
if let Some(stun) = &settings.stun_server {
match build_link_header(stun.as_str()) {
Ok(stun_link) => {
links.append(LINK, HeaderValue::from_str(stun_link.as_str()).unwrap());
Err(e) => {
gst::error!(CAT, imp: self, "Failed to parse {stun:?} : {e:?}");
if !settings.turn_servers.is_empty() {
for turn_server in settings.turn_servers.iter() {
if let Ok(turn) = turn_server.get::<String>() {
gst::debug!(CAT, imp: self, "turn server: {}",turn.as_str());
match build_link_header(turn.as_str()) {
Ok(turn_link) => {
links.append(LINK, HeaderValue::from_str(turn_link.as_str()).unwrap());
Err(e) => {
gst::error!(CAT, imp: self, "Failed to parse {turn_server:?} : {e:?}");
} else {
gst::error!(CAT, imp: self, "Failed to get String value of {turn_server:?}");
// Note: including the ETag in the original "201 Created" response is only REQUIRED
// if the WHEP resource supports ICE restarts and OPTIONAL otherwise.
let ans_text: Result<String, String>;
if let Some(sdp) = answer {
match sdp.as_text() {
Ok(text) => {
ans_text = Ok(text);
gst::debug!(CAT, imp: self, "{ans_text:?}");
Err(e) => {
ans_text = Err(format!("Failed to get SDP answer: {e:?}"));
gst::error!(CAT, imp: self, "{e:?}");
} else {
let e = "SDP Answer is empty!".to_string();
gst::error!(CAT, imp: self, "{e:?}");
ans_text = Err(e);
// If ans_text is an error. Send error code and error string in the response
if let Err(e) = ans_text {
let res = http::Response::builder()
return Ok(res);
let resource_url = "/".to_owned() + ROOT + "/" + RESOURCE_PATH + "/" + &session_id;
let mut res = http::Response::builder()
.header("location", resource_url)
let headers = res.headers_mut();
fn serve(&self) -> Option<tokio::task::JoinHandle<()>> {
let mut settings = self.settings.lock().unwrap();
let addr: SocketAddr;
match settings.host_addr.socket_addrs(|| None) {
Ok(v) => {
// pick the first vector item
addr = v[0];
gst::info!(CAT, imp:self, "using {addr:?} as address");
Err(e) => {
gst::error!(CAT, imp:self, "error getting addr from uri {e:?}");
.emit_by_name::<()>("error", &[&format!("Unable to start Whep Server: {e:?}")]);
return None;
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
settings.shutdown_signal = Some(tx);
let prefix = warp::path(ROOT);
let self_weak = self.downgrade();
// POST /endpoint
let post_filter = warp::post()
.and(warp::header::exact(CONTENT_TYPE.as_str(), CONTENT_SDP))
.and_then(move |body| {
let s = self_weak.upgrade();
async {
let self_ = s.expect("Need to have the ObjectRef");
let self_weak = self.downgrade();
// OPTIONS /endpoint
let options_filter = warp::options()
.and_then(move || {
let s = self_weak.upgrade();
async {
let self_ = s.expect("Need to have the ObjectRef");
let self_weak = self.downgrade();
// PATCH /resource/:id
let patch_filter = warp::patch()
.and_then(move |id| {
let s = self_weak.upgrade();
async {
let self_ = s.expect("Need to have the ObjectRef");
let self_weak = self.downgrade();
// DELETE /resource/:id
let delete_filter = warp::delete()
.and_then(move |id| {
let s = self_weak.upgrade();
async {
let self_ = s.expect("Need to have the ObjectRef");
let api = prefix
let s = warp::serve(api);
let f = async move {
let (_, server) = s.bind_with_graceful_shutdown(addr, async move {
match rx.await {
Ok(_) => gst::debug!(CAT, "Server shut down signal received"),
Err(e) => gst::error!(CAT, "{e:?}: Sender dropped"),
gst::debug!(CAT, "Stopped the server task...");
let jh = RUNTIME.spawn(f);
gst::debug!(CAT, imp: self, "Started the server...");
fn set_host_addr(&self, host_addr: &str) -> Result<(), url::ParseError> {
let mut settings = self.settings.lock().unwrap();
settings.host_addr = Url::parse(host_addr)?;
impl SignallableImpl for WhepServer {
fn start(&self) {
gst::info!(CAT, imp: self, "starting the WHEP server");
let jh = self.serve();
let mut settings = self.settings.lock().unwrap();
settings.server_handle = jh;
fn stop(&self) {
let mut settings = self.settings.lock().unwrap();
let handle = settings
.expect("Server handle should be set");
let tx = settings
.expect("Shutdown signal Sender needs to be valid");
if tx.send(()).is_err() {
gst::error!(CAT, imp: self, "Failed to send shutdown signal. Receiver dropped");
gst::debug!(CAT, imp: self, "Await server handle to join");
RUNTIME.block_on(async {
if let Err(e) = handle.await {
gst::error!(CAT, imp:self, "Failed to join server handle: {e:?}");
gst::info!(CAT, imp: self, "stopped the WHEP server");
fn end_session(&self, _session_id: &str) {
//FIXME: send any events to the client
impl ObjectSubclass for WhepServer {
const NAME: &'static str = "GstWhepServerSignaller";
type Type = super::WhepServerSignaller;
type ParentType = glib::Object;
type Interfaces = (Signallable,);
impl ObjectImpl for WhepServer {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
.nick("Host address")
.blurb("The the host address of the WHEP endpoint e.g.,")
.nick("STUN Server")
.blurb("The STUN server of the form stun://hostname:port")
.nick("List of TURN Servers to user")
.blurb("The TURN servers of the form <\"turn(s)://username:password@host:port\", \"turn(s)://username1:password1@host1:port1\">")
.nick("TURN Server")
.blurb("The TURN server of the form turn(s)://username:password@host:port.")
.blurb("Value in seconds to timeout WHEP endpoint requests (0 = No timeout).")
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match {
"host-addr" => {
if let Err(e) =
self.set_host_addr(value.get::<&str>().expect("type checked upstream"))
gst::error!(CAT, "Couldn't set the host address as {e:?}, fallback to the default value {DEFAULT_HOST_ADDR:?}");
"stun-server" => {
let mut settings = self.settings.lock().unwrap();
settings.stun_server = value
.expect("type checked upstream")
"turn-servers" => {
let mut settings = self.settings.lock().unwrap();
settings.turn_servers = value.get::<gst::Array>().expect("type checked upstream")
"timeout" => {
let mut settings = self.settings.lock().unwrap();
settings.timeout = value.get().unwrap();
_ => unimplemented!(),
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match {
"host-addr" => settings.host_addr.to_string().to_value(),
"stun-server" => settings.stun_server.to_value(),
"turn-servers" => settings.turn_servers.to_value(),
"timeout" => settings.timeout.to_value(),
_ => unimplemented!(),