From 2f987b09ee4aaf8d1fb80d30d2a304565f22a628 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Wed, 29 Jun 2022 12:50:31 +0300 Subject: [PATCH] tracers: Add queue levels tracer and Python script for plotting --- utils/tracers/Cargo.toml | 2 + utils/tracers/scripts/queue_levels.py | 153 +++++++++ utils/tracers/src/lib.rs | 2 + utils/tracers/src/queue_levels/imp.rs | 437 ++++++++++++++++++++++++++ utils/tracers/src/queue_levels/mod.rs | 20 ++ 5 files changed, 614 insertions(+) create mode 100644 utils/tracers/scripts/queue_levels.py create mode 100644 utils/tracers/src/queue_levels/imp.rs create mode 100644 utils/tracers/src/queue_levels/mod.rs diff --git a/utils/tracers/Cargo.toml b/utils/tracers/Cargo.toml index daf52d62..fd1fdc56 100644 --- a/utils/tracers/Cargo.toml +++ b/utils/tracers/Cargo.toml @@ -11,6 +11,7 @@ description = "GStreamer tracers plugin" gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } once_cell = "1.0" anyhow = "1" +regex = "1" [target.'cfg(unix)'.dependencies] signal-hook = "0.3" @@ -26,6 +27,7 @@ gst-plugin-version-helper = { path="../../version-helper" } [features] static = [] capi = [] +v1_22 = ["gst/v1_22"] [package.metadata.capi] min_version = "0.8.0" diff --git a/utils/tracers/scripts/queue_levels.py b/utils/tracers/scripts/queue_levels.py new file mode 100644 index 00000000..90ae8df3 --- /dev/null +++ b/utils/tracers/scripts/queue_levels.py @@ -0,0 +1,153 @@ +import argparse +import csv +import re + +import matplotlib +import matplotlib.pyplot as plt + +parser = argparse.ArgumentParser() +parser.add_argument("file", help="Input file with queue levels") +parser.add_argument("--include-filter", help="Regular expression for queue names that should be included") +parser.add_argument("--exclude-filter", help="Regular expression for queue names that should be excluded") +parser.add_argument("--bytes", help="include bytes levels", action="store_true") +parser.add_argument("--time", help="include time levels (default if none of the others are enabled)", action="store_true") +parser.add_argument("--buffers", help="include buffers levels", action="store_true") +parser.add_argument("--no-max", help="do not include max levels (enabled by default)", action="store_true") +args = parser.parse_args() + +include_filter = None +if args.include_filter is not None: + include_filter = re.compile(args.include_filter) +exclude_filter = None +if args.exclude_filter is not None: + exclude_filter = re.compile(args.exclude_filter) + +queues = {} + +with open(args.file, mode='r', encoding='utf_8', newline='') as csvfile: + reader = csv.reader(csvfile, delimiter=',', quotechar='|') + for row in reader: + if len(row) != 9: + continue + + if include_filter is not None and not include_filter.match(row[1]): + continue + if exclude_filter is not None and exclude_filter.match(row[1]): + continue + + if not row[1] in queues: + queues[row[1]] = { + 'cur-level-bytes': [], + 'cur-level-time': [], + 'cur-level-buffers': [], + 'max-size-bytes': [], + 'max-size-time': [], + 'max-size-buffers': [], + } + + wallclock = float(row[0]) / 1000000000.0 + queues[row[1]]['cur-level-bytes'].append((wallclock, int(row[3]))) + queues[row[1]]['cur-level-time'].append((wallclock, float(row[4]) / 1000000000.0)) + queues[row[1]]['cur-level-buffers'].append((wallclock, int(row[5]))) + queues[row[1]]['max-size-bytes'].append((wallclock, int(row[6]))) + queues[row[1]]['max-size-time'].append((wallclock, float(row[7]) / 1000000000.0)) + queues[row[1]]['max-size-buffers'].append((wallclock, int(row[8]))) + +matplotlib.rcParams['figure.dpi'] = 200 + +prop_cycle = plt.rcParams['axes.prop_cycle'] +colors = prop_cycle.by_key()['color'] + +num_plots = 0 +axes_names = [] +if args.buffers: + num_plots += 1 + axes_names.append("buffers") +if args.time: + num_plots += 1 + axes_names.append("time (s)") +if args.bytes: + num_plots += 1 + axes_names.append("bytes") + +if num_plots == 0: + num_plots += 1 + axes_names.append("time (s)") + +fig, ax1 = plt.subplots() +ax1.set_xlabel("wallclock (s)") +ax1.set_ylabel(axes_names[0]) +ax1.tick_params(axis='y') +axes = [ax1] + +if num_plots > 1: + ax2 = ax1.twinx() + ax2.set_ylabel(axes_names[1]) + axes.append(ax2) +if num_plots > 2: + ax3 = ax1.twinx() + ax3.set_ylabel(axes_names[2]) + ax3.spines['right'].set_position(('outward', 60)) + axes.append(ax3) + +for (i, (queue, values)) in enumerate(queues.items()): + axis = 0 + + if args.buffers: + axes[axis].plot( + [x[0] for x in values['cur-level-buffers']], + [x[1] for x in values['cur-level-buffers']], + '.', label = '{}: cur-level-buffers'.format(queue), + color = colors[i], + ) + + if not args.no_max: + axes[axis].plot( + [x[0] for x in values['max-size-buffers']], + [x[1] for x in values['max-size-buffers']], + '-', label = '{}: max-size-buffers'.format(queue), + color = colors[i], + ) + + axis += 1 + + if args.time: + axes[axis].plot( + [x[0] for x in values['cur-level-time']], + [x[1] for x in values['cur-level-time']], + 'p', label = '{}: cur-level-time'.format(queue), + color = colors[i], + ) + + if not args.no_max: + axes[axis].plot( + [x[0] for x in values['max-size-time']], + [x[1] for x in values['max-size-time']], + '-.', label = '{}: max-size-time'.format(queue), + color = colors[i], + ) + + axis += 1 + + if args.bytes: + axes[axis].plot( + [x[0] for x in values['cur-level-bytes']], + [x[1] for x in values['cur-level-bytes']], + 'x', label = '{}: cur-level-bytes'.format(queue), + color = colors[i], + ) + + if not args.no_max: + axes[axis].plot( + [x[0] for x in values['max-size-bytes']], + [x[1] for x in values['max-size-bytes']], + '--', label = '{}: max-size-bytes'.format(queue), + color = colors[i], + ) + + axis += 1 + +fig.tight_layout() +fig.legend() + +plt.show() diff --git a/utils/tracers/src/lib.rs b/utils/tracers/src/lib.rs index 8250d27c..bd037e8d 100644 --- a/utils/tracers/src/lib.rs +++ b/utils/tracers/src/lib.rs @@ -10,9 +10,11 @@ use gst::glib; mod pipeline_snapshot; +mod queue_levels; fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { pipeline_snapshot::register(plugin)?; + queue_levels::register(plugin)?; Ok(()) } diff --git a/utils/tracers/src/queue_levels/imp.rs b/utils/tracers/src/queue_levels/imp.rs new file mode 100644 index 00000000..48256fa1 --- /dev/null +++ b/utils/tracers/src/queue_levels/imp.rs @@ -0,0 +1,437 @@ +// Copyright (C) 2022 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +/// This tracer provides an easy way to collect queue levels over time of all queues inside a +/// pipeline. +/// +/// Example: +/// +/// ```console +/// $ GST_TRACERS='queue-levels(file="/tmp/queue_levels.log")' gst-launch-1.0 audiotestsrc ! queue ! fakesink +/// ``` +/// +/// The generated file is a CSV file of the format +/// +/// ```csv +/// timestamp,queue name,queue pointer,cur-level-bytes,cur-level-time,cur-level-buffers,max-size-bytes,max-size-time,max-size-buffers +/// ``` +/// +/// ## Parameters +/// +/// ### `file` +/// +/// Specifies the path to the file that will collect the CSV file with the queue levels. +/// +/// By default the file is written to `/tmp/queue_levels.log`. +/// +/// ### `include-filter` +/// +/// Specifies a regular expression for the queue object names that should be included. +/// +/// By default this is not set. +/// +/// ### `exclude-filter` +/// +/// Specifies a regular expression for the queue object names that should **not** be included. +/// +/// By default this is not set. +use std::collections::HashMap; +use std::path::PathBuf; +use std::str::FromStr; +use std::sync::{Arc, Mutex}; + +use gst::glib; +use gst::prelude::*; +use gst::subclass::prelude::*; +use once_cell::sync::Lazy; +use regex::Regex; + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "queue-levels", + gst::DebugColorFlags::empty(), + Some("Tracer to collect queue levels"), + ) +}); + +static QUEUE_TYPE: Lazy = Lazy::new(|| { + if let Ok(queue) = gst::ElementFactory::make("queue", None) { + queue.type_() + } else { + gst::warning!(CAT, "Can't instantiate queue element"); + glib::Type::INVALID + } +}); + +#[derive(Debug)] +struct Settings { + file: PathBuf, + include_filter: Option, + exclude_filter: Option, +} + +impl Default for Settings { + fn default() -> Self { + let mut file = glib::tmp_dir(); + file.push("queue_levels.log"); + + Self { + file, + include_filter: None, + exclude_filter: None, + } + } +} + +impl Settings { + fn update_from_params(&mut self, obj: &super::QueueLevels, params: String) { + let s = match gst::Structure::from_str(&format!("queue-levels,{}", params)) { + Ok(s) => s, + Err(err) => { + gst::warning!(CAT, obj: obj, "failed to parse tracer parameters: {}", err); + return; + } + }; + + if let Ok(file) = s.get::<&str>("file") { + gst::log!(CAT, obj: obj, "file= {}", file); + self.file = PathBuf::from(file); + } + + if let Ok(filter) = s.get::<&str>("include-filter") { + gst::log!(CAT, obj: obj, "include filter= {}", filter); + let filter = match Regex::new(filter) { + Ok(filter) => Some(filter), + Err(err) => { + gst::error!( + CAT, + obj: obj, + "Failed to compile include-filter regex: {}", + err + ); + None + } + }; + self.include_filter = filter; + } + + if let Ok(filter) = s.get::<&str>("exclude-filter") { + gst::log!(CAT, obj: obj, "exclude filter= {}", filter); + let filter = match Regex::new(filter) { + Ok(filter) => Some(filter), + Err(err) => { + gst::error!( + CAT, + obj: obj, + "Failed to compile exclude-filter regex: {}", + err + ); + None + } + }; + self.exclude_filter = filter; + } + } +} + +#[derive(Default)] +struct State { + queues: HashMap>, + log: Vec, + settings: Settings, +} + +struct LogLine { + timestamp: u64, + name: Arc, + ptr: usize, + cur_level_bytes: u32, + cur_level_time: u64, + cur_level_buffers: u32, + max_size_bytes: u32, + max_size_time: u64, + max_size_buffers: u32, +} + +#[derive(Default)] +pub struct QueueLevels { + state: Mutex, +} + +#[glib::object_subclass] +impl ObjectSubclass for QueueLevels { + const NAME: &'static str = "GstQueueLevels"; + type Type = super::QueueLevels; + type ParentType = gst::Tracer; +} + +impl ObjectImpl for QueueLevels { + fn constructed(&self, obj: &Self::Type) { + self.parent_constructed(obj); + + if let Some(params) = obj.property::>("params") { + let mut state = self.state.lock().unwrap(); + state.settings.update_from_params(obj, params); + } + + Lazy::force(&QUEUE_TYPE); + + self.register_hook(TracerHook::ElementNew); + self.register_hook(TracerHook::ObjectDestroyed); + self.register_hook(TracerHook::PadPushPost); + self.register_hook(TracerHook::PadPushListPost); + #[cfg(feature = "v1_22")] + { + self.register_hook(TracerHook::PadChainPost); + self.register_hook(TracerHook::PadChainListPost); + } + #[cfg(not(feature = "v1_22"))] + { + self.register_hook(TracerHook::PadPushPost); + self.register_hook(TracerHook::PadPushListPost); + } + self.register_hook(TracerHook::PadPushPre); + self.register_hook(TracerHook::PadPushListPre); + self.register_hook(TracerHook::ElementChangeStatePost); + self.register_hook(TracerHook::PadPushEventPre); + } + + fn dispose(&self, obj: &Self::Type) { + use std::io::prelude::*; + + let state = self.state.lock().unwrap(); + + let mut file = match std::fs::File::create(&state.settings.file) { + Ok(file) => file, + Err(err) => { + gst::error!(CAT, obj: obj, "Failed to create file: {err}"); + return; + } + }; + + for LogLine { + timestamp, + name, + ptr, + cur_level_bytes, + cur_level_time, + cur_level_buffers, + max_size_bytes, + max_size_time, + max_size_buffers, + } in &state.log + { + if let Err(err) = writeln!(&mut file, "{timestamp},{name},0x{ptr:08x},{cur_level_bytes},{cur_level_time},{cur_level_buffers},{max_size_bytes},{max_size_time},{max_size_buffers}") { + gst::error!(CAT, obj: obj, "Failed to write to file: {err}"); + return; + } + } + } +} + +impl GstObjectImpl for QueueLevels {} + +impl TracerImpl for QueueLevels { + fn element_new(&self, _ts: u64, element: &gst::Element) { + if element.type_() != *QUEUE_TYPE { + return; + } + + let tracer = self.instance(); + let ptr = element.as_ptr() as usize; + gst::debug!( + CAT, + obj: &tracer, + "new queue: {} 0x{:08x}", + element.name(), + ptr + ); + + let mut state = self.state.lock().unwrap(); + + let name = element.name(); + if let Some(ref filter) = state.settings.include_filter { + if !filter.is_match(&name) { + return; + } + } + if let Some(ref filter) = state.settings.exclude_filter { + if filter.is_match(&name) { + return; + } + } + + state.queues.entry(ptr).or_insert_with(|| Arc::new(name)); + } + + fn object_destroyed(&self, _ts: u64, object: std::ptr::NonNull) { + let ptr = object.as_ptr() as usize; + let mut state = self.state.lock().unwrap(); + state.queues.remove(&ptr); + } + + fn pad_push_pre(&self, ts: u64, pad: &gst::Pad, _buffer: &gst::Buffer) { + let element = + if let Some(parent) = pad.parent().and_then(|p| p.downcast::().ok()) { + if parent.type_() == *QUEUE_TYPE { + parent + } else { + return; + } + } else { + return; + }; + + self.log(&element, ts); + } + + fn pad_push_list_pre(&self, ts: u64, pad: &gst::Pad, _list: &gst::BufferList) { + let element = + if let Some(parent) = pad.parent().and_then(|p| p.downcast::().ok()) { + if parent.type_() == *QUEUE_TYPE { + parent + } else { + return; + } + } else { + return; + }; + + self.log(&element, ts); + } + + #[cfg(not(feature = "v1_22"))] + fn pad_push_post(&self, ts: u64, pad: &gst::Pad, _result: gst::FlowReturn) { + let element = if let Some(parent) = pad + .peer() + .and_then(|p| p.parent()) + .and_then(|p| p.downcast::().ok()) + { + if parent.type_() == *QUEUE_TYPE { + parent + } else { + return; + } + } else { + return; + }; + + self.log(&element, ts); + } + + #[cfg(not(feature = "v1_22"))] + fn pad_push_list_post(&self, ts: u64, pad: &gst::Pad, _result: gst::FlowReturn) { + let element = if let Some(parent) = pad + .peer() + .and_then(|p| p.parent()) + .and_then(|p| p.downcast::().ok()) + { + if parent.type_() == *QUEUE_TYPE { + parent + } else { + return; + } + } else { + return; + }; + + self.log(&element, ts); + } + + #[cfg(feature = "v1_22")] + fn pad_chain_post(&self, ts: u64, pad: &gst::Pad, _result: gst::FlowReturn) { + let element = + if let Some(parent) = pad.parent().and_then(|p| p.downcast::().ok()) { + if parent.type_() == *QUEUE_TYPE { + parent + } else { + return; + } + } else { + return; + }; + + self.log(&element, ts); + } + + #[cfg(feature = "v1_22")] + fn pad_chain_list_post(&self, ts: u64, pad: &gst::Pad, _result: gst::FlowReturn) { + let element = + if let Some(parent) = pad.parent().and_then(|p| p.downcast::().ok()) { + if parent.type_() == *QUEUE_TYPE { + parent + } else { + return; + } + } else { + return; + }; + + self.log(&element, ts); + } + + fn element_change_state_post( + &self, + ts: u64, + element: &gst::Element, + change: gst::StateChange, + _result: gst::StateChangeReturn, + ) { + if change.next() != gst::State::Null { + return; + } + + if element.type_() != *QUEUE_TYPE { + return; + } + + self.log(element, ts); + } + + fn pad_push_event_pre(&self, ts: u64, pad: &gst::Pad, ev: &gst::Event) { + if ev.type_() != gst::EventType::FlushStop { + return; + } + + if let Some(parent) = pad.parent().and_then(|p| p.downcast::().ok()) { + if parent.type_() == *QUEUE_TYPE { + self.log(&parent, ts); + } + } + } +} + +impl QueueLevels { + fn log(&self, element: &gst::Element, timestamp: u64) { + let ptr = element.as_ptr() as usize; + + let mut state = self.state.lock().unwrap(); + let name = match state.queues.get(&ptr) { + Some(name) => name.clone(), + None => return, + }; + + let cur_level_bytes = element.property::("current-level-bytes"); + let cur_level_time = element.property::("current-level-time"); + let cur_level_buffers = element.property::("current-level-buffers"); + let max_size_bytes = element.property::("max-size-bytes"); + let max_size_time = element.property::("max-size-time"); + let max_size_buffers = element.property::("max-size-buffers"); + state.log.push(LogLine { + timestamp, + name, + ptr, + cur_level_bytes, + cur_level_time, + cur_level_buffers, + max_size_bytes, + max_size_time, + max_size_buffers, + }); + } +} diff --git a/utils/tracers/src/queue_levels/mod.rs b/utils/tracers/src/queue_levels/mod.rs new file mode 100644 index 00000000..f88a2f9e --- /dev/null +++ b/utils/tracers/src/queue_levels/mod.rs @@ -0,0 +1,20 @@ +// Copyright (C) 2022 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct QueueLevels(ObjectSubclass) @extends gst::Tracer, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Tracer::register(Some(plugin), "queue-levels", QueueLevels::static_type()) +}