diff --git a/Cargo.lock b/Cargo.lock index 018477e0..d4621877 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "ansi_term" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" +dependencies = [ + "winapi", +] + [[package]] name = "anyhow" version = "1.0.48" @@ -262,8 +271,10 @@ version = "0.4.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73" dependencies = [ + "libc", "num-integer", "num-traits", + "winapi", ] [[package]] @@ -605,6 +616,8 @@ dependencies = [ "option-operations", "paste", "pretty-hex", + "serde", + "serde_bytes", "thiserror", ] @@ -871,6 +884,15 @@ dependencies = [ "value-bag", ] +[[package]] +name = "matchers" +version = "0.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1" +dependencies = [ + "regex-automata", +] + [[package]] name = "matches" version = "0.1.9" @@ -926,6 +948,7 @@ dependencies = [ "autocfg", "num-integer", "num-traits", + "serde", ] [[package]] @@ -1175,6 +1198,30 @@ dependencies = [ "bitflags", ] +[[package]] +name = "regex" +version = "1.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461" +dependencies = [ + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.6.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" + [[package]] name = "remove_dir_all" version = "0.5.3" @@ -1228,6 +1275,18 @@ name = "serde" version = "1.0.130" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f12d06de37cf59146fbdecab66aa99f9fe4f78722e3607577a5375d66bd0c913" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_bytes" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16ae07dd2f88a366f15bd0632ba725227018c69a1c8550a927324f8eb8368bb9" +dependencies = [ + "serde", +] [[package]] name = "serde_derive" @@ -1264,6 +1323,15 @@ dependencies = [ "opaque-debug", ] +[[package]] +name = "sharded-slab" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31" +dependencies = [ + "lazy_static", +] + [[package]] name = "signal-hook" version = "0.3.10" @@ -1363,6 +1431,15 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8018d24e04c95ac8790716a5987d0fec4f8b27249ffa0f7d33f1369bdfb88cbd" +dependencies = [ + "once_cell", +] + [[package]] name = "tinyvec" version = "1.5.1" @@ -1387,6 +1464,82 @@ dependencies = [ "serde", ] +[[package]] +name = "tracing" +version = "0.1.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "375a639232caf30edfc78e8d89b2d4c375515393e7af7e16f01cd96917fb2105" +dependencies = [ + "cfg-if", + "log", + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4f480b8f81512e825f337ad51e94c1eb5d3bbdf2b363dcd01e2b19a9ffe3f8e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f4ed65637b8390770814083d20756f87bfa2c21bf2f110babdc5438351746e4" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "tracing-log" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6923477a48e41c1951f1999ef8bb5a3023eb723ceadafe78ffb65dc366761e3" +dependencies = [ + "lazy_static", + "log", + "tracing-core", +] + +[[package]] +name = "tracing-serde" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb65ea441fbb84f9f6748fd496cf7f63ec9af5bca94dd86456978d055e8eb28b" +dependencies = [ + "serde", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.2.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e0d2eaa99c3c2e41547cfa109e910a68ea03823cccad4a0525dcbc9b01e8c71" +dependencies = [ + "ansi_term", + "chrono", + "lazy_static", + "matchers", + "regex", + "serde", + "serde_json", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", + "tracing-serde", +] + [[package]] name = "tungstenite" version = "0.16.0" @@ -1458,6 +1611,15 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" +[[package]] +name = "uuid" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" +dependencies = [ + "getrandom", +] + [[package]] name = "value-bag" version = "1.0.0-alpha.8" @@ -1595,6 +1757,10 @@ dependencies = [ "serde_derive", "serde_json", "smallvec", + "tracing", + "tracing-log", + "tracing-subscriber", + "uuid", ] [[package]] diff --git a/plugins/Cargo.toml b/plugins/Cargo.toml index 0a023f9d..537fc7b5 100644 --- a/plugins/Cargo.toml +++ b/plugins/Cargo.toml @@ -9,7 +9,7 @@ repository = "https://github.com/centricular/webrtcsink/" build = "build.rs" [dependencies] -gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] } +gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20", "ser_de"] } gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] } gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] } gst-webrtc = { package = "gstreamer-webrtc", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] } @@ -26,6 +26,12 @@ serde_derive = "1" serde_json = "1" fastrand = "1.0" +[dev-dependencies] +tracing = { version = "0.1", features = ["log"] } +tracing-subscriber = { version = "0.2", features = ["registry", "env-filter"] } +tracing-log = "0.1" +uuid = { version = "0.8", features = ["v4"] } + [lib] name = "webrtcsink" crate-type = ["cdylib", "rlib"] @@ -50,3 +56,6 @@ versioning = false [package.metadata.capi.pkg_config] requires_private = "gstreamer-rtp >= 1.20, gstreamer-webrtc >= 1.20, gstreamer-1.0 >= 1.20, gstreamer-app >= 1.20, gstreamer-video >= 1.20, gstreamer-sdp >= 1.20, gobject-2.0, glib-2.0, gmodule-2.0" + +[[example]] +name = "webrtcsink-stats-server" diff --git a/plugins/examples/README.md b/plugins/examples/README.md new file mode 100644 index 00000000..7b402e69 --- /dev/null +++ b/plugins/examples/README.md @@ -0,0 +1,18 @@ +# webrtcsink examples + +Collection (1-sized for now) of webrtcsink examples + +## webrtcsink-stats-server + +A simple application that instantiates a webrtcsink and serves stats +over websockets. + +The application expects a signalling server to be running at `ws://localhost:8443`, +similar to the usage example in the main README. + +``` shell +cargo run --example webrtcsink-stats-server +``` + +Once it is running, follow the instruction in the webrtcsink-stats folder to +run an example client. diff --git a/plugins/examples/webrtcsink-stats-server.rs b/plugins/examples/webrtcsink-stats-server.rs new file mode 100644 index 00000000..a5ed2ef1 --- /dev/null +++ b/plugins/examples/webrtcsink-stats-server.rs @@ -0,0 +1,179 @@ +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +use anyhow::Error; + +use async_std::net::{TcpListener, TcpStream}; +use async_std::task; +use async_tungstenite::tungstenite::Message as WsMessage; +use futures::channel::mpsc; +use futures::prelude::*; +use gst::glib::Type; +use gst::prelude::*; +use tracing::info; +use tracing_subscriber::prelude::*; + +fn serialize_value(val: &gst::glib::Value) -> Option { + match val.type_() { + Type::STRING => Some(val.get::().unwrap().into()), + Type::BOOL => Some(val.get::().unwrap().into()), + Type::I32 => Some(val.get::().unwrap().into()), + Type::U32 => Some(val.get::().unwrap().into()), + Type::I_LONG | Type::I64 => Some(val.get::().unwrap().into()), + Type::U_LONG | Type::U64 => Some(val.get::().unwrap().into()), + Type::F32 => Some(val.get::().unwrap().into()), + Type::F64 => Some(val.get::().unwrap().into()), + _ => { + if let Ok(s) = val.get::() { + serde_json::to_value( + s.iter() + .filter_map(|(name, value)| { + serialize_value(value).map(|value| (name.to_string(), value)) + }) + .collect::>(), + ) + .ok() + } else if let Ok(a) = val.get::() { + serde_json::to_value( + a.iter() + .filter_map(|value| serialize_value(value)) + .collect::>(), + ) + .ok() + } else if let Some((_klass, values)) = gst::glib::FlagsValue::from_value(val) { + Some( + values + .iter() + .map(|value| value.nick()) + .collect::>() + .join("+") + .into(), + ) + } else if let Ok(value) = val.serialize() { + Some(value.as_str().into()) + } else { + None + } + } + } +} + +#[derive(Clone)] +struct Listener { + id: uuid::Uuid, + sender: mpsc::Sender, +} + +struct State { + listeners: Vec, +} + +async fn run() -> Result<(), Error> { + tracing_log::LogTracer::init().expect("Failed to set logger"); + let env_filter = tracing_subscriber::EnvFilter::try_from_env("WEBRTCSINK_STATS_LOG") + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")); + let fmt_layer = tracing_subscriber::fmt::layer() + .with_thread_ids(true) + .with_target(true) + .with_span_events( + tracing_subscriber::fmt::format::FmtSpan::NEW + | tracing_subscriber::fmt::format::FmtSpan::CLOSE, + ); + let subscriber = tracing_subscriber::Registry::default() + .with(env_filter) + .with(fmt_layer); + tracing::subscriber::set_global_default(subscriber).expect("Failed to set subscriber"); + + let state = Arc::new(Mutex::new(State { listeners: vec![] })); + + let addr = "127.0.0.1:8484".to_string(); + + // Create the event loop and TCP listener we'll accept connections on. + let try_socket = TcpListener::bind(&addr).await; + let listener = try_socket.expect("Failed to bind"); + info!("Listening on: {}", addr); + + let pipeline = + gst::parse_launch("webrtcsink name=ws videotestsrc ! queue ! ws. audiotestsrc ! ws.")?; + let ws = pipeline + .downcast_ref::() + .unwrap() + .by_name("ws") + .unwrap(); + + let ws_clone = ws.downgrade(); + let state_clone = state.clone(); + task::spawn(async move { + let mut interval = async_std::stream::interval(std::time::Duration::from_millis(100)); + + while let Some(_) = interval.next().await { + if let Some(ws) = ws_clone.upgrade() { + let stats = ws.property::("stats"); + let stats = serialize_value(&stats.to_value()).unwrap(); + info!("Stats: {}", serde_json::to_string_pretty(&stats).unwrap()); + let msg = WsMessage::Text(serde_json::to_string(&stats).unwrap()); + + let listeners = state_clone.lock().unwrap().listeners.clone(); + + for mut listener in listeners { + if listener.sender.send(msg.clone()).await.is_err() { + let mut state = state_clone.lock().unwrap(); + let index = state + .listeners + .iter() + .position(|l| l.id == listener.id) + .unwrap(); + state.listeners.remove(index); + } + } + } else { + break; + } + } + }); + + pipeline.set_state(gst::State::Playing)?; + + while let Ok((stream, _)) = listener.accept().await { + task::spawn(accept_connection(state.clone(), stream)); + } + + Ok(()) +} + +async fn accept_connection(state: Arc>, stream: TcpStream) { + let addr = stream + .peer_addr() + .expect("connected streams should have a peer address"); + info!("Peer address: {}", addr); + + let mut ws_stream = async_tungstenite::accept_async(stream) + .await + .expect("Error during the websocket handshake occurred"); + + info!("New WebSocket connection: {}", addr); + + let mut state = state.lock().unwrap(); + let (sender, mut receiver) = mpsc::channel::(1000); + state.listeners.push(Listener { + id: uuid::Uuid::new_v4(), + sender, + }); + drop(state); + + task::spawn(async move { + while let Some(msg) = receiver.next().await { + info!("Sending to one listener!"); + if ws_stream.send(msg).await.is_err() { + info!("Listener errored out"); + receiver.close(); + } + } + }); +} + +fn main() -> Result<(), Error> { + gst::init()?; + + task::block_on(run()) +}