Thibault Saunier 2022-07-12 17:13:38 -04:00
parent 0ae637f531
commit ce3bb2f1d4
12 changed files with 2611 additions and 5 deletions

View file

@ -99,9 +99,17 @@ foreach plugin_name: list_plugin_res.stdout().split(':')
gst_index: 'plugins/index.md', gst_index: 'plugins/index.md',
include_paths: join_paths(meson.current_source_dir(), '..'), include_paths: join_paths(meson.current_source_dir(), '..'),
gst_smart_index: true, gst_smart_index: true,
gst_c_source_filters: [
'../target/*/*.rs',
'../target/*/*/*.rs',
'../target/*/*/*/*.rs',
'../target/*/*/*/*/*.rs',
'../target/*/*/*/*/*/*.rs',
],
gst_c_sources: [ gst_c_sources: [
'../*/*/*/*.rs', '../*/*/*/*.rs',
'../*/*/*/*/*.rs', '../*/*/*/*/*.rs',
'../*/*/*/*/*/*.rs',
], ],
dependencies: [gst_dep], dependencies: [gst_dep],
gst_order_generated_subpages: true, gst_order_generated_subpages: true,

View file

@ -6150,11 +6150,240 @@
"when": "last" "when": "last"
} }
} }
},
"webrtcsrc": {
"author": "Thibault Saunier <tsaunier@igalia.com>",
"description": "WebRTC src",
"hierarchy": [
"GstWebRTCSrc",
"GstBin",
"GstElement",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"interfaces": [
"GstChildProxy",
"GstURIHandler"
],
"klass": "Source/Network/WebRTC",
"long-name": "WebRTCSrc",
"pad-templates": {
"audio_%%u": {
"caps": "audio/x-raw(ANY):\naudio/x-opus:\napplication/x-rtp:\n",
"direction": "src",
"presence": "sometimes",
"type": "GstWebRTCSrcPad"
},
"video_%%u": {
"caps": "video/x-raw(ANY):\napplication/x-rtp:\n",
"direction": "src",
"presence": "sometimes",
"type": "GstWebRTCSrcPad"
}
},
"properties": {
"audio-codecs": {
"blurb": "Names of audio codecs to be be used during the SDP negotiation. Valid values: [OPUS]",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"mutable": "ready",
"readable": true,
"type": "GstValueArray",
"writable": true
},
"meta": {
"blurb": "Free form metadata about the consumer",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"mutable": "ready",
"readable": true,
"type": "GstStructure",
"writable": true
},
"signaller": {
"blurb": "The Signallable object to use to handle WebRTC Signalling",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"mutable": "ready",
"readable": true,
"type": "GstRSWebRTCSignallableIface",
"writable": true
},
"stun-server": {
"blurb": "NULL",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "stun://stun.l.google.com:19302",
"mutable": "null",
"readable": true,
"type": "gchararray",
"writable": true
},
"video-codecs": {
"blurb": "Names of video codecs to be be used during the SDP negotiation. Valid values: [VP8, H264, VP9, H265]",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"mutable": "ready",
"readable": true,
"type": "GstValueArray",
"writable": true
}
},
"rank": "primary"
} }
}, },
"filename": "gstrswebrtc", "filename": "gstrswebrtc",
"license": "MPL-2.0", "license": "MPL-2.0",
"other-types": { "other-types": {
"GstRSWebRTCSignallableIface": {
"hierarchy": [
"GstRSWebRTCSignallableIface",
"GInterface"
],
"kind": "interface",
"signals": {
"error": {
"args": [
{
"name": "arg0",
"type": "gchararray"
}
],
"return-type": "void",
"when": "last"
},
"handle-ice": {
"args": [
{
"name": "arg0",
"type": "gchararray"
},
{
"name": "arg1",
"type": "guint"
},
{
"name": "arg2",
"type": "gchararray"
},
{
"name": "arg3",
"type": "gchararray"
}
],
"return-type": "void",
"when": "last"
},
"producer-added": {
"args": [
{
"name": "arg0",
"type": "gchararray"
},
{
"name": "arg1",
"type": "GstStructure"
}
],
"return-type": "void",
"when": "last"
},
"producer-removed": {
"args": [
{
"name": "arg0",
"type": "gchararray"
},
{
"name": "arg1",
"type": "GstStructure"
}
],
"return-type": "void",
"when": "last"
},
"request-meta": {
"args": [],
"return-type": "GstStructure",
"when": "last"
},
"session-description": {
"args": [
{
"name": "arg0",
"type": "gchararray"
},
{
"name": "arg1",
"type": "GstWebRTCSessionDescription"
}
],
"return-type": "void",
"when": "last"
},
"session-ended": {
"args": [
{
"name": "arg0",
"type": "gchararray"
}
],
"return-type": "void",
"when": "last"
},
"session-requested": {
"args": [
{
"name": "arg0",
"type": "gchararray"
},
{
"name": "arg1",
"type": "gchararray"
}
],
"return-type": "void",
"when": "last"
},
"session-started": {
"args": [
{
"name": "arg0",
"type": "gchararray"
},
{
"name": "arg1",
"type": "gchararray"
}
],
"return-type": "void",
"when": "last"
},
"start": {
"action": true,
"args": [],
"return-type": "void",
"when": "last"
},
"stop": {
"action": true,
"args": [],
"return-type": "void",
"when": "last"
}
}
},
"GstWebRTCSinkCongestionControl": { "GstWebRTCSinkCongestionControl": {
"kind": "enum", "kind": "enum",
"values": [ "values": [
@ -6174,6 +6403,18 @@
"value": "2" "value": "2"
} }
] ]
},
"GstWebRTCSrcPad": {
"hierarchy": [
"GstWebRTCSrcPad",
"GstGhostPad",
"GstProxyPad",
"GstPad",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"kind": "object"
} }
}, },
"package": "gst-plugin-webrtc", "package": "gst-plugin-webrtc",

