From 344326434ca63a5b4ac84391418b2410ae72f5fb Mon Sep 17 00:00:00 2001 From: Thibault Saunier Date: Mon, 29 Apr 2024 13:43:27 -0400 Subject: [PATCH] tracers: snapshot: Add an option to use folders for each snapshot Part-of: --- Cargo.lock | 2 + utils/tracers/Cargo.toml | 2 + utils/tracers/src/pipeline_snapshot/imp.rs | 260 ++++++++++++++++----- 3 files changed, 209 insertions(+), 55 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5a4d6e5e..24eda174 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3083,6 +3083,7 @@ version = "0.14.0-alpha.1" dependencies = [ "anyhow", "atomic_refcell", + "chrono", "dirs", "etherparse", "gst-plugin-version-helper", @@ -3090,6 +3091,7 @@ dependencies = [ "pcap-file", "regex", "signal-hook", + "walkdir", ] [[package]] diff --git a/utils/tracers/Cargo.toml b/utils/tracers/Cargo.toml index c8bb748e..58c44b80 100644 --- a/utils/tracers/Cargo.toml +++ b/utils/tracers/Cargo.toml @@ -16,6 +16,8 @@ atomic_refcell = "0.1" pcap-file = "1.1.1" etherparse = "0.16.0" dirs = "5.0.1" +chrono = "0.4.35" +walkdir = "2" [target.'cfg(unix)'.dependencies] signal-hook = "0.3" diff --git a/utils/tracers/src/pipeline_snapshot/imp.rs b/utils/tracers/src/pipeline_snapshot/imp.rs index 54414a62..438944bc 100644 --- a/utils/tracers/src/pipeline_snapshot/imp.rs +++ b/utils/tracers/src/pipeline_snapshot/imp.rs @@ -40,6 +40,7 @@ */ use std::collections::HashMap; use std::io::Write; +use std::path::PathBuf; use std::str::FromStr; use std::sync::{Arc, Mutex, RwLock}; @@ -60,7 +61,7 @@ static CAT: LazyLock = LazyLock::new(|| { static START_TIME: LazyLock = LazyLock::new(gst::get_timestamp); -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Copy, Clone)] struct ElementPtr(std::ptr::NonNull); unsafe impl Send for ElementPtr {} @@ -80,7 +81,7 @@ impl ElementPtr { #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)] #[repr(u32)] -#[enum_type(name = "GstpipelineSnapshotCleanupMode")] +#[enum_type(name = "GstPipelineSnapshotCleanupMode")] #[non_exhaustive] pub enum CleanupMode { #[enum_value( @@ -89,7 +90,8 @@ pub enum CleanupMode { )] Initial, #[enum_value( - name = "CleanupAutomatic: cleanup .dot files before each snapshots", + name = "CleanupAutomatic: cleanup .dot files before each snapshots if pipeline-snapshot::folder-mode is not None \ + otherwise cleanup `.dot` files in folders", nick = "automatic" )] Automatic, @@ -97,6 +99,53 @@ pub enum CleanupMode { None, } +impl std::str::FromStr for CleanupMode { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "initial" => Ok(CleanupMode::Initial), + "automatic" => Ok(CleanupMode::Automatic), + "none" => Ok(CleanupMode::None), + _ => Err(format!("unknown cleanup mode: {}", s)), + } + } +} + +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)] +#[repr(u32)] +#[enum_type(name = "GstPipelineSnapshotFolderMode")] +#[non_exhaustive] +pub enum FolderMode { + #[enum_value(name = "None: Do not use folders to store dot files", nick = "none")] + None, + #[enum_value( + name = "Numbered: Use folders to store dot files, each time `.snapshot()` is called a new folder is created \ + and named with a number starting from 0.", + nick = "numbered" + )] + Numbered, + #[enum_value( + name = "Timed: Use folders to store dot files, each time `.snapshot()` is called a new folder is created \ + and named with the current timestamp.", + nick = "timed" + )] + Timed, +} + +impl std::str::FromStr for FolderMode { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "none" => Ok(FolderMode::None), + "numbered" => Ok(FolderMode::Numbered), + "timed" => Ok(FolderMode::Timed), + _ => Err(format!("unknown folder mode: {}", s)), + } + } +} + #[derive(Debug)] struct Settings { dot_prefix: Option, @@ -104,6 +153,7 @@ struct Settings { dot_pipeline_ptr: bool, dot_dir: Option, cleanup_mode: CleanupMode, + folder_mode: FolderMode, } impl Default for Settings { @@ -114,9 +164,11 @@ impl Default for Settings { dot_ts: true, cleanup_mode: CleanupMode::None, dot_pipeline_ptr: false, + folder_mode: FolderMode::None, } } } + impl Settings { fn set_dot_dir(&mut self, dot_dir: Option) { if let Some(dot_dir) = dot_dir { @@ -143,7 +195,7 @@ impl Settings { if let Ok(dot_dir) = s.get("dot-dir") { self.set_dot_dir(dot_dir); - gst::log!(CAT, imp = imp, "dot-dir = {:?}", self.dot_dir); + gst::log!(CAT, imp = imp, "dot-prefix = {:?}", self.dot_dir); } if let Ok(dot_prefix) = s.get("dot-prefix") { @@ -161,21 +213,34 @@ impl Settings { self.dot_pipeline_ptr = dot_pipeline_ptr; } - if let Ok(cleanup_mod) = s.get::("cleanup-mode") { - gst::log!(CAT, imp = imp, "cleanup-mode = {:?}", cleanup_mod); - self.cleanup_mode = match cleanup_mod.as_str() { - "initial" => CleanupMode::Initial, - "automatic" => CleanupMode::Automatic, - "none" => CleanupMode::None, - _ => { - gst::warning!(CAT, imp = imp, "unknown cleanup-mode: {}", cleanup_mod); + if let Ok(cleanup_mod) = s.get::<&str>("cleanup-mode") { + self.cleanup_mode = match cleanup_mod.parse() { + Ok(mode) => mode, + Err(err) => { + gst::warning!(CAT, imp = imp, "unknown cleanup-mode: {}", err); CleanupMode::None } }; } + + if let Ok(folder_mode) = s.get::<&str>("folder-mode") { + self.folder_mode = match folder_mode.parse() { + Ok(mode) => mode, + Err(err) => { + gst::warning!(CAT, imp = imp, "unknown folder-mode: {}", err); + FolderMode::None + } + }; + } } } +#[derive(Debug, Default)] +struct State { + current_folder: u32, + pipelines: HashMap>, +} + #[derive(Properties, Debug, Default)] #[properties(wrapper_type = super::PipelineSnapshot)] pub struct PipelineSnapshot { @@ -184,9 +249,13 @@ pub struct PipelineSnapshot { #[property(name="dot-ts", get, set, type = bool, member = dot_ts, blurb = "Add timestamp to dot files")] #[property(name="dot-pipeline-ptr", get, set, type = bool, member = dot_pipeline_ptr, blurb = "Add pipeline ptr value to dot files")] #[property(name="cleanup-mode", get = |s: &Self| s.settings.read().unwrap().cleanup_mode, set, type = CleanupMode, member = cleanup_mode, blurb = "Cleanup mode", builder(CleanupMode::None))] + #[property(name="folder-mode", + get=|s: &Self| s.settings.read().unwrap().folder_mode, + set, + type = FolderMode, member = folder_mode, blurb = "How to create folder each time a snapshot of all pipelines is made", builder(FolderMode::None))] settings: RwLock, - pipelines: Arc>>>, handles: Mutex>, + state: Arc>, } #[derive(Debug)] @@ -216,7 +285,7 @@ impl ObjectImpl for PipelineSnapshot { if settings.cleanup_mode == CleanupMode::Initial { drop(settings); - self.cleanup_dots(); + self.cleanup_dots(&self.settings.read().unwrap().dot_dir.as_ref(), true); } self.register_hook(TracerHook::ElementNew); @@ -228,7 +297,7 @@ impl ObjectImpl for PipelineSnapshot { } fn signals() -> &'static [glib::subclass::Signal] { - static SIGNALS: Lazy> = Lazy::new(|| { + static SIGNALS: LazyLock> = LazyLock::new(|| { vec![glib::subclass::Signal::builder("snapshot") .action() .class_handler(|_, args| { @@ -257,18 +326,34 @@ impl GstObjectImpl for PipelineSnapshot {} impl TracerImpl for PipelineSnapshot { fn element_new(&self, _ts: u64, element: &gst::Element) { if element.is::() { - gst::debug!(CAT, imp = self, "new pipeline: {}", element.name()); + let pipeline_ptr = ElementPtr::from_ref(element); let weak = element.downgrade(); - let mut pipelines = self.pipelines.lock().unwrap(); - pipelines.insert(ElementPtr::from_ref(element), weak); + let mut state = self.state.lock().unwrap(); + state.pipelines.insert(pipeline_ptr, weak); + gst::debug!( + CAT, + imp = self, + "new pipeline: {} ({:?}) got {} now", + element.name(), + pipeline_ptr, + state.pipelines.len() + ); } } fn object_destroyed(&self, _ts: u64, object: std::ptr::NonNull) { - let mut pipelines = self.pipelines.lock().unwrap(); + let mut state = self.state.lock().unwrap(); let object = ElementPtr::from_object_ptr(object); - pipelines.remove(&object); + if state.pipelines.remove(&object).is_some() { + gst::debug!( + CAT, + imp = self, + "Pipeline removed: {:?} - {} remaining", + object, + state.pipelines.len() + ); + } } } @@ -279,24 +364,47 @@ impl PipelineSnapshot { } pub(crate) fn snapshot(&self) { - let pipelines = { - let weaks = self.pipelines.lock().unwrap(); - weaks - .values() - .filter_map(|w| w.upgrade()) - .collect::>() - }; - let settings = self.settings.read().unwrap(); + let dot_dir = if let Some(dot_dir) = settings.dot_dir.as_ref() { - dot_dir + if !matches!(settings.folder_mode, FolderMode::None) { + let dot_dir = match settings.folder_mode { + FolderMode::Numbered => { + let mut state = self.state.lock().unwrap(); + let res = state.current_folder; + state.current_folder += 1; + + format!("{dot_dir}/{res}") + } + FolderMode::Timed => { + let datetime: chrono::DateTime = chrono::Local::now(); + format!("{dot_dir}/{}", datetime.format("%Y-%m-%d %H:%M:%S")) + } + _ => unreachable!(), + }; + + if let Err(err) = std::fs::create_dir_all(&dot_dir) { + gst::warning!( + CAT, + imp = self, + "Failed to create folder {}: {}", + dot_dir, + err + ); + return; + } + + dot_dir + } else { + dot_dir.clone() + } } else { gst::info!(CAT, imp = self, "No dot-dir set, not dumping pipelines"); return; }; if matches!(settings.cleanup_mode, CleanupMode::Automatic) { - self.cleanup_dots(); + self.cleanup_dots(&Some(&dot_dir), false); } let ts = if settings.dot_ts { @@ -305,6 +413,29 @@ impl PipelineSnapshot { "".to_string() }; + let pipelines = { + let state = self.state.lock().unwrap(); + gst::log!( + CAT, + imp = self, + "dumping {} pipelines", + state.pipelines.len() + ); + + state + .pipelines + .iter() + .filter_map(|(ptr, w)| { + let pipeline = w.upgrade(); + + if pipeline.is_none() { + gst::warning!(CAT, imp = self, "Pipeline {ptr:?} disappeared"); + } + pipeline + }) + .collect::>() + }; + for pipeline in pipelines.into_iter() { let pipeline = pipeline.downcast::().unwrap(); gst::debug!(CAT, imp = self, "dump {}", pipeline.name()); @@ -321,7 +452,7 @@ impl PipelineSnapshot { settings.dot_prefix.as_ref().map_or("", |s| s.as_str()), pipeline.name(), ); - gst::debug!(CAT, im =: self, "Writing {}", dot_path); + gst::debug!(CAT, imp = self, "Writing {}", dot_path); match std::fs::File::create(&dot_path) { Ok(mut f) => { let data = pipeline.debug_to_dot_data(gst::DebugGraphDetails::all()); @@ -374,38 +505,57 @@ impl PipelineSnapshot { anyhow::bail!("only supported on UNIX system"); } - fn cleanup_dots(&self) { - let settings = self.settings.read().unwrap(); - if let Some(dot_dir) = settings.dot_dir.as_ref() { + fn cleanup_dots(&self, dot_dir: &Option<&String>, recurse: bool) { + if let Some(dot_dir) = dot_dir { gst::info!(CAT, imp = self, "Cleaning up {}", dot_dir); - let entries = match std::fs::read_dir(dot_dir) { - Ok(entries) => entries, + let mut paths = match std::fs::read_dir(dot_dir) { + Ok(entries) => { + entries + .filter_map(|entry| { + let entry = entry.ok()?; // Handle possible errors when reading directory entries + let path = entry.path(); + let extension = path.extension()?.to_str()?; // Get the extension as a string + if extension.ends_with(".dot") { + Some(path.to_path_buf()) + } else { + None + } + }) + .collect::>() + } Err(e) => { gst::warning!(CAT, imp = self, "Failed to read {}: {}", dot_dir, e); return; } }; - for entry in entries { - let entry = match entry { - Ok(e) => e, - Err(e) => { - gst::warning!(CAT, imp = self, "Failed to read entry: {}", e); - continue; - } - }; + if recurse { + paths.append( + &mut walkdir::WalkDir::new(dot_dir) + .into_iter() + .filter_map(|entry| { + let entry = entry.ok()?; + let path = entry.path(); + let extension = path.extension()?.to_str()?; + if extension == "dot" { + Some(path.to_path_buf()) + } else { + None + } + }) + .collect::>(), + ) + } - let path = entry.path(); - if path.extension().map_or(false, |e| e == "dot") { - if let Err(e) = std::fs::remove_file(&path) { - gst::warning!( - CAT, - imp = self, - "Failed to remove {}: {}", - path.display(), - e - ); - } + for path in paths { + if let Err(e) = std::fs::remove_file(&path) { + gst::warning!( + CAT, + imp = self, + "Failed to remove {}: {}", + path.display(), + e + ); } } }