tracers: snapshot: Add an option to use folders for each snapshot

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1889>
This commit is contained in:
Thibault Saunier 2024-04-29 13:43:27 -04:00 committed by GStreamer Marge Bot
parent f6d550d571
commit 344326434c
3 changed files with 209 additions and 55 deletions

2
Cargo.lock generated
View file

@ -3083,6 +3083,7 @@ version = "0.14.0-alpha.1"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"atomic_refcell", "atomic_refcell",
"chrono",
"dirs", "dirs",
"etherparse", "etherparse",
"gst-plugin-version-helper", "gst-plugin-version-helper",
@ -3090,6 +3091,7 @@ dependencies = [
"pcap-file", "pcap-file",
"regex", "regex",
"signal-hook", "signal-hook",
"walkdir",
] ]
[[package]] [[package]]

View file

@ -16,6 +16,8 @@ atomic_refcell = "0.1"
pcap-file = "1.1.1" pcap-file = "1.1.1"
etherparse = "0.16.0" etherparse = "0.16.0"
dirs = "5.0.1" dirs = "5.0.1"
chrono = "0.4.35"
walkdir = "2"
[target.'cfg(unix)'.dependencies] [target.'cfg(unix)'.dependencies]
signal-hook = "0.3" signal-hook = "0.3"

View file

