net/webrtc: add whipserversrc

Implement new signaller WhipServerSignaller
 - an http server using 'warp'
 - handlers for the POST, OPTIONS, PATCH and DELETE
 - fixed path `/whip/endpoint` as the URI
 - fixed value 'whip-client' as the producer peer id
 - fixed resource url `/whip/resource/whip-client`

Derive whipserversrc element from BaseWebRTCSrc
 - implement constructed method for ObjectImpl to set
  non-default signaller, i.e., WhipServerSignaller
 - bind the properties stun-server and turn-servers to those on
   the Signaller

Connect to 'webrtcbin-ready' signal in the constructor of WhipServerSignaller
 - it will be emitted by the webrtcsrc when the webrtcbin element is ready
 - the closure for this signal will in turn connect to webrtcbin's ice-gathering-state
   and perform send with the answer sdp via the channel
 - the WhipServer will hold its HTTP response in POST handler until this signal
   is received or timeout which happens early

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1284>
This commit is contained in:
Taruntej Kanakamalla 2023-07-19 10:35:33 +05:30 committed by GStreamer Marge Bot
parent ed3aa740be
commit 43ee6bfc1c
9 changed files with 952 additions and 5 deletions

106
Cargo.lock generated
View file

@ -1208,6 +1208,16 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.3"
@ -2799,6 +2809,7 @@ dependencies = [
"aws-types",
"chrono",
"clap",
"crossbeam-channel",
"data-encoding",
"fastrand",
"futures",
@ -2831,6 +2842,7 @@ dependencies = [
"url",
"url-escape",
"uuid",
"warp",
]
[[package]]
@ -4235,6 +4247,16 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "mime_guess"
version = "2.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef"
dependencies = [
"mime",
"unicase",
]
[[package]]
name = "minimal-lexical"
version = "0.2.1"
@ -4274,6 +4296,24 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "956787520e75e9bd233246045d19f42fb73242759cc57fba9611d940ae96d4b0"
[[package]]
name = "multer"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2"
dependencies = [
"bytes",
"encoding_rs",
"futures-util",
"http",
"httparse",
"log",
"memchr",
"mime",
"spin 0.9.8",
"version_check",
]
[[package]]
name = "multimap"
version = "0.8.3"
@ -4726,6 +4766,26 @@ dependencies = [
"indexmap 2.1.0",
]
[[package]]
name = "pin-project"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.39",
]
[[package]]
name = "pin-project-lite"
version = "0.2.13"
@ -5421,6 +5481,12 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "scoped-tls"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294"
[[package]]
name = "scopeguard"
version = "1.2.0"
@ -6260,6 +6326,15 @@ version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
[[package]]
name = "unicase"
version = "2.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89"
dependencies = [
"version_check",
]
[[package]]
name = "unicode-bidi"
version = "0.3.13"
@ -6437,6 +6512,37 @@ dependencies = [
"try-lock",
]
[[package]]
name = "warp"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1e92e22e03ff1230c03a1a8ee37d2f89cd489e2e541b7550d6afad96faed169"
dependencies = [
"bytes",
"futures-channel",
"futures-util",
"headers",
"http",
"hyper",
"log",
"mime",
"mime_guess",
"multer",
"percent-encoding",
"pin-project",
"rustls-pemfile",
"scoped-tls",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-stream",
"tokio-tungstenite",
"tokio-util",
"tower-service",
"tracing",
]
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"

View file

@ -6499,6 +6499,38 @@
}
},
"rank": "none"
},
"whipserversrc": {
"author": "Taruntej Kanakamalla <taruntej@asymptotic.io>",
"description": "WebRTC source element using WHIP Server as the signaller",
"hierarchy": [
"GstWhipServerSrc",
"GstBaseWebRTCSrc",
"GstBin",
"GstElement",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"interfaces": [
"GstChildProxy"
],
"klass": "Source/Network/WebRTC",
"pad-templates": {
"audio_%%u": {
"caps": "audio/x-raw(ANY):\napplication/x-rtp:\naudio/x-opus:\n",
"direction": "src",
"presence": "sometimes",
"type": "GstWebRTCSrcPad"
},
"video_%%u": {
"caps": "video/x-raw(ANY):\napplication/x-rtp:\nvideo/x-vp8:\nvideo/x-h264:\nvideo/x-vp9:\nvideo/x-h265:\n",
"direction": "src",
"presence": "sometimes",
"type": "GstWebRTCSrcPad"
}
},
"rank": "primary"
}
},
"filename": "gstrswebrtc",

