webrtcsink: Add a mechanism for SDP munging

Unfortunately, server implementations might have odd SDP-related quirks,
so let's allow clients a way to work around these oddities themselves.
For now, this means that a client can fix up the H.264 profile-level-id
as required by Twitch (whose media pipeline is more permissive than the
WHIP implementation).

Fixes: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/516
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1525>
This commit is contained in:
Arun Raghavan 2024-04-03 11:31:55 -04:00
parent 83f76280f5
commit 1c54c77840
8 changed files with 148 additions and 5 deletions

View file

@ -10015,6 +10015,20 @@
"return-type": "void", "return-type": "void",
"when": "last" "when": "last"
}, },
"munge-session-description": {
"args": [
{
"name": "arg0",
"type": "gchararray"
},
{
"name": "arg1",
"type": "GstWebRTCSessionDescription"
}
],
"return-type": "GstWebRTCSessionDescription",
"when": "last"
},
"producer-added": { "producer-added": {
"args": [ "args": [
{ {

View file

@ -651,6 +651,12 @@ impl ObjectImpl for Signaller {
fn properties() -> &'static [glib::ParamSpec] { fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| { static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![ vec![
glib::ParamSpecBoolean::builder("manual-sdp-munging")
.nick("Manual SDP munging")
.blurb("Whether the signaller manages SDP munging itself")
.default_value(false)
.read_only()
.build(),
glib::ParamSpecString::builder("address") glib::ParamSpecString::builder("address")
.nick("Address") .nick("Address")
.blurb("Address of the signalling server") .blurb("Address of the signalling server")
@ -721,6 +727,7 @@ impl ObjectImpl for Signaller {
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() { match pspec.name() {
"manual-sdp-munging" => false.to_value(),
"address" => self.settings.lock().unwrap().address.to_value(), "address" => self.settings.lock().unwrap().address.to_value(),
"cafile" => { "cafile" => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();

View file

@ -295,6 +295,7 @@ impl Default for Settings {
#[properties(wrapper_type = super::JanusVRSignaller)] #[properties(wrapper_type = super::JanusVRSignaller)]
pub struct Signaller { pub struct Signaller {
state: Mutex<State>, state: Mutex<State>,
#[property(name="manual-sdp-munging", default = false, get = |_| false, type = bool, blurb = "Whether the signaller manages SDP munging itself")]
#[property(name="janus-endpoint", get, set, type = String, member = janus_endpoint, blurb = "The Janus server endpoint to POST SDP offer to")] #[property(name="janus-endpoint", get, set, type = String, member = janus_endpoint, blurb = "The Janus server endpoint to POST SDP offer to")]
#[property(name="display-name", get, set, type = String, member = display_name, blurb = "The name of the publisher in the Janus Video Room")] #[property(name="display-name", get, set, type = String, member = display_name, blurb = "The name of the publisher in the Janus Video Room")]
#[property(name="secret-key", get, set, type = String, member = secret_key, blurb = "The secret API key to communicate with Janus server")] #[property(name="secret-key", get, set, type = String, member = secret_key, blurb = "The secret API key to communicate with Janus server")]

View file

@ -777,6 +777,12 @@ impl ObjectImpl for Signaller {
fn properties() -> &'static [glib::ParamSpec] { fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| { static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![ vec![
glib::ParamSpecBoolean::builder("manual-sdp-munging")
.nick("Manual SDP munging")
.blurb("Whether the signaller manages SDP munging itself")
.default_value(false)
.read_only()
.build(),
glib::ParamSpecString::builder("ws-url") glib::ParamSpecString::builder("ws-url")
.nick("WebSocket URL") .nick("WebSocket URL")
.blurb("The URL of the websocket of the LiveKit server") .blurb("The URL of the websocket of the LiveKit server")
@ -897,6 +903,7 @@ impl ObjectImpl for Signaller {
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
match pspec.name() { match pspec.name() {
"manual-sdp-munging" => false.to_value(),
"ws-url" => settings.wsurl.to_value(), "ws-url" => settings.wsurl.to_value(),
"api-key" => settings.api_key.to_value(), "api-key" => settings.api_key.to_value(),
"secret-key" => settings.secret_key.to_value(), "secret-key" => settings.secret_key.to_value(),

View file

@ -58,6 +58,19 @@ impl prelude::ObjectInterface for Signallable {
iface.end_session = Signallable::end_session; iface.end_session = Signallable::end_session;
} }
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![glib::ParamSpecBoolean::builder("manual-sdp-munging")
.nick("Manual SDP munging")
.blurb("Whether the signaller manages SDP munging itself")
.default_value(false)
.read_only()
.build()]
});
PROPERTIES.as_ref()
}
fn signals() -> &'static [Signal] { fn signals() -> &'static [Signal] {
static SIGNALS: Lazy<Vec<Signal>> = Lazy::new(|| { static SIGNALS: Lazy<Vec<Signal>> = Lazy::new(|| {
vec![ vec![
@ -309,6 +322,47 @@ impl prelude::ObjectInterface for Signallable {
Signal::builder("consumer-removed") Signal::builder("consumer-removed")
.param_types([String::static_type(), gst::Element::static_type()]) .param_types([String::static_type(), gst::Element::static_type()])
.build(), .build(),
/**
* GstRSWebRTCSignallableIface::munge-session-description:
* @self: The object implementing #GstRSWebRTCSignallableIface
* @session_id: Id of the session being described
* @description: The WebRTC session description to modify
*
* For special-case handling, a callback can be registered to modify the session
* description before the signaller sends it to the peer. Only the first connected
* handler has any effect.
*
* Return: A modified session description
*/
Signal::builder("munge-session-description")
.run_last()
.param_types([
str::static_type(),
gst_webrtc::WebRTCSessionDescription::static_type(),
])
.return_type::<gst_webrtc::WebRTCSessionDescription>()
.class_handler(|_tokens, args| {
let _ = args[0usize]
.get::<&super::Signallable>()
.unwrap_or_else(|e| {
panic!("Wrong type for argument {}: {:?}", 0usize, e)
});
let _ = args[1usize].get::<&str>().unwrap_or_else(|e| {
panic!("Wrong type for argument {}: {:?}", 1usize, e)
});
let desc = args[2usize]
.get::<&gst_webrtc::WebRTCSessionDescription>()
.unwrap_or_else(|e| {
panic!("Wrong type for argument {}: {:?}", 2usize, e)
});
Some(desc.clone().into())
})
.accumulator(move |_hint, output, input| {
*output = input.clone();
false
})
.build(),
/** /**
* GstRSWebRTCSignallableIface::send-session-description: * GstRSWebRTCSignallableIface::send-session-description:
* @self: The object implementing #GstRSWebRTCSignallableIface * @self: The object implementing #GstRSWebRTCSignallableIface
@ -499,6 +553,11 @@ pub trait SignallableImpl: object::ObjectImpl + Send + Sync + 'static {
pub trait SignallableExt: 'static { pub trait SignallableExt: 'static {
fn start(&self); fn start(&self);
fn stop(&self); fn stop(&self);
fn munge_sdp(
&self,
session_id: &str,
sdp: &gst_webrtc::WebRTCSessionDescription,
) -> gst_webrtc::WebRTCSessionDescription;
fn send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription); fn send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription);
fn add_ice( fn add_ice(
&self, &self,
@ -519,6 +578,17 @@ impl<Obj: IsA<super::Signallable>> SignallableExt for Obj {
self.emit_by_name::<bool>("stop", &[]); self.emit_by_name::<bool>("stop", &[]);
} }
fn munge_sdp(
&self,
session_id: &str,
sdp: &gst_webrtc::WebRTCSessionDescription,
) -> gst_webrtc::WebRTCSessionDescription {
self.emit_by_name::<gst_webrtc::WebRTCSessionDescription>(
"munge-session-description",
&[&session_id, sdp],
)
}
fn send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription) { fn send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription) {
self.emit_by_name::<bool>("send-session-description", &[&session_id, sdp]); self.emit_by_name::<bool>("send-session-description", &[&session_id, sdp]);
} }

View file

@ -509,6 +509,12 @@ impl ObjectImpl for Signaller {
fn properties() -> &'static [glib::ParamSpec] { fn properties() -> &'static [glib::ParamSpec] {
static PROPS: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| { static PROPS: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![ vec![
glib::ParamSpecBoolean::builder("manual-sdp-munging")
.nick("Manual SDP munging")
.blurb("Whether the signaller manages SDP munging itself")
.default_value(false)
.read_only()
.build(),
glib::ParamSpecString::builder("uri") glib::ParamSpecString::builder("uri")
.nick("Signaller URI") .nick("Signaller URI")
.blurb("URI for connecting to the signaller server") .blurb("URI for connecting to the signaller server")
@ -604,6 +610,7 @@ impl ObjectImpl for Signaller {
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
match pspec.name() { match pspec.name() {
"manual-sdp-munging" => false.to_value(),
"uri" => settings.uri.to_string().to_value(), "uri" => settings.uri.to_string().to_value(),
"producer-peer-id" => { "producer-peer-id" => {
if !matches!(settings.role, WebRTCSignallerRole::Consumer) { if !matches!(settings.role, WebRTCSignallerRole::Consumer) {

View file

@ -2169,7 +2169,17 @@ impl BaseWebRTCSink {
.emit_by_name::<()>("set-local-description", &[&offer, &None::<gst::Promise>]); .emit_by_name::<()>("set-local-description", &[&offer, &None::<gst::Promise>]);
drop(state); drop(state);
signaller.send_sdp(session_id, &offer); let maybe_munged_offer = if signaller
.has_property("manual-sdp-munging", Some(bool::static_type()))
&& signaller.property("manual-sdp-munging")
{
// Don't munge, signaller will manage this
offer
} else {
// Use the default munging mechanism (signal registered by user)
signaller.munge_sdp(session_id, &offer)
};
signaller.send_sdp(session_id, &maybe_munged_offer);
} }
} }
@ -2213,7 +2223,19 @@ impl BaseWebRTCSink {
} }
drop(state); drop(state);
signaller.send_sdp(&session_id, &answer);
let maybe_munged_answer = if signaller
.has_property("manual-sdp-munging", Some(bool::static_type()))
&& signaller.property("manual-sdp-munging")
{
// Don't munge, signaller will manage this
answer
} else {
// Use the default munging mechanism (signal registered by user)
signaller.munge_sdp(session_id.as_str(), &answer)
};
signaller.send_sdp(&session_id, &maybe_munged_answer);
self.on_remote_description_set(element, session_id) self.on_remote_description_set(element, session_id)
} }

View file

@ -1,6 +1,6 @@
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use crate::signaller::{Signallable, SignallableImpl}; use crate::signaller::{Signallable, SignallableExt, SignallableImpl};
use crate::utils::{ use crate::utils::{
build_link_header, build_reqwest_client, parse_redirect_location, set_ice_servers, wait, build_link_header, build_reqwest_client, parse_redirect_location, set_ice_servers, wait,
wait_async, WaitError, wait_async, WaitError,
@ -118,7 +118,7 @@ impl WhipClient {
self.raise_error("Local description is not set".to_string()); self.raise_error("Local description is not set".to_string());
return; return;
} }
Some(offer) => offer, Some(offer) => self.obj().munge_sdp("unique", &offer),
}; };
gst::debug!( gst::debug!(
@ -533,7 +533,14 @@ impl ObjectSubclass for WhipClient {
impl ObjectImpl for WhipClient { impl ObjectImpl for WhipClient {
fn properties() -> &'static [glib::ParamSpec] { fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| { static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![glib::ParamSpecString::builder("whip-endpoint") vec![
glib::ParamSpecBoolean::builder("manual-sdp-munging")
.nick("Manual SDP munging")
.blurb("Whether the signaller manages SDP munging itself")
.default_value(false)
.read_only()
.build(),
glib::ParamSpecString::builder("whip-endpoint")
.nick("WHIP Endpoint") .nick("WHIP Endpoint")
.blurb("The WHIP server endpoint to POST SDP offer to. .blurb("The WHIP server endpoint to POST SDP offer to.
e.g.: https://example.com/whip/endpoint/room1234") e.g.: https://example.com/whip/endpoint/room1234")
@ -590,6 +597,7 @@ impl ObjectImpl for WhipClient {
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() { match pspec.name() {
"manual-sdp-munging" => true.to_value(),
"whip-endpoint" => { "whip-endpoint" => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
settings.whip_endpoint.to_value() settings.whip_endpoint.to_value()
@ -1079,6 +1087,12 @@ impl ObjectImpl for WhipServer {
fn properties() -> &'static [glib::ParamSpec] { fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| { static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![ vec![
glib::ParamSpecBoolean::builder("manual-sdp-munging")
.nick("Manual SDP munging")
.blurb("Whether the signaller manages SDP munging itself")
.default_value(false)
.read_only()
.build(),
glib::ParamSpecString::builder("host-addr") glib::ParamSpecString::builder("host-addr")
.nick("Host address") .nick("Host address")
.blurb("The the host address of the WHIP endpoint e.g., http://127.0.0.1:8080") .blurb("The the host address of the WHIP endpoint e.g., http://127.0.0.1:8080")
@ -1141,6 +1155,7 @@ impl ObjectImpl for WhipServer {
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
match pspec.name() { match pspec.name() {
"manual-sdp-munging" => false.to_value(),
"host-addr" => settings.host_addr.to_string().to_value(), "host-addr" => settings.host_addr.to_string().to_value(),
"stun-server" => settings.stun_server.to_value(), "stun-server" => settings.stun_server.to_value(),
"turn-servers" => settings.turn_servers.to_value(), "turn-servers" => settings.turn_servers.to_value(),