mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2024-11-25 13:01:07 +00:00
webrtcsink: bring in signalling code from whipsink as a signaller
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1168>
This commit is contained in:
parent
f00a169081
commit
84a33ca7b9
10 changed files with 1000 additions and 1 deletions
|
@ -6259,6 +6259,37 @@
|
|||
"when": "last"
|
||||
}
|
||||
}
|
||||
},
|
||||
"whipwebrtcsink": {
|
||||
"author": "Taruntej Kanakamalla <taruntej@asymptotic.io>",
|
||||
"description": "WebRTC sink with WHIP signaller",
|
||||
"hierarchy": [
|
||||
"GstWhipWebRTCSink",
|
||||
"GstBaseWebRTCSink",
|
||||
"GstBin",
|
||||
"GstElement",
|
||||
"GstObject",
|
||||
"GInitiallyUnowned",
|
||||
"GObject"
|
||||
],
|
||||
"interfaces": [
|
||||
"GstChildProxy",
|
||||
"GstNavigation"
|
||||
],
|
||||
"klass": "Sink/Network/WebRTC",
|
||||
"pad-templates": {
|
||||
"audio_%%u": {
|
||||
"caps": "audio/x-raw:\naudio/x-opus:\n",
|
||||
"direction": "sink",
|
||||
"presence": "request"
|
||||
},
|
||||
"video_%%u": {
|
||||
"caps": "video/x-raw:\n\nvideo/x-raw(memory:CUDAMemory):\n\nvideo/x-raw(memory:GLMemory):\n\nvideo/x-raw(memory:NVMM):\nvideo/x-vp8:\nvideo/x-h264:\nvideo/x-vp9:\nvideo/x-h265:\n",
|
||||
"direction": "sink",
|
||||
"presence": "request"
|
||||
}
|
||||
},
|
||||
"rank": "none"
|
||||
}
|
||||
},
|
||||
"filename": "gstrswebrtc",
|
||||
|
|
|
@ -47,6 +47,10 @@ chrono = "0.4"
|
|||
data-encoding = "2.3.3"
|
||||
url-escape = "0.1.1"
|
||||
|
||||
reqwest = { version = "0.11", features = ["default-tls"] }
|
||||
parse_link_header = {version = "0.3", features = ["url"]}
|
||||
async-recursion = "1.0.0"
|
||||
|
||||
[dev-dependencies]
|
||||
tracing = { version = "0.1", features = ["log"] }
|
||||
tracing-subscriber = { version = "0.3", features = ["registry", "env-filter"] }
|
||||
|
|
|
@ -216,6 +216,8 @@ All the rust code in this repository is licensed under the
|
|||
Code in [gstwebrtc-api](gstwebrtc-api) is also licensed under the
|
||||
[Mozilla Public License Version 2.0].
|
||||
|
||||
[Mozilla Public License Version 2.0]: http://opensource.org/licenses/MPL-2.0
|
||||
|
||||
## Using the AWS KVS signaller
|
||||
|
||||
* Setup AWS Kinesis Video Streams
|
||||
|
@ -230,4 +232,31 @@ AWS_ACCESS_KEY_ID="XXX" AWS_SECRET_ACCESS_KEY="XXX" gst-launch-1.0 videotestsrc
|
|||
|
||||
* Connect a viewer @ <https://awslabs.github.io/amazon-kinesis-video-streams-webrtc-sdk-js/examples/index.html>
|
||||
|
||||
[Mozilla Public License Version 2.0]: http://opensource.org/licenses/MPL-2.0
|
||||
## Using the WHIP Signaller
|
||||
|
||||
Testing the whip signaller can be done by setting up janus and
|
||||
<https://github.com/meetecho/simple-whip-server/>.
|
||||
|
||||
* Set up a [janus] instance with the videoroom plugin configured
|
||||
to expose a room with ID 1234 (configuration in `janus.plugin.videoroom.jcfg`)
|
||||
|
||||
* Open the <janus/share/janus/demos/videoroomtest.html> web page, click start
|
||||
and join the room
|
||||
|
||||
* Set up the [simple whip server] as explained in its README
|
||||
|
||||
* Navigate to <http://localhost:7080/>, create an endpoint named room1234
|
||||
pointing to the Janus room with ID 1234
|
||||
|
||||
* Finally, send a stream to the endpoint with:
|
||||
|
||||
``` shell
|
||||
gst-launch-1.0 -e uridecodebin uri=file:///home/meh/path/to/video/file ! \
|
||||
videoconvert ! video/x-raw ! queue ! \
|
||||
whipwebrtcsink name=ws signaller::whip-endpoint="http://127.0.0.1:7080/whip/endpoint/room1234"
|
||||
```
|
||||
|
||||
You should see a second video displayed in the videoroomtest web page.
|
||||
|
||||
[janus]: https://github.com/meetecho/janus-gateway
|
||||
[simple whip server]: https://github.com/meetecho/simple-whip-server/
|
||||
|
|
|
@ -19,6 +19,7 @@ pub mod signaller;
|
|||
pub mod utils;
|
||||
pub mod webrtcsink;
|
||||
pub mod webrtcsrc;
|
||||
mod whip_signaller;
|
||||
|
||||
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
|
||||
webrtcsink::register(plugin)?;
|
||||
|
|
|
@ -100,6 +100,256 @@ pub fn serialize_json_object(val: &serde_json::Map<String, serde_json::Value>) -
|
|||
res
|
||||
}
|
||||
|
||||
use crate::RUNTIME;
|
||||
use futures::future;
|
||||
use futures::prelude::*;
|
||||
use gst::ErrorMessage;
|
||||
use reqwest::header::HeaderMap;
|
||||
use reqwest::redirect::Policy;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum WaitError {
|
||||
FutureAborted,
|
||||
FutureError(ErrorMessage),
|
||||
}
|
||||
|
||||
pub async fn wait_async<F, T>(
|
||||
canceller: &Mutex<Option<future::AbortHandle>>,
|
||||
future: F,
|
||||
timeout: u32,
|
||||
) -> Result<T, WaitError>
|
||||
where
|
||||
F: Send + Future<Output = T>,
|
||||
T: Send + 'static,
|
||||
{
|
||||
let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
|
||||
{
|
||||
let mut canceller_guard = canceller.lock().unwrap();
|
||||
canceller_guard.replace(abort_handle);
|
||||
drop(canceller_guard);
|
||||
}
|
||||
|
||||
let future = async {
|
||||
if timeout == 0 {
|
||||
Ok(future.await)
|
||||
} else {
|
||||
let res = tokio::time::timeout(Duration::from_secs(timeout.into()), future).await;
|
||||
|
||||
match res {
|
||||
Ok(r) => Ok(r),
|
||||
Err(e) => Err(WaitError::FutureError(gst::error_msg!(
|
||||
gst::ResourceError::Read,
|
||||
["Request timeout, elapsed: {}", e]
|
||||
))),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let future = async {
|
||||
match future::Abortable::new(future, abort_registration).await {
|
||||
Ok(Ok(r)) => Ok(r),
|
||||
|
||||
Ok(Err(err)) => Err(WaitError::FutureError(gst::error_msg!(
|
||||
gst::ResourceError::Failed,
|
||||
["Future resolved with an error {:?}", err]
|
||||
))),
|
||||
|
||||
Err(future::Aborted) => Err(WaitError::FutureAborted),
|
||||
}
|
||||
};
|
||||
|
||||
let res = future.await;
|
||||
|
||||
let mut canceller_guard = canceller.lock().unwrap();
|
||||
*canceller_guard = None;
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
pub fn wait<F, T>(
|
||||
canceller: &Mutex<Option<future::AbortHandle>>,
|
||||
future: F,
|
||||
timeout: u32,
|
||||
) -> Result<T, WaitError>
|
||||
where
|
||||
F: Send + Future<Output = Result<T, ErrorMessage>>,
|
||||
T: Send + 'static,
|
||||
{
|
||||
let mut canceller_guard = canceller.lock().unwrap();
|
||||
let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
|
||||
|
||||
if canceller_guard.is_some() {
|
||||
return Err(WaitError::FutureError(gst::error_msg!(
|
||||
gst::ResourceError::Failed,
|
||||
["Old Canceller should not exist"]
|
||||
)));
|
||||
}
|
||||
|
||||
canceller_guard.replace(abort_handle);
|
||||
drop(canceller_guard);
|
||||
|
||||
let future = async {
|
||||
if timeout == 0 {
|
||||
future.await
|
||||
} else {
|
||||
let res = tokio::time::timeout(Duration::from_secs(timeout.into()), future).await;
|
||||
|
||||
match res {
|
||||
Ok(r) => r,
|
||||
Err(e) => Err(gst::error_msg!(
|
||||
gst::ResourceError::Read,
|
||||
["Request timeout, elapsed: {}", e.to_string()]
|
||||
)),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let future = async {
|
||||
match future::Abortable::new(future, abort_registration).await {
|
||||
Ok(Ok(res)) => Ok(res),
|
||||
|
||||
Ok(Err(err)) => Err(WaitError::FutureError(gst::error_msg!(
|
||||
gst::ResourceError::Failed,
|
||||
["Future resolved with an error {:?}", err]
|
||||
))),
|
||||
|
||||
Err(future::Aborted) => Err(WaitError::FutureAborted),
|
||||
}
|
||||
};
|
||||
|
||||
let res = {
|
||||
let _enter = RUNTIME.enter();
|
||||
futures::executor::block_on(future)
|
||||
};
|
||||
|
||||
canceller_guard = canceller.lock().unwrap();
|
||||
*canceller_guard = None;
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
pub fn parse_redirect_location(
|
||||
headermap: &HeaderMap,
|
||||
old_url: &reqwest::Url,
|
||||
) -> Result<reqwest::Url, ErrorMessage> {
|
||||
let location = match headermap.get(reqwest::header::LOCATION) {
|
||||
Some(location) => location,
|
||||
None => {
|
||||
return Err(gst::error_msg!(
|
||||
gst::ResourceError::Failed,
|
||||
["Location header field should be present for WHIP/WHEP resource URL"]
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
let location = match location.to_str() {
|
||||
Ok(loc) => loc,
|
||||
Err(e) => {
|
||||
return Err(gst::error_msg!(
|
||||
gst::ResourceError::Failed,
|
||||
["Failed to convert location to string {}", e]
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
match reqwest::Url::parse(location) {
|
||||
Ok(url) => Ok(url), // Location URL is an absolute path
|
||||
Err(_) => {
|
||||
// Location URL is a relative path
|
||||
let new_url = old_url.clone().join(location).map_err(|err| {
|
||||
gst::error_msg!(
|
||||
gst::ResourceError::Failed,
|
||||
["URL join operation failed: {:?}", err]
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(new_url)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn build_reqwest_client(pol: Policy) -> reqwest::Client {
|
||||
let client_builder = reqwest::Client::builder();
|
||||
client_builder.redirect(pol).build().unwrap()
|
||||
}
|
||||
|
||||
pub fn set_ice_servers(
|
||||
webrtcbin: &gst::Element,
|
||||
headermap: &HeaderMap,
|
||||
) -> Result<(), ErrorMessage> {
|
||||
for link in headermap.get_all("link").iter() {
|
||||
let link = link.to_str().map_err(|err| {
|
||||
gst::error_msg!(
|
||||
gst::ResourceError::Failed,
|
||||
[
|
||||
"Header value should contain only visible ASCII strings: {}",
|
||||
err
|
||||
]
|
||||
)
|
||||
})?;
|
||||
|
||||
let item_map = match parse_link_header::parse_with_rel(link) {
|
||||
Ok(map) => map,
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
let link = match item_map.contains_key("ice-server") {
|
||||
true => item_map.get("ice-server").unwrap(),
|
||||
false => continue, // Not a link header we care about
|
||||
};
|
||||
|
||||
// Note: webrtcbin needs ice servers to be in the below format
|
||||
// <scheme>://<user:pass>@<url>
|
||||
// and the ice-servers (link headers) received from the whip server might be
|
||||
// in the format <scheme>:<host> with username and password as separate params.
|
||||
// Constructing these with 'url' crate also require a format/parse
|
||||
// for changing <scheme>:<host> to <scheme>://<user>:<password>@<host>.
|
||||
// So preferred to use the String rather
|
||||
|
||||
// check if uri has ://
|
||||
let ice_server_url = if link.uri.has_authority() {
|
||||
// use raw_uri as is
|
||||
// username and password in the link.uri.params ignored
|
||||
link.uri.clone()
|
||||
} else {
|
||||
// No builder pattern is provided by reqwest::Url. Use string operation.
|
||||
// construct url as '<scheme>://<user:pass>@<url>'
|
||||
let url = format!("{}://{}", link.uri.scheme(), link.uri.path());
|
||||
|
||||
let mut new_url = match reqwest::Url::parse(url.as_str()) {
|
||||
Ok(url) => url,
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
if let Some(user) = link.params.get("username") {
|
||||
new_url.set_username(user.as_str()).unwrap();
|
||||
if let Some(pass) = link.params.get("credential") {
|
||||
new_url.set_password(Some(pass.as_str())).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
new_url
|
||||
};
|
||||
|
||||
// It's nicer to not collapse the `else if` and its inner `if`
|
||||
#[allow(clippy::collapsible_if)]
|
||||
if link.uri.scheme() == "stun" {
|
||||
webrtcbin.set_property_from_str("stun-server", ice_server_url.as_str());
|
||||
} else if link.uri.scheme().starts_with("turn") {
|
||||
if !webrtcbin.emit_by_name::<bool>("add-turn-server", &[&ice_server_url.as_str()]) {
|
||||
return Err(gst::error_msg!(
|
||||
gst::ResourceError::Failed,
|
||||
["Failed to set turn server {}", ice_server_url]
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Wrapper around `gst::ElementFactory::make` with a better error
|
||||
/// message
|
||||
pub fn make_element(element: &str, name: Option<&str>) -> Result<gst::Element, Error> {
|
||||
|
|
|
@ -23,6 +23,7 @@ use super::homegrown_cc::CongestionController;
|
|||
use super::{WebRTCSinkCongestionControl, WebRTCSinkError, WebRTCSinkMitigationMode};
|
||||
use crate::aws_kvs_signaller::AwsKvsSignaller;
|
||||
use crate::signaller::{prelude::*, Signallable, Signaller, WebRTCSignallerRole};
|
||||
use crate::whip_signaller::WhipSignaller;
|
||||
use crate::RUNTIME;
|
||||
use std::collections::{BTreeMap, HashSet};
|
||||
|
||||
|
@ -3810,3 +3811,43 @@ impl ObjectSubclass for AwsKvsWebRTCSink {
|
|||
type Type = super::AwsKvsWebRTCSink;
|
||||
type ParentType = super::BaseWebRTCSink;
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct WhipWebRTCSink {}
|
||||
|
||||
impl ObjectImpl for WhipWebRTCSink {
|
||||
fn constructed(&self) {
|
||||
let element = self.obj();
|
||||
let ws = element.upcast_ref::<super::BaseWebRTCSink>().imp();
|
||||
|
||||
let _ = ws.set_signaller(WhipSignaller::default().upcast());
|
||||
}
|
||||
}
|
||||
|
||||
impl GstObjectImpl for WhipWebRTCSink {}
|
||||
|
||||
impl ElementImpl for WhipWebRTCSink {
|
||||
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
|
||||
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
|
||||
gst::subclass::ElementMetadata::new(
|
||||
"WhipWebRTCSink",
|
||||
"Sink/Network/WebRTC",
|
||||
"WebRTC sink with WHIP signaller",
|
||||
"Taruntej Kanakamalla <taruntej@asymptotic.io>",
|
||||
)
|
||||
});
|
||||
|
||||
Some(&*ELEMENT_METADATA)
|
||||
}
|
||||
}
|
||||
|
||||
impl BinImpl for WhipWebRTCSink {}
|
||||
|
||||
impl BaseWebRTCSinkImpl for WhipWebRTCSink {}
|
||||
|
||||
#[glib::object_subclass]
|
||||
impl ObjectSubclass for WhipWebRTCSink {
|
||||
const NAME: &'static str = "GstWhipWebRTCSink";
|
||||
type Type = super::WhipWebRTCSink;
|
||||
type ParentType = super::BaseWebRTCSink;
|
||||
}
|
||||
|
|
|
@ -52,6 +52,10 @@ glib::wrapper! {
|
|||
pub struct AwsKvsWebRTCSink(ObjectSubclass<imp::AwsKvsWebRTCSink>) @extends BaseWebRTCSink, gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy, gst_video::Navigation;
|
||||
}
|
||||
|
||||
glib::wrapper! {
|
||||
pub struct WhipWebRTCSink(ObjectSubclass<imp::WhipWebRTCSink>) @extends BaseWebRTCSink, gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy, gst_video::Navigation;
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum WebRTCSinkError {
|
||||
#[error("no session with id")]
|
||||
|
@ -126,6 +130,12 @@ pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
|
|||
gst::Rank::None,
|
||||
AwsKvsWebRTCSink::static_type(),
|
||||
)?;
|
||||
gst::Element::register(
|
||||
Some(plugin),
|
||||
"whipwebrtcsink",
|
||||
gst::Rank::None,
|
||||
WhipWebRTCSink::static_type(),
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
612
net/webrtc/src/whip_signaller/imp.rs
Normal file
612
net/webrtc/src/whip_signaller/imp.rs
Normal file
|
@ -0,0 +1,612 @@
|
|||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
use crate::signaller::{Signallable, SignallableImpl};
|
||||
use crate::utils::{
|
||||
build_reqwest_client, parse_redirect_location, set_ice_servers, wait, wait_async, WaitError,
|
||||
};
|
||||
use crate::RUNTIME;
|
||||
use async_recursion::async_recursion;
|
||||
use gst::glib;
|
||||
use gst::prelude::*;
|
||||
use gst::subclass::prelude::*;
|
||||
use gst_webrtc::{WebRTCICEGatheringState, WebRTCSessionDescription};
|
||||
use once_cell::sync::Lazy;
|
||||
use reqwest::header::HeaderMap;
|
||||
use reqwest::header::HeaderValue;
|
||||
use reqwest::StatusCode;
|
||||
use std::sync::Mutex;
|
||||
|
||||
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
||||
gst::DebugCategory::new(
|
||||
"webrtc-whip-signaller",
|
||||
gst::DebugColorFlags::empty(),
|
||||
Some("WebRTC WHIP signaller"),
|
||||
)
|
||||
});
|
||||
|
||||
const MAX_REDIRECTS: u8 = 10;
|
||||
const DEFAULT_TIMEOUT: u32 = 15;
|
||||
|
||||
#[derive(Debug)]
|
||||
enum State {
|
||||
Stopped,
|
||||
Post { redirects: u8 },
|
||||
Running { whip_resource_url: String },
|
||||
}
|
||||
|
||||
impl Default for State {
|
||||
fn default() -> Self {
|
||||
Self::Stopped
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Settings {
|
||||
whip_endpoint: Option<String>,
|
||||
use_link_headers: bool,
|
||||
auth_token: Option<String>,
|
||||
timeout: u32,
|
||||
}
|
||||
|
||||
impl Default for Settings {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
whip_endpoint: None,
|
||||
use_link_headers: false,
|
||||
auth_token: None,
|
||||
timeout: DEFAULT_TIMEOUT,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct Signaller {
|
||||
state: Mutex<State>,
|
||||
settings: Mutex<Settings>,
|
||||
canceller: Mutex<Option<futures::future::AbortHandle>>,
|
||||
}
|
||||
|
||||
impl Signaller {
|
||||
fn raise_error(&self, msg: String) {
|
||||
self.obj()
|
||||
.emit_by_name::<()>("error", &[&format!("Error: {msg}")]);
|
||||
}
|
||||
|
||||
fn handle_future_error(&self, err: WaitError) {
|
||||
match err {
|
||||
WaitError::FutureAborted => {
|
||||
gst::warning!(CAT, imp: self, "Future aborted")
|
||||
}
|
||||
WaitError::FutureError(err) => self.raise_error(err.to_string()),
|
||||
};
|
||||
}
|
||||
|
||||
async fn send_offer(&self, webrtcbin: &gst::Element) {
|
||||
{
|
||||
let mut state = self.state.lock().unwrap();
|
||||
*state = State::Post { redirects: 0 };
|
||||
drop(state);
|
||||
}
|
||||
|
||||
let local_desc =
|
||||
webrtcbin.property::<Option<WebRTCSessionDescription>>("local-description");
|
||||
|
||||
let offer_sdp = match local_desc {
|
||||
None => {
|
||||
self.raise_error("Local description is not set".to_string());
|
||||
return;
|
||||
}
|
||||
Some(offer) => offer,
|
||||
};
|
||||
|
||||
gst::debug!(
|
||||
CAT,
|
||||
imp: self,
|
||||
"Sending offer SDP: {:?}",
|
||||
offer_sdp.sdp().as_text()
|
||||
);
|
||||
|
||||
let timeout;
|
||||
{
|
||||
let settings = self.settings.lock().unwrap();
|
||||
timeout = settings.timeout;
|
||||
drop(settings);
|
||||
}
|
||||
|
||||
if let Err(e) =
|
||||
wait_async(&self.canceller, self.do_post(offer_sdp, webrtcbin), timeout).await
|
||||
{
|
||||
self.handle_future_error(e);
|
||||
}
|
||||
}
|
||||
|
||||
#[async_recursion]
|
||||
async fn do_post(&self, offer: gst_webrtc::WebRTCSessionDescription, webrtcbin: &gst::Element) {
|
||||
let auth_token;
|
||||
let endpoint;
|
||||
let timeout;
|
||||
|
||||
{
|
||||
let settings = self.settings.lock().unwrap();
|
||||
endpoint =
|
||||
reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str()).unwrap();
|
||||
auth_token = settings.auth_token.clone();
|
||||
timeout = settings.timeout;
|
||||
drop(settings);
|
||||
}
|
||||
|
||||
#[allow(unused_mut)]
|
||||
let mut redirects;
|
||||
|
||||
{
|
||||
let state = self.state.lock().unwrap();
|
||||
redirects = match *state {
|
||||
State::Post { redirects } => redirects,
|
||||
_ => {
|
||||
self.raise_error("Trying to do POST in unexpected state".to_string());
|
||||
return;
|
||||
}
|
||||
};
|
||||
drop(state);
|
||||
}
|
||||
|
||||
// Default policy for redirect does not share the auth token to new location
|
||||
// So disable inbuilt redirecting and do a recursive call upon 3xx response code
|
||||
let pol = reqwest::redirect::Policy::none();
|
||||
let client = build_reqwest_client(pol);
|
||||
|
||||
let sdp = offer.sdp();
|
||||
let body = sdp.as_text().unwrap();
|
||||
|
||||
gst::debug!(CAT, imp: self, "Using endpoint {}", endpoint.as_str());
|
||||
let mut headermap = HeaderMap::new();
|
||||
headermap.insert(
|
||||
reqwest::header::CONTENT_TYPE,
|
||||
HeaderValue::from_static("application/sdp"),
|
||||
);
|
||||
|
||||
if let Some(token) = auth_token.as_ref() {
|
||||
let bearer_token = "Bearer ".to_owned() + token;
|
||||
headermap.insert(
|
||||
reqwest::header::AUTHORIZATION,
|
||||
HeaderValue::from_str(bearer_token.as_str())
|
||||
.expect("Failed to set auth token to header"),
|
||||
);
|
||||
}
|
||||
|
||||
let res = wait_async(
|
||||
&self.canceller,
|
||||
client
|
||||
.request(reqwest::Method::POST, endpoint.clone())
|
||||
.headers(headermap)
|
||||
.body(body)
|
||||
.send(),
|
||||
timeout,
|
||||
)
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(r) => match r {
|
||||
Ok(resp) => {
|
||||
if let Err(e) = wait_async(
|
||||
&self.canceller,
|
||||
self.parse_endpoint_response(offer, resp, redirects, webrtcbin),
|
||||
timeout,
|
||||
)
|
||||
.await
|
||||
{
|
||||
self.handle_future_error(e);
|
||||
}
|
||||
}
|
||||
Err(err) => self.raise_error(err.to_string()),
|
||||
},
|
||||
Err(err) => self.handle_future_error(err),
|
||||
}
|
||||
}
|
||||
|
||||
async fn parse_endpoint_response(
|
||||
&self,
|
||||
offer: gst_webrtc::WebRTCSessionDescription,
|
||||
resp: reqwest::Response,
|
||||
redirects: u8,
|
||||
webrtcbin: &gst::Element,
|
||||
) {
|
||||
gst::debug!(CAT, imp: self, "Parsing endpoint response");
|
||||
|
||||
let endpoint;
|
||||
let timeout;
|
||||
let use_link_headers;
|
||||
|
||||
{
|
||||
let settings = self.settings.lock().unwrap();
|
||||
endpoint =
|
||||
reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str()).unwrap();
|
||||
use_link_headers = settings.use_link_headers;
|
||||
timeout = settings.timeout;
|
||||
drop(settings);
|
||||
}
|
||||
|
||||
gst::debug!(CAT, "response status: {}", resp.status());
|
||||
|
||||
match resp.status() {
|
||||
StatusCode::OK | StatusCode::CREATED => {
|
||||
if use_link_headers {
|
||||
if let Err(e) = set_ice_servers(webrtcbin, resp.headers()) {
|
||||
self.raise_error(e.to_string());
|
||||
return;
|
||||
};
|
||||
}
|
||||
|
||||
// Get the url of the resource from 'location' header.
|
||||
// The resource created is expected be a relative path
|
||||
// and not an absolute path
|
||||
// So we want to construct the full url of the resource
|
||||
// using the endpoint url i.e., replace the end point path with
|
||||
// resource path
|
||||
let location = match resp.headers().get(reqwest::header::LOCATION) {
|
||||
Some(location) => location,
|
||||
None => {
|
||||
self.raise_error(
|
||||
"Location header field should be present for WHIP resource URL"
|
||||
.to_string(),
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let location = match location.to_str() {
|
||||
Ok(loc) => loc,
|
||||
Err(e) => {
|
||||
self.raise_error(format!("Failed to convert location to string: {e}"));
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let url = reqwest::Url::parse(endpoint.as_str()).unwrap();
|
||||
|
||||
gst::debug!(CAT, imp: self, "WHIP resource: {:?}", location);
|
||||
|
||||
let url = match url.join(location) {
|
||||
Ok(joined_url) => joined_url,
|
||||
Err(err) => {
|
||||
self.raise_error(format!("URL join operation failed: {err:?}"));
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
{
|
||||
let mut state = self.state.lock().unwrap();
|
||||
*state = match *state {
|
||||
State::Post { redirects: _r } => State::Running {
|
||||
whip_resource_url: url.to_string(),
|
||||
},
|
||||
_ => {
|
||||
self.raise_error("Expected to be in POST state".to_string());
|
||||
return;
|
||||
}
|
||||
};
|
||||
drop(state);
|
||||
}
|
||||
|
||||
match wait_async(&self.canceller, resp.bytes(), timeout).await {
|
||||
Ok(res) => match res {
|
||||
Ok(ans_bytes) => match gst_sdp::SDPMessage::parse_buffer(&ans_bytes) {
|
||||
Ok(ans_sdp) => {
|
||||
let answer = gst_webrtc::WebRTCSessionDescription::new(
|
||||
gst_webrtc::WebRTCSDPType::Answer,
|
||||
ans_sdp,
|
||||
);
|
||||
self.obj().emit_by_name::<()>(
|
||||
"session-description",
|
||||
&[&"unique", &answer],
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
self.raise_error(format!("Could not parse answer SDP: {err}"));
|
||||
}
|
||||
},
|
||||
Err(err) => self.raise_error(err.to_string()),
|
||||
},
|
||||
Err(err) => self.handle_future_error(err),
|
||||
}
|
||||
}
|
||||
|
||||
s if s.is_redirection() => {
|
||||
gst::debug!(CAT, "redirected");
|
||||
|
||||
if redirects < MAX_REDIRECTS {
|
||||
match parse_redirect_location(resp.headers(), &endpoint) {
|
||||
Ok(redirect_url) => {
|
||||
{
|
||||
let mut state = self.state.lock().unwrap();
|
||||
*state = match *state {
|
||||
State::Post { redirects: _r } => State::Post {
|
||||
redirects: redirects + 1,
|
||||
},
|
||||
/*
|
||||
* As per section 4.6 of the specification, redirection is
|
||||
* not required to be supported for the PATCH and DELETE
|
||||
* requests to the final WHEP resource URL. Only the initial
|
||||
* POST request may support redirection.
|
||||
*/
|
||||
State::Running { .. } => {
|
||||
self.raise_error(
|
||||
"Unexpected redirection in RUNNING state".to_string(),
|
||||
);
|
||||
return;
|
||||
}
|
||||
State::Stopped => unreachable!(),
|
||||
};
|
||||
drop(state);
|
||||
}
|
||||
|
||||
gst::debug!(
|
||||
CAT,
|
||||
imp: self,
|
||||
"Redirecting endpoint to {}",
|
||||
redirect_url.as_str()
|
||||
);
|
||||
|
||||
if let Err(err) =
|
||||
wait_async(&self.canceller, self.do_post(offer, webrtcbin), timeout)
|
||||
.await
|
||||
{
|
||||
self.handle_future_error(err);
|
||||
}
|
||||
}
|
||||
Err(e) => self.raise_error(e.to_string()),
|
||||
}
|
||||
} else {
|
||||
self.raise_error("Too many redirects. Unable to connect.".to_string());
|
||||
}
|
||||
}
|
||||
|
||||
s => {
|
||||
match wait_async(&self.canceller, resp.bytes(), timeout).await {
|
||||
Ok(r) => {
|
||||
let res = r
|
||||
.map(|x| x.escape_ascii().to_string())
|
||||
.unwrap_or_else(|_| "(no further details)".to_string());
|
||||
|
||||
// FIXME: Check and handle 'Retry-After' header in case of server error
|
||||
self.raise_error(format!("Unexpected response: {} - {}", s.as_str(), res));
|
||||
}
|
||||
Err(err) => self.handle_future_error(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn terminate_session(&self) {
|
||||
let settings = self.settings.lock().unwrap();
|
||||
let state = self.state.lock().unwrap();
|
||||
let timeout = settings.timeout;
|
||||
|
||||
let resource_url = match *state {
|
||||
State::Running {
|
||||
whip_resource_url: ref resource_url,
|
||||
} => resource_url.clone(),
|
||||
_ => {
|
||||
self.raise_error("Terminated in unexpected state".to_string());
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
drop(state);
|
||||
|
||||
let mut headermap = HeaderMap::new();
|
||||
if let Some(token) = &settings.auth_token {
|
||||
let bearer_token = "Bearer ".to_owned() + token.as_str();
|
||||
headermap.insert(
|
||||
reqwest::header::AUTHORIZATION,
|
||||
HeaderValue::from_str(bearer_token.as_str())
|
||||
.expect("Failed to set auth token to header"),
|
||||
);
|
||||
}
|
||||
|
||||
gst::debug!(CAT, imp: self, "DELETE request on {}", resource_url);
|
||||
let client = build_reqwest_client(reqwest::redirect::Policy::default());
|
||||
let future = async {
|
||||
client
|
||||
.delete(resource_url.clone())
|
||||
.headers(headermap)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|err| {
|
||||
gst::error_msg!(
|
||||
gst::ResourceError::Failed,
|
||||
["DELETE request failed {}: {:?}", resource_url, err]
|
||||
)
|
||||
})
|
||||
};
|
||||
|
||||
let res = wait(&self.canceller, future, timeout);
|
||||
match res {
|
||||
Ok(r) => {
|
||||
gst::debug!(CAT, imp: self, "Response to DELETE : {}", r.status());
|
||||
}
|
||||
Err(e) => match e {
|
||||
WaitError::FutureAborted => {
|
||||
gst::warning!(CAT, imp: self, "DELETE request aborted")
|
||||
}
|
||||
WaitError::FutureError(e) => {
|
||||
gst::error!(CAT, imp: self, "Error on DELETE request : {}", e)
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
impl SignallableImpl for Signaller {
|
||||
fn start(&self) {
|
||||
if self.settings.lock().unwrap().whip_endpoint.is_none() {
|
||||
self.raise_error("WHIP endpoint URL must be set".to_string());
|
||||
return;
|
||||
}
|
||||
|
||||
self.obj().connect_closure(
|
||||
"consumer-added",
|
||||
false,
|
||||
glib::closure!(|signaller: &super::WhipSignaller,
|
||||
_consumer_identifier: &str,
|
||||
webrtcbin: &gst::Element| {
|
||||
let obj_weak = signaller.downgrade();
|
||||
webrtcbin.connect_notify(Some("ice-gathering-state"), move |webrtcbin, _pspec| {
|
||||
let obj = match obj_weak.upgrade() {
|
||||
Some(obj) => obj,
|
||||
None => return,
|
||||
};
|
||||
|
||||
let state =
|
||||
webrtcbin.property::<WebRTCICEGatheringState>("ice-gathering-state");
|
||||
|
||||
match state {
|
||||
WebRTCICEGatheringState::Gathering => {
|
||||
gst::info!(CAT, obj: obj, "ICE gathering started");
|
||||
}
|
||||
WebRTCICEGatheringState::Complete => {
|
||||
gst::info!(CAT, obj: obj, "ICE gathering complete");
|
||||
|
||||
let webrtcbin = webrtcbin.clone();
|
||||
|
||||
RUNTIME.spawn(async move {
|
||||
/* Note that we check for a valid WHIP endpoint in change_state */
|
||||
obj.imp().send_offer(&webrtcbin).await
|
||||
});
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
});
|
||||
}),
|
||||
);
|
||||
|
||||
self.obj().emit_by_name::<()>(
|
||||
"session-requested",
|
||||
&[
|
||||
&"unique",
|
||||
&"unique",
|
||||
&None::<gst_webrtc::WebRTCSessionDescription>,
|
||||
],
|
||||
);
|
||||
}
|
||||
|
||||
fn stop(&self) {
|
||||
// Interrupt requests in progress, if any
|
||||
if let Some(canceller) = &*self.canceller.lock().unwrap() {
|
||||
canceller.abort();
|
||||
}
|
||||
|
||||
let state = self.state.lock().unwrap();
|
||||
if let State::Running { .. } = *state {
|
||||
// Release server-side resources
|
||||
drop(state);
|
||||
}
|
||||
}
|
||||
|
||||
fn end_session(&self, session_id: &str) {
|
||||
assert_eq!(session_id, "unique");
|
||||
|
||||
// Interrupt requests in progress, if any
|
||||
if let Some(canceller) = &*self.canceller.lock().unwrap() {
|
||||
canceller.abort();
|
||||
}
|
||||
|
||||
let state = self.state.lock().unwrap();
|
||||
if let State::Running { .. } = *state {
|
||||
// Release server-side resources
|
||||
drop(state);
|
||||
self.terminate_session();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[glib::object_subclass]
|
||||
impl ObjectSubclass for Signaller {
|
||||
const NAME: &'static str = "GstWHIPWebRTCSinkSignaller";
|
||||
type Type = super::WhipSignaller;
|
||||
type ParentType = glib::Object;
|
||||
type Interfaces = (Signallable,);
|
||||
}
|
||||
|
||||
impl ObjectImpl for Signaller {
|
||||
fn properties() -> &'static [glib::ParamSpec] {
|
||||
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
|
||||
vec![glib::ParamSpecString::builder("whip-endpoint")
|
||||
.nick("WHIP Endpoint")
|
||||
.blurb("The WHIP server endpoint to POST SDP offer to.
|
||||
e.g.: https://example.com/whip/endpoint/room1234")
|
||||
.mutable_ready()
|
||||
.build(),
|
||||
|
||||
glib::ParamSpecBoolean::builder("use-link-headers")
|
||||
.nick("Use Link Headers")
|
||||
.blurb("Use link headers to configure ice-servers from the WHIP server response to the POST request.
|
||||
If set to TRUE and the WHIP server returns valid ice-servers,
|
||||
this property overrides the ice-servers values set using the stun-server and turn-server properties.")
|
||||
.mutable_ready()
|
||||
.build(),
|
||||
|
||||
glib::ParamSpecString::builder("auth-token")
|
||||
.nick("Authorization Token")
|
||||
.blurb("Authentication token to use, will be sent in the HTTP Header as 'Bearer <auth-token>'")
|
||||
.mutable_ready()
|
||||
.build(),
|
||||
|
||||
glib::ParamSpecUInt::builder("timeout")
|
||||
.nick("Timeout")
|
||||
.blurb("Value in seconds to timeout WHIP endpoint requests (0 = No timeout).")
|
||||
.maximum(3600)
|
||||
.default_value(DEFAULT_TIMEOUT)
|
||||
.build(),
|
||||
]
|
||||
});
|
||||
|
||||
PROPERTIES.as_ref()
|
||||
}
|
||||
|
||||
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
|
||||
match pspec.name() {
|
||||
"whip-endpoint" => {
|
||||
let mut settings = self.settings.lock().unwrap();
|
||||
settings.whip_endpoint = value.get().expect("WHIP endpoint should be a string");
|
||||
}
|
||||
"use-link-headers" => {
|
||||
let mut settings = self.settings.lock().unwrap();
|
||||
settings.use_link_headers = value
|
||||
.get()
|
||||
.expect("use-link-headers should be a boolean value");
|
||||
}
|
||||
"auth-token" => {
|
||||
let mut settings = self.settings.lock().unwrap();
|
||||
settings.auth_token = value.get().expect("Auth token should be a string");
|
||||
}
|
||||
"timeout" => {
|
||||
let mut settings = self.settings.lock().unwrap();
|
||||
settings.timeout = value.get().expect("type checked upstream");
|
||||
}
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
}
|
||||
|
||||
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
|
||||
match pspec.name() {
|
||||
"whip-endpoint" => {
|
||||
let settings = self.settings.lock().unwrap();
|
||||
settings.whip_endpoint.to_value()
|
||||
}
|
||||
"use-link-headers" => {
|
||||
let settings = self.settings.lock().unwrap();
|
||||
settings.use_link_headers.to_value()
|
||||
}
|
||||
"auth-token" => {
|
||||
let settings = self.settings.lock().unwrap();
|
||||
settings.auth_token.to_value()
|
||||
}
|
||||
"timeout" => {
|
||||
let settings = self.settings.lock().unwrap();
|
||||
settings.timeout.to_value()
|
||||
}
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
}
|
||||
}
|
19
net/webrtc/src/whip_signaller/mod.rs
Normal file
19
net/webrtc/src/whip_signaller/mod.rs
Normal file
|
@ -0,0 +1,19 @@
|
|||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
use crate::signaller::Signallable;
|
||||
use gst::glib;
|
||||
|
||||
mod imp;
|
||||
|
||||
glib::wrapper! {
|
||||
pub struct WhipSignaller(ObjectSubclass<imp::Signaller>) @implements Signallable;
|
||||
}
|
||||
|
||||
unsafe impl Send for WhipSignaller {}
|
||||
unsafe impl Sync for WhipSignaller {}
|
||||
|
||||
impl Default for WhipSignaller {
|
||||
fn default() -> Self {
|
||||
glib::Object::new()
|
||||
}
|
||||
}
|
|
@ -366,6 +366,8 @@ impl ObjectImpl for WhipSink {
|
|||
|
||||
let self_ref = self_.ref_counted();
|
||||
|
||||
gst::info!(CAT, imp: self_, "ICE gathering complete");
|
||||
|
||||
// With tokio's spawn one does not have to .await the
|
||||
// returned JoinHandle to make the provided future start
|
||||
// execution. It will start running in the background
|
||||
|
|
Loading…
Reference in a new issue