View file

@ -54,6 +54,9 @@ async-recursion = "1.0.0"
livekit-protocol = { version = "0.2" }
livekit-api = { version = "0.2", default-features = false, features = ["signal-client", "access-token", "native-tls"] }
warp = "0.3"
crossbeam-channel = "0.5"
[dev-dependencies]
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3", features = ["registry", "env-filter"] }

View file

@ -245,7 +245,11 @@ AWS_ACCESS_KEY_ID="XXX" AWS_SECRET_ACCESS_KEY="XXX" gst-launch-1.0 videotestsrc
## Using the WHIP Signaller
Testing the whip signaller can be done by setting up janus and
### WHIP Client
WHIP Client Signaller uses BaseWebRTCSink
Testing the whip client as the signaller can be done by setting up janus and
<https://github.com/meetecho/simple-whip-server/>.
* Set up a [janus] instance with the videoroom plugin configured
@ -269,6 +273,53 @@ gst-launch-1.0 -e uridecodebin uri=file:///home/meh/path/to/video/file ! \
You should see a second video displayed in the videoroomtest web page.
### WHIP Server
WHIP Server Signaller uses BaseWebRTCSrc
The WHIP Server as the signaller can be tested in two ways.
Note: The initial version of `whipserversrc` does not check any auth or encryption.
Host application using `whipserversrc` behind an HTTP(s) proxy to enforce the auth and encryption between the WHIP client and server
#### 1. Using the Gstreamer element `whipwebrtcsink`
a. In one tab of the terminal start the WHIP server using the below command
``` shell
RUST_BACKTRACE=full GST_DEBUG=webrtc*:6 GST_PLUGIN_PATH=target/x86_64-unknown-linux-gnu/debug:$GST_PLUGIN_PATH gst-launch-1.0 whipserversrc signaller::host-addr=http://127.0.0.1:8190 stun-server="stun://stun.l.google.com:19302" turn-servers="\<\"turns://user1:pass1@turn.serverone.com:7806\", \"turn://user2:pass2@turn.servertwo.com:7809\"\>" ! videoconvert ! autovideosink
```
b. In the second tab start the WHIP Client by sending a test video as shown in the below command
``` shell
RUST_BACKTRACE=full GST_DEBUG=webrtc*:6 GST_PLUGIN_PATH=target/x86_64-unknown-linux-gnu/debug:$GST_PLUGIN_PATH gst-launch-1.0 videotestsrc ! videoconvert ! video/x-raw ! queue ! \
whipwebrtcsink name=ws signaller::whip-endpoint="http://127.0.0.1:8190/whip/endpoint"
```
#### 2. Using Meetecho's `simple-whip-client`
Set up the simple whip client using using the instructions present in https://github.com/meetecho/simple-whip-client#readme
a. In one tab of the terminal start the WHIP server using the below command
``` shell
RUST_BACKTRACE=full GST_DEBUG=webrtc*:6 GST_PLUGIN_PATH=target/x86_64-unknown-linux-gnu/debug:$GST_PLUGIN_PATH gst-launch-1.0 whipserversrc signaller::host-addr=http://127.0.0.1:8190 stun-server="stun://stun.l.google.com:19302" turn-servers="\<\"turns://user1:pass1@turn.serverone.com:7806\", \"turn://user2:pass2@turn.servertwo.com:7809\"\>" name=ws ! videoconvert ! autovideosink ws. ! audioconvert ! autoaudiosink
```
b. In the second tab start the `simple-whip-client` as shown in the below command
``` shell
./whip-client --url http://127.0.0.1:8190/whip/endpoint \
-A "audiotestsrc is-live=true wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay pt=100 ssrc=1 ! queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=100" \
-V "videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! vp8enc deadline=1 ! rtpvp8pay pt=96 ssrc=2 ! queue ! application/x-rtp,media=video,encoding-name=VP8,payload=96" \
-S stun://stun.l.google.com:19302 \
-l 7 \
-n true
```
Terminating the client will close the session and the client should receive 200 (OK) as the response to the DELETE request
## Using the LiveKit Signaller
Testing the LiveKit signaller can be done by setting up [LiveKit] and creating a room.

View file