View file

@ -16,6 +16,8 @@ gst-webrtc = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", pack
gst-sdp = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", package = "gstreamer-sdp", features = ["v1_20"] } gst-sdp = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", package = "gstreamer-sdp", features = ["v1_20"] }
gst-rtp = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", package = "gstreamer-rtp", features = ["v1_20"] } gst-rtp = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", package = "gstreamer-rtp", features = ["v1_20"] }
gst-utils = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", package = "gstreamer-utils" } gst-utils = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", package = "gstreamer-utils" }
gst-base = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", package = "gstreamer-base" }
once_cell = "1.0" once_cell = "1.0"
anyhow = "1" anyhow = "1"
thiserror = "1" thiserror = "1"
@ -29,6 +31,7 @@ serde_json = "1"
fastrand = "1.0" fastrand = "1.0"
gst_plugin_webrtc_protocol = { path="protocol", package = "gst-plugin-webrtc-signalling-protocol" } gst_plugin_webrtc_protocol = { path="protocol", package = "gst-plugin-webrtc-signalling-protocol" }
human_bytes = "0.4" human_bytes = "0.4"
url = "2"
[dev-dependencies] [dev-dependencies]
tracing = { version = "0.1", features = ["log"] } tracing = { version = "0.1", features = ["log"] }

View file

@ -277,10 +277,9 @@ impl Handler {
}, },
)?; )?;
self.peers.get(consumer_id).map_or_else( self.peers
|| Err(anyhow!("No consumer with ID: '{consumer_id}'")), .get(consumer_id)
Ok, .map_or_else(|| Err(anyhow!("No consumer with ID: '{consumer_id}'")), Ok)?;
)?;
let session_id = uuid::Uuid::new_v4().to_string(); let session_id = uuid::Uuid::new_v4().to_string();
self.sessions.insert( self.sessions.insert(

View file

@ -7,14 +7,17 @@
* Since: plugins-rs-0.9 * Since: plugins-rs-0.9
*/ */
use gst::glib; use gst::glib;
use tokio::runtime;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use tokio::runtime;
mod signaller; mod signaller;
pub mod utils;
pub mod webrtcsink; pub mod webrtcsink;
pub mod webrtcsrc;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
webrtcsink::register(plugin)?; webrtcsink::register(plugin)?;
webrtcsrc::register(Some(plugin))?;
Ok(()) Ok(())
} }

87
net/webrtc/src/utils.rs Normal file
View file

