webrtcsink: expose properties for running web server

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1671>
This commit is contained in:
Mathieu Duponchelle 2024-07-18 21:55:38 +02:00
parent b709c56478
commit 9455e09d9f
4 changed files with 497 additions and 5 deletions

27
Cargo.lock generated
View file

@ -6044,6 +6044,20 @@ dependencies = [
"sct",
]
[[package]]
name = "rustls"
version = "0.22.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432"
dependencies = [
"log",
"ring",
"rustls-pki-types",
"rustls-webpki 0.102.6",
"subtle",
"zeroize",
]
[[package]]
name = "rustls"
version = "0.23.12"
@ -6918,6 +6932,17 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-rustls"
version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f"
dependencies = [
"rustls 0.22.4",
"rustls-pki-types",
"tokio",
]
[[package]]
name = "tokio-rustls"
version = "0.26.0"
@ -7396,11 +7421,13 @@ dependencies = [
"multer",
"percent-encoding",
"pin-project",
"rustls-pemfile 2.1.3",
"scoped-tls",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-rustls 0.25.0",
"tokio-tungstenite 0.21.0",
"tokio-util",
"tower-service",

View file

@ -10067,6 +10067,70 @@
"type": "GstWebRTCSinkPad"
}
},
"properties": {
"run-signalling-server": {
"blurb": "Whether the element should run its own signalling server",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "false",
"mutable": "ready",
"readable": true,
"type": "gboolean",
"writable": true
},
"signalling-server-cert": {
"blurb": "Path to TLS certificate the signalling server should use.\n The certificate should be formatted as PKCS 12",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "NULL",
"mutable": "null",
"readable": true,
"type": "gchararray",
"writable": true
},
"signalling-server-cert-password": {
"blurb": "The password for the certificate the signalling server will use",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "NULL",
"mutable": "null",
"readable": true,
"type": "gchararray",
"writable": true
},
"signalling-server-host": {
"blurb": "Address the signalling server should listen on",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "0.0.0.0",
"mutable": "null",
"readable": true,
"type": "gchararray",
"writable": true
},
"signalling-server-port": {
"blurb": "Port the signalling server should listen on",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "8443",
"max": "65535",
"min": "1",
"mutable": "null",
"readable": true,
"type": "guint",
"writable": true
}
},
"rank": "none"
},
"webrtcsrc": {
@ -10334,6 +10398,18 @@
"type": "guint",
"writable": true
},
"run-web-server": {
"blurb": "Whether the element should run a web server",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "false",
"mutable": "ready",
"readable": true,
"type": "gboolean",
"writable": true
},
"signaller": {
"blurb": "The Signallable object to use to handle WebRTC Signalling",
"conditionally-available": false,
@ -10405,6 +10481,66 @@
"readable": true,
"type": "GstCaps",
"writable": true
},
"web-server-cert": {
"blurb": "Path to TLS certificate the web server should use.\n The certificate should be formatted as PEM",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "NULL",
"mutable": "null",
"readable": true,
"type": "gchararray",
"writable": true
},
"web-server-directory": {
"blurb": "The directory the web server should serve",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "gstwebrtc-api/dist",
"mutable": "null",
"readable": true,
"type": "gchararray",
"writable": true
},
"web-server-host-addr": {
"blurb": "Address the web server should listen on",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "http://127.0.0.1:8080/",
"mutable": "null",
"readable": true,
"type": "gchararray",
"writable": true
},
"web-server-key": {
"blurb": "Path to private encryption key the web server should use.\n The key should be formatted as PEM",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "NULL",
"mutable": "null",
"readable": true,
"type": "gchararray",
"writable": true
},
"web-server-path": {
"blurb": "The root path for the web server",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "NULL",
"mutable": "null",
"readable": true,
"type": "gchararray",
"writable": true
}
},
"signals": {

View file

@ -59,7 +59,7 @@ async-recursion = { version = "1.0.0", optional = true }
livekit-protocol = { version = "0.3, < 0.3.4", optional = true }
livekit-api = { version = "0.3", default-features = false, features = ["signal-client", "access-token", "native-tls"], optional = true }
warp = {version = "0.3", optional = true }
warp = {version = "0.3", optional = true, features = ["tls"] }
ctrlc = {version = "3.4.0", optional = true }
tracing = { version = "0.1", features = ["log"] }
@ -81,7 +81,7 @@ path = "src/lib.rs"
gst-plugin-version-helper.workspace = true
[features]
default = ["v1_22", "aws", "janus", "livekit", "whip"]
default = ["v1_22", "aws", "janus", "livekit", "whip", "web_server"]
static = []
capi = []
v1_22 = ["gst/v1_22", "gst-app/v1_22", "gst-video/v1_22", "gst-webrtc/v1_22", "gst-sdp/v1_22", "gst-rtp/v1_22"]
@ -93,6 +93,7 @@ aws = ["dep:aws-config", "dep:aws-types", "dep:aws-credential-types", "dep:aws-s
janus = ["dep:http"]
livekit = ["dep:livekit-protocol", "dep:livekit-api"]
whip = ["dep:async-recursion", "dep:reqwest", "dep:warp", "dep:ctrlc"]
web_server = ["dep:warp"]
[package.metadata.capi]
min_version = "0.9.21"

View file

@ -72,6 +72,18 @@ const DEFAULT_DO_CLOCK_SIGNALLING: bool = false;
const DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION: bool = false;
const DEFAULT_ICE_TRANSPORT_POLICY: WebRTCICETransportPolicy = WebRTCICETransportPolicy::All;
const DEFAULT_START_BITRATE: u32 = 2048000;
#[cfg(feature = "web_server")]
const DEFAULT_RUN_WEB_SERVER: bool = false;
#[cfg(feature = "web_server")]
const DEFAULT_WEB_SERVER_CERT: Option<&str> = None;
#[cfg(feature = "web_server")]
const DEFAULT_WEB_SERVER_KEY: Option<&str> = None;
#[cfg(feature = "web_server")]
const DEFAULT_WEB_SERVER_PATH: Option<&str> = None;
#[cfg(feature = "web_server")]
const DEFAULT_WEB_SERVER_DIRECTORY: &str = "gstwebrtc-api/dist";
#[cfg(feature = "web_server")]
const DEFAULT_WEB_SERVER_HOST_ADDR: &str = "http://127.0.0.1:8080";
/* Start adding some FEC when the bitrate > 2Mbps as we found experimentally
* that it is not worth it below that threshold */
#[cfg(feature = "v1_22")]
@ -100,6 +112,18 @@ struct Settings {
meta: Option<gst::Structure>,
ice_transport_policy: WebRTCICETransportPolicy,
signaller: Signallable,
#[cfg(feature = "web_server")]
run_web_server: bool,
#[cfg(feature = "web_server")]
web_server_cert: Option<String>,
#[cfg(feature = "web_server")]
web_server_key: Option<String>,
#[cfg(feature = "web_server")]
web_server_path: Option<String>,
#[cfg(feature = "web_server")]
web_server_directory: String,
#[cfg(feature = "web_server")]
web_server_host_addr: url::Url,
}
#[derive(Debug, Clone)]
@ -462,6 +486,10 @@ struct State {
mids: HashMap<String, String>,
signaller_signals: Option<SignallerSignals>,
finalizing_sessions: Arc<(Mutex<HashSet<String>>, Condvar)>,
#[cfg(feature = "web_server")]
web_shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
#[cfg(feature = "web_server")]
web_join_handle: Option<tokio::task::JoinHandle<()>>,
}
fn create_navigation_event(sink: &super::BaseWebRTCSink, msg: &str) {
@ -546,6 +574,18 @@ impl Default for Settings {
meta: None,
ice_transport_policy: DEFAULT_ICE_TRANSPORT_POLICY,
signaller: signaller.upcast(),
#[cfg(feature = "web_server")]
run_web_server: DEFAULT_RUN_WEB_SERVER,
#[cfg(feature = "web_server")]
web_server_cert: DEFAULT_WEB_SERVER_CERT.map(String::from),
#[cfg(feature = "web_server")]
web_server_key: DEFAULT_WEB_SERVER_KEY.map(String::from),
#[cfg(feature = "web_server")]
web_server_path: DEFAULT_WEB_SERVER_PATH.map(String::from),
#[cfg(feature = "web_server")]
web_server_directory: String::from(DEFAULT_WEB_SERVER_DIRECTORY),
#[cfg(feature = "web_server")]
web_server_host_addr: url::Url::parse(DEFAULT_WEB_SERVER_HOST_ADDR).unwrap(),
}
}
}
@ -567,6 +607,10 @@ impl Default for State {
mids: HashMap::new(),
signaller_signals: Default::default(),
finalizing_sessions: Arc::new((Mutex::new(HashSet::new()), Condvar::new())),
#[cfg(feature = "web_server")]
web_shutdown_tx: None,
#[cfg(feature = "web_server")]
web_join_handle: None,
}
}
}
@ -2022,11 +2066,65 @@ impl BaseWebRTCSink {
}
}
#[cfg(feature = "web_server")]
fn spawn_web_server(
settings: &Settings,
) -> Result<
(
tokio::sync::oneshot::Sender<()>,
tokio::task::JoinHandle<()>,
),
Error,
> {
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
let addr = settings.web_server_host_addr.socket_addrs(|| None).unwrap()[0];
let settings = settings.clone();
let jh = RUNTIME.spawn(async move {
let route = match settings.web_server_path {
Some(path) => warp::path(path)
.and(warp::fs::dir(settings.web_server_directory))
.boxed(),
None => warp::get()
.and(warp::fs::dir(settings.web_server_directory))
.boxed(),
};
if let (Some(cert), Some(key)) = (settings.web_server_cert, settings.web_server_key) {
let (_, server) = warp::serve(route)
.tls()
.cert_path(cert)
.key_path(key)
.bind_with_graceful_shutdown(addr, async move {
match rx.await {
Ok(_) => gst::debug!(CAT, "Server shut down signal received"),
Err(e) => gst::error!(CAT, "{e:?}: Sender dropped"),
}
});
server.await;
} else {
let (_, server) =
warp::serve(route).bind_with_graceful_shutdown(addr, async move {
match rx.await {
Ok(_) => gst::debug!(CAT, "Server shut down signal received"),
Err(e) => gst::error!(CAT, "{e:?}: Sender dropped"),
}
});
server.await;
}
});
Ok((tx, jh))
}
/// Prepare for accepting consumers, by setting
/// up StreamProducers for each of our sink pads
fn prepare(&self) -> Result<(), Error> {
gst::debug!(CAT, imp = self, "preparing");
#[cfg(feature = "web_server")]
let settings = self.settings.lock().unwrap();
let mut state = self.state.lock().unwrap();
state
@ -2034,6 +2132,13 @@ impl BaseWebRTCSink {
.iter_mut()
.try_for_each(|(_, stream)| stream.prepare(&self.obj()))?;
#[cfg(feature = "web_server")]
if settings.run_web_server {
let (web_shutdown_tx, web_join_handle) = BaseWebRTCSink::spawn_web_server(&settings)?;
state.web_shutdown_tx = Some(web_shutdown_tx);
state.web_join_handle = Some(web_join_handle);
}
Ok(())
}
@ -2047,6 +2152,15 @@ impl BaseWebRTCSink {
drop(settings);
let mut state = self.state.lock().unwrap();
#[cfg(feature = "web_server")]
if let Some(web_shutdown_tx) = state.web_shutdown_tx.take() {
let _ = web_shutdown_tx.send(());
let web_join_handle = state.web_join_handle.take().expect("no web join handle");
RUNTIME.block_on(async {
let _ = web_join_handle.await;
});
}
let session_ids: Vec<_> = state.sessions.keys().map(|k| k.to_owned()).collect();
let sessions: Vec<_> = session_ids
@ -4075,6 +4189,95 @@ impl ObjectImpl for BaseWebRTCSink {
.flags(glib::ParamFlags::READABLE | gst::PARAM_FLAG_MUTABLE_READY)
.blurb("The Signallable object to use to handle WebRTC Signalling")
.build(),
/**
* GstBaseWebRTCSink:run-web-server:
*
* Whether the element should use [warp] to serve the folder at
* #GstBaseWebRTCSink:web-server-directory.
*
* Since: plugins-rs-0.14.0
*/
#[cfg(feature = "web_server")]
glib::ParamSpecBoolean::builder("run-web-server")
.nick("Run web server")
.blurb("Whether the element should run a web server")
.default_value(DEFAULT_RUN_WEB_SERVER)
.mutable_ready()
.build(),
/**
* GstBaseWebRTCSink:web-server-cert:
*
* The certificate to use when #GstBaseWebRTCSink:run-web-server
* is TRUE.
*
* Since: plugins-rs-0.14.0
*/
#[cfg(feature = "web_server")]
glib::ParamSpecString::builder("web-server-cert")
.nick("Web server certificate")
.blurb(
"Path to TLS certificate the web server should use.
The certificate should be formatted as PEM",
)
.default_value(DEFAULT_WEB_SERVER_CERT)
.build(),
/**
* GstBaseWebRTCSink:web-server-key:
*
* The private key to use when #GstBaseWebRTCSink:run-web-server
* is TRUE.
*
* Since: plugins-rs-0.14.0
*/
#[cfg(feature = "web_server")]
glib::ParamSpecString::builder("web-server-key")
.nick("Web server private key")
.blurb("Path to private encryption key the web server should use.
The key should be formatted as PEM")
.default_value(DEFAULT_WEB_SERVER_KEY)
.build(),
/**
* GstBaseWebRTCSink:web-server-key:
*
* The root path for the web server when #GstBaseWebRTCSink:run-web-server
* is TRUE.
*
* Since: plugins-rs-0.14.0
*/
#[cfg(feature = "web_server")]
glib::ParamSpecString::builder("web-server-path")
.nick("Web server path")
.blurb("The root path for the web server")
.default_value(DEFAULT_WEB_SERVER_PATH)
.build(),
/**
* GstBaseWebRTCSink:web-server-directory:
*
* The directory to serve when #GstBaseWebRTCSink:run-web-server
* is TRUE.
*
* Since: plugins-rs-0.14.0
*/
#[cfg(feature = "web_server")]
glib::ParamSpecString::builder("web-server-directory")
.nick("Web server directory")
.blurb("The directory the web server should serve")
.default_value(DEFAULT_WEB_SERVER_DIRECTORY)
.build(),
/**
* GstBaseWebRTCSink:web-server-host-addr:
*
* The address to listen on when #GstBaseWebRTCSink:run-web-server
* is TRUE.
*
* Since: plugins-rs-0.14.0
*/
#[cfg(feature = "web_server")]
glib::ParamSpecString::builder("web-server-host-addr")
.nick("Web server host address")
.blurb("Address the web server should listen on")
.default_value(DEFAULT_WEB_SERVER_HOST_ADDR)
.build(),
]
});
@ -4154,6 +4357,54 @@ impl ObjectImpl for BaseWebRTCSink {
.get::<WebRTCICETransportPolicy>()
.expect("type checked upstream");
}
#[cfg(feature = "web_server")]
"run-web-server" => {
let mut settings = self.settings.lock().unwrap();
settings.run_web_server = value.get::<bool>().expect("type checked upstream");
}
#[cfg(feature = "web_server")]
"web-server-cert" => {
let mut settings = self.settings.lock().unwrap();
settings.web_server_cert = value
.get::<Option<String>>()
.expect("type checked upstream")
}
#[cfg(feature = "web_server")]
"web-server-key" => {
let mut settings = self.settings.lock().unwrap();
settings.web_server_key = value
.get::<Option<String>>()
.expect("type checked upstream")
}
#[cfg(feature = "web_server")]
"web-server-path" => {
let mut settings = self.settings.lock().unwrap();
settings.web_server_path = value
.get::<Option<String>>()
.expect("type checked upstream")
}
#[cfg(feature = "web_server")]
"web-server-directory" => {
let mut settings = self.settings.lock().unwrap();
settings.web_server_directory =
value.get::<String>().expect("type checked upstream")
}
#[cfg(feature = "web_server")]
"web-server-host-addr" => {
let mut settings = self.settings.lock().unwrap();
let host_addr = match url::Url::parse(
&value.get::<String>().expect("type checked upstream"),
) {
Err(e) => {
gst::error!(CAT, "Couldn't set the host address as {e:?}, fallback to the default value {DEFAULT_WEB_SERVER_HOST_ADDR:?}");
return;
}
Ok(addr) => addr,
};
settings.web_server_host_addr = host_addr;
}
_ => unimplemented!(),
}
}
@ -4218,6 +4469,36 @@ impl ObjectImpl for BaseWebRTCSink {
settings.ice_transport_policy.to_value()
}
"signaller" => self.settings.lock().unwrap().signaller.to_value(),
#[cfg(feature = "web_server")]
"run-web-server" => {
let settings = self.settings.lock().unwrap();
settings.run_web_server.to_value()
}
#[cfg(feature = "web_server")]
"web-server-cert" => {
let settings = self.settings.lock().unwrap();
settings.web_server_cert.to_value()
}
#[cfg(feature = "web_server")]
"web-server-key" => {
let settings = self.settings.lock().unwrap();
settings.web_server_key.to_value()
}
#[cfg(feature = "web_server")]
"web-server-path" => {
let settings = self.settings.lock().unwrap();
settings.web_server_path.to_value()
}
#[cfg(feature = "web_server")]
"web-server-directory" => {
let settings = self.settings.lock().unwrap();
settings.web_server_directory.to_value()
}
#[cfg(feature = "web_server")]
"web-server-host-addr" => {
let settings = self.settings.lock().unwrap();
settings.web_server_host_addr.to_string().to_value()
}
_ => unimplemented!(),
}
}
@ -4680,7 +4961,6 @@ pub struct WebRTCSink {
settings: Mutex<WebRTCSinkSettings>,
}
fn initialize_logging(envvar_name: &str) -> Result<(), Error> {
tracing_log::LogTracer::init()?;
let env_filter = tracing_subscriber::EnvFilter::try_from_env(envvar_name)
@ -4703,6 +4983,9 @@ fn initialize_logging(envvar_name: &str) -> Result<(), Error> {
pub static SIGNALLING_LOGGING: Lazy<Result<(), Error>> =
Lazy::new(|| initialize_logging("WEBRTCSINK_SIGNALLING_SERVER_LOG"));
#[cfg(feature = "web_server")]
use warp::Filter;
impl WebRTCSink {
async fn spawn_signalling_server(settings: &WebRTCSinkSettings) -> Result<(), Error> {
let server = Server::spawn(Handler::new);
@ -4791,8 +5074,12 @@ impl WebRTCSink {
self,
async move {
if let Err(err) = WebRTCSink::spawn_signalling_server(&settings).await {
gst::error!(CAT, imp = this,
"Failed to start signalling server: {}", err);
gst::error!(
CAT,
imp = this,
"Failed to start signalling server: {}",
err
);
this.post_error_message(gst::error_msg!(
gst::StreamError::Failed,
["Failed to start signalling server: {}", err]
@ -4820,17 +5107,40 @@ impl ObjectImpl for WebRTCSink {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
/**
* GstWebRTCSink:run-signalling-server:
*
* Whether the element should run its own signalling server.
*
* Since: plugins-rs-0.14.0
*/
glib::ParamSpecBoolean::builder("run-signalling-server")
.nick("Run signalling server")
.blurb("Whether the element should run its own signalling server")
.default_value(DEFAULT_RUN_SIGNALLING_SERVER)
.mutable_ready()
.build(),
/**
* GstWebRTCSink:signalling-server-host:
*
* The address to listen on when #GstWebRTCSink:run-signalling-server
* is TRUE.
*
* Since: plugins-rs-0.14.0
*/
glib::ParamSpecString::builder("signalling-server-host")
.nick("Signalling server host")
.blurb("Address the signalling server should listen on")
.default_value(DEFAULT_SIGNALLING_SERVER_HOST)
.build(),
/**
* GstWebRTCSink:signalling-server-port:
*
* The port to listen on when #GstWebRTCSink:run-signalling-server
* is TRUE.
*
* Since: plugins-rs-0.14.0
*/
glib::ParamSpecUInt::builder("signalling-server-port")
.nick("Signalling server port")
.blurb("Port the signalling server should listen on")
@ -4838,6 +5148,16 @@ impl ObjectImpl for WebRTCSink {
.maximum(u16::MAX as u32)
.default_value(DEFAULT_SIGNALLING_SERVER_PORT as u32)
.build(),
/**
* GstWebRTCSink:signalling-server-cert:
*
* Path to TLS certificate to use when #GstWebRTCSink:run-signalling-server
* is TRUE.
*
* The certificate should be formatted as PKCS 12.
*
* Since: plugins-rs-0.14.0
*/
glib::ParamSpecString::builder("signalling-server-cert")
.nick("Signalling server certificate")
.blurb(
@ -4846,6 +5166,14 @@ impl ObjectImpl for WebRTCSink {
)
.default_value(DEFAULT_SIGNALLING_SERVER_CERT)
.build(),
/**
* GstWebRTCSink:signalling-server-cert-password:
*
* The password for the certificate provided through
* #GstWebRTCSink:signalling-server-cert.
*
* Since: plugins-rs-0.14.0
*/
glib::ParamSpecString::builder("signalling-server-cert-password")
.nick("Signalling server certificate password")
.blurb("The password for the certificate the signalling server will use")