mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2024-06-02 06:50:41 +00:00
Compare commits
8 commits
ed325c334c
...
e8b563a314
Author | SHA1 | Date | |
---|---|---|---|
e8b563a314 | |||
a87eaa4b79 | |||
88cbc93338 | |||
23e8b46cee | |||
f2783fda93 | |||
ac9ef0a8d2 | |||
4404cb42b8 | |||
6e1aac0d0b |
35
Cargo.lock
generated
35
Cargo.lock
generated
|
@ -1263,15 +1263,6 @@ dependencies = [
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "crossbeam-channel"
|
|
||||||
version = "0.5.12"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "ab3db02a9c5b5121e1e42fbdb1aeb65f5e02624cc58c43f2884c6ccac0b82f95"
|
|
||||||
dependencies = [
|
|
||||||
"crossbeam-utils",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "crossbeam-deque"
|
name = "crossbeam-deque"
|
||||||
version = "0.8.5"
|
version = "0.8.5"
|
||||||
|
@ -1370,6 +1361,16 @@ dependencies = [
|
||||||
"cipher",
|
"cipher",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "ctrlc"
|
||||||
|
version = "3.4.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "82e95fbd621905b854affdc67943b043a0fbb6ed7385fd5a25650d19a8a6cfdf"
|
||||||
|
dependencies = [
|
||||||
|
"nix 0.27.1",
|
||||||
|
"windows-sys 0.48.0",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "darling"
|
name = "darling"
|
||||||
version = "0.20.8"
|
version = "0.20.8"
|
||||||
|
@ -2944,9 +2945,10 @@ dependencies = [
|
||||||
"aws-smithy-http",
|
"aws-smithy-http",
|
||||||
"aws-smithy-types",
|
"aws-smithy-types",
|
||||||
"aws-types",
|
"aws-types",
|
||||||
|
"bytes",
|
||||||
"chrono",
|
"chrono",
|
||||||
"clap",
|
"clap",
|
||||||
"crossbeam-channel",
|
"ctrlc",
|
||||||
"data-encoding",
|
"data-encoding",
|
||||||
"fastrand",
|
"fastrand",
|
||||||
"futures",
|
"futures",
|
||||||
|
@ -4183,7 +4185,7 @@ dependencies = [
|
||||||
"if-addrs",
|
"if-addrs",
|
||||||
"log",
|
"log",
|
||||||
"multimap 0.8.3",
|
"multimap 0.8.3",
|
||||||
"nix",
|
"nix 0.23.2",
|
||||||
"rand",
|
"rand",
|
||||||
"socket2 0.4.10",
|
"socket2 0.4.10",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
|
@ -4674,6 +4676,17 @@ dependencies = [
|
||||||
"memoffset 0.6.5",
|
"memoffset 0.6.5",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "nix"
|
||||||
|
version = "0.27.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053"
|
||||||
|
dependencies = [
|
||||||
|
"bitflags 2.5.0",
|
||||||
|
"cfg-if",
|
||||||
|
"libc",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "nnnoiseless"
|
name = "nnnoiseless"
|
||||||
version = "0.5.1"
|
version = "0.5.1"
|
||||||
|
|
|
@ -649,7 +649,7 @@ impl BaseTransformImpl for HrtfRender {
|
||||||
|
|
||||||
if direction == gst::PadDirection::Sink {
|
if direction == gst::PadDirection::Sink {
|
||||||
s.set("channels", 2);
|
s.set("channels", 2);
|
||||||
s.set("channel-mask", 0x3);
|
s.set("channel-mask", gst::Bitmask(0x3));
|
||||||
} else {
|
} else {
|
||||||
let settings = self.settings.lock().unwrap();
|
let settings = self.settings.lock().unwrap();
|
||||||
if let Some(objs) = &settings.spatial_objects {
|
if let Some(objs) = &settings.spatial_objects {
|
||||||
|
|
|
@ -58,7 +58,10 @@ livekit-protocol = { version = "0.3", optional = true }
|
||||||
livekit-api = { version = "0.3", default-features = false, features = ["signal-client", "access-token", "native-tls"], optional = true }
|
livekit-api = { version = "0.3", default-features = false, features = ["signal-client", "access-token", "native-tls"], optional = true }
|
||||||
|
|
||||||
warp = {version = "0.3", optional = true }
|
warp = {version = "0.3", optional = true }
|
||||||
crossbeam-channel = { version = "0.5", optional = true }
|
ctrlc = {version = "3.4.0", optional = true }
|
||||||
|
|
||||||
|
|
||||||
|
bytes = "1"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
gst-plugin-rtp = { path = "../rtp" }
|
gst-plugin-rtp = { path = "../rtp" }
|
||||||
|
@ -78,7 +81,7 @@ path = "src/lib.rs"
|
||||||
gst-plugin-version-helper.workspace = true
|
gst-plugin-version-helper.workspace = true
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["v1_22", "aws", "janus", "livekit", "whip"]
|
default = ["v1_22", "aws", "janus", "livekit", "whip", "whep"]
|
||||||
static = []
|
static = []
|
||||||
capi = []
|
capi = []
|
||||||
v1_22 = ["gst/v1_22", "gst-app/v1_22", "gst-video/v1_22", "gst-webrtc/v1_22", "gst-sdp/v1_22", "gst-rtp/v1_22"]
|
v1_22 = ["gst/v1_22", "gst-app/v1_22", "gst-video/v1_22", "gst-webrtc/v1_22", "gst-sdp/v1_22", "gst-rtp/v1_22"]
|
||||||
|
@ -89,7 +92,8 @@ aws = ["dep:aws-config", "dep:aws-types", "dep:aws-credential-types", "dep:aws-s
|
||||||
"dep:aws-sdk-kinesisvideosignaling", "dep:data-encoding", "dep:http", "dep:url-escape"]
|
"dep:aws-sdk-kinesisvideosignaling", "dep:data-encoding", "dep:http", "dep:url-escape"]
|
||||||
janus = ["dep:http"]
|
janus = ["dep:http"]
|
||||||
livekit = ["dep:livekit-protocol", "dep:livekit-api"]
|
livekit = ["dep:livekit-protocol", "dep:livekit-api"]
|
||||||
whip = ["dep:async-recursion", "dep:crossbeam-channel", "dep:reqwest", "dep:warp"]
|
whip = ["dep:async-recursion", "dep:reqwest", "dep:warp", "dep:ctrlc"]
|
||||||
|
whep = ["dep:async-recursion", "dep:reqwest", "dep:warp"]
|
||||||
|
|
||||||
[package.metadata.capi]
|
[package.metadata.capi]
|
||||||
min_version = "0.9.21"
|
min_version = "0.9.21"
|
||||||
|
@ -119,3 +123,6 @@ name = "webrtc-precise-sync-send"
|
||||||
|
|
||||||
[[example]]
|
[[example]]
|
||||||
name = "webrtc-precise-sync-recv"
|
name = "webrtc-precise-sync-recv"
|
||||||
|
|
||||||
|
[[example]]
|
||||||
|
name = "whipserver"
|
||||||
|
|
123
net/webrtc/examples/whipserver.rs
Normal file
123
net/webrtc/examples/whipserver.rs
Normal file
|
@ -0,0 +1,123 @@
|
||||||
|
use std::process::exit;
|
||||||
|
|
||||||
|
use anyhow::Error;
|
||||||
|
use clap::Parser;
|
||||||
|
use gst::prelude::*;
|
||||||
|
|
||||||
|
#[derive(Parser, Debug)]
|
||||||
|
struct Args {
|
||||||
|
host_addr: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn link_video(pad: &gst::Pad, pipeline: &gst::Pipeline) {
|
||||||
|
let q = gst::ElementFactory::make_with_name(
|
||||||
|
"queue",
|
||||||
|
Some(format!("queue_{}", pad.name()).as_str()),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
// let vconv = gst::ElementFactory::make_with_name("videoconvert", Some(format!("vconv_{}",pad.name()).as_str())).unwrap();
|
||||||
|
let vsink = gst::ElementFactory::make_with_name(
|
||||||
|
"autovideosink",
|
||||||
|
Some(format!("vsink_{}", pad.name()).as_str()),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
pipeline.add_many([&q, &vsink]).unwrap();
|
||||||
|
gst::Element::link_many([&q, &vsink]).unwrap();
|
||||||
|
let qsinkpad = q.static_pad("sink").unwrap();
|
||||||
|
pad.link(&qsinkpad).expect("linking should work");
|
||||||
|
|
||||||
|
q.sync_state_with_parent().unwrap();
|
||||||
|
// vconv.sync_state_with_parent().unwrap();
|
||||||
|
vsink.sync_state_with_parent().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn unlink_video(pad: &gst::Pad, pipeline: &gst::Pipeline) {
|
||||||
|
let q = pipeline
|
||||||
|
.by_name(format!("queue_{}", pad.name()).as_str())
|
||||||
|
.unwrap();
|
||||||
|
// let vconv = pipeline.by_name(format!("vconv_{}",pad.name()).as_str()).unwrap();
|
||||||
|
let vsink = pipeline
|
||||||
|
.by_name(format!("vsink_{}", pad.name()).as_str())
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
q.set_state(gst::State::Null).unwrap();
|
||||||
|
// vconv.set_state(gst::State::Null).unwrap();
|
||||||
|
vsink.set_state(gst::State::Null).unwrap();
|
||||||
|
|
||||||
|
pipeline.remove_many([&q, &vsink]).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn link_audio(_pad: &gst::Pad) {}
|
||||||
|
|
||||||
|
fn main() -> Result<(), Error> {
|
||||||
|
gst::init()?;
|
||||||
|
|
||||||
|
let args = Args::parse();
|
||||||
|
|
||||||
|
let pipeline = gst::Pipeline::builder().build();
|
||||||
|
let ws = gst::ElementFactory::make("whipserversrc").build()?;
|
||||||
|
ws.dynamic_cast_ref::<gst::ChildProxy>()
|
||||||
|
.unwrap()
|
||||||
|
.set_child_property("signaller::host-addr", &args.host_addr);
|
||||||
|
|
||||||
|
ws.set_property("enable-data-channel-navigation", true);
|
||||||
|
|
||||||
|
let pipe = pipeline.clone();
|
||||||
|
ws.connect_pad_added(move |_ws, pad| {
|
||||||
|
if pad.name().contains("video_") {
|
||||||
|
link_video(pad, &pipe);
|
||||||
|
} else if pad.name().contains("audio_") {
|
||||||
|
} else {
|
||||||
|
println!("unknown pad type {}", pad.name());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let pipe = pipeline.clone();
|
||||||
|
ws.connect_pad_removed(move |_ws, pad| {
|
||||||
|
if pad.name().contains("video_") {
|
||||||
|
unlink_video(pad, &pipe);
|
||||||
|
} else if pad.name().contains("audio_") {
|
||||||
|
} else {
|
||||||
|
println!("unknown pad type {}", pad.name());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
pipeline.add(&ws)?;
|
||||||
|
pipeline.set_state(gst::State::Playing)?;
|
||||||
|
|
||||||
|
let p = pipeline.clone();
|
||||||
|
ctrlc::set_handler(move || {
|
||||||
|
p.set_state(gst::State::Null).unwrap();
|
||||||
|
exit(0);
|
||||||
|
})
|
||||||
|
.expect("Error setting Ctrl-C handler");
|
||||||
|
|
||||||
|
let bus = pipeline.bus().expect("Pipeline should have a bus");
|
||||||
|
for msg in bus.iter_timed(gst::ClockTime::NONE) {
|
||||||
|
use gst::MessageView;
|
||||||
|
|
||||||
|
match msg.view() {
|
||||||
|
MessageView::Eos(..) => {
|
||||||
|
println!("EOS");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
MessageView::Error(err) => {
|
||||||
|
pipeline.set_state(gst::State::Null)?;
|
||||||
|
eprintln!(
|
||||||
|
"Got error from {}: {} ({})",
|
||||||
|
msg.src()
|
||||||
|
.map(|s| String::from(s.path_string()))
|
||||||
|
.unwrap_or_else(|| "None".into()),
|
||||||
|
err.error(),
|
||||||
|
err.debug().unwrap_or_else(|| "".into()),
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pipeline.set_state(gst::State::Null)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
|
@ -24,6 +24,8 @@ pub mod signaller;
|
||||||
pub mod utils;
|
pub mod utils;
|
||||||
pub mod webrtcsink;
|
pub mod webrtcsink;
|
||||||
pub mod webrtcsrc;
|
pub mod webrtcsrc;
|
||||||
|
#[cfg(feature = "whep")]
|
||||||
|
mod whep_signaller;
|
||||||
#[cfg(feature = "whip")]
|
#[cfg(feature = "whip")]
|
||||||
mod whip_signaller;
|
mod whip_signaller;
|
||||||
|
|
||||||
|
|
|
@ -104,9 +104,9 @@ use crate::RUNTIME;
|
||||||
use futures::future;
|
use futures::future;
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use gst::ErrorMessage;
|
use gst::ErrorMessage;
|
||||||
#[cfg(feature = "whip")]
|
#[cfg(any(feature = "whip", feature = "whep"))]
|
||||||
use reqwest::header::HeaderMap;
|
use reqwest::header::HeaderMap;
|
||||||
#[cfg(feature = "whip")]
|
#[cfg(any(feature = "whip", feature = "whep"))]
|
||||||
use reqwest::redirect::Policy;
|
use reqwest::redirect::Policy;
|
||||||
use std::sync::Mutex;
|
use std::sync::Mutex;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
@ -239,7 +239,7 @@ where
|
||||||
res
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "whip")]
|
#[cfg(any(feature = "whip", feature = "whep"))]
|
||||||
pub fn parse_redirect_location(
|
pub fn parse_redirect_location(
|
||||||
headermap: &HeaderMap,
|
headermap: &HeaderMap,
|
||||||
old_url: &reqwest::Url,
|
old_url: &reqwest::Url,
|
||||||
|
@ -280,13 +280,13 @@ pub fn parse_redirect_location(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "whip")]
|
#[cfg(any(feature = "whip", feature = "whep"))]
|
||||||
pub fn build_reqwest_client(pol: Policy) -> reqwest::Client {
|
pub fn build_reqwest_client(pol: Policy) -> reqwest::Client {
|
||||||
let client_builder = reqwest::Client::builder();
|
let client_builder = reqwest::Client::builder();
|
||||||
client_builder.redirect(pol).build().unwrap()
|
client_builder.redirect(pol).build().unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "whip")]
|
#[cfg(any(feature = "whip", feature = "whep"))]
|
||||||
pub fn set_ice_servers(
|
pub fn set_ice_servers(
|
||||||
webrtcbin: &gst::Element,
|
webrtcbin: &gst::Element,
|
||||||
headermap: &HeaderMap,
|
headermap: &HeaderMap,
|
||||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -59,6 +59,11 @@ glib::wrapper! {
|
||||||
pub struct LiveKitWebRTCSrc(ObjectSubclass<imp::livekit::LiveKitWebRTCSrc>) @extends BaseWebRTCSrc, gst::Bin, gst::Element, gst::Object, gst::ChildProxy;
|
pub struct LiveKitWebRTCSrc(ObjectSubclass<imp::livekit::LiveKitWebRTCSrc>) @extends BaseWebRTCSrc, gst::Bin, gst::Element, gst::Object, gst::ChildProxy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "whep")]
|
||||||
|
glib::wrapper! {
|
||||||
|
pub struct WhepClientSrc(ObjectSubclass<imp::whep::WhepClientSrc>) @extends BaseWebRTCSrc, gst::Bin, gst::Element, gst::Object, @implements gst::URIHandler, gst::ChildProxy;
|
||||||
|
}
|
||||||
|
|
||||||
glib::wrapper! {
|
glib::wrapper! {
|
||||||
pub struct WebRTCSrcPad(ObjectSubclass<pad::WebRTCSrcPad>) @extends gst::GhostPad, gst::ProxyPad, gst::Pad, gst::Object;
|
pub struct WebRTCSrcPad(ObjectSubclass<pad::WebRTCSrcPad>) @extends gst::GhostPad, gst::ProxyPad, gst::Pad, gst::Object;
|
||||||
}
|
}
|
||||||
|
@ -139,5 +144,13 @@ pub fn register(plugin: Option<&gst::Plugin>) -> Result<(), glib::BoolError> {
|
||||||
LiveKitWebRTCSrc::static_type(),
|
LiveKitWebRTCSrc::static_type(),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
#[cfg(feature = "whep")]
|
||||||
|
gst::Element::register(
|
||||||
|
plugin,
|
||||||
|
"whepclientsrc",
|
||||||
|
gst::Rank::PRIMARY,
|
||||||
|
WhepClientSrc::static_type(),
|
||||||
|
)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
600
net/webrtc/src/whep_signaller/client.rs
Normal file
600
net/webrtc/src/whep_signaller/client.rs
Normal file
|
@ -0,0 +1,600 @@
|
||||||
|
// Copyright (C) 2022, Asymptotic Inc.
|
||||||
|
// Author: Sanchayan Maity <sanchayan@asymptotic.io>
|
||||||
|
//
|
||||||
|
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
|
||||||
|
// If a copy of the MPL was not distributed with this file, You can obtain one at
|
||||||
|
// <https://mozilla.org/MPL/2.0/>.
|
||||||
|
//
|
||||||
|
// SPDX-License-Identifier: MPL-2.0
|
||||||
|
|
||||||
|
use crate::signaller::{Signallable, SignallableImpl};
|
||||||
|
use crate::utils::{
|
||||||
|
build_reqwest_client, parse_redirect_location, set_ice_servers, wait, wait_async, WaitError,
|
||||||
|
};
|
||||||
|
use crate::RUNTIME;
|
||||||
|
use async_recursion::async_recursion;
|
||||||
|
use bytes::Bytes;
|
||||||
|
use futures::future;
|
||||||
|
use gst::glib::RustClosure;
|
||||||
|
use gst::{glib, prelude::*, subclass::prelude::*};
|
||||||
|
use gst_sdp::*;
|
||||||
|
use gst_webrtc::*;
|
||||||
|
use once_cell::sync::Lazy;
|
||||||
|
use reqwest::header::{HeaderMap, HeaderValue};
|
||||||
|
use reqwest::StatusCode;
|
||||||
|
use std::sync::Mutex;
|
||||||
|
|
||||||
|
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
||||||
|
gst::DebugCategory::new(
|
||||||
|
"whep-client-signaller",
|
||||||
|
gst::DebugColorFlags::empty(),
|
||||||
|
Some("WHEP Client Signaller"),
|
||||||
|
)
|
||||||
|
});
|
||||||
|
|
||||||
|
const MAX_REDIRECTS: u8 = 10;
|
||||||
|
const DEFAULT_TIMEOUT: u32 = 15;
|
||||||
|
const SESSION_ID: &str = "whep-client";
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
struct Settings {
|
||||||
|
whep_endpoint: Option<String>,
|
||||||
|
auth_token: Option<String>,
|
||||||
|
use_link_headers: bool,
|
||||||
|
timeout: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::derivable_impls)]
|
||||||
|
impl Default for Settings {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
whep_endpoint: None,
|
||||||
|
auth_token: None,
|
||||||
|
use_link_headers: false,
|
||||||
|
timeout: DEFAULT_TIMEOUT,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum State {
|
||||||
|
Stopped,
|
||||||
|
Post { redirects: u8 },
|
||||||
|
Running { whep_resource: String },
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for State {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::Stopped
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct WhepClient {
|
||||||
|
settings: Mutex<Settings>,
|
||||||
|
state: Mutex<State>,
|
||||||
|
canceller: Mutex<Option<future::AbortHandle>>,
|
||||||
|
client: reqwest::Client,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for WhepClient {
|
||||||
|
fn default() -> Self {
|
||||||
|
// We'll handle redirects manually since the default redirect handler does not
|
||||||
|
// reuse the authentication token on the redirected server
|
||||||
|
let pol = reqwest::redirect::Policy::none();
|
||||||
|
let client = build_reqwest_client(pol);
|
||||||
|
|
||||||
|
Self {
|
||||||
|
settings: Mutex::new(Settings::default()),
|
||||||
|
state: Mutex::new(State::default()),
|
||||||
|
canceller: Mutex::new(None),
|
||||||
|
client,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ObjectImpl for WhepClient {
|
||||||
|
fn properties() -> &'static [glib::ParamSpec] {
|
||||||
|
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
|
||||||
|
vec![
|
||||||
|
glib::ParamSpecString::builder("whep-endpoint")
|
||||||
|
.nick("WHEP Endpoint")
|
||||||
|
.blurb("The WHEP server endpoint to POST SDP offer to.")
|
||||||
|
.build(),
|
||||||
|
glib::ParamSpecBoolean::builder("use-link-headers")
|
||||||
|
.nick("Use Link Headers")
|
||||||
|
.blurb("Use link headers to configure STUN/TURN servers if present in WHEP endpoint response.")
|
||||||
|
.build(),
|
||||||
|
glib::ParamSpecString::builder("auth-token")
|
||||||
|
.nick("Authorization Token")
|
||||||
|
.blurb("Authentication token to use, will be sent in the HTTP Header as 'Bearer <auth-token>'")
|
||||||
|
.build(),
|
||||||
|
glib::ParamSpecUInt::builder("timeout")
|
||||||
|
.nick("Timeout")
|
||||||
|
.blurb("Value in seconds to timeout WHEP endpoint requests (0 = No timeout).")
|
||||||
|
.maximum(3600)
|
||||||
|
.default_value(DEFAULT_TIMEOUT)
|
||||||
|
.readwrite()
|
||||||
|
.build(),
|
||||||
|
]
|
||||||
|
});
|
||||||
|
PROPERTIES.as_ref()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
|
||||||
|
match pspec.name() {
|
||||||
|
"whep-endpoint" => {
|
||||||
|
let mut settings = self.settings.lock().unwrap();
|
||||||
|
settings.whep_endpoint = value.get().expect("WHEP endpoint should be a string");
|
||||||
|
}
|
||||||
|
"use-link-headers" => {
|
||||||
|
let mut settings = self.settings.lock().unwrap();
|
||||||
|
settings.use_link_headers = value
|
||||||
|
.get()
|
||||||
|
.expect("use-link-headers should be a boolean value");
|
||||||
|
}
|
||||||
|
"auth-token" => {
|
||||||
|
let mut settings = self.settings.lock().unwrap();
|
||||||
|
settings.auth_token = value.get().expect("Auth token should be a string");
|
||||||
|
}
|
||||||
|
"timeout" => {
|
||||||
|
let mut settings = self.settings.lock().unwrap();
|
||||||
|
settings.timeout = value.get().expect("type checked upstream");
|
||||||
|
}
|
||||||
|
_ => unimplemented!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
|
||||||
|
match pspec.name() {
|
||||||
|
"whep-endpoint" => {
|
||||||
|
let settings = self.settings.lock().unwrap();
|
||||||
|
settings.whep_endpoint.to_value()
|
||||||
|
}
|
||||||
|
"use-link-headers" => {
|
||||||
|
let settings = self.settings.lock().unwrap();
|
||||||
|
settings.use_link_headers.to_value()
|
||||||
|
}
|
||||||
|
"auth-token" => {
|
||||||
|
let settings = self.settings.lock().unwrap();
|
||||||
|
settings.auth_token.to_value()
|
||||||
|
}
|
||||||
|
"timeout" => {
|
||||||
|
let settings = self.settings.lock().unwrap();
|
||||||
|
settings.timeout.to_value()
|
||||||
|
}
|
||||||
|
_ => unimplemented!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[glib::object_subclass]
|
||||||
|
impl ObjectSubclass for WhepClient {
|
||||||
|
const NAME: &'static str = "GstWhepClientSignaller";
|
||||||
|
type Type = super::WhepClientSignaller;
|
||||||
|
type ParentType = glib::Object;
|
||||||
|
type Interfaces = (Signallable,);
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WhepClient {
|
||||||
|
fn raise_error(&self, msg: String) {
|
||||||
|
self.obj()
|
||||||
|
.emit_by_name::<()>("error", &[&format!("Error: {msg}")]);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_future_error(&self, err: WaitError) {
|
||||||
|
match err {
|
||||||
|
WaitError::FutureAborted => {
|
||||||
|
gst::warning!(CAT, imp: self, "Future aborted")
|
||||||
|
}
|
||||||
|
WaitError::FutureError(err) => self.raise_error(err.to_string()),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
fn sdp_message_parse(&self, sdp_bytes: Bytes, _webrtcbin: &gst::Element) {
|
||||||
|
let sdp = match sdp_message::SDPMessage::parse_buffer(&sdp_bytes) {
|
||||||
|
Ok(sdp) => sdp,
|
||||||
|
Err(_) => {
|
||||||
|
self.raise_error("Could not parse answer SDP".to_string());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let remote_sdp = WebRTCSessionDescription::new(WebRTCSDPType::Answer, sdp);
|
||||||
|
|
||||||
|
self.obj()
|
||||||
|
.emit_by_name::<()>("session-description", &[&SESSION_ID, &remote_sdp]);
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn parse_endpoint_response(
|
||||||
|
&self,
|
||||||
|
sess_desc: WebRTCSessionDescription,
|
||||||
|
resp: reqwest::Response,
|
||||||
|
redirects: u8,
|
||||||
|
webrtcbin: gst::Element,
|
||||||
|
) {
|
||||||
|
let endpoint;
|
||||||
|
let use_link_headers;
|
||||||
|
|
||||||
|
{
|
||||||
|
let settings = self.settings.lock().unwrap();
|
||||||
|
endpoint =
|
||||||
|
reqwest::Url::parse(settings.whep_endpoint.as_ref().unwrap().as_str()).unwrap();
|
||||||
|
use_link_headers = settings.use_link_headers;
|
||||||
|
drop(settings);
|
||||||
|
}
|
||||||
|
|
||||||
|
match resp.status() {
|
||||||
|
StatusCode::OK | StatusCode::NO_CONTENT => {
|
||||||
|
gst::info!(CAT, imp: self, "SDP offer successfully send");
|
||||||
|
}
|
||||||
|
|
||||||
|
StatusCode::CREATED => {
|
||||||
|
gst::debug!(CAT, imp: self, "Response headers: {:?}", resp.headers());
|
||||||
|
|
||||||
|
if use_link_headers {
|
||||||
|
if let Err(e) = set_ice_servers(&webrtcbin, resp.headers()) {
|
||||||
|
self.raise_error(e.to_string());
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/* See section 4.2 of the WHEP specification */
|
||||||
|
let location = match resp.headers().get(reqwest::header::LOCATION) {
|
||||||
|
Some(location) => location,
|
||||||
|
None => {
|
||||||
|
self.raise_error(
|
||||||
|
"Location header field should be present for WHEP resource URL"
|
||||||
|
.to_string(),
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let location = match location.to_str() {
|
||||||
|
Ok(loc) => loc,
|
||||||
|
Err(e) => {
|
||||||
|
self.raise_error(format!("Failed to convert location to string: {e}"));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let url = reqwest::Url::parse(endpoint.as_str()).unwrap();
|
||||||
|
|
||||||
|
gst::debug!(CAT, imp: self, "WHEP resource: {:?}", location);
|
||||||
|
|
||||||
|
let url = match url.join(location) {
|
||||||
|
Ok(joined_url) => joined_url,
|
||||||
|
Err(err) => {
|
||||||
|
self.raise_error(format!("URL join operation failed: {err:?}"));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
match resp.bytes().await {
|
||||||
|
Ok(ans_bytes) => {
|
||||||
|
let mut state = self.state.lock().unwrap();
|
||||||
|
*state = match *state {
|
||||||
|
State::Post { redirects: _r } => State::Running {
|
||||||
|
whep_resource: url.to_string(),
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
self.raise_error("Expected to be in POST state".to_string());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
drop(state);
|
||||||
|
|
||||||
|
self.sdp_message_parse(ans_bytes, &webrtcbin)
|
||||||
|
}
|
||||||
|
Err(err) => self.raise_error(err.to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
status if status.is_redirection() => {
|
||||||
|
if redirects < MAX_REDIRECTS {
|
||||||
|
match parse_redirect_location(resp.headers(), &endpoint) {
|
||||||
|
Ok(redirect_url) => {
|
||||||
|
{
|
||||||
|
let mut state = self.state.lock().unwrap();
|
||||||
|
*state = match *state {
|
||||||
|
State::Post { redirects: _r } => State::Post {
|
||||||
|
redirects: redirects + 1,
|
||||||
|
},
|
||||||
|
/*
|
||||||
|
* As per section 4.6 of the specification, redirection is
|
||||||
|
* not required to be supported for the PATCH and DELETE
|
||||||
|
* requests to the final WHEP resource URL. Only the initial
|
||||||
|
* POST request may support redirection.
|
||||||
|
*/
|
||||||
|
State::Running { .. } => {
|
||||||
|
self.raise_error(
|
||||||
|
"Unexpected redirection in RUNNING state".to_string(),
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
State::Stopped => unreachable!(),
|
||||||
|
};
|
||||||
|
drop(state);
|
||||||
|
}
|
||||||
|
|
||||||
|
gst::warning!(
|
||||||
|
CAT,
|
||||||
|
imp: self,
|
||||||
|
"Redirecting endpoint to {}",
|
||||||
|
redirect_url.as_str()
|
||||||
|
);
|
||||||
|
|
||||||
|
self.do_post(sess_desc, webrtcbin, redirect_url).await
|
||||||
|
}
|
||||||
|
Err(e) => self.raise_error(e.to_string()),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
self.raise_error("Too many redirects. Unable to connect.".to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
s => {
|
||||||
|
match resp.bytes().await {
|
||||||
|
Ok(r) => {
|
||||||
|
let res = r.escape_ascii().to_string();
|
||||||
|
|
||||||
|
// FIXME: Check and handle 'Retry-After' header in case of server error
|
||||||
|
self.raise_error(format!("Unexpected response: {} - {}", s.as_str(), res));
|
||||||
|
}
|
||||||
|
Err(err) => self.raise_error(err.to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn whep_offer(&self, webrtcbin: gst::Element) {
|
||||||
|
let local_desc =
|
||||||
|
webrtcbin.property::<Option<WebRTCSessionDescription>>("local-description");
|
||||||
|
|
||||||
|
let sess_desc = match local_desc {
|
||||||
|
None => {
|
||||||
|
self.raise_error("Local description is not set".to_string());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Some(mut local_desc) => {
|
||||||
|
local_desc.set_type(WebRTCSDPType::Offer);
|
||||||
|
local_desc
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
gst::debug!(
|
||||||
|
CAT,
|
||||||
|
imp: self,
|
||||||
|
"Sending offer SDP: {:?}",
|
||||||
|
sess_desc.sdp().as_text()
|
||||||
|
);
|
||||||
|
|
||||||
|
let timeout;
|
||||||
|
let endpoint;
|
||||||
|
|
||||||
|
{
|
||||||
|
let settings = self.settings.lock().unwrap();
|
||||||
|
timeout = settings.timeout;
|
||||||
|
endpoint =
|
||||||
|
reqwest::Url::parse(settings.whep_endpoint.as_ref().unwrap().as_str()).unwrap();
|
||||||
|
drop(settings);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Err(e) = wait_async(
|
||||||
|
&self.canceller,
|
||||||
|
self.do_post(sess_desc, webrtcbin, endpoint),
|
||||||
|
timeout,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
self.handle_future_error(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_recursion]
|
||||||
|
async fn do_post(
|
||||||
|
&self,
|
||||||
|
offer: WebRTCSessionDescription,
|
||||||
|
webrtcbin: gst::Element,
|
||||||
|
endpoint: reqwest::Url,
|
||||||
|
) {
|
||||||
|
let auth_token;
|
||||||
|
|
||||||
|
{
|
||||||
|
let settings = self.settings.lock().unwrap();
|
||||||
|
auth_token = settings.auth_token.clone();
|
||||||
|
drop(settings);
|
||||||
|
}
|
||||||
|
|
||||||
|
let sdp = offer.sdp();
|
||||||
|
let body = sdp.as_text().unwrap();
|
||||||
|
|
||||||
|
gst::info!(CAT, imp: self, "Using endpoint {}", endpoint.as_str());
|
||||||
|
|
||||||
|
let mut headermap = HeaderMap::new();
|
||||||
|
headermap.insert(
|
||||||
|
reqwest::header::CONTENT_TYPE,
|
||||||
|
HeaderValue::from_static("application/sdp"),
|
||||||
|
);
|
||||||
|
|
||||||
|
if let Some(token) = auth_token.as_ref() {
|
||||||
|
let bearer_token = "Bearer ".to_owned() + token.as_str();
|
||||||
|
headermap.insert(
|
||||||
|
reqwest::header::AUTHORIZATION,
|
||||||
|
HeaderValue::from_str(bearer_token.as_str())
|
||||||
|
.expect("Failed to set auth token to header"),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
gst::debug!(
|
||||||
|
CAT,
|
||||||
|
imp: self,
|
||||||
|
"Url for HTTP POST request: {}",
|
||||||
|
endpoint.as_str()
|
||||||
|
);
|
||||||
|
|
||||||
|
let resp = self
|
||||||
|
.client
|
||||||
|
.request(reqwest::Method::POST, endpoint.clone())
|
||||||
|
.headers(headermap)
|
||||||
|
.body(body)
|
||||||
|
.send()
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match resp {
|
||||||
|
Ok(r) => {
|
||||||
|
#[allow(unused_mut)]
|
||||||
|
let mut redirects;
|
||||||
|
|
||||||
|
{
|
||||||
|
let state = self.state.lock().unwrap();
|
||||||
|
redirects = match *state {
|
||||||
|
State::Post { redirects } => redirects,
|
||||||
|
_ => {
|
||||||
|
self.raise_error("Trying to do POST in unexpected state".to_string());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
drop(state);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.parse_endpoint_response(offer, r, redirects, webrtcbin)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
Err(err) => self.raise_error(err.to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn terminate_session(&self) {
|
||||||
|
let settings = self.settings.lock().unwrap();
|
||||||
|
let state = self.state.lock().unwrap();
|
||||||
|
let timeout = settings.timeout;
|
||||||
|
|
||||||
|
let resource_url = match *state {
|
||||||
|
State::Running {
|
||||||
|
whep_resource: ref whep_resource_url,
|
||||||
|
} => whep_resource_url.clone(),
|
||||||
|
_ => {
|
||||||
|
self.raise_error("Terminated in unexpected state".to_string());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
drop(state);
|
||||||
|
|
||||||
|
let mut headermap = HeaderMap::new();
|
||||||
|
if let Some(token) = &settings.auth_token {
|
||||||
|
let bearer_token = "Bearer ".to_owned() + token.as_str();
|
||||||
|
headermap.insert(
|
||||||
|
reqwest::header::AUTHORIZATION,
|
||||||
|
HeaderValue::from_str(bearer_token.as_str())
|
||||||
|
.expect("Failed to set auth token to header"),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
drop(settings);
|
||||||
|
|
||||||
|
gst::debug!(CAT, imp: self, "DELETE request on {}", resource_url);
|
||||||
|
|
||||||
|
/* DELETE request goes to the WHEP resource URL. See section 3 of the specification. */
|
||||||
|
let client = build_reqwest_client(reqwest::redirect::Policy::default());
|
||||||
|
let future = async {
|
||||||
|
client
|
||||||
|
.delete(resource_url.clone())
|
||||||
|
.headers(headermap)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.map_err(|err| {
|
||||||
|
gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["DELETE request failed {}: {:?}", resource_url, err]
|
||||||
|
)
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
let res = wait(&self.canceller, future, timeout);
|
||||||
|
match res {
|
||||||
|
Ok(r) => {
|
||||||
|
gst::debug!(CAT, imp: self, "Response to DELETE : {}", r.status());
|
||||||
|
}
|
||||||
|
Err(e) => match e {
|
||||||
|
WaitError::FutureAborted => {
|
||||||
|
gst::warning!(CAT, imp: self, "DELETE request aborted")
|
||||||
|
}
|
||||||
|
WaitError::FutureError(e) => {
|
||||||
|
gst::error!(CAT, imp: self, "Error on DELETE request : {}", e)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn on_webrtcbin_ready(&self) -> RustClosure {
|
||||||
|
glib::closure!(|signaller: &super::WhepClientSignaller,
|
||||||
|
_consumer_identifier: &str,
|
||||||
|
webrtcbin: &gst::Element| {
|
||||||
|
let obj_weak = signaller.downgrade();
|
||||||
|
|
||||||
|
webrtcbin.connect_notify(Some("ice-gathering-state"), move |webrtcbin, _pspec| {
|
||||||
|
let Some(obj) = obj_weak.upgrade() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let state = webrtcbin.property::<WebRTCICEGatheringState>("ice-gathering-state");
|
||||||
|
|
||||||
|
match state {
|
||||||
|
WebRTCICEGatheringState::Gathering => {
|
||||||
|
gst::info!(CAT, obj: obj, "ICE gathering started");
|
||||||
|
}
|
||||||
|
WebRTCICEGatheringState::Complete => {
|
||||||
|
gst::info!(CAT, obj: obj, "ICE gathering complete");
|
||||||
|
|
||||||
|
let webrtcbin = webrtcbin.clone();
|
||||||
|
|
||||||
|
RUNTIME.spawn(async move { obj.imp().whep_offer(webrtcbin).await });
|
||||||
|
}
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
});
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SignallableImpl for WhepClient {
|
||||||
|
fn start(&self) {
|
||||||
|
if self.settings.lock().unwrap().whep_endpoint.is_none() {
|
||||||
|
self.raise_error("WHEP endpoint URL must be set".to_string());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut state = self.state.lock().unwrap();
|
||||||
|
*state = State::Post { redirects: 0 };
|
||||||
|
drop(state);
|
||||||
|
|
||||||
|
self.obj()
|
||||||
|
.emit_by_name::<()>("session-started", &[&SESSION_ID, &SESSION_ID]);
|
||||||
|
self.obj().emit_by_name::<()>(
|
||||||
|
"session-requested",
|
||||||
|
&[
|
||||||
|
&SESSION_ID,
|
||||||
|
&SESSION_ID,
|
||||||
|
&None::<gst_webrtc::WebRTCSessionDescription>,
|
||||||
|
],
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn stop(&self) {}
|
||||||
|
|
||||||
|
fn end_session(&self, _session_id: &str) {
|
||||||
|
// Interrupt requests in progress, if any
|
||||||
|
if let Some(canceller) = &*self.canceller.lock().unwrap() {
|
||||||
|
canceller.abort();
|
||||||
|
}
|
||||||
|
|
||||||
|
let state = self.state.lock().unwrap();
|
||||||
|
if let State::Running { .. } = *state {
|
||||||
|
// Release server-side resources
|
||||||
|
drop(state);
|
||||||
|
self.terminate_session();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
21
net/webrtc/src/whep_signaller/mod.rs
Normal file
21
net/webrtc/src/whep_signaller/mod.rs
Normal file
|
@ -0,0 +1,21 @@
|
||||||
|
// SPDX-License-Identifier: MPL-2.0
|
||||||
|
|
||||||
|
use crate::signaller::Signallable;
|
||||||
|
use gst::{glib, prelude::ObjectExt, subclass::prelude::ObjectSubclassIsExt};
|
||||||
|
|
||||||
|
mod client;
|
||||||
|
|
||||||
|
glib::wrapper! {
|
||||||
|
pub struct WhepClientSignaller(ObjectSubclass<client::WhepClient>) @implements Signallable;
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl Send for WhepClientSignaller {}
|
||||||
|
unsafe impl Sync for WhepClientSignaller {}
|
||||||
|
|
||||||
|
impl Default for WhepClientSignaller {
|
||||||
|
fn default() -> Self {
|
||||||
|
let sig: WhepClientSignaller = glib::Object::new();
|
||||||
|
sig.connect_closure("webrtcbin-ready", false, sig.imp().on_webrtcbin_ready());
|
||||||
|
sig
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,9 +18,8 @@ use reqwest::header::HeaderValue;
|
||||||
use reqwest::StatusCode;
|
use reqwest::StatusCode;
|
||||||
use std::sync::Mutex;
|
use std::sync::Mutex;
|
||||||
|
|
||||||
use core::time::Duration;
|
|
||||||
use crossbeam_channel::unbounded;
|
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
use url::Url;
|
use url::Url;
|
||||||
use warp::{
|
use warp::{
|
||||||
http,
|
http,
|
||||||
|
@ -47,7 +46,6 @@ const ENDPOINT_PATH: &str = "endpoint";
|
||||||
const RESOURCE_PATH: &str = "resource";
|
const RESOURCE_PATH: &str = "resource";
|
||||||
const DEFAULT_HOST_ADDR: &str = "http://127.0.0.1:8080";
|
const DEFAULT_HOST_ADDR: &str = "http://127.0.0.1:8080";
|
||||||
const DEFAULT_STUN_SERVER: Option<&str> = Some("stun://stun.l.google.com:19303");
|
const DEFAULT_STUN_SERVER: Option<&str> = Some("stun://stun.l.google.com:19303");
|
||||||
const DEFAULT_PRODUCER_PEER_ID: Option<&str> = Some("whip-client");
|
|
||||||
const CONTENT_SDP: &str = "application/sdp";
|
const CONTENT_SDP: &str = "application/sdp";
|
||||||
const CONTENT_TRICKLE_ICE: &str = "application/trickle-ice-sdpfrag";
|
const CONTENT_TRICKLE_ICE: &str = "application/trickle-ice-sdpfrag";
|
||||||
|
|
||||||
|
@ -193,7 +191,7 @@ impl WhipClient {
|
||||||
let mut headermap = HeaderMap::new();
|
let mut headermap = HeaderMap::new();
|
||||||
headermap.insert(
|
headermap.insert(
|
||||||
reqwest::header::CONTENT_TYPE,
|
reqwest::header::CONTENT_TYPE,
|
||||||
HeaderValue::from_static("application/sdp"),
|
HeaderValue::from_static(CONTENT_SDP),
|
||||||
);
|
);
|
||||||
|
|
||||||
if let Some(token) = auth_token.as_ref() {
|
if let Some(token) = auth_token.as_ref() {
|
||||||
|
@ -616,27 +614,14 @@ impl ObjectImpl for WhipClient {
|
||||||
// WHIP server implementation
|
// WHIP server implementation
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum WhipServerState {
|
|
||||||
Idle,
|
|
||||||
Negotiating,
|
|
||||||
Ready,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for WhipServerState {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self::Idle
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct WhipServerSettings {
|
struct WhipServerSettings {
|
||||||
stun_server: Option<String>,
|
stun_server: Option<String>,
|
||||||
turn_servers: gst::Array,
|
turn_servers: gst::Array,
|
||||||
host_addr: Url,
|
host_addr: Url,
|
||||||
producer_peer_id: Option<String>,
|
|
||||||
timeout: u32,
|
timeout: u32,
|
||||||
shutdown_signal: Option<tokio::sync::oneshot::Sender<()>>,
|
shutdown_signal: Option<tokio::sync::oneshot::Sender<()>>,
|
||||||
server_handle: Option<tokio::task::JoinHandle<()>>,
|
server_handle: Option<tokio::task::JoinHandle<()>>,
|
||||||
sdp_answer: Option<crossbeam_channel::Sender<Option<SDPMessage>>>,
|
sdp_answer: Option<mpsc::Sender<Option<SDPMessage>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for WhipServerSettings {
|
impl Default for WhipServerSettings {
|
||||||
|
@ -645,7 +630,6 @@ impl Default for WhipServerSettings {
|
||||||
host_addr: Url::parse(DEFAULT_HOST_ADDR).unwrap(),
|
host_addr: Url::parse(DEFAULT_HOST_ADDR).unwrap(),
|
||||||
stun_server: DEFAULT_STUN_SERVER.map(String::from),
|
stun_server: DEFAULT_STUN_SERVER.map(String::from),
|
||||||
turn_servers: gst::Array::new(Vec::new() as Vec<glib::SendValue>),
|
turn_servers: gst::Array::new(Vec::new() as Vec<glib::SendValue>),
|
||||||
producer_peer_id: DEFAULT_PRODUCER_PEER_ID.map(String::from),
|
|
||||||
timeout: DEFAULT_TIMEOUT,
|
timeout: DEFAULT_TIMEOUT,
|
||||||
shutdown_signal: None,
|
shutdown_signal: None,
|
||||||
server_handle: None,
|
server_handle: None,
|
||||||
|
@ -654,18 +638,10 @@ impl Default for WhipServerSettings {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
pub struct WhipServer {
|
pub struct WhipServer {
|
||||||
state: Mutex<WhipServerState>,
|
|
||||||
settings: Mutex<WhipServerSettings>,
|
settings: Mutex<WhipServerSettings>,
|
||||||
}
|
canceller: Mutex<Option<futures::future::AbortHandle>>,
|
||||||
|
|
||||||
impl Default for WhipServer {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self {
|
|
||||||
settings: Mutex::new(WhipServerSettings::default()),
|
|
||||||
state: Mutex::new(WhipServerState::default()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
@ -694,7 +670,7 @@ impl WhipServer {
|
||||||
WebRTCICEGatheringState::Complete => {
|
WebRTCICEGatheringState::Complete => {
|
||||||
gst::info!(CAT, obj: obj, "ICE gathering complete");
|
gst::info!(CAT, obj: obj, "ICE gathering complete");
|
||||||
let ans: Option<gst_sdp::SDPMessage>;
|
let ans: Option<gst_sdp::SDPMessage>;
|
||||||
let settings = obj.imp().settings.lock().unwrap();
|
let mut settings = obj.imp().settings.lock().unwrap();
|
||||||
if let Some(answer_desc) = webrtcbin
|
if let Some(answer_desc) = webrtcbin
|
||||||
.property::<Option<WebRTCSessionDescription>>("local-description")
|
.property::<Option<WebRTCSessionDescription>>("local-description")
|
||||||
{
|
{
|
||||||
|
@ -702,9 +678,22 @@ impl WhipServer {
|
||||||
} else {
|
} else {
|
||||||
ans = None;
|
ans = None;
|
||||||
}
|
}
|
||||||
if let Some(tx) = &settings.sdp_answer {
|
let tx = settings
|
||||||
tx.send(ans).unwrap()
|
.sdp_answer
|
||||||
}
|
.take()
|
||||||
|
.expect("SDP answer Sender needs to be valid");
|
||||||
|
|
||||||
|
let obj_weak = obj.downgrade();
|
||||||
|
RUNTIME.spawn(async move {
|
||||||
|
let obj = match obj_weak.upgrade() {
|
||||||
|
Some(obj) => obj,
|
||||||
|
None => return,
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = tx.send(ans).await {
|
||||||
|
gst::error!(CAT, obj: obj, "Failed to send SDP {e}");
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
|
@ -722,57 +711,23 @@ impl WhipServer {
|
||||||
//FIXME: add state checking once ICE trickle is implemented
|
//FIXME: add state checking once ICE trickle is implemented
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn delete_handler(&self, _id: String) -> Result<impl warp::Reply, warp::Rejection> {
|
async fn delete_handler(&self, id: String) -> Result<impl warp::Reply, warp::Rejection> {
|
||||||
let mut state = self.state.lock().unwrap();
|
if self
|
||||||
match *state {
|
.obj()
|
||||||
WhipServerState::Ready => {
|
.emit_by_name::<bool>("session-ended", &[&id.as_str()])
|
||||||
// FIXME: session-ended will make webrtcsrc send EOS
|
{
|
||||||
// and producer-removed is not handled
|
gst::info!(CAT, imp:self, "Ended session {id}");
|
||||||
// Need to address the usecase where when the client terminates
|
} else {
|
||||||
// the webrtcsrc should be running without sending EOS and reset
|
gst::info!(CAT, imp:self, "Failed to End session {id}");
|
||||||
// for next client connection like a usual server
|
// FIXME: Do we send a different response
|
||||||
|
|
||||||
self.obj().emit_by_name::<bool>("session-ended", &[&ROOT]);
|
|
||||||
|
|
||||||
gst::info!(CAT, imp:self, "Ending session");
|
|
||||||
*state = WhipServerState::Idle;
|
|
||||||
Ok(warp::reply::reply().into_response())
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
gst::error!(CAT, imp: self, "DELETE requested in {state:?} state. Can't Proceed");
|
|
||||||
let res = http::Response::builder()
|
|
||||||
.status(http::StatusCode::CONFLICT)
|
|
||||||
.body(Body::from(String::from("Session not Ready")))
|
|
||||||
.unwrap();
|
|
||||||
Ok(res)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Ok(warp::reply::reply().into_response())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn options_handler(&self) -> Result<impl warp::Reply, warp::Rejection> {
|
async fn options_handler(&self) -> Result<impl warp::Reply, warp::Rejection> {
|
||||||
let settings = self.settings.lock().unwrap();
|
let settings = self.settings.lock().unwrap();
|
||||||
let peer_id = settings.producer_peer_id.clone().unwrap();
|
|
||||||
drop(settings);
|
drop(settings);
|
||||||
|
|
||||||
let mut state = self.state.lock().unwrap();
|
|
||||||
match *state {
|
|
||||||
WhipServerState::Idle => {
|
|
||||||
self.obj()
|
|
||||||
.emit_by_name::<()>("session-started", &[&ROOT, &peer_id]);
|
|
||||||
*state = WhipServerState::Negotiating
|
|
||||||
}
|
|
||||||
WhipServerState::Ready => {
|
|
||||||
gst::error!(CAT, imp: self, "OPTIONS requested in {state:?} state. Can't proceed");
|
|
||||||
let res = http::Response::builder()
|
|
||||||
.status(http::StatusCode::CONFLICT)
|
|
||||||
.body(Body::from(String::from("Session active already")))
|
|
||||||
.unwrap();
|
|
||||||
return Ok(res);
|
|
||||||
}
|
|
||||||
_ => {}
|
|
||||||
};
|
|
||||||
drop(state);
|
|
||||||
|
|
||||||
let mut links = HeaderMap::new();
|
let mut links = HeaderMap::new();
|
||||||
let settings = self.settings.lock().unwrap();
|
let settings = self.settings.lock().unwrap();
|
||||||
match &settings.stun_server {
|
match &settings.stun_server {
|
||||||
|
@ -806,7 +761,7 @@ impl WhipServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut res = http::Response::builder()
|
let mut res = http::Response::builder()
|
||||||
.header("Access-Post", "application/sdp")
|
.header("Access-Post", CONTENT_SDP)
|
||||||
.body(Body::empty())
|
.body(Body::empty())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
@ -820,31 +775,15 @@ impl WhipServer {
|
||||||
&self,
|
&self,
|
||||||
body: warp::hyper::body::Bytes,
|
body: warp::hyper::body::Bytes,
|
||||||
) -> Result<http::Response<warp::hyper::Body>, warp::Rejection> {
|
) -> Result<http::Response<warp::hyper::Body>, warp::Rejection> {
|
||||||
let mut settings = self.settings.lock().unwrap();
|
let session_id = uuid::Uuid::new_v4().to_string();
|
||||||
let peer_id = settings.producer_peer_id.clone().unwrap();
|
let (tx, mut rx) = mpsc::channel::<Option<SDPMessage>>(1);
|
||||||
let wait_timeout = settings.timeout;
|
let wait_timeout = {
|
||||||
let (tx, rx) = unbounded::<Option<SDPMessage>>();
|
let mut settings = self.settings.lock().unwrap();
|
||||||
settings.sdp_answer = Some(tx);
|
let wait_timeout = settings.timeout;
|
||||||
drop(settings);
|
settings.sdp_answer = Some(tx);
|
||||||
|
drop(settings);
|
||||||
let mut state = self.state.lock().unwrap();
|
wait_timeout
|
||||||
match *state {
|
|
||||||
WhipServerState::Idle => {
|
|
||||||
self.obj()
|
|
||||||
.emit_by_name::<()>("session-started", &[&ROOT, &peer_id]);
|
|
||||||
*state = WhipServerState::Negotiating
|
|
||||||
}
|
|
||||||
WhipServerState::Ready => {
|
|
||||||
gst::error!(CAT, imp: self, "POST requested in {state:?} state. Can't Proceed");
|
|
||||||
let res = http::Response::builder()
|
|
||||||
.status(http::StatusCode::CONFLICT)
|
|
||||||
.body(Body::from(String::from("Session active already")))
|
|
||||||
.unwrap();
|
|
||||||
return Ok(res);
|
|
||||||
}
|
|
||||||
_ => {}
|
|
||||||
};
|
};
|
||||||
drop(state);
|
|
||||||
|
|
||||||
match gst_sdp::SDPMessage::parse_buffer(body.as_ref()) {
|
match gst_sdp::SDPMessage::parse_buffer(body.as_ref()) {
|
||||||
Ok(offer_sdp) => {
|
Ok(offer_sdp) => {
|
||||||
|
@ -854,7 +793,9 @@ impl WhipServer {
|
||||||
);
|
);
|
||||||
|
|
||||||
self.obj()
|
self.obj()
|
||||||
.emit_by_name::<()>("session-description", &[&"unique", &offer]);
|
.emit_by_name::<()>("session-started", &[&session_id, &session_id]);
|
||||||
|
self.obj()
|
||||||
|
.emit_by_name::<()>("session-description", &[&session_id, &offer]);
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
gst::error!(CAT, imp: self, "Could not parse offer SDP: {err}");
|
gst::error!(CAT, imp: self, "Could not parse offer SDP: {err}");
|
||||||
|
@ -864,20 +805,32 @@ impl WhipServer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// We don't want to wait infinitely for the ice gathering to complete.
|
let result = wait_async(&self.canceller, rx.recv(), wait_timeout).await;
|
||||||
let answer = match rx.recv_timeout(Duration::from_secs(wait_timeout as u64)) {
|
|
||||||
Ok(a) => a,
|
let answer = match result {
|
||||||
Err(e) => {
|
Ok(ans) => match ans {
|
||||||
let reply = warp::reply::reply();
|
Some(a) => a,
|
||||||
let res;
|
None => {
|
||||||
if e.is_timeout() {
|
let err = "Channel closed, can't receive SDP".to_owned();
|
||||||
res = warp::reply::with_status(reply, http::StatusCode::REQUEST_TIMEOUT);
|
let res = http::Response::builder()
|
||||||
gst::error!(CAT, imp: self, "Timedout waiting for SDP answer");
|
.status(http::StatusCode::INTERNAL_SERVER_ERROR)
|
||||||
} else {
|
.body(Body::from(err))
|
||||||
res = warp::reply::with_status(reply, http::StatusCode::INTERNAL_SERVER_ERROR);
|
.unwrap();
|
||||||
gst::error!(CAT, imp: self, "Channel got disconnected");
|
|
||||||
|
return Ok(res);
|
||||||
}
|
}
|
||||||
return Ok(res.into_response());
|
},
|
||||||
|
Err(e) => {
|
||||||
|
let err = match e {
|
||||||
|
WaitError::FutureAborted => "Aborted".to_owned(),
|
||||||
|
WaitError::FutureError(err) => err.to_string(),
|
||||||
|
};
|
||||||
|
let res = http::Response::builder()
|
||||||
|
.status(http::StatusCode::INTERNAL_SERVER_ERROR)
|
||||||
|
.body(Body::from(err))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
return Ok(res);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -947,10 +900,10 @@ impl WhipServer {
|
||||||
drop(settings);
|
drop(settings);
|
||||||
|
|
||||||
// Got SDP answer, send answer in the response
|
// Got SDP answer, send answer in the response
|
||||||
let resource_url = "/".to_owned() + ROOT + "/" + RESOURCE_PATH + "/" + &peer_id;
|
let resource_url = "/".to_owned() + ROOT + "/" + RESOURCE_PATH + "/" + &session_id;
|
||||||
let mut res = http::Response::builder()
|
let mut res = http::Response::builder()
|
||||||
.status(StatusCode::CREATED)
|
.status(StatusCode::CREATED)
|
||||||
.header(CONTENT_TYPE, "application/sdp")
|
.header(CONTENT_TYPE, CONTENT_SDP)
|
||||||
.header("location", resource_url)
|
.header("location", resource_url)
|
||||||
.body(Body::from(ans_text.unwrap()))
|
.body(Body::from(ans_text.unwrap()))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
@ -958,10 +911,6 @@ impl WhipServer {
|
||||||
let headers = res.headers_mut();
|
let headers = res.headers_mut();
|
||||||
headers.extend(links);
|
headers.extend(links);
|
||||||
|
|
||||||
let mut state = self.state.lock().unwrap();
|
|
||||||
*state = WhipServerState::Ready;
|
|
||||||
drop(state);
|
|
||||||
|
|
||||||
Ok(res)
|
Ok(res)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1117,7 +1066,8 @@ impl SignallableImpl for WhipServer {
|
||||||
gst::info!(CAT, imp: self, "stopped the WHIP server");
|
gst::info!(CAT, imp: self, "stopped the WHIP server");
|
||||||
}
|
}
|
||||||
|
|
||||||
fn end_session(&self, _session_id: &str) {
|
fn end_session(&self, session_id: &str) {
|
||||||
|
gst::info!(CAT, imp: self, "Session {session_id} ended");
|
||||||
//FIXME: send any events to the client
|
//FIXME: send any events to the client
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1140,11 +1090,6 @@ impl ObjectImpl for WhipServer {
|
||||||
.default_value(DEFAULT_HOST_ADDR)
|
.default_value(DEFAULT_HOST_ADDR)
|
||||||
.flags(glib::ParamFlags::READWRITE)
|
.flags(glib::ParamFlags::READWRITE)
|
||||||
.build(),
|
.build(),
|
||||||
// needed by webrtcsrc in handle_webrtc_src_pad
|
|
||||||
glib::ParamSpecString::builder("producer-peer-id")
|
|
||||||
.default_value(DEFAULT_PRODUCER_PEER_ID)
|
|
||||||
.flags(glib::ParamFlags::READABLE)
|
|
||||||
.build(),
|
|
||||||
glib::ParamSpecString::builder("stun-server")
|
glib::ParamSpecString::builder("stun-server")
|
||||||
.nick("STUN Server")
|
.nick("STUN Server")
|
||||||
.blurb("The STUN server of the form stun://hostname:port")
|
.blurb("The STUN server of the form stun://hostname:port")
|
||||||
|
@ -1204,7 +1149,6 @@ impl ObjectImpl for WhipServer {
|
||||||
"host-addr" => settings.host_addr.to_string().to_value(),
|
"host-addr" => settings.host_addr.to_string().to_value(),
|
||||||
"stun-server" => settings.stun_server.to_value(),
|
"stun-server" => settings.stun_server.to_value(),
|
||||||
"turn-servers" => settings.turn_servers.to_value(),
|
"turn-servers" => settings.turn_servers.to_value(),
|
||||||
"producer-peer-id" => settings.producer_peer_id.to_value(),
|
|
||||||
"timeout" => settings.timeout.to_value(),
|
"timeout" => settings.timeout.to_value(),
|
||||||
_ => unimplemented!(),
|
_ => unimplemented!(),
|
||||||
}
|
}
|
||||||
|
|
|
@ -134,7 +134,7 @@ impl Dav1dDec {
|
||||||
let matrix = match pic.matrix_coefficients() {
|
let matrix = match pic.matrix_coefficients() {
|
||||||
pixel::MatrixCoefficients::Identity => gst_video::VideoColorMatrix::Rgb,
|
pixel::MatrixCoefficients::Identity => gst_video::VideoColorMatrix::Rgb,
|
||||||
pixel::MatrixCoefficients::BT709 => gst_video::VideoColorMatrix::Bt709,
|
pixel::MatrixCoefficients::BT709 => gst_video::VideoColorMatrix::Bt709,
|
||||||
pixel::MatrixCoefficients::Unspecified => gst_video::VideoColorMatrix::Unknown,
|
pixel::MatrixCoefficients::Unspecified => gst_video::VideoColorMatrix::Bt709,
|
||||||
pixel::MatrixCoefficients::BT470M => gst_video::VideoColorMatrix::Fcc,
|
pixel::MatrixCoefficients::BT470M => gst_video::VideoColorMatrix::Fcc,
|
||||||
pixel::MatrixCoefficients::BT470BG => gst_video::VideoColorMatrix::Bt601,
|
pixel::MatrixCoefficients::BT470BG => gst_video::VideoColorMatrix::Bt601,
|
||||||
pixel::MatrixCoefficients::ST240M => gst_video::VideoColorMatrix::Smpte240m,
|
pixel::MatrixCoefficients::ST240M => gst_video::VideoColorMatrix::Smpte240m,
|
||||||
|
@ -149,7 +149,7 @@ impl Dav1dDec {
|
||||||
|
|
||||||
let transfer = match pic.transfer_characteristic() {
|
let transfer = match pic.transfer_characteristic() {
|
||||||
pixel::TransferCharacteristic::BT1886 => gst_video::VideoTransferFunction::Bt709,
|
pixel::TransferCharacteristic::BT1886 => gst_video::VideoTransferFunction::Bt709,
|
||||||
pixel::TransferCharacteristic::Unspecified => gst_video::VideoTransferFunction::Unknown,
|
pixel::TransferCharacteristic::Unspecified => gst_video::VideoTransferFunction::Bt709,
|
||||||
pixel::TransferCharacteristic::BT470M => gst_video::VideoTransferFunction::Bt709,
|
pixel::TransferCharacteristic::BT470M => gst_video::VideoTransferFunction::Bt709,
|
||||||
pixel::TransferCharacteristic::BT470BG => gst_video::VideoTransferFunction::Gamma28,
|
pixel::TransferCharacteristic::BT470BG => gst_video::VideoTransferFunction::Gamma28,
|
||||||
pixel::TransferCharacteristic::ST170M => gst_video::VideoTransferFunction::Bt601,
|
pixel::TransferCharacteristic::ST170M => gst_video::VideoTransferFunction::Bt601,
|
||||||
|
@ -180,7 +180,7 @@ impl Dav1dDec {
|
||||||
|
|
||||||
let primaries = match pic.color_primaries() {
|
let primaries = match pic.color_primaries() {
|
||||||
pixel::ColorPrimaries::BT709 => gst_video::VideoColorPrimaries::Bt709,
|
pixel::ColorPrimaries::BT709 => gst_video::VideoColorPrimaries::Bt709,
|
||||||
pixel::ColorPrimaries::Unspecified => gst_video::VideoColorPrimaries::Unknown,
|
pixel::ColorPrimaries::Unspecified => gst_video::VideoColorPrimaries::Bt709,
|
||||||
pixel::ColorPrimaries::BT470M => gst_video::VideoColorPrimaries::Bt470m,
|
pixel::ColorPrimaries::BT470M => gst_video::VideoColorPrimaries::Bt470m,
|
||||||
pixel::ColorPrimaries::BT470BG => gst_video::VideoColorPrimaries::Bt470bg,
|
pixel::ColorPrimaries::BT470BG => gst_video::VideoColorPrimaries::Bt470bg,
|
||||||
pixel::ColorPrimaries::ST240M => gst_video::VideoColorPrimaries::Smpte240m,
|
pixel::ColorPrimaries::ST240M => gst_video::VideoColorPrimaries::Smpte240m,
|
||||||
|
|
Loading…
Reference in a new issue