@ -0,0 +1,87 @@
use std::collections::HashMap;
use gst::{glib, prelude::*};
pub fn gvalue_to_json(val: &gst::glib::Value) -> Option<serde_json::Value> {
match val.type_() {
glib::Type::STRING => Some(val.get::<String>().unwrap().into()),
glib::Type::BOOL => Some(val.get::<bool>().unwrap().into()),
glib::Type::I32 => Some(val.get::<i32>().unwrap().into()),
glib::Type::U32 => Some(val.get::<u32>().unwrap().into()),
glib::Type::I_LONG | glib::Type::I64 => Some(val.get::<i64>().unwrap().into()),
glib::Type::U_LONG | glib::Type::U64 => Some(val.get::<u64>().unwrap().into()),
glib::Type::F32 => Some(val.get::<f32>().unwrap().into()),
glib::Type::F64 => Some(val.get::<f64>().unwrap().into()),
_ => {
if let Ok(s) = val.get::<gst::Structure>() {
serde_json::to_value(
s.iter()
.filter_map(|(name, value)| {
gvalue_to_json(value).map(|value| (name.to_string(), value))
})
.collect::<HashMap<String, serde_json::Value>>(),
)
.ok()
} else if let Ok(a) = val.get::<gst::Array>() {
serde_json::to_value(
a.iter()
.filter_map(|value| gvalue_to_json(value))
.collect::<Vec<serde_json::Value>>(),
)
.ok()
} else if let Some((_klass, values)) = gst::glib::FlagsValue::from_value(val) {
Some(
values
.iter()
.map(|value| value.nick())
.collect::<Vec<&str>>()
.join("+")
.into(),
)
} else if let Ok(value) = val.serialize() {
Some(value.as_str().into())
} else {
None
}
}
}
}
fn json_to_gststructure(val: &serde_json::Value) -> Option<glib::SendValue> {
match val {
serde_json::Value::Bool(v) => Some(v.to_send_value()),
serde_json::Value::Number(n) => {
if n.is_u64() {
Some(n.as_u64().unwrap().to_send_value())
} else if n.is_i64() {
Some(n.as_i64().unwrap().to_send_value())
} else if n.is_f64() {
Some(n.as_f64().unwrap().to_send_value())
} else {
todo!("Unhandled case {n:?}");
}
}
serde_json::Value::String(v) => Some(v.to_send_value()),
serde_json::Value::Array(v) => {
let array = v
.iter()
.filter_map(json_to_gststructure)
.collect::<Vec<glib::SendValue>>();
Some(gst::Array::from_values(array).to_send_value())
}
serde_json::Value::Object(v) => Some(serialize_json_object(v).to_send_value()),
_ => None,
}
}
pub fn serialize_json_object(val: &serde_json::Map<String, serde_json::Value>) -> gst::Structure {
let mut res = gst::Structure::new_empty("v");
val.iter().for_each(|(k, v)| {
if let Some(gvalue) = json_to_gststructure(v) {
res.set_value(k, gvalue);
}
});
res
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,65 @@
// SPDX-License-Identifier: MPL-2.0
use crate::webrtcsrc::signaller::WebRTCSignallerRole;
use gst::prelude::*;
use gst::{glib, prelude::StaticType};
/**
* element-webrtcsrc:
*
* `webrtcsrc` is the source counterpart of the #webrtcsink element and can be
* used to receive streams from it, it can also be used to easily playback WebRTC
* streams coming from a web browser.
*
* To try the element, you should run #webrtcsink as described in its documentation,
* finding its `peer-id` (in the signalling server logs for example) and then
* run:
*
* ``` bash
* gst-launch-1.0 webrtcsrc signaller::producer-peer-id=<webrtcsink-peer-id> ! videoconvert ! autovideosink
* ```
*
* or directly using `playbin`:
*
* ``` bash
* gst-launch-1.0 playbin3 uri="gstwebrtc://localhost:8443?peer-id=<webrtcsink-peer-id>"
* ```
*
* ## Decoding
*
* To be able to precisely negotiate the WebRTC SDP, `webrtcsrc` is able to decode streams.
* During SDP negotiation we expose our pads based on the peer offer and right after query caps
* to see what downstream supports.
* In practice in `uridecodebinX` or `playbinX`, decoding will happen
* in `decodebinX` but for the case where a `videoconvert` is placed after a `video_XX` pad,
* decoding will happen inside `webrtcsrc`.
*
* Since: 0.10
*/
mod imp;
mod pad;
pub mod signaller;
pub use signaller::{SignallableImpl, SignallableImplExt};
use self::signaller::Signallable;
glib::wrapper! {
pub struct WebRTCSrc(ObjectSubclass<imp::WebRTCSrc>) @extends gst::Bin, gst::Element, gst::Object, @implements gst::URIHandler, gst::ChildProxy;
}
glib::wrapper! {
pub struct WebRTCSrcPad(ObjectSubclass<pad::WebRTCSrcPad>) @extends gst::GhostPad, gst::ProxyPad, gst::Pad, gst::Object;
}
pub fn register(plugin: Option<&gst::Plugin>) -> Result<(), glib::BoolError> {
WebRTCSignallerRole::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
WebRTCSrcPad::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
Signallable::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
gst::Element::register(
plugin,
"webrtcsrc",
gst::Rank::Primary,
WebRTCSrc::static_type(),
)
}

View file

@ -0,0 +1,45 @@
// SPDX-License-Identifier: MPL-2.0
use gst::glib;
use gst::subclass::prelude::*;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Mutex;
#[derive(Default)]
pub struct WebRTCSrcPad {
needs_raw: AtomicBool,
stream_id: Mutex<Option<String>>,
}
impl WebRTCSrcPad {
pub fn set_needs_decoding(&self, raw_wanted: bool) {
self.needs_raw.store(raw_wanted, Ordering::SeqCst);
}
pub fn needs_decoding(&self) -> bool {
self.needs_raw.load(Ordering::SeqCst)
}
pub fn set_stream_id(&self, stream_id: &str) {
*self.stream_id.lock().unwrap() = Some(stream_id.to_string());
}
pub fn stream_id(&self) -> String {
let stream_id = self.stream_id.lock().unwrap();
stream_id.as_ref().unwrap().clone()
}
}
#[glib::object_subclass]
impl ObjectSubclass for WebRTCSrcPad {
const NAME: &'static str = "GstWebRTCSrcPad";
type Type = super::WebRTCSrcPad;
type ParentType = gst::GhostPad;
}
impl ObjectImpl for WebRTCSrcPad {}
impl GstObjectImpl for WebRTCSrcPad {}
impl PadImpl for WebRTCSrcPad {}
impl ProxyPadImpl for WebRTCSrcPad {}
impl GhostPadImpl for WebRTCSrcPad {}

View file

@ -0,0 +1,426 @@
use gst::glib;
use gst::glib::subclass::*;
use gst::prelude::*;
use gst::subclass::prelude::*;
use once_cell::sync::Lazy;
#[derive(Copy, Clone)]
pub struct Signallable {
_parent: glib::gobject_ffi::GTypeInterface,
pub start: fn(&super::Signallable),
pub stop: fn(&super::Signallable),
pub send_sdp: fn(&super::Signallable, &str, &gst_webrtc::WebRTCSessionDescription),
pub add_ice: fn(&super::Signallable, &str, &str, Option<u32>, Option<String>),
pub end_session: fn(&super::Signallable, &str),
}
impl Signallable {
fn request_meta(_iface: &super::Signallable) -> Option<gst::Structure> {
None
}
fn start(_iface: &super::Signallable) {}
fn stop(_iface: &super::Signallable) {}
fn send_sdp(
_iface: &super::Signallable,
_session_id: &str,
_sdp: &gst_webrtc::WebRTCSessionDescription,
) {
}
fn add_ice(
_iface: &super::Signallable,
_session_id: &str,
_candidate: &str,
_sdp_m_line_index: Option<u32>,
_sdp_mid: Option<String>,
) {
}
fn end_session(_iface: &super::Signallable, _session_id: &str) {}
}
#[glib::object_interface]
unsafe impl prelude::ObjectInterface for Signallable {
const NAME: &'static ::std::primitive::str = "GstRSWebRTCSignallableIface";
type Prerequisites = (glib::Object,);
fn interface_init(&mut self) {
self.start = Signallable::start;
self.stop = Signallable::stop;
self.send_sdp = Signallable::send_sdp;
self.add_ice = Signallable::add_ice;
self.end_session = Signallable::end_session;
}
fn signals() -> &'static [Signal] {
static SIGNALS: Lazy<Vec<Signal>> = Lazy::new(|| {
vec![
/**
* GstRSWebRTCSignallableIface::session-ended:
* @self: The object implementing #GstRSWebRTCSignallableIface
* @session-id: The ID of the session that ended
*
* Some WebRTC Session was closed.
*/
Signal::builder("session-ended")
.param_types([str::static_type()])
.build(),
/**
* GstRSWebRTCSignallableIface::producer-added:
* @self: The object implementing #GstRSWebRTCSignallableIface
* @producer_id: The ID of the producer that was added
* @meta: The metadata structure of the producer
*
* Some new producing peer is ready to produce a WebRTC stream.
*/
Signal::builder("producer-added")
.param_types([str::static_type(), <Option<gst::Structure>>::static_type()])
.build(),
/**
* GstRSWebRTCSignallableIface::producer-removed:
* @self: The object implementing #GstRSWebRTCSignallableIface
* @producer_id: The ID of the producer that was added
* @meta: The metadata structure of the producer
*
* Some new producing peer is stopped producing streams.
*/
Signal::builder("producer-removed")
.param_types([str::static_type(), <Option<gst::Structure>>::static_type()])
.build(),
/**
* GstRSWebRTCSignallableIface::session-started:
* @self: The object implementing #GstRSWebRTCSignallableIface
*
* A new session started,
*/
Signal::builder("session-started")
.param_types([str::static_type(), str::static_type()])
.build(),
/**
* GstRSWebRTCSignallableIface::session-requested:
* @self: The object implementing #GstRSWebRTCSignallableIface
* @session_id: The ID of the producer that was added
* @peer_id: The ID of the consumer peer who wants to initiate a
* session
*/
Signal::builder("session-requested")
.param_types([str::static_type(), str::static_type()])
.build(),
/**
* GstRSWebRTCSignallableIface::error:
* @self: The object implementing #GstRSWebRTCSignallableIface
* @error: The error message as a string
*/
Signal::builder("error")
.param_types([str::static_type()])
.build(),
/**
* GstRSWebRTCSignallableIface::request-meta:
* @self: The object implementing #GstRSWebRTCSignallableIface
*
* The signaller requests a meta about the peer using it
*
* Return: The metadata about the peer represented by the signaller
*/
Signal::builder("request-meta")
.return_type::<Option<gst::Structure>>()
.class_handler(|_token, args| {
let arg0 = args[0usize]
.get::<&super::Signallable>()
.unwrap_or_else(|e| {
panic!("Wrong type for argument {}: {:?}", 0usize, e)
});
Some(Signallable::request_meta(arg0).to_value())
})
.build(),
/**
* GstRSWebRTCSignallableIface::handle-ice:
* @self: The object implementing #GstRSWebRTCSignallableIface
* @session_id: Id of the session the ice information is about
* @sdp_m_line_index: The mlineindex of the ice candidate
* @sdp_mid: Media ID of the ice candidate
* @candiate: Information about the candidate
*/
Signal::builder("handle-ice")
.param_types([
str::static_type(),
u32::static_type(),
<Option<String>>::static_type(),
str::static_type(),
])
.build(),
/**
* GstRSWebRTCSignallableIface::session-description:
* @self: The object implementing #GstRSWebRTCSignallableIface
* @session_id: Id of the session being described
* @description: The WebRTC session description
*/
Signal::builder("session-description")
.param_types([
str::static_type(),
gst_webrtc::WebRTCSessionDescription::static_type(),
])
.build(),
/**
* GstRSWebRTCSignallableIface::start:
* @self: The object implementing #GstRSWebRTCSignallableIface
*
* Starts the signaller, connecting it to the signalling server.
*/
Signal::builder("start")
.flags(glib::SignalFlags::ACTION)
.class_handler(|_token, args| {
let arg0 = args[0usize]
.get::<&super::Signallable>()
.unwrap_or_else(|e| {
panic!("Wrong type for argument {}: {:?}", 0usize, e)
});
Signallable::start(arg0);
None
})
.build(),
/**
* GstRSWebRTCSignallableIface::stop:
* @self: The object implementing #GstRSWebRTCSignallableIface
*
* Stops the signaller, disconnecting it to the signalling server.
*/
Signal::builder("stop")
.flags(glib::SignalFlags::ACTION)
.class_handler(|_tokens, args| {
let arg0 = args[0usize]
.get::<&super::Signallable>()
.unwrap_or_else(|e| {
panic!("Wrong type for argument {}: {:?}", 0usize, e)
});
Signallable::stop(arg0);
None
})
.build(),
]
});
SIGNALS.as_ref()
}
}
unsafe impl<Obj: SignallableImpl> types::IsImplementable<Obj> for super::Signallable
where
<Obj as types::ObjectSubclass>::Type: glib::IsA<glib::Object>,
{
fn interface_init(iface: &mut glib::Interface<Self>) {
let iface = ::std::convert::AsMut::as_mut(iface);
fn vstart_trampoline<Obj: types::ObjectSubclass + SignallableImpl>(
obj: &super::Signallable,
) {
let this = obj
.dynamic_cast_ref::<<Obj as types::ObjectSubclass>::Type>()
.unwrap()
.imp();
SignallableImpl::start(this)
}
iface.start = vstart_trampoline::<Obj>;
fn vstop_trampoline<Obj: types::ObjectSubclass + SignallableImpl>(
this: &super::Signallable,
) {
let this = this
.dynamic_cast_ref::<<Obj as types::ObjectSubclass>::Type>()
.unwrap();
SignallableImpl::stop(this.imp())
}
iface.stop = vstop_trampoline::<Obj>;
fn send_sdp_trampoline<Obj: types::ObjectSubclass + SignallableImpl>(
this: &super::Signallable,
session_id: &str,
sdp: &gst_webrtc::WebRTCSessionDescription,
) {
let this = this
.dynamic_cast_ref::<<Obj as types::ObjectSubclass>::Type>()
.unwrap();
SignallableImpl::send_sdp(this.imp(), session_id, sdp)
}
iface.send_sdp = send_sdp_trampoline::<Obj>;
fn add_ice_trampoline<Obj: types::ObjectSubclass + SignallableImpl>(
this: &super::Signallable,
session_id: &str,
candidate: &str,
sdp_m_line_index: Option<u32>,
sdp_mid: Option<String>,
) {
let this = this
.dynamic_cast_ref::<<Obj as types::ObjectSubclass>::Type>()
.unwrap();
SignallableImpl::add_ice(this.imp(), session_id, candidate, sdp_m_line_index, sdp_mid)
}
iface.add_ice = add_ice_trampoline::<Obj>;
fn end_session_trampoline<Obj: types::ObjectSubclass + SignallableImpl>(
this: &super::Signallable,
session_id: &str,
) {
let this = this
.dynamic_cast_ref::<<Obj as types::ObjectSubclass>::Type>()
.unwrap();
SignallableImpl::end_session(this.imp(), session_id)
}
iface.end_session = end_session_trampoline::<Obj>;
}
}
pub trait SignallableImpl: object::ObjectImpl + 'static {
fn start(&self) {
SignallableImplExt::parent_vstart(self)
}
fn stop(&self) {
SignallableImplExt::parent_vstop(self)
}
fn send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription) {
SignallableImplExt::parent_send_sdp(self, session_id, sdp)
}
fn add_ice(
&self,
session_id: &str,
candidate: &str,
sdp_m_line_index: Option<u32>,
sdp_mid: Option<String>,
) {
SignallableImplExt::parent_add_ice(self, session_id, candidate, sdp_m_line_index, sdp_mid)
}
fn end_session(&self, session_id: &str) {
SignallableImplExt::parent_end_session(self, session_id)
}
}
pub trait SignallableImplExt: types::ObjectSubclass {
fn parent_vstart(&self);
fn parent_vstop(&self);
fn parent_send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription);
fn parent_add_ice(
&self,
session_id: &str,
candidate: &str,
sdp_m_line_index: Option<u32>,
sdp_mid: Option<String>,
);
fn parent_end_session(&self, session_id: &str);
}
type ClassType = *mut <super::Signallable as glib::object::ObjectType>::GlibClassType;
impl<Obj: SignallableImpl> SignallableImplExt for Obj {
fn parent_vstart(&self) {
let obj = self.obj();
let obj = unsafe { obj.unsafe_cast_ref::<super::Signallable>() };
let vtable = unsafe {
&*(Self::type_data()
.as_ref()
.parent_interface::<super::Signallable>() as ClassType)
};
(vtable.start)(obj)
}
fn parent_vstop(&self) {
let obj = self.obj();
let obj = unsafe { obj.unsafe_cast_ref::<super::Signallable>() };
let vtable = unsafe {
&*(Self::type_data()
.as_ref()
.parent_interface::<super::Signallable>() as ClassType)
};
(vtable.stop)(obj)
}
fn parent_send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription) {
let obj = self.obj();
let obj = unsafe { obj.unsafe_cast_ref::<super::Signallable>() };
let vtable = unsafe {
&*(Self::type_data()
.as_ref()
.parent_interface::<super::Signallable>() as ClassType)
};
(vtable.send_sdp)(obj, session_id, sdp)
}
fn parent_add_ice(
&self,
session_id: &str,
candidate: &str,
sdp_m_line_index: Option<u32>,
sdp_mid: Option<String>,
) {
let obj = self.obj();
let obj = unsafe { obj.unsafe_cast_ref::<super::Signallable>() };
let vtable = unsafe {
&*(Self::type_data()
.as_ref()
.parent_interface::<super::Signallable>() as ClassType)
};
(vtable.add_ice)(obj, session_id, candidate, sdp_m_line_index, sdp_mid)
}
fn parent_end_session(&self, session_id: &str) {
let obj = self.obj();
let obj = unsafe { obj.unsafe_cast_ref::<super::Signallable>() };
let vtable = unsafe {
&*(Self::type_data()
.as_ref()
.parent_interface::<super::Signallable>() as ClassType)
};
(vtable.end_session)(obj, session_id)
}
}
pub trait SignallableExt: 'static {
fn start(&self);
fn stop(&self);
fn send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription);
fn add_ice(
&self,
session_id: &str,
candidate: &str,
sdp_m_line_index: Option<u32>,
sdp_mid: Option<String>,
);
fn end_session(&self, session_id: &str);
}
impl<Obj: glib::IsA<super::Signallable>> SignallableExt for Obj {
fn start(&self) {
let obj = self.upcast_ref::<super::Signallable>();
let vtable = obj.interface::<super::Signallable>().unwrap();
let vtable = vtable.as_ref();
(vtable.start)(obj)
}
fn stop(&self) {
let obj = self.upcast_ref::<super::Signallable>();
let vtable = obj.interface::<super::Signallable>().unwrap();
let vtable = vtable.as_ref();
(vtable.stop)(obj)
}
fn send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription) {
let obj = self.upcast_ref::<super::Signallable>();
let vtable = obj.interface::<super::Signallable>().unwrap();
let vtable = vtable.as_ref();
(vtable.send_sdp)(obj, session_id, sdp)
}
fn add_ice(
&self,
session_id: &str,
candidate: &str,
sdp_m_line_index: Option<u32>,
sdp_mid: Option<String>,
) {
let obj = self.upcast_ref::<super::Signallable>();
let vtable = obj.interface::<super::Signallable>().unwrap();
let vtable = vtable.as_ref();
(vtable.add_ice)(obj, session_id, candidate, sdp_m_line_index, sdp_mid)
}
fn end_session(&self, session_id: &str) {
let obj = self.upcast_ref::<super::Signallable>();
let vtable = obj.interface::<super::Signallable>().unwrap();
let vtable = vtable.as_ref();
(vtable.end_session)(obj, session_id)
}
}