@ -357,6 +357,43 @@ pub fn set_ice_servers(
Ok(())
}
pub fn build_link_header(url_str: &str) -> Result<String, url::ParseError> {
let url = url::Url::parse(url_str)?;
let mut link_str: String = "<".to_owned() + url.scheme();
if let Some(host) = url.host_str() {
link_str = link_str + ":" + host;
}
if let Some(port) = url.port() {
link_str = link_str + ":" + port.to_string().as_str();
}
link_str += url.path();
if let Some(query) = url.query() {
link_str = link_str + "?" + query;
}
link_str += ">";
if let Some(password) = url.password() {
link_str = link_str
+ "; "
+ "rel=\"ice-server\""
+ "; "
+ "username=\""
+ url.username()
+ "\"; "
+ "credential:\""
+ password
+ "\"; "
+ "credential-type:\"password\";";
}
Ok(link_str)
}
/// Wrapper around `gst::ElementFactory::make` with a better error
/// message
pub fn make_element(element: &str, name: Option<&str>) -> Result<gst::Element, Error> {

View file

@ -5,6 +5,7 @@ use gst::prelude::*;
use crate::signaller::{prelude::*, Signallable, Signaller};
use crate::utils::{Codec, Codecs, NavigationEvent, AUDIO_CAPS, RTP_CAPS, VIDEO_CAPS};
use crate::webrtcsrc::WebRTCSrcPad;
use crate::whip_signaller::WhipServerSignaller;
use anyhow::{Context, Error};
use gst::glib;
use gst::glib::once_cell::sync::Lazy;
@ -552,6 +553,9 @@ impl BaseWebRTCSrc {
}),
);
self.signaller()
.emit_by_name::<()>("webrtcbin-ready", &[&"none", &webrtcbin]);
bin.add(&webrtcbin).unwrap();
self.obj().add(&bin).context("Could not add `webrtcbin`")?;
@ -995,6 +999,16 @@ impl BaseWebRTCSrc {
gst::info!(CAT, imp: self, "Stopped signaller");
}
}
pub fn set_signaller(&self, signaller: Signallable) -> Result<(), Error> {
let sigobj = signaller.clone();
let mut settings = self.settings.lock().unwrap();
self.connect_signaller(&sigobj);
settings.signaller = signaller;
Ok(())
}
}
impl ElementImpl for BaseWebRTCSrc {
@ -1225,3 +1239,57 @@ impl ObjectSubclass for WebRTCSrc {
type ParentType = super::BaseWebRTCSrc;
type Interfaces = (gst::URIHandler,);
}
#[derive(Default)]
pub struct WhipServerSrc {}
impl ObjectImpl for WhipServerSrc {
fn constructed(&self) {
self.parent_constructed();
let element = self.obj();
let ws = element.upcast_ref::<super::BaseWebRTCSrc>().imp();
let _ = ws.set_signaller(WhipServerSignaller::default().upcast());
let obj = &*self.obj();
obj.set_suppressed_flags(gst::ElementFlags::SINK | gst::ElementFlags::SOURCE);
obj.set_element_flags(gst::ElementFlags::SOURCE);
let settings = ws.settings.lock().unwrap();
element
.bind_property("stun-server", &settings.signaller, "stun-server")
.build();
element
.bind_property("turn-servers", &settings.signaller, "turn-servers")
.build();
}
}
impl GstObjectImpl for WhipServerSrc {}
impl BinImpl for WhipServerSrc {}
impl ElementImpl for WhipServerSrc {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"WhipServerSrc",
"Source/Network/WebRTC",
"WebRTC source element using WHIP Server as the signaller",
"Taruntej Kanakamalla <taruntej@asymptotic.io>",
)
});
Some(&*ELEMENT_METADATA)
}
}
impl BaseWebRTCSrcImpl for WhipServerSrc {}
#[glib::object_subclass]
impl ObjectSubclass for WhipServerSrc {
const NAME: &'static str = "GstWhipServerSrc";
type Type = super::WhipServerSrc;
type ParentType = super::BaseWebRTCSrc;
}

View file

