examples: implement webrtcsink-stat-server

This commit is contained in:
Mathieu Duponchelle 2021-11-30 22:49:33 +01:00
parent 921ca7fbab
commit 5f98e61c91
4 changed files with 373 additions and 1 deletions

166
Cargo.lock generated
View file

@ -2,6 +2,15 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "ansi_term"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2"
dependencies = [
"winapi",
]
[[package]]
name = "anyhow"
version = "1.0.48"
@ -262,8 +271,10 @@ version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73"
dependencies = [
"libc",
"num-integer",
"num-traits",
"winapi",
]
[[package]]
@ -605,6 +616,8 @@ dependencies = [
"option-operations",
"paste",
"pretty-hex",
"serde",
"serde_bytes",
"thiserror",
]
@ -871,6 +884,15 @@ dependencies = [
"value-bag",
]
[[package]]
name = "matchers"
version = "0.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1"
dependencies = [
"regex-automata",
]
[[package]]
name = "matches"
version = "0.1.9"
@ -926,6 +948,7 @@ dependencies = [
"autocfg",
"num-integer",
"num-traits",
"serde",
]
[[package]]
@ -1175,6 +1198,30 @@ dependencies = [
"bitflags",
]
[[package]]
name = "regex"
version = "1.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461"
dependencies = [
"regex-syntax",
]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax",
]
[[package]]
name = "regex-syntax"
version = "0.6.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b"
[[package]]
name = "remove_dir_all"
version = "0.5.3"
@ -1228,6 +1275,18 @@ name = "serde"
version = "1.0.130"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f12d06de37cf59146fbdecab66aa99f9fe4f78722e3607577a5375d66bd0c913"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_bytes"
version = "0.11.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16ae07dd2f88a366f15bd0632ba725227018c69a1c8550a927324f8eb8368bb9"
dependencies = [
"serde",
]
[[package]]
name = "serde_derive"
@ -1264,6 +1323,15 @@ dependencies = [
"opaque-debug",
]
[[package]]
name = "sharded-slab"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31"
dependencies = [
"lazy_static",
]
[[package]]
name = "signal-hook"
version = "0.3.10"
@ -1363,6 +1431,15 @@ dependencies = [
"syn",
]
[[package]]
name = "thread_local"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8018d24e04c95ac8790716a5987d0fec4f8b27249ffa0f7d33f1369bdfb88cbd"
dependencies = [
"once_cell",
]
[[package]]
name = "tinyvec"
version = "1.5.1"
@ -1387,6 +1464,82 @@ dependencies = [
"serde",
]
[[package]]
name = "tracing"
version = "0.1.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "375a639232caf30edfc78e8d89b2d4c375515393e7af7e16f01cd96917fb2105"
dependencies = [
"cfg-if",
"log",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
]
[[package]]
name = "tracing-attributes"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4f480b8f81512e825f337ad51e94c1eb5d3bbdf2b363dcd01e2b19a9ffe3f8e"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tracing-core"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f4ed65637b8390770814083d20756f87bfa2c21bf2f110babdc5438351746e4"
dependencies = [
"lazy_static",
]
[[package]]
name = "tracing-log"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6923477a48e41c1951f1999ef8bb5a3023eb723ceadafe78ffb65dc366761e3"
dependencies = [
"lazy_static",
"log",
"tracing-core",
]
[[package]]
name = "tracing-serde"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb65ea441fbb84f9f6748fd496cf7f63ec9af5bca94dd86456978d055e8eb28b"
dependencies = [
"serde",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.2.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e0d2eaa99c3c2e41547cfa109e910a68ea03823cccad4a0525dcbc9b01e8c71"
dependencies = [
"ansi_term",
"chrono",
"lazy_static",
"matchers",
"regex",
"serde",
"serde_json",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
"tracing-serde",
]
[[package]]
name = "tungstenite"
version = "0.16.0"
@ -1458,6 +1611,15 @@ version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "uuid"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7"
dependencies = [
"getrandom",
]
[[package]]
name = "value-bag"
version = "1.0.0-alpha.8"
@ -1595,6 +1757,10 @@ dependencies = [
"serde_derive",
"serde_json",
"smallvec",
"tracing",
"tracing-log",
"tracing-subscriber",
"uuid",
]
[[package]]

View file

@ -9,7 +9,7 @@ repository = "https://github.com/centricular/webrtcsink/"
build = "build.rs"
[dependencies]
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] }
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20", "ser_de"] }
gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] }
gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] }
gst-webrtc = { package = "gstreamer-webrtc", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] }
@ -26,6 +26,12 @@ serde_derive = "1"
serde_json = "1"
fastrand = "1.0"
[dev-dependencies]
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.2", features = ["registry", "env-filter"] }
tracing-log = "0.1"
uuid = { version = "0.8", features = ["v4"] }
[lib]
name = "webrtcsink"
crate-type = ["cdylib", "rlib"]
@ -50,3 +56,6 @@ versioning = false
[package.metadata.capi.pkg_config]
requires_private = "gstreamer-rtp >= 1.20, gstreamer-webrtc >= 1.20, gstreamer-1.0 >= 1.20, gstreamer-app >= 1.20, gstreamer-video >= 1.20, gstreamer-sdp >= 1.20, gobject-2.0, glib-2.0, gmodule-2.0"
[[example]]
name = "webrtcsink-stats-server"

View file

@ -0,0 +1,18 @@
# webrtcsink examples
Collection (1-sized for now) of webrtcsink examples
## webrtcsink-stats-server
A simple application that instantiates a webrtcsink and serves stats
over websockets.
The application expects a signalling server to be running at `ws://localhost:8443`,
similar to the usage example in the main README.
``` shell
cargo run --example webrtcsink-stats-server
```
Once it is running, follow the instruction in the webrtcsink-stats folder to
run an example client.