View file

@ -0,0 +1,584 @@
use crate::utils::{gvalue_to_json, serialize_json_object};
use crate::webrtcsrc::signaller::{prelude::*, Signallable};
use crate::RUNTIME;
use anyhow::{anyhow, Error};
use async_tungstenite::tungstenite::Message as WsMessage;
use futures::channel::mpsc;
use futures::prelude::*;
use gst::glib;
use gst::glib::prelude::*;
use gst::subclass::prelude::*;
use gst_plugin_webrtc_protocol as p;
use once_cell::sync::Lazy;
use std::collections::HashSet;
use std::ops::ControlFlow;
use std::str::FromStr;
use std::sync::Mutex;
use std::time::Duration;
use tokio::{task, time::timeout};
use url::Url;
use super::CAT;
#[derive(Debug, Eq, PartialEq, Clone, Copy, glib::Enum, Default)]
#[repr(u32)]
#[enum_type(name = "GstRSWebRTCSignallerRole")]
pub enum WebRTCSignallerRole {
#[default]
Consumer,
Producer,
Listener,
}
pub struct Settings {
uri: Url,
producer_peer_id: Option<String>,
cafile: Option<String>,
role: WebRTCSignallerRole,
}
impl Default for Settings {
fn default() -> Self {
Self {
uri: Url::from_str("ws://127.0.0.1:8443").unwrap(),
producer_peer_id: None,
cafile: Default::default(),
role: Default::default(),
}
}
}
#[derive(Default)]
pub struct Signaller {
state: Mutex<State>,
settings: Mutex<Settings>,
}
#[derive(Default)]
struct State {
/// Sender for the websocket messages
websocket_sender: Option<mpsc::Sender<p::IncomingMessage>>,
send_task_handle: Option<task::JoinHandle<Result<(), Error>>>,
receive_task_handle: Option<task::JoinHandle<()>>,
producers: HashSet<String>,
}
impl Signaller {
fn uri(&self) -> Url {
self.settings.lock().unwrap().uri.clone()
}
fn set_uri(&self, uri: &str) -> Result<(), Error> {
let mut settings = self.settings.lock().unwrap();
let mut uri = Url::from_str(uri).map_err(|err| anyhow!("{err:?}"))?;
if let Some(peer_id) = uri
.query_pairs()
.find(|(k, _)| k == "peer-id")
.map(|v| v.1.to_string())
{
if !matches!(settings.role, WebRTCSignallerRole::Consumer) {
gst::warning!(
CAT,
"Setting peer-id doesn't make sense for {:?}",
settings.role
);
} else {
settings.producer_peer_id = Some(peer_id);
}
}
if let Some(peer_id) = &settings.producer_peer_id {
uri.query_pairs_mut()
.clear()
.append_pair("peer-id", peer_id);
}
settings.uri = uri;
Ok(())
}
async fn connect(&self) -> Result<(), Error> {
let obj = self.obj();
let role = self.settings.lock().unwrap().role;
if let super::WebRTCSignallerRole::Consumer = role {
self.producer_peer_id()
.ok_or_else(|| anyhow!("No target producer peer id set"))?;
}
let connector = if let Some(path) = obj.property::<Option<String>>("cafile") {
let cert = tokio::fs::read_to_string(&path).await?;
let cert = tokio_native_tls::native_tls::Certificate::from_pem(cert.as_bytes())?;
let mut connector_builder = tokio_native_tls::native_tls::TlsConnector::builder();
let connector = connector_builder.add_root_certificate(cert).build()?;
Some(tokio_native_tls::TlsConnector::from(connector))
} else {
None
};
let mut uri = self.uri();
uri.set_query(None);
let (ws, _) = timeout(
// FIXME: Make the timeout configurable
Duration::from_secs(20),
async_tungstenite::tokio::connect_async_with_tls_connector(uri.to_string(), connector),
)
.await??;
gst::info!(CAT, imp: self, "connected");
// Channel for asynchronously sending out websocket message
let (mut ws_sink, mut ws_stream) = ws.split();
// 1000 is completely arbitrary, we simply don't want infinite piling
// up of messages as with unbounded
let (websocket_sender, mut websocket_receiver) = mpsc::channel::<p::IncomingMessage>(1000);
let send_task_handle =
RUNTIME.spawn(glib::clone!(@weak-allow-none self as this => async move {
while let Some(msg) = websocket_receiver.next().await {
gst::log!(CAT, "Sending websocket message {:?}", msg);
ws_sink
.send(WsMessage::Text(serde_json::to_string(&msg).unwrap()))
.await?;
}
let msg = "Done sending";
this.map_or_else(|| gst::info!(CAT, "{msg}"),
|this| gst::info!(CAT, imp: this, "{msg}")
);
ws_sink.send(WsMessage::Close(None)).await?;
ws_sink.close().await?;
Ok::<(), Error>(())
}));
let obj = self.obj();
let meta =
if let Some(meta) = obj.emit_by_name::<Option<gst::Structure>>("request-meta", &[]) {
gvalue_to_json(&meta.to_value())
} else {
None
};
let receive_task_handle =
RUNTIME.spawn(glib::clone!(@weak-allow-none self as this => async move {
while let Some(msg) = tokio_stream::StreamExt::next(&mut ws_stream).await {
if let Some(ref this) = this {
if let ControlFlow::Break(_) = this.handle_message(msg, &meta) {
break;
}
} else {
break;
}
}
let msg = "Stopped websocket receiving";
this.map_or_else(|| gst::info!(CAT, "{msg}"),
|this| gst::info!(CAT, imp: this, "{msg}")
);
}));
let mut state = self.state.lock().unwrap();
state.websocket_sender = Some(websocket_sender);
state.send_task_handle = Some(send_task_handle);
state.receive_task_handle = Some(receive_task_handle);
Ok(())
}
fn set_status(&self, meta: &Option<serde_json::Value>, peer_id: &str) {
let role = self.settings.lock().unwrap().role;
self.send(p::IncomingMessage::SetPeerStatus(match role {
super::WebRTCSignallerRole::Consumer => p::PeerStatus {
meta: meta.clone(),
peer_id: Some(peer_id.to_string()),
roles: vec![],
},
super::WebRTCSignallerRole::Producer => p::PeerStatus {
meta: meta.clone(),
peer_id: Some(peer_id.to_string()),
roles: vec![p::PeerRole::Producer],
},
super::WebRTCSignallerRole::Listener => p::PeerStatus {
meta: meta.clone(),
peer_id: Some(peer_id.to_string()),
roles: vec![p::PeerRole::Listener],
},
}));
}
fn producer_peer_id(&self) -> Option<String> {
let settings = self.settings.lock().unwrap();
settings.producer_peer_id.clone()
}
fn send(&self, msg: p::IncomingMessage) {
let state = self.state.lock().unwrap();
if let Some(mut sender) = state.websocket_sender.clone() {
RUNTIME.spawn(glib::clone!(@weak self as this => async move {
if let Err(err) = sender.send(msg).await {
this.obj().emit_by_name::<()>("error", &[&format!("Error: {}", err)]);
}
}));
}
}
pub fn start_session(&self) {
let role = self.settings.lock().unwrap().role;
if matches!(role, super::WebRTCSignallerRole::Consumer) {
let target_producer = self.producer_peer_id().unwrap();
self.send(p::IncomingMessage::StartSession(p::StartSessionMessage {
peer_id: target_producer.clone(),
}));
gst::info!(
CAT,
imp: self,
"Started session with producer peer id {target_producer}",
);
}
}
fn handle_message(
&self,
msg: Result<WsMessage, async_tungstenite::tungstenite::Error>,
meta: &Option<serde_json::Value>,
) -> ControlFlow<()> {
match msg {
Ok(WsMessage::Text(msg)) => {
gst::trace!(CAT, imp: self, "Received message {}", msg);
if let Ok(msg) = serde_json::from_str::<p::OutgoingMessage>(&msg) {
match msg {
p::OutgoingMessage::Welcome { peer_id } => {
self.set_status(meta, &peer_id);
self.start_session();
}
p::OutgoingMessage::PeerStatusChanged(p::PeerStatus {
meta,
roles,
peer_id,
}) => {
let meta = meta.and_then(|m| match m {
serde_json::Value::Object(v) => Some(serialize_json_object(&v)),
_ => {
gst::error!(CAT, imp: self, "Invalid json value: {m:?}");
None
}
});
let peer_id =
peer_id.expect("Status changed should always contain a peer ID");
let mut state = self.state.lock().unwrap();
if roles.iter().any(|r| matches!(r, p::PeerRole::Producer)) {
if !state.producers.contains(&peer_id) {
state.producers.insert(peer_id.clone());
drop(state);
self.obj()
.emit_by_name::<()>("producer-added", &[&peer_id, &meta]);
}
} else if state.producers.remove(&peer_id) {
drop(state);
self.obj()
.emit_by_name::<()>("producer-removed", &[&peer_id, &meta]);
}
}
p::OutgoingMessage::SessionStarted {
peer_id,
session_id,
} => {
self.obj()
.emit_by_name::<()>("session-started", &[&session_id, &peer_id]);
}
p::OutgoingMessage::StartSession {
session_id,
peer_id,
} => {
assert!(matches!(
self.obj().property::<WebRTCSignallerRole>("role"),
super::WebRTCSignallerRole::Producer
));
self.obj()
.emit_by_name::<()>("session-requested", &[&session_id, &peer_id]);
}
p::OutgoingMessage::EndSession(p::EndSessionMessage { session_id }) => {
gst::info!(CAT, imp: self, "Session {session_id} ended");
self.obj()
.emit_by_name::<()>("session-ended", &[&session_id]);
}
p::OutgoingMessage::Peer(p::PeerMessage {
session_id,
peer_message,
}) => match peer_message {
p::PeerMessageInner::Sdp(reply) => {
let (sdp, desc_type) = match reply {
p::SdpMessage::Answer { sdp } => {
(sdp, gst_webrtc::WebRTCSDPType::Answer)
}
p::SdpMessage::Offer { sdp } => {
(sdp, gst_webrtc::WebRTCSDPType::Offer)
}
};
let sdp = match gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes()) {
Ok(sdp) => sdp,
Err(err) => {
self.obj().emit_by_name::<()>(
"error",
&[&format!("Error parsing SDP: {sdp} {err:?}")],
);
return ControlFlow::Break(());
}
};
let desc =
gst_webrtc::WebRTCSessionDescription::new(desc_type, sdp);
self.obj().emit_by_name::<()>(
"session-description",
&[&session_id, &desc],
);
}
p::PeerMessageInner::Ice {
candidate,
sdp_m_line_index,
} => {
let sdp_mid: Option<String> = None;
self.obj().emit_by_name::<()>(
"handle-ice",
&[&session_id, &sdp_m_line_index, &sdp_mid, &candidate],
);
}
},
p::OutgoingMessage::Error { details } => {
self.obj().emit_by_name::<()>(
"error",
&[&format!("Error message from server: {details}")],
);
}
_ => {
gst::warning!(CAT, imp: self, "Ignoring unsupported message {:?}", msg);
}
}
} else {
gst::error!(CAT, imp: self, "Unknown message from server: {}", msg);
self.obj().emit_by_name::<()>(
"error",
&[&format!("Unknown message from server: {}", msg)],
);
}
}
Ok(WsMessage::Close(reason)) => {
gst::info!(CAT, imp: self, "websocket connection closed: {:?}", reason);
return ControlFlow::Break(());
}
Ok(_) => (),
Err(err) => {
self.obj()
.emit_by_name::<()>("error", &[&format!("Error receiving: {}", err)]);
return ControlFlow::Break(());
}
}
ControlFlow::Continue(())
}
}
#[glib::object_subclass]
impl ObjectSubclass for Signaller {
const NAME: &'static str = "GstWebRTCSignaller";
type Type = super::Signaller;
type ParentType = glib::Object;
type Interfaces = (Signallable,);
}
impl ObjectImpl for Signaller {
fn properties() -> &'static [glib::ParamSpec] {
static PROPS: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpecString::builder("uri")
.flags(glib::ParamFlags::READWRITE)
.build(),
glib::ParamSpecString::builder("producer-peer-id")
.flags(glib::ParamFlags::READWRITE)
.build(),
glib::ParamSpecString::builder("cafile")
.flags(glib::ParamFlags::READWRITE)
.build(),
glib::ParamSpecEnum::builder_with_default("role", WebRTCSignallerRole::Consumer)
.flags(glib::ParamFlags::READWRITE)
.build(),
]
});
PROPS.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"uri" => {
if let Err(e) = self.set_uri(value.get::<&str>().expect("type checked upstream")) {
gst::error!(CAT, "Couldn't set URI: {e:?}");
}
}
"producer-peer-id" => {
let mut settings = self.settings.lock().unwrap();
if !matches!(settings.role, WebRTCSignallerRole::Consumer) {
gst::warning!(
CAT,
"Setting `producer-peer-id` doesn't make sense for {:?}",
settings.role
);
} else {
settings.producer_peer_id = value
.get::<Option<String>>()
.expect("type checked upstream");
}
}
"cafile" => {
self.settings.lock().unwrap().cafile = value
.get::<Option<String>>()
.expect("type checked upstream")
}
"role" => {
self.settings.lock().unwrap().role = value
.get::<WebRTCSignallerRole>()
.expect("type checked upstream")
}
_ => unimplemented!(),
}
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() {
"uri" => settings.uri.to_string().to_value(),
"producer-peer-id" => {
if !matches!(settings.role, WebRTCSignallerRole::Consumer) {
gst::warning!(
CAT,
"`producer-peer-id` doesn't make sense for {:?}",
settings.role
);
}
settings.producer_peer_id.to_value()
}
"cafile" => settings.cafile.to_value(),
"role" => settings.role.to_value(),
_ => unimplemented!(),
}
}
}
impl SignallableImpl for Signaller {
fn start(&self) {
gst::info!(CAT, imp: self, "Starting");
RUNTIME.spawn(glib::clone!(@weak self as this => async move {
if let Err(err) = this.connect().await {
this.obj().emit_by_name::<()>("error", &[&format!("Error receiving: {}", err)]);
}
}));
}
fn stop(&self) {
gst::info!(CAT, imp: self, "Stopping now");
let mut state = self.state.lock().unwrap();
let send_task_handle = state.send_task_handle.take();
let receive_task_handle = state.receive_task_handle.take();
if let Some(mut sender) = state.websocket_sender.take() {
RUNTIME.block_on(async move {
sender.close_channel();
if let Some(handle) = send_task_handle {
if let Err(err) = handle.await {
gst::warning!(CAT, imp: self, "Error while joining send task: {}", err);
}
}
if let Some(handle) = receive_task_handle {
if let Err(err) = handle.await {
gst::warning!(CAT, imp: self, "Error while joining receive task: {}", err);
}
}
});
}
}
fn send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription) {
gst::debug!(CAT, imp: self, "Sending SDP {sdp:#?}");
let role = self.settings.lock().unwrap().role;
let is_consumer = matches!(role, super::WebRTCSignallerRole::Consumer);
let msg = p::IncomingMessage::Peer(p::PeerMessage {
session_id: session_id.to_owned(),
peer_message: p::PeerMessageInner::Sdp(if is_consumer {
p::SdpMessage::Answer {
sdp: sdp.sdp().as_text().unwrap(),
}
} else {
p::SdpMessage::Offer {
sdp: sdp.sdp().as_text().unwrap(),
}
}),
});
self.send(msg);
}
fn add_ice(
&self,
session_id: &str,
candidate: &str,
sdp_m_line_index: Option<u32>,
_sdp_mid: Option<String>,
) {
gst::debug!(
CAT,
imp: self,
"Adding ice candidate {candidate:?} for {sdp_m_line_index:?} on session {session_id}"
);
let msg = p::IncomingMessage::Peer(p::PeerMessage {
session_id: session_id.to_string(),
peer_message: p::PeerMessageInner::Ice {
candidate: candidate.to_string(),
sdp_m_line_index: sdp_m_line_index.unwrap(),
},
});
self.send(msg);
}
fn end_session(&self, session_id: &str) {
gst::debug!(CAT, imp: self, "Signalling session done {}", session_id);
let state = self.state.lock().unwrap();
let session_id = session_id.to_string();
if let Some(mut sender) = state.websocket_sender.clone() {
RUNTIME.spawn(glib::clone!(@weak self as this => async move {
if let Err(err) = sender
.send(p::IncomingMessage::EndSession(p::EndSessionMessage {
session_id,
}))
.await
{
this.obj().emit_by_name::<()>("error", &[&format!("Error: {}", err)]);
}
}));
}
}
}
impl GstObjectImpl for Signaller {}

View file

@ -0,0 +1,46 @@
mod iface;
mod imp;
use gst::glib;
use once_cell::sync::Lazy;
// Expose traits and objects from the module itself so it exactly looks like
// generated bindings
pub use imp::WebRTCSignallerRole;
pub mod prelude {
pub use {super::SignallableExt, super::SignallableImpl};
}
pub static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"webrtcsrc-signaller",
gst::DebugColorFlags::empty(),
Some("WebRTC src signaller"),
)
});
glib::wrapper! {
pub struct Signallable(ObjectInterface<iface::Signallable>);
}
glib::wrapper! {
pub struct Signaller(ObjectSubclass <imp::Signaller>) @implements Signallable;
}
impl Default for Signaller {
fn default() -> Self {
glib::Object::builder().build()
}
}
impl Signaller {
pub fn new(mode: WebRTCSignallerRole) -> Self {
glib::Object::builder().property("role", &mode).build()
}
}
pub use iface::SignallableExt;
pub use iface::SignallableImpl;
pub use iface::SignallableImplExt;
unsafe impl Send for Signallable {}
unsafe impl Sync for Signallable {}