diff --git a/Cargo.lock b/Cargo.lock index ab812b1f1..02f2fc0ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -220,9 +220,9 @@ dependencies = [ [[package]] name = "async-tungstenite" -version = "0.29.0" +version = "0.29.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2640e8a18087635fa4a69fa080027f35a079b36f1282903e9ae605a31663117c" +checksum = "ef0f7efedeac57d9b26170f72965ecfd31473ca52ca7a64e925b0b6f5f079886" dependencies = [ "atomic-waker", "futures-core", @@ -3233,15 +3233,22 @@ name = "gst-plugin-tracers" version = "0.14.0-alpha.1" dependencies = [ "anyhow", + "async-tungstenite", "atomic_refcell", "chrono", "dirs", "etherparse", + "futures", "gst-plugin-version-helper", "gstreamer", "pcap-file", "regex", + "serde", + "serde_json", "signal-hook", + "tokio", + "tokio-stream", + "url", "walkdir", ] diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 1979df68d..1e57d8a30 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -12661,6 +12661,18 @@ "type": "gboolean", "writable": true }, + "dots-viewer-ws-url": { + "blurb": "gst-dots-viewer websocket URL", + "conditionally-available": false, + "construct": false, + "construct-only": true, + "controllable": false, + "default": "NULL", + "mutable": "null", + "readable": true, + "type": "gchararray", + "writable": true + }, "folder-mode": { "blurb": "How to create folder each time a snapshot of all pipelines is made", "conditionally-available": false, diff --git a/utils/tracers/Cargo.toml b/utils/tracers/Cargo.toml index 621b12270..d3457258d 100644 --- a/utils/tracers/Cargo.toml +++ b/utils/tracers/Cargo.toml @@ -18,6 +18,13 @@ etherparse = "0.16.0" dirs = "6" chrono = "0.4.35" walkdir = "2" +tokio = { version = "1", features = ["macros", "rt-multi-thread"] } +tokio-stream = "0.1.11" +async-tungstenite = { version = "0.29", features = ["tokio-runtime", "url"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +url = "2" +futures = "0.3" [target.'cfg(unix)'.dependencies] signal-hook = "0.3" diff --git a/utils/tracers/src/pipeline_snapshot/imp.rs b/utils/tracers/src/pipeline_snapshot/imp.rs index 980b57911..e9fff9e1b 100644 --- a/utils/tracers/src/pipeline_snapshot/imp.rs +++ b/utils/tracers/src/pipeline_snapshot/imp.rs @@ -41,6 +41,15 @@ * - "none": All .dot files are stored directly in the target directory without subfolder organization * - "numbered": Creates a new numbered folder (starting from 0) for each snapshot operation * - "timed": Creates a new folder named with the current timestamp for each snapshot operation + * - `dots-viewer-websocket-url`: A websocket URL to connect to a dots-viewer server instance, + * allowing the user to snapshot running pipelines from the web +* page. To trigger a snapshot, the user should send a json message +* with the following format: +* ```json +* { +* "type": "Snapshot" +* } +* ``` * * Examples: * @@ -59,17 +68,21 @@ * $ GST_TRACERS="pipeline-snapshot(folder-mode=timed,cleanup-mode=initial)" GST_DEBUG_DUMP_DOT_DIR=. gst-launch-1.0 audiotestsrc ! fakesink * ``` */ +use futures::prelude::*; use std::collections::HashMap; use std::io::Write; use std::path::PathBuf; use std::str::FromStr; use std::sync::{Arc, Mutex, RwLock}; +use tokio::runtime; +use async_tungstenite::tungstenite::Message; use gst::glib; use gst::glib::translate::ToGlibPtr; use gst::glib::Properties; use gst::prelude::*; use gst::subclass::prelude::*; +use serde::{Deserialize, Serialize}; use std::sync::LazyLock; static CAT: LazyLock = LazyLock::new(|| { @@ -80,6 +93,14 @@ static CAT: LazyLock = LazyLock::new(|| { ) }); +pub static RUNTIME: LazyLock = LazyLock::new(|| { + runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(1) + .build() + .unwrap() +}); + static START_TIME: LazyLock = LazyLock::new(gst::get_timestamp); #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Copy, Clone)] @@ -167,6 +188,17 @@ impl std::str::FromStr for FolderMode { } } +#[derive(Debug, Serialize, Deserialize)] +enum DotViewerMessageType { + Snapshot, +} + +#[derive(Debug, Serialize, Deserialize)] +struct DotViewerMessage { + #[serde(rename = "type")] + pub type_: DotViewerMessageType, +} + #[derive(Debug)] struct Settings { dot_prefix: Option, @@ -176,6 +208,7 @@ struct Settings { xdg_cache: bool, cleanup_mode: CleanupMode, folder_mode: FolderMode, + dots_viewer_ws_url: Option, } impl Default for Settings { @@ -188,6 +221,7 @@ impl Default for Settings { cleanup_mode: CleanupMode::None, dot_pipeline_ptr: false, folder_mode: FolderMode::None, + dots_viewer_ws_url: None, } } } @@ -263,6 +297,11 @@ impl Settings { }; } + if let Ok(websocket) = s.get::<&str>("dots-viewer-ws-url") { + gst::debug!(CAT, imp = imp, "dots-viewer-websocket-url = {}", websocket); + self.dots_viewer_ws_url = Some(websocket.to_string()); + } + if let Ok(folder_mode) = s.get::<&str>("folder-mode") { self.folder_mode = match folder_mode.parse() { Ok(mode) => mode, @@ -285,6 +324,7 @@ struct State { #[properties(wrapper_type = super::PipelineSnapshot)] pub struct PipelineSnapshot { #[property(name="dot-dir", get, set = Self::set_dot_dir, construct_only, type = String, member = dot_dir, blurb = "Directory where to place dot files")] + #[property(name="dots-viewer-ws-url", get, set = Self::set_dot_viewer_ws_url, construct_only, type = String, member = dots_viewer_ws_url, blurb = "gst-dots-viewer websocket URL")] #[property(name="xdg-cache", get, set = Self::set_xdg_cache, construct_only, type = bool, member = xdg_cache, blurb = "Use $XDG_CACHE_DIR/gstreamer-dots")] #[property(name="dot-prefix", get, set, type = String, member = dot_prefix, blurb = "Prefix for dot files")] #[property(name="dot-ts", get, set, type = bool, member = dot_ts, blurb = "Add timestamp to dot files")] @@ -304,6 +344,7 @@ struct Handles { #[cfg(unix)] signal: signal_hook::iterator::Handle, thread: std::thread::JoinHandle<()>, + websocket: Option>, } #[glib::object_subclass] @@ -327,6 +368,8 @@ impl ObjectImpl for PipelineSnapshot { if settings.cleanup_mode == CleanupMode::Initial { drop(settings); self.cleanup_dots(&self.settings.read().unwrap().dot_dir.as_ref(), true); + } else { + drop(settings); } self.register_hook(TracerHook::ElementNew); @@ -335,6 +378,8 @@ impl ObjectImpl for PipelineSnapshot { if let Err(err) = self.setup_signal() { gst::warning!(CAT, imp = self, "failed to setup UNIX signals: {}", err); } + + self.setup_websocket(); } fn signals() -> &'static [glib::subclass::Signal] { @@ -399,11 +444,98 @@ impl TracerImpl for PipelineSnapshot { } impl PipelineSnapshot { + async fn handle_websocket( + weak_self: glib::WeakRef, + host: String, + ) -> Result<(), Box> { + let url = url::Url::parse(&host)?; + let (ws_stream, _) = async_tungstenite::tokio::connect_async(url).await?; + let (mut write, mut read) = ws_stream.split(); + + gst::debug!(CAT, "Connected to WebSocket server at {}", host); + write + .send(Message::Text( + serde_json::json!({ + "type": "Hello", + }) + .to_string() + .into(), + )) + .await?; + + while let Some(msg) = read.next().await { + match msg { + Ok(Message::Text(text)) => { + let msg: DotViewerMessage = match serde_json::from_str(&text) { + Ok(s) => s, + Err(e) => { + gst::error!(CAT, "Failed to parse message: {}", e); + continue; + } + }; + + match msg.type_ { + DotViewerMessageType::Snapshot => { + if let Some(this) = weak_self.upgrade() { + gst::info!(CAT, "Received dot-pipeline request from the WebSocket"); + this.snapshot(); + } + } + } + } + Ok(Message::Close(_)) => break, + Err(e) => { + gst::warning!(CAT, "WebSocket error: {}", e); + break; + } + _ => {} + } + } + + let _ = write.close().await; + Ok(()) + } + + fn setup_websocket(&self) { + let settings = self.settings.read().unwrap(); + + if let Some(dots_viewer_websocket_url) = &settings.dots_viewer_ws_url { + let host = dots_viewer_websocket_url.clone(); + let obj = self.obj().downgrade(); + + let handle = RUNTIME.spawn(async move { + loop { + gst::debug!(CAT, "Connecting to WebSocket server at {}", host); + if let Err(e) = Self::handle_websocket(obj.clone(), host.clone()).await { + gst::warning!( + CAT, + "WebSocket {} connection failed: {}. Retrying in 5 seconds...", + host, + e + ); + } + + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } + }); + + let mut handles = self.handles.lock().unwrap(); + if let Some(handles) = handles.as_mut() { + handles.websocket = Some(handle); + } + } + } + fn set_dot_dir(&self, dot_dir: Option) { let mut settings = self.settings.write().unwrap(); settings.set_dot_dir(dot_dir); } + fn set_dot_viewer_ws_url(&self, url: Option) { + let mut settings = self.settings.write().unwrap(); + settings.dots_viewer_ws_url = url; + } + fn set_xdg_cache(&self, use_xdg_cache: bool) { let mut settings = self.settings.write().unwrap(); settings.set_xdg_cache(use_xdg_cache); @@ -541,6 +673,7 @@ impl PipelineSnapshot { *handles = Some(Handles { signal: signal_handle, thread: thread_handle, + websocket: None, }); Ok(())