@ -40,6 +40,7 @@
*/ */
use std::collections::HashMap; use std::collections::HashMap;
use std::io::Write; use std::io::Write;
use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
@ -60,7 +61,7 @@ static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
static START_TIME: LazyLock<gst::ClockTime> = LazyLock::new(gst::get_timestamp); static START_TIME: LazyLock<gst::ClockTime> = LazyLock::new(gst::get_timestamp);
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Copy, Clone)]
struct ElementPtr(std::ptr::NonNull<gst::ffi::GstElement>); struct ElementPtr(std::ptr::NonNull<gst::ffi::GstElement>);
unsafe impl Send for ElementPtr {} unsafe impl Send for ElementPtr {}
@ -80,7 +81,7 @@ impl ElementPtr {
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)] #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)]
#[repr(u32)] #[repr(u32)]
#[enum_type(name = "GstpipelineSnapshotCleanupMode")] #[enum_type(name = "GstPipelineSnapshotCleanupMode")]
#[non_exhaustive] #[non_exhaustive]
pub enum CleanupMode { pub enum CleanupMode {
#[enum_value( #[enum_value(
@ -89,7 +90,8 @@ pub enum CleanupMode {
)] )]
Initial, Initial,
#[enum_value( #[enum_value(
name = "CleanupAutomatic: cleanup .dot files before each snapshots", name = "CleanupAutomatic: cleanup .dot files before each snapshots if pipeline-snapshot::folder-mode is not None \
otherwise cleanup `.dot` files in folders",
nick = "automatic" nick = "automatic"
)] )]
Automatic, Automatic,
@ -97,6 +99,53 @@ pub enum CleanupMode {
None, None,
} }
impl std::str::FromStr for CleanupMode {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"initial" => Ok(CleanupMode::Initial),
"automatic" => Ok(CleanupMode::Automatic),
"none" => Ok(CleanupMode::None),
_ => Err(format!("unknown cleanup mode: {}", s)),
}
}
}
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)]
#[repr(u32)]
#[enum_type(name = "GstPipelineSnapshotFolderMode")]
#[non_exhaustive]
pub enum FolderMode {
#[enum_value(name = "None: Do not use folders to store dot files", nick = "none")]
None,
#[enum_value(
name = "Numbered: Use folders to store dot files, each time `.snapshot()` is called a new folder is created \
and named with a number starting from 0.",
nick = "numbered"
)]
Numbered,
#[enum_value(
name = "Timed: Use folders to store dot files, each time `.snapshot()` is called a new folder is created \
and named with the current timestamp.",
nick = "timed"
)]
Timed,
}
impl std::str::FromStr for FolderMode {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"none" => Ok(FolderMode::None),
"numbered" => Ok(FolderMode::Numbered),
"timed" => Ok(FolderMode::Timed),
_ => Err(format!("unknown folder mode: {}", s)),
}
}
}
#[derive(Debug)] #[derive(Debug)]
struct Settings { struct Settings {
dot_prefix: Option<String>, dot_prefix: Option<String>,
@ -104,6 +153,7 @@ struct Settings {
dot_pipeline_ptr: bool, dot_pipeline_ptr: bool,
dot_dir: Option<String>, dot_dir: Option<String>,
cleanup_mode: CleanupMode, cleanup_mode: CleanupMode,
folder_mode: FolderMode,
} }
impl Default for Settings { impl Default for Settings {
@ -114,9 +164,11 @@ impl Default for Settings {
dot_ts: true, dot_ts: true,
cleanup_mode: CleanupMode::None, cleanup_mode: CleanupMode::None,
dot_pipeline_ptr: false, dot_pipeline_ptr: false,
folder_mode: FolderMode::None,
} }
} }
} }
impl Settings { impl Settings {
fn set_dot_dir(&mut self, dot_dir: Option<String>) { fn set_dot_dir(&mut self, dot_dir: Option<String>) {
if let Some(dot_dir) = dot_dir { if let Some(dot_dir) = dot_dir {
@ -143,7 +195,7 @@ impl Settings {
if let Ok(dot_dir) = s.get("dot-dir") { if let Ok(dot_dir) = s.get("dot-dir") {
self.set_dot_dir(dot_dir); self.set_dot_dir(dot_dir);
gst::log!(CAT, imp = imp, "dot-dir = {:?}", self.dot_dir); gst::log!(CAT, imp = imp, "dot-prefix = {:?}", self.dot_dir);
} }
if let Ok(dot_prefix) = s.get("dot-prefix") { if let Ok(dot_prefix) = s.get("dot-prefix") {
@ -161,19 +213,32 @@ impl Settings {
self.dot_pipeline_ptr = dot_pipeline_ptr; self.dot_pipeline_ptr = dot_pipeline_ptr;
} }
if let Ok(cleanup_mod) = s.get::<String>("cleanup-mode") { if let Ok(cleanup_mod) = s.get::<&str>("cleanup-mode") {
gst::log!(CAT, imp = imp, "cleanup-mode = {:?}", cleanup_mod); self.cleanup_mode = match cleanup_mod.parse() {
self.cleanup_mode = match cleanup_mod.as_str() { Ok(mode) => mode,
"initial" => CleanupMode::Initial, Err(err) => {
"automatic" => CleanupMode::Automatic, gst::warning!(CAT, imp = imp, "unknown cleanup-mode: {}", err);
"none" => CleanupMode::None,
_ => {
gst::warning!(CAT, imp = imp, "unknown cleanup-mode: {}", cleanup_mod);
CleanupMode::None CleanupMode::None
} }
}; };
} }
if let Ok(folder_mode) = s.get::<&str>("folder-mode") {
self.folder_mode = match folder_mode.parse() {
Ok(mode) => mode,
Err(err) => {
gst::warning!(CAT, imp = imp, "unknown folder-mode: {}", err);
FolderMode::None
} }
};
}
}
}
#[derive(Debug, Default)]
struct State {
current_folder: u32,
pipelines: HashMap<ElementPtr, glib::WeakRef<gst::Element>>,
} }
#[derive(Properties, Debug, Default)] #[derive(Properties, Debug, Default)]
@ -184,9 +249,13 @@ pub struct PipelineSnapshot {
#[property(name="dot-ts", get, set, type = bool, member = dot_ts, blurb = "Add timestamp to dot files")] #[property(name="dot-ts", get, set, type = bool, member = dot_ts, blurb = "Add timestamp to dot files")]
#[property(name="dot-pipeline-ptr", get, set, type = bool, member = dot_pipeline_ptr, blurb = "Add pipeline ptr value to dot files")] #[property(name="dot-pipeline-ptr", get, set, type = bool, member = dot_pipeline_ptr, blurb = "Add pipeline ptr value to dot files")]
#[property(name="cleanup-mode", get = |s: &Self| s.settings.read().unwrap().cleanup_mode, set, type = CleanupMode, member = cleanup_mode, blurb = "Cleanup mode", builder(CleanupMode::None))] #[property(name="cleanup-mode", get = |s: &Self| s.settings.read().unwrap().cleanup_mode, set, type = CleanupMode, member = cleanup_mode, blurb = "Cleanup mode", builder(CleanupMode::None))]
#[property(name="folder-mode",
get=|s: &Self| s.settings.read().unwrap().folder_mode,
set,
type = FolderMode, member = folder_mode, blurb = "How to create folder each time a snapshot of all pipelines is made", builder(FolderMode::None))]
settings: RwLock<Settings>, settings: RwLock<Settings>,
pipelines: Arc<Mutex<HashMap<ElementPtr, glib::WeakRef<gst::Element>>>>,
handles: Mutex<Option<Handles>>, handles: Mutex<Option<Handles>>,
state: Arc<Mutex<State>>,
} }
#[derive(Debug)] #[derive(Debug)]
@ -216,7 +285,7 @@ impl ObjectImpl for PipelineSnapshot {
if settings.cleanup_mode == CleanupMode::Initial { if settings.cleanup_mode == CleanupMode::Initial {
drop(settings); drop(settings);
self.cleanup_dots(); self.cleanup_dots(&self.settings.read().unwrap().dot_dir.as_ref(), true);
} }
self.register_hook(TracerHook::ElementNew); self.register_hook(TracerHook::ElementNew);
@ -228,7 +297,7 @@ impl ObjectImpl for PipelineSnapshot {
} }
fn signals() -> &'static [glib::subclass::Signal] { fn signals() -> &'static [glib::subclass::Signal] {
static SIGNALS: Lazy<Vec<glib::subclass::Signal>> = Lazy::new(|| { static SIGNALS: LazyLock<Vec<glib::subclass::Signal>> = LazyLock::new(|| {
vec![glib::subclass::Signal::builder("snapshot") vec![glib::subclass::Signal::builder("snapshot")
.action() .action()
.class_handler(|_, args| { .class_handler(|_, args| {
@ -257,18 +326,34 @@ impl GstObjectImpl for PipelineSnapshot {}
impl TracerImpl for PipelineSnapshot { impl TracerImpl for PipelineSnapshot {
fn element_new(&self, _ts: u64, element: &gst::Element) { fn element_new(&self, _ts: u64, element: &gst::Element) {
if element.is::<gst::Pipeline>() { if element.is::<gst::Pipeline>() {
gst::debug!(CAT, imp = self, "new pipeline: {}", element.name()); let pipeline_ptr = ElementPtr::from_ref(element);
let weak = element.downgrade(); let weak = element.downgrade();
let mut pipelines = self.pipelines.lock().unwrap(); let mut state = self.state.lock().unwrap();
pipelines.insert(ElementPtr::from_ref(element), weak); state.pipelines.insert(pipeline_ptr, weak);
gst::debug!(
CAT,
imp = self,
"new pipeline: {} ({:?}) got {} now",
element.name(),
pipeline_ptr,
state.pipelines.len()
);
} }
} }
fn object_destroyed(&self, _ts: u64, object: std::ptr::NonNull<gst::ffi::GstObject>) { fn object_destroyed(&self, _ts: u64, object: std::ptr::NonNull<gst::ffi::GstObject>) {
let mut pipelines = self.pipelines.lock().unwrap(); let mut state = self.state.lock().unwrap();
let object = ElementPtr::from_object_ptr(object); let object = ElementPtr::from_object_ptr(object);
pipelines.remove(&object); if state.pipelines.remove(&object).is_some() {
gst::debug!(
CAT,
imp = self,
"Pipeline removed: {:?} - {} remaining",
object,
state.pipelines.len()
);
}
} }
} }
@ -279,24 +364,47 @@ impl PipelineSnapshot {
} }
pub(crate) fn snapshot(&self) { pub(crate) fn snapshot(&self) {
let pipelines = { let settings = self.settings.read().unwrap();
let weaks = self.pipelines.lock().unwrap();
weaks let dot_dir = if let Some(dot_dir) = settings.dot_dir.as_ref() {
.values() if !matches!(settings.folder_mode, FolderMode::None) {
.filter_map(|w| w.upgrade()) let dot_dir = match settings.folder_mode {
.collect::<Vec<_>>() FolderMode::Numbered => {
let mut state = self.state.lock().unwrap();
let res = state.current_folder;
state.current_folder += 1;
format!("{dot_dir}/{res}")
}
FolderMode::Timed => {
let datetime: chrono::DateTime<chrono::Local> = chrono::Local::now();
format!("{dot_dir}/{}", datetime.format("%Y-%m-%d %H:%M:%S"))
}
_ => unreachable!(),
}; };
let settings = self.settings.read().unwrap(); if let Err(err) = std::fs::create_dir_all(&dot_dir) {
let dot_dir = if let Some(dot_dir) = settings.dot_dir.as_ref() { gst::warning!(
CAT,
imp = self,
"Failed to create folder {}: {}",
dot_dir,
err
);
return;
}
dot_dir dot_dir
} else {
dot_dir.clone()
}
} else { } else {
gst::info!(CAT, imp = self, "No dot-dir set, not dumping pipelines"); gst::info!(CAT, imp = self, "No dot-dir set, not dumping pipelines");
return; return;
}; };
if matches!(settings.cleanup_mode, CleanupMode::Automatic) { if matches!(settings.cleanup_mode, CleanupMode::Automatic) {
self.cleanup_dots(); self.cleanup_dots(&Some(&dot_dir), false);
} }
let ts = if settings.dot_ts { let ts = if settings.dot_ts {
@ -305,6 +413,29 @@ impl PipelineSnapshot {
"".to_string() "".to_string()
}; };
let pipelines = {
let state = self.state.lock().unwrap();
gst::log!(
CAT,
imp = self,
"dumping {} pipelines",
state.pipelines.len()
);
state
.pipelines
.iter()
.filter_map(|(ptr, w)| {
let pipeline = w.upgrade();
if pipeline.is_none() {
gst::warning!(CAT, imp = self, "Pipeline {ptr:?} disappeared");
}
pipeline
})
.collect::<Vec<_>>()
};
for pipeline in pipelines.into_iter() { for pipeline in pipelines.into_iter() {
let pipeline = pipeline.downcast::<gst::Pipeline>().unwrap(); let pipeline = pipeline.downcast::<gst::Pipeline>().unwrap();
gst::debug!(CAT, imp = self, "dump {}", pipeline.name()); gst::debug!(CAT, imp = self, "dump {}", pipeline.name());
@ -321,7 +452,7 @@ impl PipelineSnapshot {
settings.dot_prefix.as_ref().map_or("", |s| s.as_str()), settings.dot_prefix.as_ref().map_or("", |s| s.as_str()),
pipeline.name(), pipeline.name(),
); );
gst::debug!(CAT, im =: self, "Writing {}", dot_path); gst::debug!(CAT, imp = self, "Writing {}", dot_path);
match std::fs::File::create(&dot_path) { match std::fs::File::create(&dot_path) {
Ok(mut f) => { Ok(mut f) => {
let data = pipeline.debug_to_dot_data(gst::DebugGraphDetails::all()); let data = pipeline.debug_to_dot_data(gst::DebugGraphDetails::all());
@ -374,29 +505,49 @@ impl PipelineSnapshot {
anyhow::bail!("only supported on UNIX system"); anyhow::bail!("only supported on UNIX system");
} }
fn cleanup_dots(&self) { fn cleanup_dots(&self, dot_dir: &Option<&String>, recurse: bool) {
let settings = self.settings.read().unwrap(); if let Some(dot_dir) = dot_dir {
if let Some(dot_dir) = settings.dot_dir.as_ref() {
gst::info!(CAT, imp = self, "Cleaning up {}", dot_dir); gst::info!(CAT, imp = self, "Cleaning up {}", dot_dir);
let entries = match std::fs::read_dir(dot_dir) { let mut paths = match std::fs::read_dir(dot_dir) {
Ok(entries) => entries, Ok(entries) => {
entries
.filter_map(|entry| {
let entry = entry.ok()?; // Handle possible errors when reading directory entries
let path = entry.path();
let extension = path.extension()?.to_str()?; // Get the extension as a string
if extension.ends_with(".dot") {
Some(path.to_path_buf())
} else {
None
}
})
.collect::<Vec<PathBuf>>()
}
Err(e) => { Err(e) => {
gst::warning!(CAT, imp = self, "Failed to read {}: {}", dot_dir, e); gst::warning!(CAT, imp = self, "Failed to read {}: {}", dot_dir, e);
return; return;
} }
}; };
for entry in entries { if recurse {
let entry = match entry { paths.append(
Ok(e) => e, &mut walkdir::WalkDir::new(dot_dir)
Err(e) => { .into_iter()
gst::warning!(CAT, imp = self, "Failed to read entry: {}", e); .filter_map(|entry| {
continue; let entry = entry.ok()?;
}
};
let path = entry.path(); let path = entry.path();
if path.extension().map_or(false, |e| e == "dot") { let extension = path.extension()?.to_str()?;
if extension == "dot" {
Some(path.to_path_buf())
} else {
None
}
})
.collect::<Vec<PathBuf>>(),
)
}
for path in paths {
if let Err(e) = std::fs::remove_file(&path) { if let Err(e) = std::fs::remove_file(&path) {
gst::warning!( gst::warning!(
CAT, CAT,
@ -410,4 +561,3 @@ impl PipelineSnapshot {
} }
} }
} }
}