tracers: pipeline-snapshot: Add websocket support for dots-viewer

Allows connecting to a dots-viewer server instance to trigger pipeline
snapshots remotely through the web interface.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1956>
This commit is contained in:
Thibault Saunier 2024-11-29 16:36:49 -03:00
parent ccf8adb6ae
commit 2a11f0b577
4 changed files with 161 additions and 2 deletions

11
Cargo.lock generated
View file

@ -220,9 +220,9 @@ dependencies = [
[[package]]
name = "async-tungstenite"
version = "0.29.0"
version = "0.29.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2640e8a18087635fa4a69fa080027f35a079b36f1282903e9ae605a31663117c"
checksum = "ef0f7efedeac57d9b26170f72965ecfd31473ca52ca7a64e925b0b6f5f079886"
dependencies = [
"atomic-waker",
"futures-core",
@ -3233,15 +3233,22 @@ name = "gst-plugin-tracers"
version = "0.14.0-alpha.1"
dependencies = [
"anyhow",
"async-tungstenite",
"atomic_refcell",
"chrono",
"dirs",
"etherparse",
"futures",
"gst-plugin-version-helper",
"gstreamer",
"pcap-file",
"regex",
"serde",
"serde_json",
"signal-hook",
"tokio",
"tokio-stream",
"url",
"walkdir",
]

View file

@ -12661,6 +12661,18 @@
"type": "gboolean",
"writable": true
},
"dots-viewer-ws-url": {
"blurb": "gst-dots-viewer websocket URL",
"conditionally-available": false,
"construct": false,
"construct-only": true,
"controllable": false,
"default": "NULL",
"mutable": "null",
"readable": true,
"type": "gchararray",
"writable": true
},
"folder-mode": {
"blurb": "How to create folder each time a snapshot of all pipelines is made",
"conditionally-available": false,

View file

@ -18,6 +18,13 @@ etherparse = "0.16.0"
dirs = "6"
chrono = "0.4.35"
walkdir = "2"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
tokio-stream = "0.1.11"
async-tungstenite = { version = "0.29", features = ["tokio-runtime", "url"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
url = "2"
futures = "0.3"
[target.'cfg(unix)'.dependencies]
signal-hook = "0.3"

View file

@ -41,6 +41,15 @@
* - "none": All .dot files are stored directly in the target directory without subfolder organization
* - "numbered": Creates a new numbered folder (starting from 0) for each snapshot operation
* - "timed": Creates a new folder named with the current timestamp for each snapshot operation
* - `dots-viewer-websocket-url`: A websocket URL to connect to a dots-viewer server instance,
* allowing the user to snapshot running pipelines from the web
* page. To trigger a snapshot, the user should send a json message
* with the following format:
* ```json
* {
* "type": "Snapshot"
* }
* ```
*
* Examples:
*
@ -59,17 +68,21 @@
* $ GST_TRACERS="pipeline-snapshot(folder-mode=timed,cleanup-mode=initial)" GST_DEBUG_DUMP_DOT_DIR=. gst-launch-1.0 audiotestsrc ! fakesink
* ```
*/
use futures::prelude::*;
use std::collections::HashMap;
use std::io::Write;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, Mutex, RwLock};
use tokio::runtime;
use async_tungstenite::tungstenite::Message;
use gst::glib;
use gst::glib::translate::ToGlibPtr;
use gst::glib::Properties;
use gst::prelude::*;
use gst::subclass::prelude::*;
use serde::{Deserialize, Serialize};
use std::sync::LazyLock;
static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
@ -80,6 +93,14 @@ static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
)
});
pub static RUNTIME: LazyLock<runtime::Runtime> = LazyLock::new(|| {
runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(1)
.build()
.unwrap()
});
static START_TIME: LazyLock<gst::ClockTime> = LazyLock::new(gst::get_timestamp);
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Copy, Clone)]
@ -167,6 +188,17 @@ impl std::str::FromStr for FolderMode {
}
}
#[derive(Debug, Serialize, Deserialize)]
enum DotViewerMessageType {
Snapshot,
}
#[derive(Debug, Serialize, Deserialize)]
struct DotViewerMessage {
#[serde(rename = "type")]
pub type_: DotViewerMessageType,
}
#[derive(Debug)]
struct Settings {
dot_prefix: Option<String>,
@ -176,6 +208,7 @@ struct Settings {
xdg_cache: bool,
cleanup_mode: CleanupMode,
folder_mode: FolderMode,
dots_viewer_ws_url: Option<String>,
}
impl Default for Settings {
@ -188,6 +221,7 @@ impl Default for Settings {
cleanup_mode: CleanupMode::None,
dot_pipeline_ptr: false,
folder_mode: FolderMode::None,
dots_viewer_ws_url: None,
}
}
}
@ -263,6 +297,11 @@ impl Settings {
};
}
if let Ok(websocket) = s.get::<&str>("dots-viewer-ws-url") {
gst::debug!(CAT, imp = imp, "dots-viewer-websocket-url = {}", websocket);
self.dots_viewer_ws_url = Some(websocket.to_string());
}
if let Ok(folder_mode) = s.get::<&str>("folder-mode") {
self.folder_mode = match folder_mode.parse() {
Ok(mode) => mode,
@ -285,6 +324,7 @@ struct State {
#[properties(wrapper_type = super::PipelineSnapshot)]
pub struct PipelineSnapshot {
#[property(name="dot-dir", get, set = Self::set_dot_dir, construct_only, type = String, member = dot_dir, blurb = "Directory where to place dot files")]
#[property(name="dots-viewer-ws-url", get, set = Self::set_dot_viewer_ws_url, construct_only, type = String, member = dots_viewer_ws_url, blurb = "gst-dots-viewer websocket URL")]
#[property(name="xdg-cache", get, set = Self::set_xdg_cache, construct_only, type = bool, member = xdg_cache, blurb = "Use $XDG_CACHE_DIR/gstreamer-dots")]
#[property(name="dot-prefix", get, set, type = String, member = dot_prefix, blurb = "Prefix for dot files")]
#[property(name="dot-ts", get, set, type = bool, member = dot_ts, blurb = "Add timestamp to dot files")]
@ -304,6 +344,7 @@ struct Handles {
#[cfg(unix)]
signal: signal_hook::iterator::Handle,
thread: std::thread::JoinHandle<()>,
websocket: Option<tokio::task::JoinHandle<()>>,
}
#[glib::object_subclass]
@ -327,6 +368,8 @@ impl ObjectImpl for PipelineSnapshot {
if settings.cleanup_mode == CleanupMode::Initial {
drop(settings);
self.cleanup_dots(&self.settings.read().unwrap().dot_dir.as_ref(), true);
} else {
drop(settings);
}
self.register_hook(TracerHook::ElementNew);
@ -335,6 +378,8 @@ impl ObjectImpl for PipelineSnapshot {
if let Err(err) = self.setup_signal() {
gst::warning!(CAT, imp = self, "failed to setup UNIX signals: {}", err);
}
self.setup_websocket();
}
fn signals() -> &'static [glib::subclass::Signal] {
@ -399,11 +444,98 @@ impl TracerImpl for PipelineSnapshot {
}
impl PipelineSnapshot {
async fn handle_websocket(
weak_self: glib::WeakRef<super::PipelineSnapshot>,
host: String,
) -> Result<(), Box<dyn std::error::Error>> {
let url = url::Url::parse(&host)?;
let (ws_stream, _) = async_tungstenite::tokio::connect_async(url).await?;
let (mut write, mut read) = ws_stream.split();
gst::debug!(CAT, "Connected to WebSocket server at {}", host);
write
.send(Message::Text(
serde_json::json!({
"type": "Hello",
})
.to_string()
.into(),
))
.await?;
while let Some(msg) = read.next().await {
match msg {
Ok(Message::Text(text)) => {
let msg: DotViewerMessage = match serde_json::from_str(&text) {
Ok(s) => s,
Err(e) => {
gst::error!(CAT, "Failed to parse message: {}", e);
continue;
}
};
match msg.type_ {
DotViewerMessageType::Snapshot => {
if let Some(this) = weak_self.upgrade() {
gst::info!(CAT, "Received dot-pipeline request from the WebSocket");
this.snapshot();
}
}
}
}
Ok(Message::Close(_)) => break,
Err(e) => {
gst::warning!(CAT, "WebSocket error: {}", e);
break;
}
_ => {}
}
}
let _ = write.close().await;
Ok(())
}
fn setup_websocket(&self) {
let settings = self.settings.read().unwrap();
if let Some(dots_viewer_websocket_url) = &settings.dots_viewer_ws_url {
let host = dots_viewer_websocket_url.clone();
let obj = self.obj().downgrade();
let handle = RUNTIME.spawn(async move {
loop {
gst::debug!(CAT, "Connecting to WebSocket server at {}", host);
if let Err(e) = Self::handle_websocket(obj.clone(), host.clone()).await {
gst::warning!(
CAT,
"WebSocket {} connection failed: {}. Retrying in 5 seconds...",
host,
e
);
}
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
});
let mut handles = self.handles.lock().unwrap();
if let Some(handles) = handles.as_mut() {
handles.websocket = Some(handle);
}
}
}
fn set_dot_dir(&self, dot_dir: Option<String>) {
let mut settings = self.settings.write().unwrap();
settings.set_dot_dir(dot_dir);
}
fn set_dot_viewer_ws_url(&self, url: Option<String>) {
let mut settings = self.settings.write().unwrap();
settings.dots_viewer_ws_url = url;
}
fn set_xdg_cache(&self, use_xdg_cache: bool) {
let mut settings = self.settings.write().unwrap();
settings.set_xdg_cache(use_xdg_cache);
@ -541,6 +673,7 @@ impl PipelineSnapshot {
*handles = Some(Handles {
signal: signal_handle,
thread: thread_handle,
websocket: None,
});
Ok(())