From b709c564788e5f0dbaeb9c36ec810b77c6d31516 Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Wed, 17 Jul 2024 19:50:16 +0200 Subject: [PATCH] webrtcsink: expose properties for running signalling server Part-of: --- Cargo.lock | 1 + net/webrtc/Cargo.toml | 7 +- net/webrtc/src/webrtcsink/imp.rs | 316 ++++++++++++++++++++++++++++++- 3 files changed, 315 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1a4989470..2f051a6c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3056,6 +3056,7 @@ dependencies = [ "futures", "gst-plugin-rtp", "gst-plugin-version-helper", + "gst-plugin-webrtc-signalling", "gst-plugin-webrtc-signalling-protocol", "gstreamer", "gstreamer-app", diff --git a/net/webrtc/Cargo.toml b/net/webrtc/Cargo.toml index 1972d1cbf..d3bd3540c 100644 --- a/net/webrtc/Cargo.toml +++ b/net/webrtc/Cargo.toml @@ -34,6 +34,7 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" fastrand = "2.0" gst_plugin_webrtc_protocol = { path="protocol", package = "gst-plugin-webrtc-signalling-protocol" } +gst_plugin_webrtc_signalling = { path="signalling", package = "gst-plugin-webrtc-signalling" } human_bytes = "0.4" once_cell.workspace = true rand = "0.8" @@ -61,13 +62,13 @@ livekit-api = { version = "0.3", default-features = false, features = ["signal-c warp = {version = "0.3", optional = true } ctrlc = {version = "3.4.0", optional = true } +tracing = { version = "0.1", features = ["log"] } +tracing-subscriber = { version = "0.3", features = ["registry", "env-filter"] } +tracing-log = "0.2" [dev-dependencies] gst-plugin-rtp = { path = "../rtp" } tokio = { version = "1", features = ["signal"] } -tracing = { version = "0.1", features = ["log"] } -tracing-subscriber = { version = "0.3", features = ["registry", "env-filter"] } -tracing-log = "0.2" clap = { version = "4", features = ["derive"] } regex = "1" diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index 1f610dc5c..3f9a6885f 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -7,10 +7,15 @@ use anyhow::Context; use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; +use gst_plugin_webrtc_signalling::handlers::Handler; +use gst_plugin_webrtc_signalling::server::{Server, ServerError}; use gst_rtp::prelude::*; use gst_utils::StreamProducer; use gst_video::subclass::prelude::*; use gst_webrtc::{WebRTCDataChannel, WebRTCICETransportPolicy}; +use tokio::io::AsyncReadExt; +use tokio::net::TcpListener; +use tokio_native_tls::native_tls::TlsAcceptor; use futures::prelude::*; @@ -29,6 +34,7 @@ use super::{ use crate::signaller::{prelude::*, Signallable, Signaller, WebRTCSignallerRole}; use crate::{utils, RUNTIME}; use std::collections::{BTreeMap, HashSet}; +use tracing_subscriber::prelude::*; static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( @@ -46,6 +52,8 @@ const D3D11_MEMORY_FEATURE: &str = "memory:D3D11Memory"; const RTP_TWCC_URI: &str = "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01"; +const TLS_HANDSHAKE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5); + const DEFAULT_STUN_SERVER: Option<&str> = Some("stun://stun.l.google.com:19302"); const DEFAULT_MIN_BITRATE: u32 = 1000; @@ -2019,9 +2027,9 @@ impl BaseWebRTCSink { fn prepare(&self) -> Result<(), Error> { gst::debug!(CAT, imp = self, "preparing"); - self.state - .lock() - .unwrap() + let mut state = self.state.lock().unwrap(); + + state .streams .iter_mut() .try_for_each(|(_, stream)| stream.prepare(&self.obj()))?; @@ -4633,10 +4641,281 @@ impl NavigationImpl for BaseWebRTCSink { } } -#[derive(Default)] -pub struct WebRTCSink {} +const DEFAULT_RUN_SIGNALLING_SERVER: bool = false; +const DEFAULT_SIGNALLING_SERVER_HOST: &str = "0.0.0.0"; +const DEFAULT_SIGNALLING_SERVER_PORT: u16 = 8443; +const DEFAULT_SIGNALLING_SERVER_CERT: Option<&str> = None; +const DEFAULT_SIGNALLING_SERVER_CERT_PASSWORD: Option<&str> = None; -impl ObjectImpl for WebRTCSink {} +#[derive(Default)] +pub struct WebRTCSinkState { + signalling_server_handle: Option>, +} + +#[derive(Clone)] +pub struct WebRTCSinkSettings { + run_signalling_server: bool, + signalling_server_host: String, + signalling_server_port: u16, + signalling_server_cert: Option, + signalling_server_cert_password: Option, +} + +impl Default for WebRTCSinkSettings { + fn default() -> Self { + Self { + run_signalling_server: DEFAULT_RUN_SIGNALLING_SERVER, + signalling_server_host: DEFAULT_SIGNALLING_SERVER_HOST.to_string(), + signalling_server_port: DEFAULT_SIGNALLING_SERVER_PORT, + signalling_server_cert: DEFAULT_SIGNALLING_SERVER_CERT.map(String::from), + signalling_server_cert_password: DEFAULT_SIGNALLING_SERVER_CERT_PASSWORD + .map(String::from), + } + } +} + +#[derive(Default)] +pub struct WebRTCSink { + state: Mutex, + settings: Mutex, +} + + +fn initialize_logging(envvar_name: &str) -> Result<(), Error> { + tracing_log::LogTracer::init()?; + let env_filter = tracing_subscriber::EnvFilter::try_from_env(envvar_name) + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")); + let fmt_layer = tracing_subscriber::fmt::layer() + .with_thread_ids(true) + .with_target(true) + .with_span_events( + tracing_subscriber::fmt::format::FmtSpan::NEW + | tracing_subscriber::fmt::format::FmtSpan::CLOSE, + ); + let subscriber = tracing_subscriber::Registry::default() + .with(env_filter) + .with(fmt_layer); + tracing::subscriber::set_global_default(subscriber)?; + + Ok(()) +} + +pub static SIGNALLING_LOGGING: Lazy> = + Lazy::new(|| initialize_logging("WEBRTCSINK_SIGNALLING_SERVER_LOG")); + +impl WebRTCSink { + async fn spawn_signalling_server(settings: &WebRTCSinkSettings) -> Result<(), Error> { + let server = Server::spawn(Handler::new); + let addr = format!( + "{}:{}", + settings.signalling_server_host, settings.signalling_server_port + ); + + // Create the event loop and TCP listener we'll accept connections on. + let listener = TcpListener::bind(&addr).await?; + + let acceptor = match &settings.signalling_server_cert { + Some(cert) => { + let mut file = tokio::fs::File::open(cert).await?; + let mut identity = vec![]; + file.read_to_end(&mut identity).await?; + let identity = tokio_native_tls::native_tls::Identity::from_pkcs12( + &identity, + settings + .signalling_server_cert_password + .as_deref() + .unwrap_or(""), + ) + .unwrap(); + Some(tokio_native_tls::TlsAcceptor::from( + TlsAcceptor::new(identity).unwrap(), + )) + } + None => None, + }; + + while let Ok((stream, address)) = listener.accept().await { + let mut server_clone = server.clone(); + gst::info!(CAT, "Accepting connection from {}", address); + + if let Some(acceptor) = acceptor.clone() { + tokio::spawn(async move { + match tokio::time::timeout(TLS_HANDSHAKE_TIMEOUT, acceptor.accept(stream)).await + { + Ok(Ok(stream)) => server_clone.accept_async(stream).await, + Ok(Err(err)) => { + gst::warning!( + CAT, + "Failed to accept TLS connection from {}: {}", + address, + err + ); + Err(ServerError::TLSHandshake(err)) + } + Err(elapsed) => { + gst::warning!( + CAT, + "TLS connection timed out {} after {}", + address, + elapsed + ); + Err(ServerError::TLSHandshakeTimeout(elapsed)) + } + } + }); + } else { + RUNTIME.spawn(async move { server_clone.accept_async(stream).await }); + } + } + + Ok(()) + } + + /// Start a signalling server if required + fn prepare(&self) -> Result<(), Error> { + gst::debug!(CAT, imp = self, "preparing"); + + let mut state = self.state.lock().unwrap(); + + let settings = self.settings.lock().unwrap().clone(); + + if settings.run_signalling_server { + if let Err(err) = Lazy::force(&SIGNALLING_LOGGING) { + Err(anyhow!( + "failed signalling server logging initialization: {}", + err + ))?; + } + state.signalling_server_handle = Some(RUNTIME.spawn(glib::clone!( + #[weak(rename_to = this)] + self, + async move { + if let Err(err) = WebRTCSink::spawn_signalling_server(&settings).await { + gst::error!(CAT, imp = this, + "Failed to start signalling server: {}", err); + this.post_error_message(gst::error_msg!( + gst::StreamError::Failed, + ["Failed to start signalling server: {}", err] + )); + } + } + ))); + } + + Ok(()) + } + + fn unprepare(&self) { + gst::info!(CAT, imp = self, "unpreparing"); + + let mut state = self.state.lock().unwrap(); + + if let Some(handle) = state.signalling_server_handle.take() { + handle.abort(); + } + } +} + +impl ObjectImpl for WebRTCSink { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecBoolean::builder("run-signalling-server") + .nick("Run signalling server") + .blurb("Whether the element should run its own signalling server") + .default_value(DEFAULT_RUN_SIGNALLING_SERVER) + .mutable_ready() + .build(), + glib::ParamSpecString::builder("signalling-server-host") + .nick("Signalling server host") + .blurb("Address the signalling server should listen on") + .default_value(DEFAULT_SIGNALLING_SERVER_HOST) + .build(), + glib::ParamSpecUInt::builder("signalling-server-port") + .nick("Signalling server port") + .blurb("Port the signalling server should listen on") + .minimum(1) + .maximum(u16::MAX as u32) + .default_value(DEFAULT_SIGNALLING_SERVER_PORT as u32) + .build(), + glib::ParamSpecString::builder("signalling-server-cert") + .nick("Signalling server certificate") + .blurb( + "Path to TLS certificate the signalling server should use. + The certificate should be formatted as PKCS 12", + ) + .default_value(DEFAULT_SIGNALLING_SERVER_CERT) + .build(), + glib::ParamSpecString::builder("signalling-server-cert-password") + .nick("Signalling server certificate password") + .blurb("The password for the certificate the signalling server will use") + .default_value(DEFAULT_SIGNALLING_SERVER_CERT_PASSWORD) + .build(), + ] + }); + + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "run-signalling-server" => { + let mut settings = self.settings.lock().unwrap(); + settings.run_signalling_server = + value.get::().expect("type checked upstream"); + } + "signalling-server-host" => { + let mut settings = self.settings.lock().unwrap(); + settings.signalling_server_host = + value.get::().expect("type checked upstream") + } + "signalling-server-port" => { + let mut settings = self.settings.lock().unwrap(); + settings.signalling_server_port = + value.get::().expect("type checked upstream") as u16; + } + "signalling-server-cert" => { + let mut settings = self.settings.lock().unwrap(); + settings.signalling_server_cert = value + .get::>() + .expect("type checked upstream") + } + "signalling-server-cert-password" => { + let mut settings = self.settings.lock().unwrap(); + settings.signalling_server_cert_password = value + .get::>() + .expect("type checked upstream") + } + _ => unimplemented!(), + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "run-signalling-server" => { + let settings = self.settings.lock().unwrap(); + settings.run_signalling_server.to_value() + } + "signalling-server-host" => { + let settings = self.settings.lock().unwrap(); + settings.signalling_server_host.to_value() + } + "signalling-server-port" => { + let settings = self.settings.lock().unwrap(); + (settings.signalling_server_port as u32).to_value() + } + "signalling-server-cert" => { + let settings = self.settings.lock().unwrap(); + settings.signalling_server_cert.to_value() + } + "signalling-server-cert-password" => { + let settings = self.settings.lock().unwrap(); + settings.signalling_server_cert_password.to_value() + } + _ => unimplemented!(), + } + } +} impl GstObjectImpl for WebRTCSink {} @@ -4653,6 +4932,31 @@ impl ElementImpl for WebRTCSink { Some(&*ELEMENT_METADATA) } + + fn change_state( + &self, + transition: gst::StateChange, + ) -> Result { + let element = self.obj(); + if let gst::StateChange::ReadyToPaused = transition { + if let Err(err) = self.prepare() { + gst::element_error!( + element, + gst::StreamError::Failed, + ["Failed to prepare: {}", err] + ); + return Err(gst::StateChangeError); + } + } + + let ret = self.parent_change_state(transition); + + if let gst::StateChange::PausedToReady = transition { + self.unprepare(); + } + + ret + } } impl BinImpl for WebRTCSink {}