View file

@ -0,0 +1,179 @@
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use anyhow::Error;
use async_std::net::{TcpListener, TcpStream};
use async_std::task;
use async_tungstenite::tungstenite::Message as WsMessage;
use futures::channel::mpsc;
use futures::prelude::*;
use gst::glib::Type;
use gst::prelude::*;
use tracing::info;
use tracing_subscriber::prelude::*;
fn serialize_value(val: &gst::glib::Value) -> Option<serde_json::Value> {
match val.type_() {
Type::STRING => Some(val.get::<String>().unwrap().into()),
Type::BOOL => Some(val.get::<bool>().unwrap().into()),
Type::I32 => Some(val.get::<i32>().unwrap().into()),
Type::U32 => Some(val.get::<u32>().unwrap().into()),
Type::I_LONG | Type::I64 => Some(val.get::<i64>().unwrap().into()),
Type::U_LONG | Type::U64 => Some(val.get::<u64>().unwrap().into()),
Type::F32 => Some(val.get::<f32>().unwrap().into()),
Type::F64 => Some(val.get::<f64>().unwrap().into()),
_ => {
if let Ok(s) = val.get::<gst::Structure>() {
serde_json::to_value(
s.iter()
.filter_map(|(name, value)| {
serialize_value(value).map(|value| (name.to_string(), value))
})
.collect::<HashMap<String, serde_json::Value>>(),
)
.ok()
} else if let Ok(a) = val.get::<gst::Array>() {
serde_json::to_value(
a.iter()
.filter_map(|value| serialize_value(value))
.collect::<Vec<serde_json::Value>>(),
)
.ok()
} else if let Some((_klass, values)) = gst::glib::FlagsValue::from_value(val) {
Some(
values
.iter()
.map(|value| value.nick())
.collect::<Vec<&str>>()
.join("+")
.into(),
)
} else if let Ok(value) = val.serialize() {
Some(value.as_str().into())
} else {
None
}
}
}
}
#[derive(Clone)]
struct Listener {
id: uuid::Uuid,
sender: mpsc::Sender<WsMessage>,
}
struct State {
listeners: Vec<Listener>,
}
async fn run() -> Result<(), Error> {
tracing_log::LogTracer::init().expect("Failed to set logger");
let env_filter = tracing_subscriber::EnvFilter::try_from_env("WEBRTCSINK_STATS_LOG")
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"));
let fmt_layer = tracing_subscriber::fmt::layer()
.with_thread_ids(true)
.with_target(true)
.with_span_events(
tracing_subscriber::fmt::format::FmtSpan::NEW
| tracing_subscriber::fmt::format::FmtSpan::CLOSE,
);
let subscriber = tracing_subscriber::Registry::default()
.with(env_filter)
.with(fmt_layer);
tracing::subscriber::set_global_default(subscriber).expect("Failed to set subscriber");
let state = Arc::new(Mutex::new(State { listeners: vec![] }));
let addr = "127.0.0.1:8484".to_string();
// Create the event loop and TCP listener we'll accept connections on.
let try_socket = TcpListener::bind(&addr).await;
let listener = try_socket.expect("Failed to bind");
info!("Listening on: {}", addr);
let pipeline =
gst::parse_launch("webrtcsink name=ws videotestsrc ! queue ! ws. audiotestsrc ! ws.")?;
let ws = pipeline
.downcast_ref::<gst::Bin>()
.unwrap()
.by_name("ws")
.unwrap();
let ws_clone = ws.downgrade();
let state_clone = state.clone();
task::spawn(async move {
let mut interval = async_std::stream::interval(std::time::Duration::from_millis(100));
while let Some(_) = interval.next().await {
if let Some(ws) = ws_clone.upgrade() {
let stats = ws.property::<gst::Structure>("stats");
let stats = serialize_value(&stats.to_value()).unwrap();
info!("Stats: {}", serde_json::to_string_pretty(&stats).unwrap());
let msg = WsMessage::Text(serde_json::to_string(&stats).unwrap());
let listeners = state_clone.lock().unwrap().listeners.clone();
for mut listener in listeners {
if listener.sender.send(msg.clone()).await.is_err() {
let mut state = state_clone.lock().unwrap();
let index = state
.listeners
.iter()
.position(|l| l.id == listener.id)
.unwrap();
state.listeners.remove(index);
}
}
} else {
break;
}
}
});
pipeline.set_state(gst::State::Playing)?;
while let Ok((stream, _)) = listener.accept().await {
task::spawn(accept_connection(state.clone(), stream));
}
Ok(())
}
async fn accept_connection(state: Arc<Mutex<State>>, stream: TcpStream) {
let addr = stream
.peer_addr()
.expect("connected streams should have a peer address");
info!("Peer address: {}", addr);
let mut ws_stream = async_tungstenite::accept_async(stream)
.await
.expect("Error during the websocket handshake occurred");
info!("New WebSocket connection: {}", addr);
let mut state = state.lock().unwrap();
let (sender, mut receiver) = mpsc::channel::<WsMessage>(1000);
state.listeners.push(Listener {
id: uuid::Uuid::new_v4(),
sender,
});
drop(state);
task::spawn(async move {
while let Some(msg) = receiver.next().await {
info!("Sending to one listener!");
if ws_stream.send(msg).await.is_err() {
info!("Listener errored out");
receiver.close();
}
}
});
}
fn main() -> Result<(), Error> {
gst::init()?;
task::block_on(run())
}