From 9455e09d9f4f15a9751a8c010f320aae56040eb8 Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Thu, 18 Jul 2024 21:55:38 +0200 Subject: [PATCH] webrtcsink: expose properties for running web server Part-of: --- Cargo.lock | 27 +++ docs/plugins/gst_plugins_cache.json | 136 +++++++++++ net/webrtc/Cargo.toml | 5 +- net/webrtc/src/webrtcsink/imp.rs | 334 +++++++++++++++++++++++++++- 4 files changed, 497 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2f051a6c2..bdc2478b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6044,6 +6044,20 @@ dependencies = [ "sct", ] +[[package]] +name = "rustls" +version = "0.22.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" +dependencies = [ + "log", + "ring", + "rustls-pki-types", + "rustls-webpki 0.102.6", + "subtle", + "zeroize", +] + [[package]] name = "rustls" version = "0.23.12" @@ -6918,6 +6932,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" +dependencies = [ + "rustls 0.22.4", + "rustls-pki-types", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.0" @@ -7396,11 +7421,13 @@ dependencies = [ "multer", "percent-encoding", "pin-project", + "rustls-pemfile 2.1.3", "scoped-tls", "serde", "serde_json", "serde_urlencoded", "tokio", + "tokio-rustls 0.25.0", "tokio-tungstenite 0.21.0", "tokio-util", "tower-service", diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index abf9f84d3..7595e7efa 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -10067,6 +10067,70 @@ "type": "GstWebRTCSinkPad" } }, + "properties": { + "run-signalling-server": { + "blurb": "Whether the element should run its own signalling server", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "false", + "mutable": "ready", + "readable": true, + "type": "gboolean", + "writable": true + }, + "signalling-server-cert": { + "blurb": "Path to TLS certificate the signalling server should use.\n The certificate should be formatted as PKCS 12", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "NULL", + "mutable": "null", + "readable": true, + "type": "gchararray", + "writable": true + }, + "signalling-server-cert-password": { + "blurb": "The password for the certificate the signalling server will use", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "NULL", + "mutable": "null", + "readable": true, + "type": "gchararray", + "writable": true + }, + "signalling-server-host": { + "blurb": "Address the signalling server should listen on", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0.0.0.0", + "mutable": "null", + "readable": true, + "type": "gchararray", + "writable": true + }, + "signalling-server-port": { + "blurb": "Port the signalling server should listen on", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "8443", + "max": "65535", + "min": "1", + "mutable": "null", + "readable": true, + "type": "guint", + "writable": true + } + }, "rank": "none" }, "webrtcsrc": { @@ -10334,6 +10398,18 @@ "type": "guint", "writable": true }, + "run-web-server": { + "blurb": "Whether the element should run a web server", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "false", + "mutable": "ready", + "readable": true, + "type": "gboolean", + "writable": true + }, "signaller": { "blurb": "The Signallable object to use to handle WebRTC Signalling", "conditionally-available": false, @@ -10405,6 +10481,66 @@ "readable": true, "type": "GstCaps", "writable": true + }, + "web-server-cert": { + "blurb": "Path to TLS certificate the web server should use.\n The certificate should be formatted as PEM", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "NULL", + "mutable": "null", + "readable": true, + "type": "gchararray", + "writable": true + }, + "web-server-directory": { + "blurb": "The directory the web server should serve", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "gstwebrtc-api/dist", + "mutable": "null", + "readable": true, + "type": "gchararray", + "writable": true + }, + "web-server-host-addr": { + "blurb": "Address the web server should listen on", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "http://127.0.0.1:8080/", + "mutable": "null", + "readable": true, + "type": "gchararray", + "writable": true + }, + "web-server-key": { + "blurb": "Path to private encryption key the web server should use.\n The key should be formatted as PEM", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "NULL", + "mutable": "null", + "readable": true, + "type": "gchararray", + "writable": true + }, + "web-server-path": { + "blurb": "The root path for the web server", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "NULL", + "mutable": "null", + "readable": true, + "type": "gchararray", + "writable": true } }, "signals": { diff --git a/net/webrtc/Cargo.toml b/net/webrtc/Cargo.toml index d3bd3540c..088f667c4 100644 --- a/net/webrtc/Cargo.toml +++ b/net/webrtc/Cargo.toml @@ -59,7 +59,7 @@ async-recursion = { version = "1.0.0", optional = true } livekit-protocol = { version = "0.3, < 0.3.4", optional = true } livekit-api = { version = "0.3", default-features = false, features = ["signal-client", "access-token", "native-tls"], optional = true } -warp = {version = "0.3", optional = true } +warp = {version = "0.3", optional = true, features = ["tls"] } ctrlc = {version = "3.4.0", optional = true } tracing = { version = "0.1", features = ["log"] } @@ -81,7 +81,7 @@ path = "src/lib.rs" gst-plugin-version-helper.workspace = true [features] -default = ["v1_22", "aws", "janus", "livekit", "whip"] +default = ["v1_22", "aws", "janus", "livekit", "whip", "web_server"] static = [] capi = [] v1_22 = ["gst/v1_22", "gst-app/v1_22", "gst-video/v1_22", "gst-webrtc/v1_22", "gst-sdp/v1_22", "gst-rtp/v1_22"] @@ -93,6 +93,7 @@ aws = ["dep:aws-config", "dep:aws-types", "dep:aws-credential-types", "dep:aws-s janus = ["dep:http"] livekit = ["dep:livekit-protocol", "dep:livekit-api"] whip = ["dep:async-recursion", "dep:reqwest", "dep:warp", "dep:ctrlc"] +web_server = ["dep:warp"] [package.metadata.capi] min_version = "0.9.21" diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index 3f9a6885f..c7cb6341a 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -72,6 +72,18 @@ const DEFAULT_DO_CLOCK_SIGNALLING: bool = false; const DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION: bool = false; const DEFAULT_ICE_TRANSPORT_POLICY: WebRTCICETransportPolicy = WebRTCICETransportPolicy::All; const DEFAULT_START_BITRATE: u32 = 2048000; +#[cfg(feature = "web_server")] +const DEFAULT_RUN_WEB_SERVER: bool = false; +#[cfg(feature = "web_server")] +const DEFAULT_WEB_SERVER_CERT: Option<&str> = None; +#[cfg(feature = "web_server")] +const DEFAULT_WEB_SERVER_KEY: Option<&str> = None; +#[cfg(feature = "web_server")] +const DEFAULT_WEB_SERVER_PATH: Option<&str> = None; +#[cfg(feature = "web_server")] +const DEFAULT_WEB_SERVER_DIRECTORY: &str = "gstwebrtc-api/dist"; +#[cfg(feature = "web_server")] +const DEFAULT_WEB_SERVER_HOST_ADDR: &str = "http://127.0.0.1:8080"; /* Start adding some FEC when the bitrate > 2Mbps as we found experimentally * that it is not worth it below that threshold */ #[cfg(feature = "v1_22")] @@ -100,6 +112,18 @@ struct Settings { meta: Option, ice_transport_policy: WebRTCICETransportPolicy, signaller: Signallable, + #[cfg(feature = "web_server")] + run_web_server: bool, + #[cfg(feature = "web_server")] + web_server_cert: Option, + #[cfg(feature = "web_server")] + web_server_key: Option, + #[cfg(feature = "web_server")] + web_server_path: Option, + #[cfg(feature = "web_server")] + web_server_directory: String, + #[cfg(feature = "web_server")] + web_server_host_addr: url::Url, } #[derive(Debug, Clone)] @@ -462,6 +486,10 @@ struct State { mids: HashMap, signaller_signals: Option, finalizing_sessions: Arc<(Mutex>, Condvar)>, + #[cfg(feature = "web_server")] + web_shutdown_tx: Option>, + #[cfg(feature = "web_server")] + web_join_handle: Option>, } fn create_navigation_event(sink: &super::BaseWebRTCSink, msg: &str) { @@ -546,6 +574,18 @@ impl Default for Settings { meta: None, ice_transport_policy: DEFAULT_ICE_TRANSPORT_POLICY, signaller: signaller.upcast(), + #[cfg(feature = "web_server")] + run_web_server: DEFAULT_RUN_WEB_SERVER, + #[cfg(feature = "web_server")] + web_server_cert: DEFAULT_WEB_SERVER_CERT.map(String::from), + #[cfg(feature = "web_server")] + web_server_key: DEFAULT_WEB_SERVER_KEY.map(String::from), + #[cfg(feature = "web_server")] + web_server_path: DEFAULT_WEB_SERVER_PATH.map(String::from), + #[cfg(feature = "web_server")] + web_server_directory: String::from(DEFAULT_WEB_SERVER_DIRECTORY), + #[cfg(feature = "web_server")] + web_server_host_addr: url::Url::parse(DEFAULT_WEB_SERVER_HOST_ADDR).unwrap(), } } } @@ -567,6 +607,10 @@ impl Default for State { mids: HashMap::new(), signaller_signals: Default::default(), finalizing_sessions: Arc::new((Mutex::new(HashSet::new()), Condvar::new())), + #[cfg(feature = "web_server")] + web_shutdown_tx: None, + #[cfg(feature = "web_server")] + web_join_handle: None, } } } @@ -2022,11 +2066,65 @@ impl BaseWebRTCSink { } } + #[cfg(feature = "web_server")] + fn spawn_web_server( + settings: &Settings, + ) -> Result< + ( + tokio::sync::oneshot::Sender<()>, + tokio::task::JoinHandle<()>, + ), + Error, + > { + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + + let addr = settings.web_server_host_addr.socket_addrs(|| None).unwrap()[0]; + + let settings = settings.clone(); + + let jh = RUNTIME.spawn(async move { + let route = match settings.web_server_path { + Some(path) => warp::path(path) + .and(warp::fs::dir(settings.web_server_directory)) + .boxed(), + None => warp::get() + .and(warp::fs::dir(settings.web_server_directory)) + .boxed(), + }; + if let (Some(cert), Some(key)) = (settings.web_server_cert, settings.web_server_key) { + let (_, server) = warp::serve(route) + .tls() + .cert_path(cert) + .key_path(key) + .bind_with_graceful_shutdown(addr, async move { + match rx.await { + Ok(_) => gst::debug!(CAT, "Server shut down signal received"), + Err(e) => gst::error!(CAT, "{e:?}: Sender dropped"), + } + }); + server.await; + } else { + let (_, server) = + warp::serve(route).bind_with_graceful_shutdown(addr, async move { + match rx.await { + Ok(_) => gst::debug!(CAT, "Server shut down signal received"), + Err(e) => gst::error!(CAT, "{e:?}: Sender dropped"), + } + }); + server.await; + } + }); + + Ok((tx, jh)) + } + /// Prepare for accepting consumers, by setting /// up StreamProducers for each of our sink pads fn prepare(&self) -> Result<(), Error> { gst::debug!(CAT, imp = self, "preparing"); + #[cfg(feature = "web_server")] + let settings = self.settings.lock().unwrap(); let mut state = self.state.lock().unwrap(); state @@ -2034,6 +2132,13 @@ impl BaseWebRTCSink { .iter_mut() .try_for_each(|(_, stream)| stream.prepare(&self.obj()))?; + #[cfg(feature = "web_server")] + if settings.run_web_server { + let (web_shutdown_tx, web_join_handle) = BaseWebRTCSink::spawn_web_server(&settings)?; + state.web_shutdown_tx = Some(web_shutdown_tx); + state.web_join_handle = Some(web_join_handle); + } + Ok(()) } @@ -2047,6 +2152,15 @@ impl BaseWebRTCSink { drop(settings); let mut state = self.state.lock().unwrap(); + #[cfg(feature = "web_server")] + if let Some(web_shutdown_tx) = state.web_shutdown_tx.take() { + let _ = web_shutdown_tx.send(()); + let web_join_handle = state.web_join_handle.take().expect("no web join handle"); + RUNTIME.block_on(async { + let _ = web_join_handle.await; + }); + } + let session_ids: Vec<_> = state.sessions.keys().map(|k| k.to_owned()).collect(); let sessions: Vec<_> = session_ids @@ -4075,6 +4189,95 @@ impl ObjectImpl for BaseWebRTCSink { .flags(glib::ParamFlags::READABLE | gst::PARAM_FLAG_MUTABLE_READY) .blurb("The Signallable object to use to handle WebRTC Signalling") .build(), + /** + * GstBaseWebRTCSink:run-web-server: + * + * Whether the element should use [warp] to serve the folder at + * #GstBaseWebRTCSink:web-server-directory. + * + * Since: plugins-rs-0.14.0 + */ + #[cfg(feature = "web_server")] + glib::ParamSpecBoolean::builder("run-web-server") + .nick("Run web server") + .blurb("Whether the element should run a web server") + .default_value(DEFAULT_RUN_WEB_SERVER) + .mutable_ready() + .build(), + /** + * GstBaseWebRTCSink:web-server-cert: + * + * The certificate to use when #GstBaseWebRTCSink:run-web-server + * is TRUE. + * + * Since: plugins-rs-0.14.0 + */ + #[cfg(feature = "web_server")] + glib::ParamSpecString::builder("web-server-cert") + .nick("Web server certificate") + .blurb( + "Path to TLS certificate the web server should use. + The certificate should be formatted as PEM", + ) + .default_value(DEFAULT_WEB_SERVER_CERT) + .build(), + /** + * GstBaseWebRTCSink:web-server-key: + * + * The private key to use when #GstBaseWebRTCSink:run-web-server + * is TRUE. + * + * Since: plugins-rs-0.14.0 + */ + #[cfg(feature = "web_server")] + glib::ParamSpecString::builder("web-server-key") + .nick("Web server private key") + .blurb("Path to private encryption key the web server should use. + The key should be formatted as PEM") + .default_value(DEFAULT_WEB_SERVER_KEY) + .build(), + /** + * GstBaseWebRTCSink:web-server-key: + * + * The root path for the web server when #GstBaseWebRTCSink:run-web-server + * is TRUE. + * + * Since: plugins-rs-0.14.0 + */ + #[cfg(feature = "web_server")] + glib::ParamSpecString::builder("web-server-path") + .nick("Web server path") + .blurb("The root path for the web server") + .default_value(DEFAULT_WEB_SERVER_PATH) + .build(), + /** + * GstBaseWebRTCSink:web-server-directory: + * + * The directory to serve when #GstBaseWebRTCSink:run-web-server + * is TRUE. + * + * Since: plugins-rs-0.14.0 + */ + #[cfg(feature = "web_server")] + glib::ParamSpecString::builder("web-server-directory") + .nick("Web server directory") + .blurb("The directory the web server should serve") + .default_value(DEFAULT_WEB_SERVER_DIRECTORY) + .build(), + /** + * GstBaseWebRTCSink:web-server-host-addr: + * + * The address to listen on when #GstBaseWebRTCSink:run-web-server + * is TRUE. + * + * Since: plugins-rs-0.14.0 + */ + #[cfg(feature = "web_server")] + glib::ParamSpecString::builder("web-server-host-addr") + .nick("Web server host address") + .blurb("Address the web server should listen on") + .default_value(DEFAULT_WEB_SERVER_HOST_ADDR) + .build(), ] }); @@ -4154,6 +4357,54 @@ impl ObjectImpl for BaseWebRTCSink { .get::() .expect("type checked upstream"); } + #[cfg(feature = "web_server")] + "run-web-server" => { + let mut settings = self.settings.lock().unwrap(); + settings.run_web_server = value.get::().expect("type checked upstream"); + } + #[cfg(feature = "web_server")] + "web-server-cert" => { + let mut settings = self.settings.lock().unwrap(); + settings.web_server_cert = value + .get::>() + .expect("type checked upstream") + } + #[cfg(feature = "web_server")] + "web-server-key" => { + let mut settings = self.settings.lock().unwrap(); + settings.web_server_key = value + .get::>() + .expect("type checked upstream") + } + #[cfg(feature = "web_server")] + "web-server-path" => { + let mut settings = self.settings.lock().unwrap(); + settings.web_server_path = value + .get::>() + .expect("type checked upstream") + } + #[cfg(feature = "web_server")] + "web-server-directory" => { + let mut settings = self.settings.lock().unwrap(); + settings.web_server_directory = + value.get::().expect("type checked upstream") + } + #[cfg(feature = "web_server")] + "web-server-host-addr" => { + let mut settings = self.settings.lock().unwrap(); + + let host_addr = match url::Url::parse( + &value.get::().expect("type checked upstream"), + ) { + Err(e) => { + gst::error!(CAT, "Couldn't set the host address as {e:?}, fallback to the default value {DEFAULT_WEB_SERVER_HOST_ADDR:?}"); + return; + } + + Ok(addr) => addr, + }; + settings.web_server_host_addr = host_addr; + } _ => unimplemented!(), } } @@ -4218,6 +4469,36 @@ impl ObjectImpl for BaseWebRTCSink { settings.ice_transport_policy.to_value() } "signaller" => self.settings.lock().unwrap().signaller.to_value(), + #[cfg(feature = "web_server")] + "run-web-server" => { + let settings = self.settings.lock().unwrap(); + settings.run_web_server.to_value() + } + #[cfg(feature = "web_server")] + "web-server-cert" => { + let settings = self.settings.lock().unwrap(); + settings.web_server_cert.to_value() + } + #[cfg(feature = "web_server")] + "web-server-key" => { + let settings = self.settings.lock().unwrap(); + settings.web_server_key.to_value() + } + #[cfg(feature = "web_server")] + "web-server-path" => { + let settings = self.settings.lock().unwrap(); + settings.web_server_path.to_value() + } + #[cfg(feature = "web_server")] + "web-server-directory" => { + let settings = self.settings.lock().unwrap(); + settings.web_server_directory.to_value() + } + #[cfg(feature = "web_server")] + "web-server-host-addr" => { + let settings = self.settings.lock().unwrap(); + settings.web_server_host_addr.to_string().to_value() + } _ => unimplemented!(), } } @@ -4680,7 +4961,6 @@ pub struct WebRTCSink { settings: Mutex, } - fn initialize_logging(envvar_name: &str) -> Result<(), Error> { tracing_log::LogTracer::init()?; let env_filter = tracing_subscriber::EnvFilter::try_from_env(envvar_name) @@ -4703,6 +4983,9 @@ fn initialize_logging(envvar_name: &str) -> Result<(), Error> { pub static SIGNALLING_LOGGING: Lazy> = Lazy::new(|| initialize_logging("WEBRTCSINK_SIGNALLING_SERVER_LOG")); +#[cfg(feature = "web_server")] +use warp::Filter; + impl WebRTCSink { async fn spawn_signalling_server(settings: &WebRTCSinkSettings) -> Result<(), Error> { let server = Server::spawn(Handler::new); @@ -4791,8 +5074,12 @@ impl WebRTCSink { self, async move { if let Err(err) = WebRTCSink::spawn_signalling_server(&settings).await { - gst::error!(CAT, imp = this, - "Failed to start signalling server: {}", err); + gst::error!( + CAT, + imp = this, + "Failed to start signalling server: {}", + err + ); this.post_error_message(gst::error_msg!( gst::StreamError::Failed, ["Failed to start signalling server: {}", err] @@ -4820,17 +5107,40 @@ impl ObjectImpl for WebRTCSink { fn properties() -> &'static [glib::ParamSpec] { static PROPERTIES: Lazy> = Lazy::new(|| { vec![ + /** + * GstWebRTCSink:run-signalling-server: + * + * Whether the element should run its own signalling server. + * + * Since: plugins-rs-0.14.0 + */ glib::ParamSpecBoolean::builder("run-signalling-server") .nick("Run signalling server") .blurb("Whether the element should run its own signalling server") .default_value(DEFAULT_RUN_SIGNALLING_SERVER) .mutable_ready() .build(), + /** + * GstWebRTCSink:signalling-server-host: + * + * The address to listen on when #GstWebRTCSink:run-signalling-server + * is TRUE. + * + * Since: plugins-rs-0.14.0 + */ glib::ParamSpecString::builder("signalling-server-host") .nick("Signalling server host") .blurb("Address the signalling server should listen on") .default_value(DEFAULT_SIGNALLING_SERVER_HOST) .build(), + /** + * GstWebRTCSink:signalling-server-port: + * + * The port to listen on when #GstWebRTCSink:run-signalling-server + * is TRUE. + * + * Since: plugins-rs-0.14.0 + */ glib::ParamSpecUInt::builder("signalling-server-port") .nick("Signalling server port") .blurb("Port the signalling server should listen on") @@ -4838,6 +5148,16 @@ impl ObjectImpl for WebRTCSink { .maximum(u16::MAX as u32) .default_value(DEFAULT_SIGNALLING_SERVER_PORT as u32) .build(), + /** + * GstWebRTCSink:signalling-server-cert: + * + * Path to TLS certificate to use when #GstWebRTCSink:run-signalling-server + * is TRUE. + * + * The certificate should be formatted as PKCS 12. + * + * Since: plugins-rs-0.14.0 + */ glib::ParamSpecString::builder("signalling-server-cert") .nick("Signalling server certificate") .blurb( @@ -4846,6 +5166,14 @@ impl ObjectImpl for WebRTCSink { ) .default_value(DEFAULT_SIGNALLING_SERVER_CERT) .build(), + /** + * GstWebRTCSink:signalling-server-cert-password: + * + * The password for the certificate provided through + * #GstWebRTCSink:signalling-server-cert. + * + * Since: plugins-rs-0.14.0 + */ glib::ParamSpecString::builder("signalling-server-cert-password") .nick("Signalling server certificate password") .blurb("The password for the certificate the signalling server will use")