mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2024-11-25 21:11:00 +00:00
tracers: Add new tracer for logging the lateness of each buffer when it leaves a pad
This commit is contained in:
parent
2f987b09ee
commit
320cb73527
4 changed files with 482 additions and 0 deletions
79
utils/tracers/scripts/buffer_lateness.py
Normal file
79
utils/tracers/scripts/buffer_lateness.py
Normal file
|
@ -0,0 +1,79 @@
|
||||||
|
import argparse
|
||||||
|
import csv
|
||||||
|
import re
|
||||||
|
|
||||||
|
import matplotlib
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument("file", help="Input file with queue levels")
|
||||||
|
parser.add_argument("--include-filter", help="Regular expression for element:pad names that should be included")
|
||||||
|
parser.add_argument("--exclude-filter", help="Regular expression for element:pad names that should be excluded")
|
||||||
|
parser.add_argument("--no-latency", help="do not include latency (enabled by default)", action="store_true")
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
include_filter = None
|
||||||
|
if args.include_filter is not None:
|
||||||
|
include_filter = re.compile(args.include_filter)
|
||||||
|
exclude_filter = None
|
||||||
|
if args.exclude_filter is not None:
|
||||||
|
exclude_filter = re.compile(args.exclude_filter)
|
||||||
|
|
||||||
|
pads = {}
|
||||||
|
|
||||||
|
with open(args.file, mode='r', encoding='utf_8', newline='') as csvfile:
|
||||||
|
reader = csv.reader(csvfile, delimiter=',', quotechar='|')
|
||||||
|
for row in reader:
|
||||||
|
if len(row) != 7:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if include_filter is not None and not include_filter.match(row[1]):
|
||||||
|
continue
|
||||||
|
if exclude_filter is not None and exclude_filter.match(row[1]):
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not row[1] in pads:
|
||||||
|
pads[row[1]] = {
|
||||||
|
'buffer-clock-time': [],
|
||||||
|
'pipeline-clock-time': [],
|
||||||
|
'lateness': [],
|
||||||
|
'latency': [],
|
||||||
|
}
|
||||||
|
|
||||||
|
wallclock = float(row[0]) / 1000000000.0
|
||||||
|
pads[row[1]]['buffer-clock-time'].append((wallclock, float(row[3]) / 1000000000.0))
|
||||||
|
pads[row[1]]['pipeline-clock-time'].append((wallclock, float(row[4]) / 1000000000.0))
|
||||||
|
pads[row[1]]['lateness'].append((wallclock, float(row[5]) / 1000000000.0))
|
||||||
|
pads[row[1]]['latency'].append((wallclock, float(row[6]) / 1000000000.0))
|
||||||
|
|
||||||
|
matplotlib.rcParams['figure.dpi'] = 200
|
||||||
|
|
||||||
|
prop_cycle = plt.rcParams['axes.prop_cycle']
|
||||||
|
colors = prop_cycle.by_key()['color']
|
||||||
|
|
||||||
|
fig, ax1 = plt.subplots()
|
||||||
|
|
||||||
|
ax1.set_xlabel("wallclock (s)")
|
||||||
|
ax1.set_ylabel("time (s)")
|
||||||
|
ax1.tick_params(axis='y')
|
||||||
|
|
||||||
|
for (i, (pad, values)) in enumerate(pads.items()):
|
||||||
|
ax1.plot(
|
||||||
|
[x[0] for x in values['lateness']],
|
||||||
|
[x[1] for x in values['lateness']],
|
||||||
|
'.', label = '{}: lateness'.format(pad),
|
||||||
|
color = colors[i],
|
||||||
|
)
|
||||||
|
|
||||||
|
if not args.no_latency:
|
||||||
|
ax1.plot(
|
||||||
|
[x[0] for x in values['latency']],
|
||||||
|
[x[1] for x in values['latency']],
|
||||||
|
'-', label = '{}: latency'.format(pad),
|
||||||
|
color = colors[i],
|
||||||
|
)
|
||||||
|
|
||||||
|
fig.tight_layout()
|
||||||
|
plt.legend(loc='best')
|
||||||
|
|
||||||
|
plt.show()
|
377
utils/tracers/src/buffer_lateness/imp.rs
Normal file
377
utils/tracers/src/buffer_lateness/imp.rs
Normal file
|
@ -0,0 +1,377 @@
|
||||||
|
// Copyright (C) 2022 Sebastian Dröge <sebastian@centricular.com>
|
||||||
|
//
|
||||||
|
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
|
||||||
|
// If a copy of the MPL was not distributed with this file, You can obtain one at
|
||||||
|
// <https://mozilla.org/MPL/2.0/>.
|
||||||
|
//
|
||||||
|
// SPDX-License-Identifier: MPL-2.0
|
||||||
|
|
||||||
|
/// This tracer provides an easy way to collect lateness of each buffer when it is pushed out of a
|
||||||
|
/// pad in live pipelines.
|
||||||
|
///
|
||||||
|
/// Example:
|
||||||
|
///
|
||||||
|
/// ```console
|
||||||
|
/// $ GST_TRACERS='buffer-lateness(file="/tmp/buffer_lateness.log")' gst-launch-1.0 audiotestsrc is-live=true ! queue ! fakesink
|
||||||
|
/// ```
|
||||||
|
///
|
||||||
|
/// The generated file is a CSV file of the format
|
||||||
|
///
|
||||||
|
/// ```csv
|
||||||
|
/// timestamp,element:pad name,pad pointer,buffer clock time,pipeline clock time,lateness,min latency
|
||||||
|
/// ```
|
||||||
|
///
|
||||||
|
/// ## Parameters
|
||||||
|
///
|
||||||
|
/// ### `file`
|
||||||
|
///
|
||||||
|
/// Specifies the path to the file that will collect the CSV file with the buffer lateness.
|
||||||
|
///
|
||||||
|
/// By default the file is written to `/tmp/buffer_lateness.log`.
|
||||||
|
///
|
||||||
|
/// ### `include-filter`
|
||||||
|
///
|
||||||
|
/// Specifies a regular expression for the `element:pad` names that should be included.
|
||||||
|
///
|
||||||
|
/// By default this is not set.
|
||||||
|
///
|
||||||
|
/// ### `exclude-filter`
|
||||||
|
///
|
||||||
|
/// Specifies a regular expression for the `element:pad` names that should **not** be included.
|
||||||
|
///
|
||||||
|
/// By default this is not set.
|
||||||
|
///
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::str::FromStr;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
|
use gst::glib;
|
||||||
|
use gst::prelude::*;
|
||||||
|
use gst::subclass::prelude::*;
|
||||||
|
use once_cell::sync::Lazy;
|
||||||
|
use regex::Regex;
|
||||||
|
|
||||||
|
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
||||||
|
gst::DebugCategory::new(
|
||||||
|
"buffer-lateness",
|
||||||
|
gst::DebugColorFlags::empty(),
|
||||||
|
Some("Tracer to collect buffer lateness"),
|
||||||
|
)
|
||||||
|
});
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct Settings {
|
||||||
|
file: PathBuf,
|
||||||
|
include_filter: Option<Regex>,
|
||||||
|
exclude_filter: Option<Regex>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for Settings {
|
||||||
|
fn default() -> Self {
|
||||||
|
let mut file = glib::tmp_dir();
|
||||||
|
file.push("buffer_lateness.log");
|
||||||
|
|
||||||
|
Self {
|
||||||
|
file,
|
||||||
|
include_filter: None,
|
||||||
|
exclude_filter: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Settings {
|
||||||
|
fn update_from_params(&mut self, obj: &super::BufferLateness, params: String) {
|
||||||
|
let s = match gst::Structure::from_str(&format!("buffer-lateness,{}", params)) {
|
||||||
|
Ok(s) => s,
|
||||||
|
Err(err) => {
|
||||||
|
gst::warning!(CAT, obj: obj, "failed to parse tracer parameters: {}", err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Ok(file) = s.get::<&str>("file") {
|
||||||
|
gst::log!(CAT, obj: obj, "file= {}", file);
|
||||||
|
self.file = PathBuf::from(file);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Ok(filter) = s.get::<&str>("include-filter") {
|
||||||
|
gst::log!(CAT, obj: obj, "include filter= {}", filter);
|
||||||
|
let filter = match Regex::new(filter) {
|
||||||
|
Ok(filter) => Some(filter),
|
||||||
|
Err(err) => {
|
||||||
|
gst::error!(
|
||||||
|
CAT,
|
||||||
|
obj: obj,
|
||||||
|
"Failed to compile include-filter regex: {}",
|
||||||
|
err
|
||||||
|
);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
};
|
||||||
|
self.include_filter = filter;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Ok(filter) = s.get::<&str>("exclude-filter") {
|
||||||
|
gst::log!(CAT, obj: obj, "exclude filter= {}", filter);
|
||||||
|
let filter = match Regex::new(filter) {
|
||||||
|
Ok(filter) => Some(filter),
|
||||||
|
Err(err) => {
|
||||||
|
gst::error!(
|
||||||
|
CAT,
|
||||||
|
obj: obj,
|
||||||
|
"Failed to compile exclude-filter regex: {}",
|
||||||
|
err
|
||||||
|
);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
};
|
||||||
|
self.exclude_filter = filter;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct State {
|
||||||
|
pads: HashMap<usize, Pad>,
|
||||||
|
log: Vec<LogLine>,
|
||||||
|
settings: Settings,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Pad {
|
||||||
|
element_name: Option<Arc<glib::GString>>,
|
||||||
|
pad_name: Arc<glib::GString>,
|
||||||
|
latency: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct LogLine {
|
||||||
|
timestamp: u64,
|
||||||
|
element_name: Arc<glib::GString>,
|
||||||
|
pad_name: Arc<glib::GString>,
|
||||||
|
ptr: usize,
|
||||||
|
buffer_clock_time: u64,
|
||||||
|
pipeline_clock_time: u64,
|
||||||
|
lateness: i64,
|
||||||
|
min_latency: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct BufferLateness {
|
||||||
|
state: Mutex<State>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[glib::object_subclass]
|
||||||
|
impl ObjectSubclass for BufferLateness {
|
||||||
|
const NAME: &'static str = "GstBufferLateness";
|
||||||
|
type Type = super::BufferLateness;
|
||||||
|
type ParentType = gst::Tracer;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ObjectImpl for BufferLateness {
|
||||||
|
fn constructed(&self, obj: &Self::Type) {
|
||||||
|
self.parent_constructed(obj);
|
||||||
|
|
||||||
|
if let Some(params) = obj.property::<Option<String>>("params") {
|
||||||
|
let mut state = self.state.lock().unwrap();
|
||||||
|
state.settings.update_from_params(obj, params);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.register_hook(TracerHook::ElementAddPad);
|
||||||
|
self.register_hook(TracerHook::ElementRemovePad);
|
||||||
|
self.register_hook(TracerHook::PadPushPre);
|
||||||
|
self.register_hook(TracerHook::PadPushListPre);
|
||||||
|
self.register_hook(TracerHook::PadQueryPost);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn dispose(&self, obj: &Self::Type) {
|
||||||
|
use std::io::prelude::*;
|
||||||
|
|
||||||
|
let state = self.state.lock().unwrap();
|
||||||
|
|
||||||
|
let mut file = match std::fs::File::create(&state.settings.file) {
|
||||||
|
Ok(file) => file,
|
||||||
|
Err(err) => {
|
||||||
|
gst::error!(CAT, obj: obj, "Failed to create file: {err}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
for LogLine {
|
||||||
|
timestamp,
|
||||||
|
element_name,
|
||||||
|
pad_name,
|
||||||
|
ptr,
|
||||||
|
buffer_clock_time,
|
||||||
|
pipeline_clock_time,
|
||||||
|
lateness,
|
||||||
|
min_latency,
|
||||||
|
} in &state.log
|
||||||
|
{
|
||||||
|
if let Err(err) = writeln!(&mut file, "{timestamp},{element_name}:{pad_name},0x{ptr:08x},{buffer_clock_time},{pipeline_clock_time},{lateness},{min_latency}") {
|
||||||
|
gst::error!(CAT, obj: obj, "Failed to write to file: {err}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl GstObjectImpl for BufferLateness {}
|
||||||
|
|
||||||
|
impl TracerImpl for BufferLateness {
|
||||||
|
fn element_add_pad(&self, _ts: u64, _element: &gst::Element, pad: &gst::Pad) {
|
||||||
|
if pad.direction() != gst::PadDirection::Src {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let tracer = self.instance();
|
||||||
|
let ptr = pad.as_ptr() as usize;
|
||||||
|
gst::debug!(
|
||||||
|
CAT,
|
||||||
|
obj: &tracer,
|
||||||
|
"new source pad: {} 0x{:08x}",
|
||||||
|
pad.name(),
|
||||||
|
ptr
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut state = self.state.lock().unwrap();
|
||||||
|
// FIXME: Element name might not be set yet here if the pad is added in instance_init
|
||||||
|
// already.
|
||||||
|
state.pads.entry(ptr).or_insert_with(|| Pad {
|
||||||
|
element_name: None,
|
||||||
|
pad_name: Arc::new(pad.name()),
|
||||||
|
latency: 0,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
fn element_remove_pad(&self, _ts: u64, _element: &gst::Element, pad: &gst::Pad) {
|
||||||
|
let ptr = pad.as_ptr() as usize;
|
||||||
|
let mut state = self.state.lock().unwrap();
|
||||||
|
state.pads.remove(&ptr);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn pad_push_pre(&self, ts: u64, pad: &gst::Pad, buffer: &gst::Buffer) {
|
||||||
|
let timestamp = match buffer.dts_or_pts() {
|
||||||
|
Some(timestamp) => timestamp,
|
||||||
|
None => return,
|
||||||
|
};
|
||||||
|
|
||||||
|
let element = match pad.parent().and_then(|p| p.downcast::<gst::Element>().ok()) {
|
||||||
|
Some(element) => element,
|
||||||
|
None => return,
|
||||||
|
};
|
||||||
|
|
||||||
|
let clock = match element.clock() {
|
||||||
|
Some(clock) => clock,
|
||||||
|
None => return,
|
||||||
|
};
|
||||||
|
|
||||||
|
let base_time = match element.base_time() {
|
||||||
|
// FIXME: Workaround for base time being set to 0 initially instead of None
|
||||||
|
Some(base_time)
|
||||||
|
if base_time == gst::ClockTime::ZERO && element.start_time().is_some() =>
|
||||||
|
{
|
||||||
|
return
|
||||||
|
}
|
||||||
|
Some(base_time) => base_time,
|
||||||
|
None => return,
|
||||||
|
};
|
||||||
|
|
||||||
|
let segment = match pad
|
||||||
|
.sticky_event::<gst::event::Segment>(0)
|
||||||
|
.map(|s| s.segment().clone())
|
||||||
|
.and_then(|s| s.downcast::<gst::ClockTime>().ok())
|
||||||
|
{
|
||||||
|
Some(segment) => segment,
|
||||||
|
None => return,
|
||||||
|
};
|
||||||
|
|
||||||
|
let ptr = pad.as_ptr() as usize;
|
||||||
|
|
||||||
|
let mut state = self.state.lock().unwrap();
|
||||||
|
let State {
|
||||||
|
ref mut pads,
|
||||||
|
ref mut log,
|
||||||
|
ref settings,
|
||||||
|
..
|
||||||
|
} = &mut *state;
|
||||||
|
if let Some(pad) = pads.get_mut(&ptr) {
|
||||||
|
if pad.element_name.is_none() {
|
||||||
|
pad.element_name = Some(Arc::new(element.name()));
|
||||||
|
|
||||||
|
let name = format!("{}:{}", pad.element_name.as_ref().unwrap(), pad.pad_name);
|
||||||
|
if let Some(ref filter) = settings.include_filter {
|
||||||
|
if !filter.is_match(&name) {
|
||||||
|
pads.remove(&ptr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some(ref filter) = settings.exclude_filter {
|
||||||
|
if filter.is_match(&name) {
|
||||||
|
pads.remove(&ptr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let element_name = pad.element_name.as_ref().unwrap();
|
||||||
|
|
||||||
|
let running_time = match segment.to_running_time(timestamp) {
|
||||||
|
Some(running_time) => running_time,
|
||||||
|
None => return,
|
||||||
|
};
|
||||||
|
|
||||||
|
let buffer_clock_time = running_time + base_time;
|
||||||
|
let pipeline_clock_time = match clock.time() {
|
||||||
|
Some(time) => time,
|
||||||
|
None => return,
|
||||||
|
};
|
||||||
|
|
||||||
|
log.push(LogLine {
|
||||||
|
timestamp: ts,
|
||||||
|
element_name: element_name.clone(),
|
||||||
|
pad_name: pad.pad_name.clone(),
|
||||||
|
ptr,
|
||||||
|
buffer_clock_time: buffer_clock_time.nseconds(),
|
||||||
|
pipeline_clock_time: pipeline_clock_time.nseconds(),
|
||||||
|
lateness: if buffer_clock_time > pipeline_clock_time {
|
||||||
|
-((buffer_clock_time.nseconds() - pipeline_clock_time.nseconds()) as i64)
|
||||||
|
} else {
|
||||||
|
(pipeline_clock_time.nseconds() - buffer_clock_time.nseconds()) as i64
|
||||||
|
},
|
||||||
|
min_latency: pad.latency,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn pad_push_list_pre(&self, ts: u64, pad: &gst::Pad, buffer_list: &gst::BufferList) {
|
||||||
|
for buffer in buffer_list.iter_owned() {
|
||||||
|
self.pad_push_pre(ts, pad, &buffer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::single_match)]
|
||||||
|
fn pad_query_post(&self, _ts: u64, pad: &gst::Pad, query: &gst::Query, res: bool) {
|
||||||
|
if !res {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if pad.direction() != gst::PadDirection::Src {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
match query.view() {
|
||||||
|
gst::QueryView::Latency(l) => {
|
||||||
|
let mut state = self.state.lock().unwrap();
|
||||||
|
if let Some(pad) = state.pads.get_mut(&(pad.as_ptr() as usize)) {
|
||||||
|
let (live, min, _max) = l.result();
|
||||||
|
if live {
|
||||||
|
pad.latency = min.nseconds();
|
||||||
|
} else {
|
||||||
|
pad.latency = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
24
utils/tracers/src/buffer_lateness/mod.rs
Normal file
24
utils/tracers/src/buffer_lateness/mod.rs
Normal file
|
@ -0,0 +1,24 @@
|
||||||
|
// Copyright (C) 2022 Sebastian Dröge <sebastian@centricular.com>
|
||||||
|
//
|
||||||
|
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
|
||||||
|
// If a copy of the MPL was not distributed with this file, You can obtain one at
|
||||||
|
// <https://mozilla.org/MPL/2.0/>.
|
||||||
|
//
|
||||||
|
// SPDX-License-Identifier: MPL-2.0
|
||||||
|
|
||||||
|
use gst::glib;
|
||||||
|
use gst::prelude::*;
|
||||||
|
|
||||||
|
mod imp;
|
||||||
|
|
||||||
|
glib::wrapper! {
|
||||||
|
pub struct BufferLateness(ObjectSubclass<imp::BufferLateness>) @extends gst::Tracer, gst::Object;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
|
||||||
|
gst::Tracer::register(
|
||||||
|
Some(plugin),
|
||||||
|
"buffer-lateness",
|
||||||
|
BufferLateness::static_type(),
|
||||||
|
)
|
||||||
|
}
|
|
@ -9,12 +9,14 @@
|
||||||
|
|
||||||
use gst::glib;
|
use gst::glib;
|
||||||
|
|
||||||
|
mod buffer_lateness;
|
||||||
mod pipeline_snapshot;
|
mod pipeline_snapshot;
|
||||||
mod queue_levels;
|
mod queue_levels;
|
||||||
|
|
||||||
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
|
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
|
||||||
pipeline_snapshot::register(plugin)?;
|
pipeline_snapshot::register(plugin)?;
|
||||||
queue_levels::register(plugin)?;
|
queue_levels::register(plugin)?;
|
||||||
|
buffer_lateness::register(plugin)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue