From ce3bb2f1d42cbed5f9ad49cf264b8a6c0f7be737 Mon Sep 17 00:00:00 2001 From: Thibault Saunier Date: Tue, 12 Jul 2022 17:13:38 -0400 Subject: [PATCH] Add a webrtcsrc element Updating the docker image to include: https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/3236 Part-of: --- docs/meson.build | 8 + docs/plugins/gst_plugins_cache.json | 241 ++++ net/webrtc/Cargo.toml | 3 + net/webrtc/signalling/src/handlers/mod.rs | 7 +- net/webrtc/src/lib.rs | 5 +- net/webrtc/src/utils.rs | 87 ++ net/webrtc/src/webrtcsrc/imp.rs | 1099 +++++++++++++++++++ net/webrtc/src/webrtcsrc/mod.rs | 65 ++ net/webrtc/src/webrtcsrc/pad.rs | 45 + net/webrtc/src/webrtcsrc/signaller/iface.rs | 426 +++++++ net/webrtc/src/webrtcsrc/signaller/imp.rs | 584 ++++++++++ net/webrtc/src/webrtcsrc/signaller/mod.rs | 46 + 12 files changed, 2611 insertions(+), 5 deletions(-) create mode 100644 net/webrtc/src/utils.rs create mode 100644 net/webrtc/src/webrtcsrc/imp.rs create mode 100644 net/webrtc/src/webrtcsrc/mod.rs create mode 100644 net/webrtc/src/webrtcsrc/pad.rs create mode 100644 net/webrtc/src/webrtcsrc/signaller/iface.rs create mode 100644 net/webrtc/src/webrtcsrc/signaller/imp.rs create mode 100644 net/webrtc/src/webrtcsrc/signaller/mod.rs diff --git a/docs/meson.build b/docs/meson.build index 69d5cbf7..a6697ee7 100644 --- a/docs/meson.build +++ b/docs/meson.build @@ -99,9 +99,17 @@ foreach plugin_name: list_plugin_res.stdout().split(':') gst_index: 'plugins/index.md', include_paths: join_paths(meson.current_source_dir(), '..'), gst_smart_index: true, + gst_c_source_filters: [ + '../target/*/*.rs', + '../target/*/*/*.rs', + '../target/*/*/*/*.rs', + '../target/*/*/*/*/*.rs', + '../target/*/*/*/*/*/*.rs', + ], gst_c_sources: [ '../*/*/*/*.rs', '../*/*/*/*/*.rs', + '../*/*/*/*/*/*.rs', ], dependencies: [gst_dep], gst_order_generated_subpages: true, diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 31c38301..2ac55be0 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -6150,11 +6150,240 @@ "when": "last" } } + }, + "webrtcsrc": { + "author": "Thibault Saunier ", + "description": "WebRTC src", + "hierarchy": [ + "GstWebRTCSrc", + "GstBin", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "interfaces": [ + "GstChildProxy", + "GstURIHandler" + ], + "klass": "Source/Network/WebRTC", + "long-name": "WebRTCSrc", + "pad-templates": { + "audio_%%u": { + "caps": "audio/x-raw(ANY):\naudio/x-opus:\napplication/x-rtp:\n", + "direction": "src", + "presence": "sometimes", + "type": "GstWebRTCSrcPad" + }, + "video_%%u": { + "caps": "video/x-raw(ANY):\napplication/x-rtp:\n", + "direction": "src", + "presence": "sometimes", + "type": "GstWebRTCSrcPad" + } + }, + "properties": { + "audio-codecs": { + "blurb": "Names of audio codecs to be be used during the SDP negotiation. Valid values: [OPUS]", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "mutable": "ready", + "readable": true, + "type": "GstValueArray", + "writable": true + }, + "meta": { + "blurb": "Free form metadata about the consumer", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "mutable": "ready", + "readable": true, + "type": "GstStructure", + "writable": true + }, + "signaller": { + "blurb": "The Signallable object to use to handle WebRTC Signalling", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "mutable": "ready", + "readable": true, + "type": "GstRSWebRTCSignallableIface", + "writable": true + }, + "stun-server": { + "blurb": "NULL", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "stun://stun.l.google.com:19302", + "mutable": "null", + "readable": true, + "type": "gchararray", + "writable": true + }, + "video-codecs": { + "blurb": "Names of video codecs to be be used during the SDP negotiation. Valid values: [VP8, H264, VP9, H265]", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "mutable": "ready", + "readable": true, + "type": "GstValueArray", + "writable": true + } + }, + "rank": "primary" } }, "filename": "gstrswebrtc", "license": "MPL-2.0", "other-types": { + "GstRSWebRTCSignallableIface": { + "hierarchy": [ + "GstRSWebRTCSignallableIface", + "GInterface" + ], + "kind": "interface", + "signals": { + "error": { + "args": [ + { + "name": "arg0", + "type": "gchararray" + } + ], + "return-type": "void", + "when": "last" + }, + "handle-ice": { + "args": [ + { + "name": "arg0", + "type": "gchararray" + }, + { + "name": "arg1", + "type": "guint" + }, + { + "name": "arg2", + "type": "gchararray" + }, + { + "name": "arg3", + "type": "gchararray" + } + ], + "return-type": "void", + "when": "last" + }, + "producer-added": { + "args": [ + { + "name": "arg0", + "type": "gchararray" + }, + { + "name": "arg1", + "type": "GstStructure" + } + ], + "return-type": "void", + "when": "last" + }, + "producer-removed": { + "args": [ + { + "name": "arg0", + "type": "gchararray" + }, + { + "name": "arg1", + "type": "GstStructure" + } + ], + "return-type": "void", + "when": "last" + }, + "request-meta": { + "args": [], + "return-type": "GstStructure", + "when": "last" + }, + "session-description": { + "args": [ + { + "name": "arg0", + "type": "gchararray" + }, + { + "name": "arg1", + "type": "GstWebRTCSessionDescription" + } + ], + "return-type": "void", + "when": "last" + }, + "session-ended": { + "args": [ + { + "name": "arg0", + "type": "gchararray" + } + ], + "return-type": "void", + "when": "last" + }, + "session-requested": { + "args": [ + { + "name": "arg0", + "type": "gchararray" + }, + { + "name": "arg1", + "type": "gchararray" + } + ], + "return-type": "void", + "when": "last" + }, + "session-started": { + "args": [ + { + "name": "arg0", + "type": "gchararray" + }, + { + "name": "arg1", + "type": "gchararray" + } + ], + "return-type": "void", + "when": "last" + }, + "start": { + "action": true, + "args": [], + "return-type": "void", + "when": "last" + }, + "stop": { + "action": true, + "args": [], + "return-type": "void", + "when": "last" + } + } + }, "GstWebRTCSinkCongestionControl": { "kind": "enum", "values": [ @@ -6174,6 +6403,18 @@ "value": "2" } ] + }, + "GstWebRTCSrcPad": { + "hierarchy": [ + "GstWebRTCSrcPad", + "GstGhostPad", + "GstProxyPad", + "GstPad", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "kind": "object" } }, "package": "gst-plugin-webrtc", diff --git a/net/webrtc/Cargo.toml b/net/webrtc/Cargo.toml index 4565b2cf..f52e48a4 100644 --- a/net/webrtc/Cargo.toml +++ b/net/webrtc/Cargo.toml @@ -16,6 +16,8 @@ gst-webrtc = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", pack gst-sdp = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", package = "gstreamer-sdp", features = ["v1_20"] } gst-rtp = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", package = "gstreamer-rtp", features = ["v1_20"] } gst-utils = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", package = "gstreamer-utils" } +gst-base = { git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", package = "gstreamer-base" } + once_cell = "1.0" anyhow = "1" thiserror = "1" @@ -29,6 +31,7 @@ serde_json = "1" fastrand = "1.0" gst_plugin_webrtc_protocol = { path="protocol", package = "gst-plugin-webrtc-signalling-protocol" } human_bytes = "0.4" +url = "2" [dev-dependencies] tracing = { version = "0.1", features = ["log"] } diff --git a/net/webrtc/signalling/src/handlers/mod.rs b/net/webrtc/signalling/src/handlers/mod.rs index a9793709..892b52fc 100644 --- a/net/webrtc/signalling/src/handlers/mod.rs +++ b/net/webrtc/signalling/src/handlers/mod.rs @@ -277,10 +277,9 @@ impl Handler { }, )?; - self.peers.get(consumer_id).map_or_else( - || Err(anyhow!("No consumer with ID: '{consumer_id}'")), - Ok, - )?; + self.peers + .get(consumer_id) + .map_or_else(|| Err(anyhow!("No consumer with ID: '{consumer_id}'")), Ok)?; let session_id = uuid::Uuid::new_v4().to_string(); self.sessions.insert( diff --git a/net/webrtc/src/lib.rs b/net/webrtc/src/lib.rs index bf1432b5..dc2cc963 100644 --- a/net/webrtc/src/lib.rs +++ b/net/webrtc/src/lib.rs @@ -7,14 +7,17 @@ * Since: plugins-rs-0.9 */ use gst::glib; -use tokio::runtime; use once_cell::sync::Lazy; +use tokio::runtime; mod signaller; +pub mod utils; pub mod webrtcsink; +pub mod webrtcsrc; fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { webrtcsink::register(plugin)?; + webrtcsrc::register(Some(plugin))?; Ok(()) } diff --git a/net/webrtc/src/utils.rs b/net/webrtc/src/utils.rs new file mode 100644 index 00000000..b9026506 --- /dev/null +++ b/net/webrtc/src/utils.rs @@ -0,0 +1,87 @@ +use std::collections::HashMap; + +use gst::{glib, prelude::*}; + +pub fn gvalue_to_json(val: &gst::glib::Value) -> Option { + match val.type_() { + glib::Type::STRING => Some(val.get::().unwrap().into()), + glib::Type::BOOL => Some(val.get::().unwrap().into()), + glib::Type::I32 => Some(val.get::().unwrap().into()), + glib::Type::U32 => Some(val.get::().unwrap().into()), + glib::Type::I_LONG | glib::Type::I64 => Some(val.get::().unwrap().into()), + glib::Type::U_LONG | glib::Type::U64 => Some(val.get::().unwrap().into()), + glib::Type::F32 => Some(val.get::().unwrap().into()), + glib::Type::F64 => Some(val.get::().unwrap().into()), + _ => { + if let Ok(s) = val.get::() { + serde_json::to_value( + s.iter() + .filter_map(|(name, value)| { + gvalue_to_json(value).map(|value| (name.to_string(), value)) + }) + .collect::>(), + ) + .ok() + } else if let Ok(a) = val.get::() { + serde_json::to_value( + a.iter() + .filter_map(|value| gvalue_to_json(value)) + .collect::>(), + ) + .ok() + } else if let Some((_klass, values)) = gst::glib::FlagsValue::from_value(val) { + Some( + values + .iter() + .map(|value| value.nick()) + .collect::>() + .join("+") + .into(), + ) + } else if let Ok(value) = val.serialize() { + Some(value.as_str().into()) + } else { + None + } + } + } +} + +fn json_to_gststructure(val: &serde_json::Value) -> Option { + match val { + serde_json::Value::Bool(v) => Some(v.to_send_value()), + serde_json::Value::Number(n) => { + if n.is_u64() { + Some(n.as_u64().unwrap().to_send_value()) + } else if n.is_i64() { + Some(n.as_i64().unwrap().to_send_value()) + } else if n.is_f64() { + Some(n.as_f64().unwrap().to_send_value()) + } else { + todo!("Unhandled case {n:?}"); + } + } + serde_json::Value::String(v) => Some(v.to_send_value()), + serde_json::Value::Array(v) => { + let array = v + .iter() + .filter_map(json_to_gststructure) + .collect::>(); + Some(gst::Array::from_values(array).to_send_value()) + } + serde_json::Value::Object(v) => Some(serialize_json_object(v).to_send_value()), + _ => None, + } +} + +pub fn serialize_json_object(val: &serde_json::Map) -> gst::Structure { + let mut res = gst::Structure::new_empty("v"); + + val.iter().for_each(|(k, v)| { + if let Some(gvalue) = json_to_gststructure(v) { + res.set_value(k, gvalue); + } + }); + + res +} diff --git a/net/webrtc/src/webrtcsrc/imp.rs b/net/webrtc/src/webrtcsrc/imp.rs new file mode 100644 index 00000000..0330b57e --- /dev/null +++ b/net/webrtc/src/webrtcsrc/imp.rs @@ -0,0 +1,1099 @@ +// SPDX-License-Identifier: MPL-2.0 + +use gst::prelude::*; + +use crate::webrtcsrc::signaller::{prelude::*, Signallable, Signaller}; +use crate::webrtcsrc::WebRTCSrcPad; +use anyhow::{Context, Error}; +use core::ops::Deref; +use gst::glib; +use gst::subclass::prelude::*; +use once_cell::sync::Lazy; +use std::str::FromStr; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::AtomicU16; +use std::sync::atomic::Ordering; +use std::sync::Mutex; +use url::Url; + +const DEFAULT_STUN_SERVER: Option<&str> = Some("stun://stun.l.google.com:19302"); + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "webrtcsrc", + gst::DebugColorFlags::empty(), + Some("WebRTC src"), + ) +}); + +struct Codec { + name: String, + caps: gst::Caps, + has_decoder: AtomicBool, + stream_type: gst::StreamType, +} + +impl Clone for Codec { + fn clone(&self) -> Self { + Self { + name: self.name.clone(), + caps: self.caps.clone(), + has_decoder: AtomicBool::new(self.has_decoder.load(Ordering::SeqCst)), + stream_type: self.stream_type, + } + } +} + +impl Codec { + fn new( + name: &str, + stream_type: gst::StreamType, + caps: &gst::Caps, + decoders: &glib::List, + ) -> Self { + let has_decoder = Self::has_decoder_for_caps(caps, decoders); + + Self { + caps: caps.clone(), + stream_type, + name: name.into(), + has_decoder: AtomicBool::new(has_decoder), + } + } + + fn has_decoder(&self) -> bool { + if self.has_decoder.load(Ordering::SeqCst) { + true + } else if Self::has_decoder_for_caps( + &self.caps, + // Replicating decodebin logic + &gst::ElementFactory::factories_with_type( + gst::ElementFactoryType::DECODER, + gst::Rank::Marginal, + ), + ) { + // Check if new decoders have been installed meanwhile + self.has_decoder.store(true, Ordering::SeqCst); + true + } else { + false + } + } + + fn has_decoder_for_caps(caps: &gst::Caps, decoders: &glib::List) -> bool { + decoders.iter().any(|factory| { + factory.static_pad_templates().iter().any(|template| { + let template_caps = template.caps(); + template.direction() == gst::PadDirection::Sink + && !template_caps.is_any() + && caps.can_intersect(&template_caps) + }) + }) + } +} + +static AUDIO_CAPS: Lazy = Lazy::new(|| gst::Caps::new_empty_simple("audio/x-raw")); +static OPUS_CAPS: Lazy = Lazy::new(|| gst::Caps::new_empty_simple("audio/x-opus")); + +static VIDEO_CAPS: Lazy = Lazy::new(|| { + gst::Caps::builder_full_with_any_features() + .structure(gst::Structure::new_empty("video/x-raw")) + .build() +}); +static VP8_CAPS: Lazy = Lazy::new(|| gst::Caps::new_empty_simple("video/x-vp8")); +static VP9_CAPS: Lazy = Lazy::new(|| gst::Caps::new_empty_simple("video/x-vp9")); +static H264_CAPS: Lazy = Lazy::new(|| gst::Caps::new_empty_simple("video/x-h264")); +static H265_CAPS: Lazy = Lazy::new(|| gst::Caps::new_empty_simple("video/x-h265")); + +static RTP_CAPS: Lazy = Lazy::new(|| gst::Caps::new_empty_simple("application/x-rtp")); + +struct Codecs(Vec); + +impl Deref for Codecs { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +static CODECS: Lazy = Lazy::new(|| { + let decoders = gst::ElementFactory::factories_with_type( + gst::ElementFactoryType::DECODER, + gst::Rank::Marginal, + ); + + Codecs(vec![ + Codec::new("OPUS", gst::StreamType::AUDIO, &OPUS_CAPS, &decoders), + Codec::new("VP8", gst::StreamType::VIDEO, &VP8_CAPS, &decoders), + Codec::new("H264", gst::StreamType::VIDEO, &H264_CAPS, &decoders), + Codec::new("VP9", gst::StreamType::VIDEO, &VP9_CAPS, &decoders), + Codec::new("H265", gst::StreamType::VIDEO, &H265_CAPS, &decoders), + ]) +}); + +struct Settings { + stun_server: Option, + signaller: Signallable, + meta: Option, + video_codecs: Vec, + audio_codecs: Vec, +} + +#[derive(Default)] +pub struct WebRTCSrc { + settings: Mutex, + n_video_pads: AtomicU16, + n_audio_pads: AtomicU16, + state: Mutex, +} + +#[glib::object_subclass] +impl ObjectSubclass for WebRTCSrc { + const NAME: &'static str = "GstWebRTCSrc"; + type Type = super::WebRTCSrc; + type ParentType = gst::Bin; + type Interfaces = (gst::URIHandler, gst::ChildProxy); +} + +impl ObjectImpl for WebRTCSrc { + fn properties() -> &'static [glib::ParamSpec] { + static PROPS: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecString::builder("stun-server") + .flags(glib::ParamFlags::READWRITE) + .default_value(DEFAULT_STUN_SERVER) + .build(), + glib::ParamSpecObject::builder::("signaller") + .flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY) + .blurb("The Signallable object to use to handle WebRTC Signalling") + .build(), + glib::ParamSpecBoxed::builder::("meta") + .flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY) + .blurb("Free form metadata about the consumer") + .build(), + gst::ParamSpecArray::builder("video-codecs") + .flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY) + .blurb(&format!("Names of video codecs to be be used during the SDP negotiation. Valid values: [{}]", CODECS.iter().filter_map(|c| + if matches!(c.stream_type, gst::StreamType::VIDEO) { + Some(c.name.to_owned()) + } else { + None + } + ).collect::>().join(", ") + )) + .element_spec(&glib::ParamSpecString::builder("video-codec-name").build()) + .build(), + gst::ParamSpecArray::builder("audio-codecs") + .flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY) + .blurb(&format!("Names of audio codecs to be be used during the SDP negotiation. Valid values: [{}]", CODECS.iter().filter_map(|c| + if matches!(c.stream_type, gst::StreamType::AUDIO) { + Some(c.name.to_owned()) + } else { + None + } + ).collect::>().join(", ") + )) + .element_spec(&glib::ParamSpecString::builder("audio-codec-name").build()) + .build(), + ] + }); + + PROPS.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "signaller" => { + self.settings.lock().unwrap().signaller = + value.get::().expect("type checked upstream"); + } + "video-codecs" => { + self.settings.lock().unwrap().video_codecs = value + .get::() + .expect("Type checked upstream") + .as_slice() + .iter() + .filter_map(|codec_name| { + CODECS + .iter() + .find(|codec| { + codec.stream_type == gst::StreamType::VIDEO + && codec.name + == codec_name.get::<&str>().expect("Type checked upstream") + }) + .cloned() + }) + .collect::>() + } + "audio-codecs" => { + self.settings.lock().unwrap().audio_codecs = value + .get::() + .expect("Type checked upstream") + .as_slice() + .iter() + .filter_map(|codec_name| { + CODECS + .iter() + .find(|codec| { + codec.stream_type == gst::StreamType::AUDIO + && codec.name + == codec_name.get::<&str>().expect("Type checked upstream") + }) + .cloned() + }) + .collect::>() + } + "stun-server" => { + self.settings.lock().unwrap().stun_server = value + .get::>() + .expect("type checked upstream") + } + "meta" => { + self.settings.lock().unwrap().meta = value + .get::>() + .expect("type checked upstream") + } + _ => unimplemented!(), + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "signaller" => self.settings.lock().unwrap().signaller.to_value(), + "video-codecs" => gst::Array::new( + self.settings + .lock() + .unwrap() + .video_codecs + .iter() + .map(|v| &v.name), + ) + .to_value(), + "audio-codecs" => gst::Array::new( + self.settings + .lock() + .unwrap() + .audio_codecs + .iter() + .map(|v| &v.name), + ) + .to_value(), + "stun-server" => self.settings.lock().unwrap().stun_server.to_value(), + "meta" => self.settings.lock().unwrap().meta.to_value(), + name => panic!("{} getter not implemented", name), + } + } + + fn constructed(&self) { + self.parent_constructed(); + let signaller = self.settings.lock().unwrap().signaller.clone(); + + self.connect_signaller(&signaller); + + let obj = &*self.obj(); + + obj.set_suppressed_flags(gst::ElementFlags::SINK | gst::ElementFlags::SOURCE); + obj.set_element_flags(gst::ElementFlags::SOURCE); + } +} + +impl Default for Settings { + fn default() -> Self { + let signaller = Signaller::default(); + + Self { + stun_server: DEFAULT_STUN_SERVER.map(|v| v.to_string()), + signaller: signaller.upcast(), + meta: Default::default(), + audio_codecs: CODECS + .iter() + .filter(|codec| { + matches!(codec.stream_type, gst::StreamType::AUDIO) && codec.has_decoder() + }) + .cloned() + .collect(), + video_codecs: CODECS + .iter() + .filter(|codec| { + matches!(codec.stream_type, gst::StreamType::VIDEO) && codec.has_decoder() + }) + .cloned() + .collect(), + } + } +} + +#[allow(dead_code)] +struct SignallerSignals { + error: glib::SignalHandlerId, + session_started: glib::SignalHandlerId, + session_ended: glib::SignalHandlerId, + request_meta: glib::SignalHandlerId, + session_description: glib::SignalHandlerId, + handle_ice: glib::SignalHandlerId, +} + +impl WebRTCSrc { + fn webrtcbin(&self) -> gst::Bin { + let state = self.state.lock().unwrap(); + let webrtcbin = state + .webrtcbin + .as_ref() + .expect("We should never call `.webrtcbin()` when state not > Ready") + .clone() + .downcast::() + .unwrap(); + + webrtcbin + } + + fn signaller(&self) -> Signallable { + self.settings.lock().unwrap().signaller.clone() + } + + // Maps the `webrtcbin` pad to our exposed source pad using the pad stream ID. + fn get_src_pad_from_webrtcbin_pad(&self, webrtcbin_src: &gst::Pad) -> Option { + self.get_stream_id( + Some(webrtcbin_src.property::("transceiver")), + None, + ) + .and_then(|stream_id| { + self.obj().iterate_src_pads().into_iter().find_map(|s| { + let pad = s.ok()?.downcast::().unwrap(); + if pad.imp().stream_id() == stream_id { + Some(pad) + } else { + None + } + }) + }) + } + + fn handle_webrtc_src_pad(&self, bin: &gst::Bin, pad: &gst::Pad) { + let srcpad = self.get_src_pad_from_webrtcbin_pad(pad); + if let Some(ref srcpad) = srcpad { + let stream_id = srcpad.imp().stream_id(); + let mut builder = gst::event::StreamStart::builder(&stream_id); + if let Some(stream_start) = pad.sticky_event::(0) { + builder = builder + .seqnum(stream_start.seqnum()) + .group_id(stream_start.group_id().unwrap_or_else(gst::GroupId::next)); + } + + gst::debug!(CAT, imp: self, "Storing id {stream_id} on {pad:?}"); + pad.store_sticky_event(&builder.build()).ok(); + } + + let ghostpad = gst::GhostPad::builder(None, gst::PadDirection::Src) + .proxy_pad_chain_function(glib::clone!(@weak self as this => @default-panic, move + |pad, parent, buffer| { + let padret = gst::ProxyPad::chain_default(pad, parent, buffer); + let ret = this.state.lock().unwrap().flow_combiner.update_flow(padret); + + ret + } + )) + .proxy_pad_event_function(glib::clone!(@weak self as this => @default-panic, move |pad, parent, event| { + let event = if let gst::EventView::StreamStart(stream_start) = event.view() { + let webrtcpad = pad.peer().unwrap(); + + this.get_src_pad_from_webrtcbin_pad(&webrtcpad) + .map(|srcpad| { + gst::event::StreamStart::builder(&srcpad.imp().stream_id()) + .seqnum(stream_start.seqnum()) + .group_id(stream_start.group_id().unwrap_or_else(gst::GroupId::next)) + .build() + }).unwrap_or(event) + } else { + event + }; + + gst::Pad::event_default(pad, parent, event) + })) + .build_with_target(pad) + .unwrap(); + + bin.add_pad(&ghostpad) + .expect("Adding ghostpad to the bin should always work"); + + if let Some(srcpad) = srcpad { + if srcpad.imp().needs_decoding() { + let decodebin = gst::ElementFactory::make("decodebin3") + .build() + .expect("decodebin3 needs to be present!"); + self.obj().add(&decodebin).unwrap(); + decodebin.sync_state_with_parent().unwrap(); + decodebin.connect_pad_added( + glib::clone!(@weak self as this, @weak srcpad => move |_webrtcbin, pad| { + if pad.direction() == gst::PadDirection::Sink { + return; + } + + srcpad.set_target(Some(pad)).unwrap(); + }), + ); + + gst::debug!(CAT, imp: self, "Decoding for {}", srcpad.imp().stream_id()); + let sinkpad = decodebin + .static_pad("sink") + .expect("decodebin has a sink pad"); + ghostpad + .link(&sinkpad) + .expect("webrtcbin ! decodebin3 linking failed"); + } else { + gst::debug!( + CAT, + imp: self, + "NO decoding for {}", + srcpad.imp().stream_id() + ); + srcpad.set_target(Some(&ghostpad)).unwrap(); + } + } else { + gst::debug!(CAT, imp: self, "Unused webrtcbin pad {pad:?}"); + } + } + + fn prepare(&self) -> Result<(), Error> { + let webrtcbin = gst::ElementFactory::make("webrtcbin") + .property("bundle-policy", gst_webrtc::WebRTCBundlePolicy::MaxBundle) + .property( + "stun-server", + &self.settings.lock().unwrap().stun_server.to_value(), + ) + .build() + .with_context(|| "Failed to make element webrtcbin".to_string())?; + + let bin = gst::Bin::new(None); + bin.connect_pad_removed(glib::clone!(@weak self as this => move |_, pad| + this.state.lock().unwrap().flow_combiner.remove_pad(pad); + )); + bin.connect_pad_added(glib::clone!(@weak self as this => move |_, pad| + this.state.lock().unwrap().flow_combiner.add_pad(pad); + )); + webrtcbin.connect_pad_added( + glib::clone!(@weak self as this, @weak bin, => move |_webrtcbin, pad| { + if pad.direction() == gst::PadDirection::Sink { + return; + } + + this.handle_webrtc_src_pad(&bin, pad); + }), + ); + + webrtcbin.connect_closure( + "on-ice-candidate", + false, + glib::closure!(@weak-allow-none self as this => move | + _webrtcbin: gst::Bin, + sdp_m_line_index: u32, + candidate: String| { + this.unwrap().on_ice_candidate(sdp_m_line_index, candidate); + }), + ); + + bin.add(&webrtcbin).unwrap(); + self.obj().add(&bin).context("Could not add `webrtcbin`")?; + + let mut state = self.state.lock().unwrap(); + state.webrtcbin.replace(webrtcbin); + + Ok(()) + } + + fn get_stream_id( + &self, + transceiver: Option, + mline: Option, + ) -> Option { + let mline = transceiver.map_or(mline, |t| Some(t.mlineindex())); + + // Same logic as gst_pad_create_stream_id and friends, making a hash of + // the URI and adding `:`, here the ID is the mline of the + // stream in the SDP. + mline.map(|mline| { + let mut cs = glib::Checksum::new(glib::ChecksumType::Sha256).unwrap(); + cs.update( + self.uri() + .expect("get_stream_id should never be called if no URI has been set") + .as_bytes(), + ); + + format!("{}:{mline}", cs.string().unwrap()) + }) + } + + fn unprepare(&self) -> Result<(), Error> { + gst::info!(CAT, imp: self, "unpreparing"); + + let obj = self.obj(); + self.maybe_stop_signaller(); + self.state.lock().unwrap().session_id = None; + for pad in obj.src_pads() { + obj.remove_pad(&pad) + .map_err(|err| anyhow::anyhow!("Couldn't remove pad? {err:?}"))?; + } + + self.n_video_pads.store(0, Ordering::SeqCst); + self.n_audio_pads.store(0, Ordering::SeqCst); + + Ok(()) + } + + fn connect_signaller(&self, signaller: &Signallable) { + let _ = self + .state + .lock() + .unwrap() + .signaller_signals + .insert(SignallerSignals { + error: signaller.connect_closure( + "error", + false, + glib::closure!(@to-owned self as this => move | + _signaller: glib::Object, error: String| { + gst::element_error!( + this.obj(), + gst::StreamError::Failed, + ["Signalling error: {}", error] + ); + }), + ), + + session_started: signaller.connect_closure( + "session-started", + false, + glib::closure!(@to-owned self as this => move | + _signaller: glib::Object, + session_id: &str, + _peer_id: &str| { + gst::info!(CAT, imp: this, "Session started: {session_id}"); + this.state.lock().unwrap().session_id = + Some(session_id.to_string()); + }), + ), + + session_ended: signaller.connect_closure( + "session-ended", + false, + glib::closure!(@to-owned self as this => move | + _signaller: glib::Object, _peer_id: &str| { + gst::debug!(CAT, imp: this, "Session ended."); + + this.state.lock().unwrap().session_id = None; + this.obj().iterate_src_pads().into_iter().for_each(|pad| + { if let Err(e) = pad.map(|pad| pad.push_event(gst::event::Eos::new())) { + gst::error!(CAT, "Could not send EOS: {e:?}"); + }} + ); + }), + ), + + request_meta: signaller.connect_closure( + "request-meta", + false, + glib::closure!(@to-owned self as this => move | + _signaller: glib::Object| -> Option { + let meta = this.settings.lock().unwrap().meta.clone(); + + meta + }), + ), + + session_description: signaller.connect_closure( + "session-description", + false, + glib::closure!(@to-owned self as this => move | + _signaller: glib::Object, + _peer_id: &str, + desc: &gst_webrtc::WebRTCSessionDescription| { + assert_eq!(desc.type_(), gst_webrtc::WebRTCSDPType::Offer); + + this.handle_offer(desc); + }), + ), + + // sdp_mid is exposed for future proofing, see + // https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/issues/1174, + // at the moment sdp_m_line_index must be Some + handle_ice: signaller.connect_closure( + "handle-ice", + false, + glib::closure!(@to-owned self as this => move | + _signaller: glib::Object, + peer_id: &str, + sdp_m_line_index: u32, + _sdp_mid: Option, + candidate: &str| { + this.handle_ice(peer_id, Some(sdp_m_line_index), None, candidate); + }), + ), + }); + + // previous signals are disconnected when dropping the old structure + } + + // Creates and adds our `WebRTCSrcPad` source pad, returning caps accepted + // downstream + fn create_and_probe_src_pad(&self, caps: &gst::Caps, stream_id: &str) -> Option { + gst::log!(CAT, "Creating pad for {caps:?}, stream: {stream_id}"); + + let obj = self.obj(); + let media_type = caps + .structure(0) + .expect("Passing empty caps is invalid") + .get::<&str>("media") + .expect("Only caps with a `media` field are expected when creating the pad"); + + let (template, name, raw_caps) = if media_type == "video" { + ( + obj.pad_template("video_%u").unwrap(), + format!("video_{}", self.n_video_pads.fetch_add(1, Ordering::SeqCst)), + VIDEO_CAPS.to_owned(), + ) + } else if media_type == "audio" { + ( + obj.pad_template("audio_%u").unwrap(), + format!("audio_{}", self.n_audio_pads.fetch_add(1, Ordering::SeqCst)), + AUDIO_CAPS.to_owned(), + ) + } else { + gst::info!(CAT, imp: self, "Not an audio or video media {media_type:?}"); + + return None; + }; + + let caps_with_raw = [caps.clone(), raw_caps.clone()] + .into_iter() + .collect::(); + let ghost = gst::GhostPad::builder_with_template(&template, Some(&name)) + .build() + .downcast::() + .unwrap(); + ghost.imp().set_stream_id(stream_id); + obj.add_pad(&ghost) + .expect("Adding ghost pad should never fail"); + + let downstream_caps = ghost.peer_query_caps(Some(&caps_with_raw)); + if let Some(first_struct) = downstream_caps.structure(0) { + if first_struct.has_name(raw_caps.structure(0).unwrap().name()) { + ghost.imp().set_needs_decoding(true) + } + } + + if ghost.imp().needs_decoding() { + Some(caps.clone()) + } else { + Some(downstream_caps) + } + } + + fn handle_offer(&self, offer: &gst_webrtc::WebRTCSessionDescription) { + gst::log!(CAT, imp: self, "Got offer {}", offer.sdp().to_string()); + + let sdp = offer.sdp(); + let direction = gst_webrtc::WebRTCRTPTransceiverDirection::Recvonly; + let webrtcbin = self.webrtcbin(); + for (i, media) in sdp.medias().enumerate() { + let all_caps_for_media = media + .formats() + .filter_map(|format| { + format.parse::().ok().and_then(|pt| { + let mut tmpcaps = media.caps_from_media(pt)?; + { + let tmpcaps = tmpcaps.get_mut().unwrap(); + + tmpcaps + .structure_mut(0) + .unwrap() + .set_name("application/x-rtp"); + + if let Err(err) = media.attributes_to_caps(tmpcaps) { + gst::error!(CAT, "Couldn't copy media attributes to caps: {err:?}") + } + } + + Some(tmpcaps) + }) + }) + .collect::>(); + + let mut caps = gst::Caps::new_empty(); + let settings = self.settings.lock().unwrap(); + for codec in settings + .video_codecs + .iter() + .chain(settings.audio_codecs.iter()) + { + for media_caps in &all_caps_for_media { + let encoding_name = media_caps + .structure(0) + .unwrap() + .get::<&str>("encoding-name") + .unwrap(); + if encoding_name == codec.name { + caps.get_mut().unwrap().append(media_caps.clone()); + } + } + } + drop(settings); + + if !caps.is_empty() { + let stream_id = self.get_stream_id(None, Some(i as u32)).unwrap(); + if let Some(caps) = self.create_and_probe_src_pad(&caps, &stream_id) { + gst::info!( + CAT, + imp: self, + "Adding transceiver for {stream_id} with caps: {caps:?}" + ); + let transceiver = webrtcbin.emit_by_name::( + "add-transceiver", + &[&direction, &caps], + ); + + transceiver.set_property("do_nack", true); + transceiver.set_property("fec-type", gst_webrtc::WebRTCFECType::UlpRed); + } + } else { + gst::info!( + CAT, + "Not using media: {media:#?} as it doesn't match our codec restrictions" + ); + } + } + + webrtcbin.emit_by_name::<()>("set-remote-description", &[&offer, &None::]); + + let obj = self.obj(); + obj.no_more_pads(); + + let promise = + gst::Promise::with_change_func(glib::clone!(@weak self as this => move |reply| { + this.on_answer_created(reply); + } + )); + + webrtcbin.emit_by_name::<()>("create-answer", &[&None::, &promise]); + } + + fn on_answer_created(&self, reply: Result, gst::PromiseError>) { + let reply = match reply { + Ok(Some(reply)) => { + if !reply.has_field_with_type( + "answer", + gst_webrtc::WebRTCSessionDescription::static_type(), + ) { + gst::element_error!( + self.obj(), + gst::StreamError::Failed, + ["create-answer::Promise returned with no reply"] + ); + return; + } else if reply.has_field_with_type("error", glib::Error::static_type()) { + gst::element_error!( + self.obj(), + gst::LibraryError::Failed, + ["create-offer::Promise returned with error: {:?}", reply] + ); + return; + } + + reply + } + Ok(None) => { + gst::element_error!( + self.obj(), + gst::StreamError::Failed, + ["create-answer::Promise returned with no reply"] + ); + + return; + } + Err(err) => { + gst::element_error!( + self.obj(), + gst::LibraryError::Failed, + ["create-answer::Promise returned with error {:?}", err] + ); + + return; + } + }; + + let answer = reply + .value("answer") + .unwrap() + .get::() + .expect("Invalid argument"); + + self.webrtcbin() + .emit_by_name::<()>("set-local-description", &[&answer, &None::]); + + let session_id = { + let state = self.state.lock().unwrap(); + match &state.session_id { + Some(id) => id.to_owned(), + _ => { + gst::element_error!( + self.obj(), + gst::StreamError::Failed, + ["Signalling error, no session started while requesting to send an SDP offer"] + ); + + return; + } + } + }; + + gst::log!(CAT, imp: self, "Sending SDP, {}", answer.sdp().to_string()); + let signaller = self.signaller(); + signaller.send_sdp(&session_id, &answer); + } + + fn on_ice_candidate(&self, sdp_m_line_index: u32, candidate: String) { + let signaller = self.signaller(); + let session_id = match self.state.lock().unwrap().session_id.as_ref() { + Some(id) => id.to_string(), + _ => { + gst::element_error!( + self.obj(), + gst::StreamError::Failed, + ["Signalling error, no session started while requesting to propose ice candidates"] + ); + + return; + } + }; + signaller.add_ice( + &session_id, + &candidate, + Some(sdp_m_line_index), + None::, + ); + } + + /// Called by the signaller with an ice candidate + fn handle_ice( + &self, + peer_id: &str, + sdp_m_line_index: Option, + _sdp_mid: Option, + candidate: &str, + ) { + let sdp_m_line_index = match sdp_m_line_index { + Some(m_line) => m_line, + None => { + gst::error!(CAT, imp: self, "No mandatory mline"); + return; + } + }; + gst::log!(CAT, imp: self, "Got ice from {peer_id}: {candidate}"); + + self.webrtcbin() + .emit_by_name::<()>("add-ice-candidate", &[&sdp_m_line_index, &candidate]); + } + + fn maybe_start_signaller(&self) { + let obj = self.obj(); + let mut state = self.state.lock().unwrap(); + if state.signaller_state == SignallerState::Stopped + && obj.current_state() >= gst::State::Paused + { + self.signaller().start(); + + gst::info!(CAT, imp: self, "Started signaller"); + state.signaller_state = SignallerState::Started; + } + } + + fn maybe_stop_signaller(&self) { + let mut state = self.state.lock().unwrap(); + if state.signaller_state == SignallerState::Started { + self.signaller().stop(); + state.signaller_state = SignallerState::Stopped; + gst::info!(CAT, imp: self, "Stopped signaller"); + } + } +} + +impl ElementImpl for WebRTCSrc { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "WebRTCSrc", + "Source/Network/WebRTC", + "WebRTC src", + "Thibault Saunier ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + vec![ + gst::PadTemplate::with_gtype( + "video_%u", + gst::PadDirection::Src, + gst::PadPresence::Sometimes, + &gst::Caps::builder_full() + .structure_with_any_features(VIDEO_CAPS.structure(0).unwrap().to_owned()) + .structure(RTP_CAPS.structure(0).unwrap().to_owned()) + .build(), + WebRTCSrcPad::static_type(), + ) + .unwrap(), + gst::PadTemplate::with_gtype( + "audio_%u", + gst::PadDirection::Src, + gst::PadPresence::Sometimes, + &gst::Caps::builder_full() + .structure_with_any_features(AUDIO_CAPS.structure(0).unwrap().to_owned()) + .structure(OPUS_CAPS.structure(0).unwrap().to_owned()) + .structure(RTP_CAPS.structure(0).unwrap().to_owned()) + .build(), + WebRTCSrcPad::static_type(), + ) + .unwrap(), + ] + }); + + PAD_TEMPLATES.as_ref() + } + + fn change_state( + &self, + transition: gst::StateChange, + ) -> Result { + let obj = &*self.obj(); + if let gst::StateChange::NullToReady = transition { + if let Err(err) = self.prepare() { + gst::element_error!( + obj, + gst::StreamError::Failed, + ["Failed to prepare: {}", err] + ); + return Err(gst::StateChangeError); + } + } + + let mut ret = self.parent_change_state(transition); + + match transition { + gst::StateChange::PausedToReady => { + if let Err(err) = self.unprepare() { + gst::element_error!( + obj, + gst::StreamError::Failed, + ["Failed to unprepare: {}", err] + ); + return Err(gst::StateChangeError); + } + } + gst::StateChange::ReadyToPaused => { + ret = Ok(gst::StateChangeSuccess::NoPreroll); + } + gst::StateChange::PlayingToPaused => { + ret = Ok(gst::StateChangeSuccess::NoPreroll); + } + gst::StateChange::PausedToPlaying => { + self.maybe_start_signaller(); + } + _ => (), + } + + ret + } +} + +impl GstObjectImpl for WebRTCSrc {} + +impl BinImpl for WebRTCSrc {} + +impl ChildProxyImpl for WebRTCSrc { + fn child_by_index(&self, index: u32) -> Option { + if index == 0 { + Some(self.signaller().upcast()) + } else { + None + } + } + + fn children_count(&self) -> u32 { + 1 + } + + fn child_by_name(&self, name: &str) -> Option { + match name { + "signaller" => { + gst::info!(CAT, imp: self, "Getting signaller"); + Some(self.signaller().upcast()) + } + _ => None, + } + } +} + +impl URIHandlerImpl for WebRTCSrc { + const URI_TYPE: gst::URIType = gst::URIType::Src; + + fn protocols() -> &'static [&'static str] { + &["gstwebrtc", "gstwebrtcs"] + } + + fn uri(&self) -> Option { + self.signaller().property::>("uri") + } + + fn set_uri(&self, uri: &str) -> Result<(), glib::Error> { + let uri = Url::from_str(uri) + .map_err(|err| glib::Error::new(gst::URIError::BadUri, &format!("{:?}", err)))?; + + let socket_scheme = match uri.scheme() { + "gstwebrtc" => Ok("ws"), + "gstwebrtcs" => Ok("wss"), + _ => Err(glib::Error::new( + gst::URIError::BadUri, + &format!("Invalid protocol: {}", uri.scheme()), + )), + }?; + + let mut url_str = uri.to_string(); + + // Not using `set_scheme()` because it doesn't work with `http` + // See https://github.com/servo/rust-url/pull/768 for a PR implementing that + url_str.replace_range(0..uri.scheme().len(), socket_scheme); + + self.signaller().set_property("uri", &url_str); + + Ok(()) + } +} + +#[derive(PartialEq)] +enum SignallerState { + Started, + Stopped, +} + +struct State { + session_id: Option, + signaller_state: SignallerState, + webrtcbin: Option, + flow_combiner: gst_base::UniqueFlowCombiner, + signaller_signals: Option, +} + +impl Default for State { + fn default() -> Self { + Self { + signaller_state: SignallerState::Stopped, + session_id: None, + webrtcbin: None, + flow_combiner: Default::default(), + signaller_signals: Default::default(), + } + } +} diff --git a/net/webrtc/src/webrtcsrc/mod.rs b/net/webrtc/src/webrtcsrc/mod.rs new file mode 100644 index 00000000..c869ebcd --- /dev/null +++ b/net/webrtc/src/webrtcsrc/mod.rs @@ -0,0 +1,65 @@ +// SPDX-License-Identifier: MPL-2.0 + +use crate::webrtcsrc::signaller::WebRTCSignallerRole; +use gst::prelude::*; +use gst::{glib, prelude::StaticType}; + +/** + * element-webrtcsrc: + * + * `webrtcsrc` is the source counterpart of the #webrtcsink element and can be + * used to receive streams from it, it can also be used to easily playback WebRTC + * streams coming from a web browser. + * + * To try the element, you should run #webrtcsink as described in its documentation, + * finding its `peer-id` (in the signalling server logs for example) and then + * run: + * + * ``` bash + * gst-launch-1.0 webrtcsrc signaller::producer-peer-id= ! videoconvert ! autovideosink + * ``` + * + * or directly using `playbin`: + * + * ``` bash + * gst-launch-1.0 playbin3 uri="gstwebrtc://localhost:8443?peer-id=" + * ``` + * + * ## Decoding + * + * To be able to precisely negotiate the WebRTC SDP, `webrtcsrc` is able to decode streams. + * During SDP negotiation we expose our pads based on the peer offer and right after query caps + * to see what downstream supports. + * In practice in `uridecodebinX` or `playbinX`, decoding will happen + * in `decodebinX` but for the case where a `videoconvert` is placed after a `video_XX` pad, + * decoding will happen inside `webrtcsrc`. + * + * Since: 0.10 + */ +mod imp; +mod pad; +pub mod signaller; + +pub use signaller::{SignallableImpl, SignallableImplExt}; + +use self::signaller::Signallable; + +glib::wrapper! { + pub struct WebRTCSrc(ObjectSubclass) @extends gst::Bin, gst::Element, gst::Object, @implements gst::URIHandler, gst::ChildProxy; +} + +glib::wrapper! { + pub struct WebRTCSrcPad(ObjectSubclass) @extends gst::GhostPad, gst::ProxyPad, gst::Pad, gst::Object; +} + +pub fn register(plugin: Option<&gst::Plugin>) -> Result<(), glib::BoolError> { + WebRTCSignallerRole::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); + WebRTCSrcPad::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); + Signallable::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); + gst::Element::register( + plugin, + "webrtcsrc", + gst::Rank::Primary, + WebRTCSrc::static_type(), + ) +} diff --git a/net/webrtc/src/webrtcsrc/pad.rs b/net/webrtc/src/webrtcsrc/pad.rs new file mode 100644 index 00000000..a8b0769f --- /dev/null +++ b/net/webrtc/src/webrtcsrc/pad.rs @@ -0,0 +1,45 @@ +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::subclass::prelude::*; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Mutex; + +#[derive(Default)] +pub struct WebRTCSrcPad { + needs_raw: AtomicBool, + stream_id: Mutex>, +} + +impl WebRTCSrcPad { + pub fn set_needs_decoding(&self, raw_wanted: bool) { + self.needs_raw.store(raw_wanted, Ordering::SeqCst); + } + + pub fn needs_decoding(&self) -> bool { + self.needs_raw.load(Ordering::SeqCst) + } + + pub fn set_stream_id(&self, stream_id: &str) { + *self.stream_id.lock().unwrap() = Some(stream_id.to_string()); + } + + pub fn stream_id(&self) -> String { + let stream_id = self.stream_id.lock().unwrap(); + stream_id.as_ref().unwrap().clone() + } +} + +#[glib::object_subclass] +impl ObjectSubclass for WebRTCSrcPad { + const NAME: &'static str = "GstWebRTCSrcPad"; + type Type = super::WebRTCSrcPad; + type ParentType = gst::GhostPad; +} + +impl ObjectImpl for WebRTCSrcPad {} +impl GstObjectImpl for WebRTCSrcPad {} +impl PadImpl for WebRTCSrcPad {} +impl ProxyPadImpl for WebRTCSrcPad {} +impl GhostPadImpl for WebRTCSrcPad {} diff --git a/net/webrtc/src/webrtcsrc/signaller/iface.rs b/net/webrtc/src/webrtcsrc/signaller/iface.rs new file mode 100644 index 00000000..5da1bed2 --- /dev/null +++ b/net/webrtc/src/webrtcsrc/signaller/iface.rs @@ -0,0 +1,426 @@ +use gst::glib; +use gst::glib::subclass::*; +use gst::prelude::*; +use gst::subclass::prelude::*; +use once_cell::sync::Lazy; + +#[derive(Copy, Clone)] +pub struct Signallable { + _parent: glib::gobject_ffi::GTypeInterface, + pub start: fn(&super::Signallable), + pub stop: fn(&super::Signallable), + pub send_sdp: fn(&super::Signallable, &str, &gst_webrtc::WebRTCSessionDescription), + pub add_ice: fn(&super::Signallable, &str, &str, Option, Option), + pub end_session: fn(&super::Signallable, &str), +} + +impl Signallable { + fn request_meta(_iface: &super::Signallable) -> Option { + None + } + fn start(_iface: &super::Signallable) {} + fn stop(_iface: &super::Signallable) {} + fn send_sdp( + _iface: &super::Signallable, + _session_id: &str, + _sdp: &gst_webrtc::WebRTCSessionDescription, + ) { + } + fn add_ice( + _iface: &super::Signallable, + _session_id: &str, + _candidate: &str, + _sdp_m_line_index: Option, + _sdp_mid: Option, + ) { + } + fn end_session(_iface: &super::Signallable, _session_id: &str) {} +} + +#[glib::object_interface] +unsafe impl prelude::ObjectInterface for Signallable { + const NAME: &'static ::std::primitive::str = "GstRSWebRTCSignallableIface"; + type Prerequisites = (glib::Object,); + + fn interface_init(&mut self) { + self.start = Signallable::start; + self.stop = Signallable::stop; + self.send_sdp = Signallable::send_sdp; + self.add_ice = Signallable::add_ice; + self.end_session = Signallable::end_session; + } + + fn signals() -> &'static [Signal] { + static SIGNALS: Lazy> = Lazy::new(|| { + vec![ + /** + * GstRSWebRTCSignallableIface::session-ended: + * @self: The object implementing #GstRSWebRTCSignallableIface + * @session-id: The ID of the session that ended + * + * Some WebRTC Session was closed. + */ + Signal::builder("session-ended") + .param_types([str::static_type()]) + .build(), + /** + * GstRSWebRTCSignallableIface::producer-added: + * @self: The object implementing #GstRSWebRTCSignallableIface + * @producer_id: The ID of the producer that was added + * @meta: The metadata structure of the producer + * + * Some new producing peer is ready to produce a WebRTC stream. + */ + Signal::builder("producer-added") + .param_types([str::static_type(), >::static_type()]) + .build(), + /** + * GstRSWebRTCSignallableIface::producer-removed: + * @self: The object implementing #GstRSWebRTCSignallableIface + * @producer_id: The ID of the producer that was added + * @meta: The metadata structure of the producer + * + * Some new producing peer is stopped producing streams. + */ + Signal::builder("producer-removed") + .param_types([str::static_type(), >::static_type()]) + .build(), + /** + * GstRSWebRTCSignallableIface::session-started: + * @self: The object implementing #GstRSWebRTCSignallableIface + * + * A new session started, + */ + Signal::builder("session-started") + .param_types([str::static_type(), str::static_type()]) + .build(), + /** + * GstRSWebRTCSignallableIface::session-requested: + * @self: The object implementing #GstRSWebRTCSignallableIface + * @session_id: The ID of the producer that was added + * @peer_id: The ID of the consumer peer who wants to initiate a + * session + */ + Signal::builder("session-requested") + .param_types([str::static_type(), str::static_type()]) + .build(), + /** + * GstRSWebRTCSignallableIface::error: + * @self: The object implementing #GstRSWebRTCSignallableIface + * @error: The error message as a string + */ + Signal::builder("error") + .param_types([str::static_type()]) + .build(), + /** + * GstRSWebRTCSignallableIface::request-meta: + * @self: The object implementing #GstRSWebRTCSignallableIface + * + * The signaller requests a meta about the peer using it + * + * Return: The metadata about the peer represented by the signaller + */ + Signal::builder("request-meta") + .return_type::>() + .class_handler(|_token, args| { + let arg0 = args[0usize] + .get::<&super::Signallable>() + .unwrap_or_else(|e| { + panic!("Wrong type for argument {}: {:?}", 0usize, e) + }); + Some(Signallable::request_meta(arg0).to_value()) + }) + .build(), + /** + * GstRSWebRTCSignallableIface::handle-ice: + * @self: The object implementing #GstRSWebRTCSignallableIface + * @session_id: Id of the session the ice information is about + * @sdp_m_line_index: The mlineindex of the ice candidate + * @sdp_mid: Media ID of the ice candidate + * @candiate: Information about the candidate + */ + Signal::builder("handle-ice") + .param_types([ + str::static_type(), + u32::static_type(), + >::static_type(), + str::static_type(), + ]) + .build(), + /** + * GstRSWebRTCSignallableIface::session-description: + * @self: The object implementing #GstRSWebRTCSignallableIface + * @session_id: Id of the session being described + * @description: The WebRTC session description + */ + Signal::builder("session-description") + .param_types([ + str::static_type(), + gst_webrtc::WebRTCSessionDescription::static_type(), + ]) + .build(), + /** + * GstRSWebRTCSignallableIface::start: + * @self: The object implementing #GstRSWebRTCSignallableIface + * + * Starts the signaller, connecting it to the signalling server. + */ + Signal::builder("start") + .flags(glib::SignalFlags::ACTION) + .class_handler(|_token, args| { + let arg0 = args[0usize] + .get::<&super::Signallable>() + .unwrap_or_else(|e| { + panic!("Wrong type for argument {}: {:?}", 0usize, e) + }); + Signallable::start(arg0); + + None + }) + .build(), + /** + * GstRSWebRTCSignallableIface::stop: + * @self: The object implementing #GstRSWebRTCSignallableIface + * + * Stops the signaller, disconnecting it to the signalling server. + */ + Signal::builder("stop") + .flags(glib::SignalFlags::ACTION) + .class_handler(|_tokens, args| { + let arg0 = args[0usize] + .get::<&super::Signallable>() + .unwrap_or_else(|e| { + panic!("Wrong type for argument {}: {:?}", 0usize, e) + }); + Signallable::stop(arg0); + + None + }) + .build(), + ] + }); + SIGNALS.as_ref() + } +} + +unsafe impl types::IsImplementable for super::Signallable +where + ::Type: glib::IsA, +{ + fn interface_init(iface: &mut glib::Interface) { + let iface = ::std::convert::AsMut::as_mut(iface); + + fn vstart_trampoline( + obj: &super::Signallable, + ) { + let this = obj + .dynamic_cast_ref::<::Type>() + .unwrap() + .imp(); + SignallableImpl::start(this) + } + iface.start = vstart_trampoline::; + + fn vstop_trampoline( + this: &super::Signallable, + ) { + let this = this + .dynamic_cast_ref::<::Type>() + .unwrap(); + SignallableImpl::stop(this.imp()) + } + iface.stop = vstop_trampoline::; + + fn send_sdp_trampoline( + this: &super::Signallable, + session_id: &str, + sdp: &gst_webrtc::WebRTCSessionDescription, + ) { + let this = this + .dynamic_cast_ref::<::Type>() + .unwrap(); + SignallableImpl::send_sdp(this.imp(), session_id, sdp) + } + iface.send_sdp = send_sdp_trampoline::; + + fn add_ice_trampoline( + this: &super::Signallable, + session_id: &str, + candidate: &str, + sdp_m_line_index: Option, + sdp_mid: Option, + ) { + let this = this + .dynamic_cast_ref::<::Type>() + .unwrap(); + SignallableImpl::add_ice(this.imp(), session_id, candidate, sdp_m_line_index, sdp_mid) + } + iface.add_ice = add_ice_trampoline::; + + fn end_session_trampoline( + this: &super::Signallable, + session_id: &str, + ) { + let this = this + .dynamic_cast_ref::<::Type>() + .unwrap(); + SignallableImpl::end_session(this.imp(), session_id) + } + iface.end_session = end_session_trampoline::; + } +} + +pub trait SignallableImpl: object::ObjectImpl + 'static { + fn start(&self) { + SignallableImplExt::parent_vstart(self) + } + fn stop(&self) { + SignallableImplExt::parent_vstop(self) + } + fn send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription) { + SignallableImplExt::parent_send_sdp(self, session_id, sdp) + } + fn add_ice( + &self, + session_id: &str, + candidate: &str, + sdp_m_line_index: Option, + sdp_mid: Option, + ) { + SignallableImplExt::parent_add_ice(self, session_id, candidate, sdp_m_line_index, sdp_mid) + } + fn end_session(&self, session_id: &str) { + SignallableImplExt::parent_end_session(self, session_id) + } +} + +pub trait SignallableImplExt: types::ObjectSubclass { + fn parent_vstart(&self); + fn parent_vstop(&self); + fn parent_send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription); + fn parent_add_ice( + &self, + session_id: &str, + candidate: &str, + sdp_m_line_index: Option, + sdp_mid: Option, + ); + fn parent_end_session(&self, session_id: &str); +} + +type ClassType = *mut ::GlibClassType; +impl SignallableImplExt for Obj { + fn parent_vstart(&self) { + let obj = self.obj(); + let obj = unsafe { obj.unsafe_cast_ref::() }; + let vtable = unsafe { + &*(Self::type_data() + .as_ref() + .parent_interface::() as ClassType) + }; + (vtable.start)(obj) + } + fn parent_vstop(&self) { + let obj = self.obj(); + let obj = unsafe { obj.unsafe_cast_ref::() }; + let vtable = unsafe { + &*(Self::type_data() + .as_ref() + .parent_interface::() as ClassType) + }; + (vtable.stop)(obj) + } + fn parent_send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription) { + let obj = self.obj(); + let obj = unsafe { obj.unsafe_cast_ref::() }; + let vtable = unsafe { + &*(Self::type_data() + .as_ref() + .parent_interface::() as ClassType) + }; + (vtable.send_sdp)(obj, session_id, sdp) + } + fn parent_add_ice( + &self, + session_id: &str, + candidate: &str, + sdp_m_line_index: Option, + sdp_mid: Option, + ) { + let obj = self.obj(); + let obj = unsafe { obj.unsafe_cast_ref::() }; + let vtable = unsafe { + &*(Self::type_data() + .as_ref() + .parent_interface::() as ClassType) + }; + (vtable.add_ice)(obj, session_id, candidate, sdp_m_line_index, sdp_mid) + } + fn parent_end_session(&self, session_id: &str) { + let obj = self.obj(); + let obj = unsafe { obj.unsafe_cast_ref::() }; + let vtable = unsafe { + &*(Self::type_data() + .as_ref() + .parent_interface::() as ClassType) + }; + (vtable.end_session)(obj, session_id) + } +} + +pub trait SignallableExt: 'static { + fn start(&self); + fn stop(&self); + fn send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription); + fn add_ice( + &self, + session_id: &str, + candidate: &str, + sdp_m_line_index: Option, + sdp_mid: Option, + ); + fn end_session(&self, session_id: &str); +} + +impl> SignallableExt for Obj { + fn start(&self) { + let obj = self.upcast_ref::(); + let vtable = obj.interface::().unwrap(); + let vtable = vtable.as_ref(); + (vtable.start)(obj) + } + + fn stop(&self) { + let obj = self.upcast_ref::(); + let vtable = obj.interface::().unwrap(); + let vtable = vtable.as_ref(); + (vtable.stop)(obj) + } + + fn send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription) { + let obj = self.upcast_ref::(); + let vtable = obj.interface::().unwrap(); + let vtable = vtable.as_ref(); + (vtable.send_sdp)(obj, session_id, sdp) + } + + fn add_ice( + &self, + session_id: &str, + candidate: &str, + sdp_m_line_index: Option, + sdp_mid: Option, + ) { + let obj = self.upcast_ref::(); + let vtable = obj.interface::().unwrap(); + let vtable = vtable.as_ref(); + (vtable.add_ice)(obj, session_id, candidate, sdp_m_line_index, sdp_mid) + } + + fn end_session(&self, session_id: &str) { + let obj = self.upcast_ref::(); + let vtable = obj.interface::().unwrap(); + let vtable = vtable.as_ref(); + (vtable.end_session)(obj, session_id) + } +} diff --git a/net/webrtc/src/webrtcsrc/signaller/imp.rs b/net/webrtc/src/webrtcsrc/signaller/imp.rs new file mode 100644 index 00000000..8f9715a1 --- /dev/null +++ b/net/webrtc/src/webrtcsrc/signaller/imp.rs @@ -0,0 +1,584 @@ +use crate::utils::{gvalue_to_json, serialize_json_object}; +use crate::webrtcsrc::signaller::{prelude::*, Signallable}; +use crate::RUNTIME; +use anyhow::{anyhow, Error}; +use async_tungstenite::tungstenite::Message as WsMessage; +use futures::channel::mpsc; +use futures::prelude::*; +use gst::glib; +use gst::glib::prelude::*; +use gst::subclass::prelude::*; +use gst_plugin_webrtc_protocol as p; +use once_cell::sync::Lazy; +use std::collections::HashSet; +use std::ops::ControlFlow; +use std::str::FromStr; +use std::sync::Mutex; +use std::time::Duration; +use tokio::{task, time::timeout}; +use url::Url; + +use super::CAT; + +#[derive(Debug, Eq, PartialEq, Clone, Copy, glib::Enum, Default)] +#[repr(u32)] +#[enum_type(name = "GstRSWebRTCSignallerRole")] +pub enum WebRTCSignallerRole { + #[default] + Consumer, + Producer, + Listener, +} + +pub struct Settings { + uri: Url, + producer_peer_id: Option, + cafile: Option, + role: WebRTCSignallerRole, +} + +impl Default for Settings { + fn default() -> Self { + Self { + uri: Url::from_str("ws://127.0.0.1:8443").unwrap(), + producer_peer_id: None, + cafile: Default::default(), + role: Default::default(), + } + } +} + +#[derive(Default)] +pub struct Signaller { + state: Mutex, + settings: Mutex, +} + +#[derive(Default)] +struct State { + /// Sender for the websocket messages + websocket_sender: Option>, + send_task_handle: Option>>, + receive_task_handle: Option>, + producers: HashSet, +} + +impl Signaller { + fn uri(&self) -> Url { + self.settings.lock().unwrap().uri.clone() + } + + fn set_uri(&self, uri: &str) -> Result<(), Error> { + let mut settings = self.settings.lock().unwrap(); + let mut uri = Url::from_str(uri).map_err(|err| anyhow!("{err:?}"))?; + + if let Some(peer_id) = uri + .query_pairs() + .find(|(k, _)| k == "peer-id") + .map(|v| v.1.to_string()) + { + if !matches!(settings.role, WebRTCSignallerRole::Consumer) { + gst::warning!( + CAT, + "Setting peer-id doesn't make sense for {:?}", + settings.role + ); + } else { + settings.producer_peer_id = Some(peer_id); + } + } + + if let Some(peer_id) = &settings.producer_peer_id { + uri.query_pairs_mut() + .clear() + .append_pair("peer-id", peer_id); + } + + settings.uri = uri; + + Ok(()) + } + + async fn connect(&self) -> Result<(), Error> { + let obj = self.obj(); + + let role = self.settings.lock().unwrap().role; + if let super::WebRTCSignallerRole::Consumer = role { + self.producer_peer_id() + .ok_or_else(|| anyhow!("No target producer peer id set"))?; + } + + let connector = if let Some(path) = obj.property::>("cafile") { + let cert = tokio::fs::read_to_string(&path).await?; + let cert = tokio_native_tls::native_tls::Certificate::from_pem(cert.as_bytes())?; + let mut connector_builder = tokio_native_tls::native_tls::TlsConnector::builder(); + let connector = connector_builder.add_root_certificate(cert).build()?; + Some(tokio_native_tls::TlsConnector::from(connector)) + } else { + None + }; + + let mut uri = self.uri(); + uri.set_query(None); + let (ws, _) = timeout( + // FIXME: Make the timeout configurable + Duration::from_secs(20), + async_tungstenite::tokio::connect_async_with_tls_connector(uri.to_string(), connector), + ) + .await??; + + gst::info!(CAT, imp: self, "connected"); + + // Channel for asynchronously sending out websocket message + let (mut ws_sink, mut ws_stream) = ws.split(); + + // 1000 is completely arbitrary, we simply don't want infinite piling + // up of messages as with unbounded + let (websocket_sender, mut websocket_receiver) = mpsc::channel::(1000); + let send_task_handle = + RUNTIME.spawn(glib::clone!(@weak-allow-none self as this => async move { + while let Some(msg) = websocket_receiver.next().await { + gst::log!(CAT, "Sending websocket message {:?}", msg); + ws_sink + .send(WsMessage::Text(serde_json::to_string(&msg).unwrap())) + .await?; + } + + let msg = "Done sending"; + this.map_or_else(|| gst::info!(CAT, "{msg}"), + |this| gst::info!(CAT, imp: this, "{msg}") + ); + + ws_sink.send(WsMessage::Close(None)).await?; + ws_sink.close().await?; + + Ok::<(), Error>(()) + })); + + let obj = self.obj(); + let meta = + if let Some(meta) = obj.emit_by_name::>("request-meta", &[]) { + gvalue_to_json(&meta.to_value()) + } else { + None + }; + + let receive_task_handle = + RUNTIME.spawn(glib::clone!(@weak-allow-none self as this => async move { + while let Some(msg) = tokio_stream::StreamExt::next(&mut ws_stream).await { + if let Some(ref this) = this { + if let ControlFlow::Break(_) = this.handle_message(msg, &meta) { + break; + } + } else { + break; + } + } + + let msg = "Stopped websocket receiving"; + this.map_or_else(|| gst::info!(CAT, "{msg}"), + |this| gst::info!(CAT, imp: this, "{msg}") + ); + })); + + let mut state = self.state.lock().unwrap(); + state.websocket_sender = Some(websocket_sender); + state.send_task_handle = Some(send_task_handle); + state.receive_task_handle = Some(receive_task_handle); + + Ok(()) + } + + fn set_status(&self, meta: &Option, peer_id: &str) { + let role = self.settings.lock().unwrap().role; + self.send(p::IncomingMessage::SetPeerStatus(match role { + super::WebRTCSignallerRole::Consumer => p::PeerStatus { + meta: meta.clone(), + peer_id: Some(peer_id.to_string()), + roles: vec![], + }, + super::WebRTCSignallerRole::Producer => p::PeerStatus { + meta: meta.clone(), + peer_id: Some(peer_id.to_string()), + roles: vec![p::PeerRole::Producer], + }, + super::WebRTCSignallerRole::Listener => p::PeerStatus { + meta: meta.clone(), + peer_id: Some(peer_id.to_string()), + roles: vec![p::PeerRole::Listener], + }, + })); + } + + fn producer_peer_id(&self) -> Option { + let settings = self.settings.lock().unwrap(); + + settings.producer_peer_id.clone() + } + + fn send(&self, msg: p::IncomingMessage) { + let state = self.state.lock().unwrap(); + if let Some(mut sender) = state.websocket_sender.clone() { + RUNTIME.spawn(glib::clone!(@weak self as this => async move { + if let Err(err) = sender.send(msg).await { + this.obj().emit_by_name::<()>("error", &[&format!("Error: {}", err)]); + } + })); + } + } + + pub fn start_session(&self) { + let role = self.settings.lock().unwrap().role; + if matches!(role, super::WebRTCSignallerRole::Consumer) { + let target_producer = self.producer_peer_id().unwrap(); + + self.send(p::IncomingMessage::StartSession(p::StartSessionMessage { + peer_id: target_producer.clone(), + })); + + gst::info!( + CAT, + imp: self, + "Started session with producer peer id {target_producer}", + ); + } + } + + fn handle_message( + &self, + msg: Result, + meta: &Option, + ) -> ControlFlow<()> { + match msg { + Ok(WsMessage::Text(msg)) => { + gst::trace!(CAT, imp: self, "Received message {}", msg); + + if let Ok(msg) = serde_json::from_str::(&msg) { + match msg { + p::OutgoingMessage::Welcome { peer_id } => { + self.set_status(meta, &peer_id); + self.start_session(); + } + p::OutgoingMessage::PeerStatusChanged(p::PeerStatus { + meta, + roles, + peer_id, + }) => { + let meta = meta.and_then(|m| match m { + serde_json::Value::Object(v) => Some(serialize_json_object(&v)), + _ => { + gst::error!(CAT, imp: self, "Invalid json value: {m:?}"); + None + } + }); + + let peer_id = + peer_id.expect("Status changed should always contain a peer ID"); + let mut state = self.state.lock().unwrap(); + if roles.iter().any(|r| matches!(r, p::PeerRole::Producer)) { + if !state.producers.contains(&peer_id) { + state.producers.insert(peer_id.clone()); + drop(state); + + self.obj() + .emit_by_name::<()>("producer-added", &[&peer_id, &meta]); + } + } else if state.producers.remove(&peer_id) { + drop(state); + + self.obj() + .emit_by_name::<()>("producer-removed", &[&peer_id, &meta]); + } + } + p::OutgoingMessage::SessionStarted { + peer_id, + session_id, + } => { + self.obj() + .emit_by_name::<()>("session-started", &[&session_id, &peer_id]); + } + p::OutgoingMessage::StartSession { + session_id, + peer_id, + } => { + assert!(matches!( + self.obj().property::("role"), + super::WebRTCSignallerRole::Producer + )); + + self.obj() + .emit_by_name::<()>("session-requested", &[&session_id, &peer_id]); + } + p::OutgoingMessage::EndSession(p::EndSessionMessage { session_id }) => { + gst::info!(CAT, imp: self, "Session {session_id} ended"); + + self.obj() + .emit_by_name::<()>("session-ended", &[&session_id]); + } + p::OutgoingMessage::Peer(p::PeerMessage { + session_id, + peer_message, + }) => match peer_message { + p::PeerMessageInner::Sdp(reply) => { + let (sdp, desc_type) = match reply { + p::SdpMessage::Answer { sdp } => { + (sdp, gst_webrtc::WebRTCSDPType::Answer) + } + p::SdpMessage::Offer { sdp } => { + (sdp, gst_webrtc::WebRTCSDPType::Offer) + } + }; + let sdp = match gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes()) { + Ok(sdp) => sdp, + Err(err) => { + self.obj().emit_by_name::<()>( + "error", + &[&format!("Error parsing SDP: {sdp} {err:?}")], + ); + + return ControlFlow::Break(()); + } + }; + + let desc = + gst_webrtc::WebRTCSessionDescription::new(desc_type, sdp); + self.obj().emit_by_name::<()>( + "session-description", + &[&session_id, &desc], + ); + } + p::PeerMessageInner::Ice { + candidate, + sdp_m_line_index, + } => { + let sdp_mid: Option = None; + self.obj().emit_by_name::<()>( + "handle-ice", + &[&session_id, &sdp_m_line_index, &sdp_mid, &candidate], + ); + } + }, + p::OutgoingMessage::Error { details } => { + self.obj().emit_by_name::<()>( + "error", + &[&format!("Error message from server: {details}")], + ); + } + _ => { + gst::warning!(CAT, imp: self, "Ignoring unsupported message {:?}", msg); + } + } + } else { + gst::error!(CAT, imp: self, "Unknown message from server: {}", msg); + + self.obj().emit_by_name::<()>( + "error", + &[&format!("Unknown message from server: {}", msg)], + ); + } + } + Ok(WsMessage::Close(reason)) => { + gst::info!(CAT, imp: self, "websocket connection closed: {:?}", reason); + return ControlFlow::Break(()); + } + Ok(_) => (), + Err(err) => { + self.obj() + .emit_by_name::<()>("error", &[&format!("Error receiving: {}", err)]); + return ControlFlow::Break(()); + } + } + ControlFlow::Continue(()) + } +} + +#[glib::object_subclass] +impl ObjectSubclass for Signaller { + const NAME: &'static str = "GstWebRTCSignaller"; + type Type = super::Signaller; + type ParentType = glib::Object; + type Interfaces = (Signallable,); +} + +impl ObjectImpl for Signaller { + fn properties() -> &'static [glib::ParamSpec] { + static PROPS: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecString::builder("uri") + .flags(glib::ParamFlags::READWRITE) + .build(), + glib::ParamSpecString::builder("producer-peer-id") + .flags(glib::ParamFlags::READWRITE) + .build(), + glib::ParamSpecString::builder("cafile") + .flags(glib::ParamFlags::READWRITE) + .build(), + glib::ParamSpecEnum::builder_with_default("role", WebRTCSignallerRole::Consumer) + .flags(glib::ParamFlags::READWRITE) + .build(), + ] + }); + + PROPS.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "uri" => { + if let Err(e) = self.set_uri(value.get::<&str>().expect("type checked upstream")) { + gst::error!(CAT, "Couldn't set URI: {e:?}"); + } + } + "producer-peer-id" => { + let mut settings = self.settings.lock().unwrap(); + + if !matches!(settings.role, WebRTCSignallerRole::Consumer) { + gst::warning!( + CAT, + "Setting `producer-peer-id` doesn't make sense for {:?}", + settings.role + ); + } else { + settings.producer_peer_id = value + .get::>() + .expect("type checked upstream"); + } + } + "cafile" => { + self.settings.lock().unwrap().cafile = value + .get::>() + .expect("type checked upstream") + } + "role" => { + self.settings.lock().unwrap().role = value + .get::() + .expect("type checked upstream") + } + _ => unimplemented!(), + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + let settings = self.settings.lock().unwrap(); + match pspec.name() { + "uri" => settings.uri.to_string().to_value(), + "producer-peer-id" => { + if !matches!(settings.role, WebRTCSignallerRole::Consumer) { + gst::warning!( + CAT, + "`producer-peer-id` doesn't make sense for {:?}", + settings.role + ); + } + + settings.producer_peer_id.to_value() + } + "cafile" => settings.cafile.to_value(), + "role" => settings.role.to_value(), + _ => unimplemented!(), + } + } +} + +impl SignallableImpl for Signaller { + fn start(&self) { + gst::info!(CAT, imp: self, "Starting"); + RUNTIME.spawn(glib::clone!(@weak self as this => async move { + if let Err(err) = this.connect().await { + this.obj().emit_by_name::<()>("error", &[&format!("Error receiving: {}", err)]); + } + })); + } + + fn stop(&self) { + gst::info!(CAT, imp: self, "Stopping now"); + + let mut state = self.state.lock().unwrap(); + let send_task_handle = state.send_task_handle.take(); + let receive_task_handle = state.receive_task_handle.take(); + if let Some(mut sender) = state.websocket_sender.take() { + RUNTIME.block_on(async move { + sender.close_channel(); + + if let Some(handle) = send_task_handle { + if let Err(err) = handle.await { + gst::warning!(CAT, imp: self, "Error while joining send task: {}", err); + } + } + + if let Some(handle) = receive_task_handle { + if let Err(err) = handle.await { + gst::warning!(CAT, imp: self, "Error while joining receive task: {}", err); + } + } + }); + } + } + + fn send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription) { + gst::debug!(CAT, imp: self, "Sending SDP {sdp:#?}"); + + let role = self.settings.lock().unwrap().role; + let is_consumer = matches!(role, super::WebRTCSignallerRole::Consumer); + + let msg = p::IncomingMessage::Peer(p::PeerMessage { + session_id: session_id.to_owned(), + peer_message: p::PeerMessageInner::Sdp(if is_consumer { + p::SdpMessage::Answer { + sdp: sdp.sdp().as_text().unwrap(), + } + } else { + p::SdpMessage::Offer { + sdp: sdp.sdp().as_text().unwrap(), + } + }), + }); + + self.send(msg); + } + + fn add_ice( + &self, + session_id: &str, + candidate: &str, + sdp_m_line_index: Option, + _sdp_mid: Option, + ) { + gst::debug!( + CAT, + imp: self, + "Adding ice candidate {candidate:?} for {sdp_m_line_index:?} on session {session_id}" + ); + + let msg = p::IncomingMessage::Peer(p::PeerMessage { + session_id: session_id.to_string(), + peer_message: p::PeerMessageInner::Ice { + candidate: candidate.to_string(), + sdp_m_line_index: sdp_m_line_index.unwrap(), + }, + }); + + self.send(msg); + } + + fn end_session(&self, session_id: &str) { + gst::debug!(CAT, imp: self, "Signalling session done {}", session_id); + + let state = self.state.lock().unwrap(); + let session_id = session_id.to_string(); + if let Some(mut sender) = state.websocket_sender.clone() { + RUNTIME.spawn(glib::clone!(@weak self as this => async move { + if let Err(err) = sender + .send(p::IncomingMessage::EndSession(p::EndSessionMessage { + session_id, + })) + .await + { + this.obj().emit_by_name::<()>("error", &[&format!("Error: {}", err)]); + } + })); + } + } +} + +impl GstObjectImpl for Signaller {} diff --git a/net/webrtc/src/webrtcsrc/signaller/mod.rs b/net/webrtc/src/webrtcsrc/signaller/mod.rs new file mode 100644 index 00000000..ef337be3 --- /dev/null +++ b/net/webrtc/src/webrtcsrc/signaller/mod.rs @@ -0,0 +1,46 @@ +mod iface; +mod imp; +use gst::glib; + +use once_cell::sync::Lazy; +// Expose traits and objects from the module itself so it exactly looks like +// generated bindings +pub use imp::WebRTCSignallerRole; +pub mod prelude { + pub use {super::SignallableExt, super::SignallableImpl}; +} + +pub static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "webrtcsrc-signaller", + gst::DebugColorFlags::empty(), + Some("WebRTC src signaller"), + ) +}); + +glib::wrapper! { + pub struct Signallable(ObjectInterface); +} + +glib::wrapper! { + pub struct Signaller(ObjectSubclass ) @implements Signallable; +} + +impl Default for Signaller { + fn default() -> Self { + glib::Object::builder().build() + } +} + +impl Signaller { + pub fn new(mode: WebRTCSignallerRole) -> Self { + glib::Object::builder().property("role", &mode).build() + } +} + +pub use iface::SignallableExt; +pub use iface::SignallableImpl; +pub use iface::SignallableImplExt; + +unsafe impl Send for Signallable {} +unsafe impl Sync for Signallable {}