@ -49,6 +49,10 @@ glib::wrapper! {
pub struct WebRTCSrc(ObjectSubclass<imp::WebRTCSrc>) @extends BaseWebRTCSrc, gst::Bin, gst::Element, gst::Object, @implements gst::URIHandler, gst::ChildProxy;
}
glib::wrapper! {
pub struct WhipServerSrc(ObjectSubclass<imp::WhipServerSrc>) @extends BaseWebRTCSrc, gst::Bin, gst::Element, gst::Object, @implements gst::URIHandler, gst::ChildProxy;
}
glib::wrapper! {
pub struct WebRTCSrcPad(ObjectSubclass<pad::WebRTCSrcPad>) @extends gst::GhostPad, gst::ProxyPad, gst::Pad, gst::Object;
}
@ -63,5 +67,14 @@ pub fn register(plugin: Option<&gst::Plugin>) -> Result<(), glib::BoolError> {
"webrtcsrc",
gst::Rank::PRIMARY,
WebRTCSrc::static_type(),
)
)?;
gst::Element::register(
plugin,
"whipserversrc",
gst::Rank::PRIMARY,
WhipServerSrc::static_type(),
)?;
Ok(())
}

View file

@ -2,20 +2,35 @@
use crate::signaller::{Signallable, SignallableImpl};
use crate::utils::{
build_reqwest_client, parse_redirect_location, set_ice_servers, wait, wait_async, WaitError,
build_link_header, build_reqwest_client, parse_redirect_location, set_ice_servers, wait,
wait_async, WaitError,
};
use crate::RUNTIME;
use async_recursion::async_recursion;
use gst::glib;
use gst::glib::once_cell::sync::Lazy;
use gst::glib::{self, RustClosure};
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst_sdp::SDPMessage;
use gst_webrtc::{WebRTCICEGatheringState, WebRTCSessionDescription};
use reqwest::header::HeaderMap;
use reqwest::header::HeaderValue;
use reqwest::StatusCode;
use std::sync::Mutex;
use core::time::Duration;
use crossbeam_channel::unbounded;
use std::net::SocketAddr;
use url::Url;
use warp::{
http,
hyper::{
header::{CONTENT_TYPE, LINK},
Body,
},
Filter, Reply,
};
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"webrtc-whip-signaller",
@ -27,6 +42,15 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
const MAX_REDIRECTS: u8 = 10;
const DEFAULT_TIMEOUT: u32 = 15;
const ROOT: &str = "whip";
const ENDPOINT_PATH: &str = "endpoint";
const RESOURCE_PATH: &str = "resource";
const DEFAULT_HOST_ADDR: &str = "http://127.0.0.1:8080";
const DEFAULT_STUN_SERVER: Option<&str> = Some("stun://stun.l.google.com:19303");
const DEFAULT_PRODUCER_PEER_ID: Option<&str> = Some("whip-client");
const CONTENT_SDP: &str = "application/sdp";
const CONTENT_TRICKLE_ICE: &str = "application/trickle-ice-sdpfrag";
#[derive(Debug)]
enum WhipClientState {
Stopped,
@ -588,3 +612,601 @@ impl ObjectImpl for WhipClient {
}
}
}
// WHIP server implementation
#[derive(Debug)]
enum WhipServerState {
Idle,
Negotiating,
Ready,
}
impl Default for WhipServerState {
fn default() -> Self {
Self::Idle
}
}
struct WhipServerSettings {
stun_server: Option<String>,
turn_servers: gst::Array,
host_addr: Url,
producer_peer_id: Option<String>,
timeout: u32,
shutdown_signal: Option<tokio::sync::oneshot::Sender<()>>,
server_handle: Option<tokio::task::JoinHandle<()>>,
sdp_answer: Option<crossbeam_channel::Sender<Option<SDPMessage>>>,
}
impl Default for WhipServerSettings {
fn default() -> Self {
Self {
host_addr: Url::parse(DEFAULT_HOST_ADDR).unwrap(),
stun_server: DEFAULT_STUN_SERVER.map(String::from),
turn_servers: gst::Array::new(Vec::new() as Vec<glib::SendValue>),
producer_peer_id: DEFAULT_PRODUCER_PEER_ID.map(String::from),
timeout: DEFAULT_TIMEOUT,
shutdown_signal: None,
server_handle: None,
sdp_answer: None,
}
}
}
pub struct WhipServer {
state: Mutex<WhipServerState>,
settings: Mutex<WhipServerSettings>,
}
impl Default for WhipServer {
fn default() -> Self {
Self {
settings: Mutex::new(WhipServerSettings::default()),
state: Mutex::new(WhipServerState::default()),
}
}
}
#[derive(Debug)]
struct InternalError;
impl warp::reject::Reject for InternalError {}
impl WhipServer {
pub fn on_webrtcbin_ready(&self) -> RustClosure {
glib::closure!(|signaller: &super::WhipServerSignaller,
_producer_identifier: &str,
webrtcbin: &gst::Element| {
let obj_weak = signaller.downgrade();
webrtcbin.connect_notify(Some("ice-gathering-state"), move |webrtcbin, _pspec| {
let obj = match obj_weak.upgrade() {
Some(obj) => obj,
None => return,
};
let state = webrtcbin.property::<WebRTCICEGatheringState>("ice-gathering-state");
match state {
WebRTCICEGatheringState::Gathering => {
gst::info!(CAT, obj: obj, "ICE gathering started");
}
WebRTCICEGatheringState::Complete => {
gst::info!(CAT, obj: obj, "ICE gathering complete");
let ans: Option<gst_sdp::SDPMessage>;
let settings = obj.imp().settings.lock().unwrap();
if let Some(answer_sdp) = webrtcbin
.property::<Option<WebRTCSessionDescription>>("local-description")
{
ans = Some(answer_sdp.sdp());
} else {
ans = None;
}
if let Some(tx) = &settings.sdp_answer {
tx.send(ans).unwrap()
}
}
_ => (),
}
});
})
}
async fn patch_handler(&self, _id: String) -> Result<impl warp::Reply, warp::Rejection> {
// FIXME: implement ICE Trickle and ICE restart
// emit signal `handle-ice` to for ICE trickle
let reply = warp::reply::reply();
let res = warp::reply::with_status(reply, http::StatusCode::NOT_IMPLEMENTED);
Ok(res.into_response())
//FIXME: add state checking once ICE trickle is implemented
}
async fn delete_handler(&self, _id: String) -> Result<impl warp::Reply, warp::Rejection> {
let mut state = self.state.lock().unwrap();
match *state {
WhipServerState::Ready => {
// FIXME: session-ended will make webrtcsrc send EOS
// and producer-removed is not handled
// Need to address the usecase where when the client terminates
// the webrtcsrc should be running without sending EOS and reset
// for next client connection like a usual server
self.obj().emit_by_name::<bool>("session-ended", &[&ROOT]);
gst::info!(CAT, imp:self, "Ending session");
*state = WhipServerState::Idle;
Ok(warp::reply::reply().into_response())
}
_ => {
gst::error!(CAT, imp: self, "DELETE requested in {state:?} state. Can't Proceed");
let res = http::Response::builder()
.status(http::StatusCode::CONFLICT)
.body(Body::from(String::from("Session not Ready")))
.unwrap();
Ok(res)
}
}
}
async fn options_handler(&self) -> Result<impl warp::Reply, warp::Rejection> {
let settings = self.settings.lock().unwrap();
let peer_id = settings.producer_peer_id.clone().unwrap();
drop(settings);
let mut state = self.state.lock().unwrap();
match *state {
WhipServerState::Idle => {
self.obj()
.emit_by_name::<()>("session-started", &[&ROOT, &peer_id]);
*state = WhipServerState::Negotiating
}
WhipServerState::Ready => {
gst::error!(CAT, imp: self, "OPTIONS requested in {state:?} state. Can't proceed");
let res = http::Response::builder()
.status(http::StatusCode::CONFLICT)
.body(Body::from(String::from("Session active already")))
.unwrap();
return Ok(res);
}
_ => {}
};
drop(state);
let mut links = HeaderMap::new();
let settings = self.settings.lock().unwrap();
match &settings.stun_server {
Some(stun) => match build_link_header(stun.as_str()) {
Ok(stun_link) => {
links.append(LINK, HeaderValue::from_str(stun_link.as_str()).unwrap());
}
Err(e) => {
gst::error!(CAT, imp: self, "Failed to parse {stun:?} : {e:?}");
}
},
None => {}
}
if !settings.turn_servers.is_empty() {
for turn_server in settings.turn_servers.iter() {
if let Ok(turn) = turn_server.get::<String>() {
gst::debug!(CAT, imp: self, "turn server: {}",turn.as_str());
match build_link_header(turn.as_str()) {
Ok(turn_link) => {
links.append(LINK, HeaderValue::from_str(turn_link.as_str()).unwrap());
}
Err(e) => {
gst::error!(CAT, imp: self, "Failed to parse {turn_server:?} : {e:?}");
}
}
} else {
gst::debug!(CAT, imp: self, "Failed to get String value of {turn_server:?}");
}
}
}
let mut res = http::Response::builder()
.header("Access-Post", "application/sdp")
.body(Body::empty())
.unwrap();
let headers = res.headers_mut();
headers.extend(links);
Ok(res)
}
async fn post_handler(
&self,
body: warp::hyper::body::Bytes,
) -> Result<http::Response<warp::hyper::Body>, warp::Rejection> {
let mut settings = self.settings.lock().unwrap();
let peer_id = settings.producer_peer_id.clone().unwrap();
let wait_timeout = settings.timeout;
let (tx, rx) = unbounded::<Option<SDPMessage>>();
settings.sdp_answer = Some(tx);
drop(settings);
let mut state = self.state.lock().unwrap();
match *state {
WhipServerState::Idle => {
self.obj()
.emit_by_name::<()>("session-started", &[&ROOT, &peer_id]);
*state = WhipServerState::Negotiating
}
WhipServerState::Ready => {
gst::error!(CAT, imp: self, "POST requested in {state:?} state. Can't Proceed");
let res = http::Response::builder()
.status(http::StatusCode::CONFLICT)
.body(Body::from(String::from("Session active already")))
.unwrap();
return Ok(res);
}
_ => {}
};
drop(state);
match gst_sdp::SDPMessage::parse_buffer(body.as_ref()) {
Ok(offer_sdp) => {
let offer = gst_webrtc::WebRTCSessionDescription::new(
gst_webrtc::WebRTCSDPType::Offer,
offer_sdp,
);
self.obj()
.emit_by_name::<()>("session-description", &[&"unique", &offer]);
}
Err(err) => {
gst::error!(CAT, imp: self, "Could not parse offer SDP: {err}");
let reply = warp::reply::reply();
let res = warp::reply::with_status(reply, http::StatusCode::NOT_ACCEPTABLE);
return Ok(res.into_response());
}
}
// We don't want to wait infinitely for the ice gathering to complete.
let answer = match rx.recv_timeout(Duration::from_secs(wait_timeout as u64)) {
Ok(a) => a,
Err(e) => {
let reply = warp::reply::reply();
let res;
if e.is_timeout() {
res = warp::reply::with_status(reply, http::StatusCode::REQUEST_TIMEOUT);
gst::error!(CAT, imp: self, "Timedout waiting for SDP answer");
} else {
res = warp::reply::with_status(reply, http::StatusCode::INTERNAL_SERVER_ERROR);
gst::error!(CAT, imp: self, "Channel got disconnected");
}
return Ok(res.into_response());
}
};
let settings = self.settings.lock().unwrap();
let mut links = HeaderMap::new();
match &settings.stun_server {
Some(stun) => match build_link_header(stun.as_str()) {
Ok(stun_link) => {
links.append(LINK, HeaderValue::from_str(stun_link.as_str()).unwrap());
}
Err(e) => {
gst::error!(CAT, imp: self, "Failed to parse {stun:?} : {e:?}");
}
},
None => {}
}
if !settings.turn_servers.is_empty() {
for turn_server in settings.turn_servers.iter() {
if let Ok(turn) = turn_server.get::<String>() {
gst::debug!(CAT, imp: self, "turn server: {}",turn.as_str());
match build_link_header(turn.as_str()) {
Ok(turn_link) => {
links.append(LINK, HeaderValue::from_str(turn_link.as_str()).unwrap());
}
Err(e) => {
gst::error!(CAT, imp: self, "Failed to parse {turn_server:?} : {e:?}");
}
}
} else {
gst::error!(CAT, imp: self, "Failed to get String value of {turn_server:?}");
}
}
}
// Note: including the ETag in the original "201 Created" response is only REQUIRED
// if the WHIP resource supports ICE restarts and OPTIONAL otherwise.
let ans_text: Result<String, String>;
if let Some(sdp) = answer {
match sdp.as_text() {
Ok(text) => {
ans_text = Ok(text);
gst::debug!(CAT, imp: self, "{ans_text:?}");
}
Err(e) => {
ans_text = Err(format!("Failed to get SDP answer: {e:?}"));
gst::error!(CAT, imp: self, "{e:?}");
}
}
} else {
let e = "SDP Answer is empty!".to_string();
gst::error!(CAT, imp: self, "{e:?}");
ans_text = Err(e);
}
// If ans_text is an error. Send error code and error string in the response
if let Err(e) = ans_text {
let res = http::Response::builder()
.status(http::StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(e))
.unwrap();
return Ok(res);
}
drop(settings);
// Got SDP answer, send answer in the response
let resource_url = "/".to_owned() + ROOT + "/" + RESOURCE_PATH + "/" + &peer_id;
let mut res = http::Response::builder()
.status(StatusCode::CREATED)
.header(CONTENT_TYPE, "application/sdp")
.header("location", resource_url)
.body(Body::from(ans_text.unwrap()))
.unwrap();
let headers = res.headers_mut();
headers.extend(links);
let mut state = self.state.lock().unwrap();
*state = WhipServerState::Ready;
drop(state);
Ok(res)
}
fn serve(&self) -> Option<tokio::task::JoinHandle<()>> {
let mut settings = self.settings.lock().unwrap();
let addr: SocketAddr;
match settings.host_addr.socket_addrs(|| None) {
Ok(v) => {
// pick the first vector item
addr = v[0];
gst::info!(CAT, imp:self, "using {addr:?} as address");
}
Err(e) => {
gst::error!(CAT, imp:self, "error getting addr from uri {e:?}");
self.obj()
.emit_by_name::<()>("error", &[&format!("Unable to start WHIP Server: {e:?}")]);
return None;
}
}
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
settings.shutdown_signal = Some(tx);
drop(settings);
let prefix = warp::path(ROOT);
let self_weak = self.downgrade();
// POST /endpoint
let post_filter = warp::post()
.and(warp::path(ENDPOINT_PATH))
.and(warp::path::end())
.and(warp::header::exact(CONTENT_TYPE.as_str(), CONTENT_SDP))
.and(warp::body::bytes())
.and_then(move |body| {
let s = self_weak.upgrade();
async {
let self_ = s.expect("Need to have the ObjectRef");
self_.post_handler(body).await
}
});
let self_weak = self.downgrade();
// OPTIONS /endpoint
let options_filter = warp::options()
.and(warp::path(ENDPOINT_PATH))
.and(warp::path::end())
.and_then(move || {
let s = self_weak.upgrade();
async {
let self_ = s.expect("Need to have the ObjectRef");
self_.options_handler().await
}
});
let self_weak = self.downgrade();
// PATCH /resource/:id
let patch_filter = warp::patch()
.and(warp::path(RESOURCE_PATH))
.and(warp::path::param::<String>())
.and(warp::path::end())
.and(warp::header::exact(
CONTENT_TYPE.as_str(),
CONTENT_TRICKLE_ICE,
))
.and_then(move |id| {
let s = self_weak.upgrade();
async {
let self_ = s.expect("Need to have the ObjectRef");
self_.patch_handler(id).await
}
});
let self_weak = self.downgrade();
// DELETE /resource/:id
let delete_filter = warp::delete()
.and(warp::path(RESOURCE_PATH))
.and(warp::path::param::<String>())
.and(warp::path::end())
.and_then(move |id| {
let s = self_weak.upgrade();
async {
let self_ = s.expect("Need to have the ObjectRef");
self_.delete_handler(id).await
}
});
let api = prefix
.and(post_filter)
.or(prefix.and(options_filter))
.or(prefix.and(patch_filter))
.or(prefix.and(delete_filter));
let s = warp::serve(api);
let jh = RUNTIME.spawn(async move {
let (_, server) = s.bind_with_graceful_shutdown(addr, async move {
match rx.await {
Ok(_) => gst::debug!(CAT, "Server shut down signal received"),
Err(e) => gst::error!(CAT, "{e:?}: Sender dropped"),
}
});
server.await;
gst::debug!(CAT, "Stopped the server task...");
});
gst::debug!(CAT, imp: self, "Started the server...");
Some(jh)
}
fn set_host_addr(&self, host_addr: &str) -> Result<(), url::ParseError> {
let mut settings = self.settings.lock().unwrap();
settings.host_addr = Url::parse(host_addr)?;
Ok(())
}
}
impl SignallableImpl for WhipServer {
fn start(&self) {
gst::info!(CAT, imp: self, "starting the WHIP server");
let jh = self.serve();
let mut settings = self.settings.lock().unwrap();
settings.server_handle = jh;
}
fn stop(&self) {
let mut settings = self.settings.lock().unwrap();
let handle = settings
.server_handle
.take()
.expect("Server handle should be set");
let tx = settings
.shutdown_signal
.take()
.expect("Shutdown signal Sender needs to be valid");
if tx.send(()).is_err() {
gst::error!(CAT, imp: self, "Failed to send shutdown signal. Receiver dropped");
}
gst::debug!(CAT, imp: self, "Await server handle to join");
RUNTIME.block_on(async {
if let Err(e) = handle.await {
gst::error!(CAT, imp:self, "Failed to join server handle: {e:?}");
};
});
gst::info!(CAT, imp: self, "stopped the WHIP server");
}
fn end_session(&self, _session_id: &str) {
//FIXME: send any events to the client
}
}
#[glib::object_subclass]
impl ObjectSubclass for WhipServer {
const NAME: &'static str = "GstWhipServerSignaller";
type Type = super::WhipServerSignaller;
type ParentType = glib::Object;
type Interfaces = (Signallable,);
}
impl ObjectImpl for WhipServer {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpecString::builder("host-addr")
.nick("Host address")
.blurb("The the host address of the WHIP endpoint e.g., http://127.0.0.1:8080")
.default_value(DEFAULT_HOST_ADDR)
.flags(glib::ParamFlags::READWRITE)
.build(),
// needed by webrtcsrc in handle_webrtc_src_pad
glib::ParamSpecString::builder("producer-peer-id")
.default_value(DEFAULT_PRODUCER_PEER_ID)
.flags(glib::ParamFlags::READABLE)
.build(),
glib::ParamSpecString::builder("stun-server")
.nick("STUN Server")
.blurb("The STUN server of the form stun://hostname:port")
.default_value(DEFAULT_STUN_SERVER)
.build(),
gst::ParamSpecArray::builder("turn-servers")
.nick("List of TURN Servers to user")
.blurb("The TURN servers of the form <\"turn(s)://username:password@host:port\", \"turn(s)://username1:password1@host1:port1\">")
.element_spec(&glib::ParamSpecString::builder("turn-server")
.nick("TURN Server")
.blurb("The TURN server of the form turn(s)://username:password@host:port.")
.build()
)
.mutable_ready()
.build(),
glib::ParamSpecUInt::builder("timeout")
.nick("Timeout")
.blurb("Value in seconds to timeout WHIP endpoint requests (0 = No timeout).")
.maximum(3600)
.default_value(DEFAULT_TIMEOUT)
.build(),
]
});
PROPERTIES.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"host-addr" => {
if let Err(e) =
self.set_host_addr(value.get::<&str>().expect("type checked upstream"))
{
gst::error!(CAT, "Couldn't set the host address as {e:?}, fallback to the default value {DEFAULT_HOST_ADDR:?}");
}
}
"stun-server" => {
let mut settings = self.settings.lock().unwrap();
settings.stun_server = value
.get::<Option<String>>()
.expect("type checked upstream")
}
"turn-servers" => {
let mut settings = self.settings.lock().unwrap();
settings.turn_servers = value.get::<gst::Array>().expect("type checked upstream")
}
"timeout" => {
let mut settings = self.settings.lock().unwrap();
settings.timeout = value.get().unwrap();
}
_ => unimplemented!(),
}
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() {
"host-addr" => settings.host_addr.to_string().to_value(),
"stun-server" => settings.stun_server.to_value(),
"turn-servers" => settings.turn_servers.to_value(),
"producer-peer-id" => settings.producer_peer_id.to_value(),
"timeout" => settings.timeout.to_value(),
_ => unimplemented!(),
}
}
}

View file

@ -1,7 +1,7 @@
// SPDX-License-Identifier: MPL-2.0
use crate::signaller::Signallable;
use gst::glib;
use gst::{glib, prelude::ObjectExt, subclass::prelude::ObjectSubclassIsExt};
mod imp;
@ -9,6 +9,10 @@ glib::wrapper! {
pub struct WhipClientSignaller(ObjectSubclass<imp::WhipClient>) @implements Signallable;
}
glib::wrapper! {
pub struct WhipServerSignaller(ObjectSubclass<imp::WhipServer>) @implements Signallable;
}
unsafe impl Send for WhipClientSignaller {}
unsafe impl Sync for WhipClientSignaller {}
@ -17,3 +21,14 @@ impl Default for WhipClientSignaller {
glib::Object::new()
}
}
unsafe impl Send for WhipServerSignaller {}
unsafe impl Sync for WhipServerSignaller {}
impl Default for WhipServerSignaller {
fn default() -> Self {
let sig: WhipServerSignaller = glib::Object::new();
sig.connect_closure("webrtcbin-ready", false, sig.imp().on_webrtcbin_ready());
sig
}
}