ts: add feature to add counters for performance evaluation

Add a `tuning` feature which adds counters that help with performance
evaluation. The only counter added so far accumulates the duration a
Scheduler has been parked, which is pretty accurate an indication of
CPU usage of the Scheduler.
This commit is contained in:
François Laignel 2022-09-07 23:10:29 +02:00 committed by Sebastian Dröge
parent 72acbebff0
commit 235ded35fd
5 changed files with 170 additions and 119 deletions

View file

@ -65,6 +65,8 @@ pkg-config = "0.3.15"
[features] [features]
static = [] static = []
capi = [] capi = []
# Adds performance counters used by benchmarking tools.
tuning = []
doc = ["gst/v1_18"] doc = ["gst/v1_18"]
[package.metadata.capi] [package.metadata.capi]

View file

@ -39,29 +39,14 @@ pub static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
fn main() { fn main() {
gst::init().unwrap(); gst::init().unwrap();
// Register the plugins statically:
#[cfg(debug_assertions)] // - The executable can be run from anywhere.
{ // - No risk of running against a previous version.
use std::path::Path; // - `main` can use features that rely on `static`s or `thread_local`
// such as `Context::acquire` which otherwise don't point to
let mut path = Path::new("target/debug"); // the same `static` or `thread_local`, probably because
if !path.exists() { // the shared object uses its owns and the executable, others.
path = Path::new("../../target/debug"); gstthreadshare::plugin_register_static().unwrap();
}
gst::Registry::get().scan_path(path);
}
#[cfg(not(debug_assertions))]
{
use std::path::Path;
let mut path = Path::new("target/release");
if !path.exists() {
path = Path::new("../../target/release");
}
gst::Registry::get().scan_path(path);
}
let args = env::args().collect::<Vec<_>>(); let args = env::args().collect::<Vec<_>>();
assert!(args.len() > 4); assert!(args.len() > 4);
@ -71,7 +56,7 @@ fn main() {
let wait: u32 = args[4].parse().unwrap(); let wait: u32 = args[4].parse().unwrap();
// Nb buffers to await before stopping. // Nb buffers to await before stopping.
let max_buffers: Option<u64> = if args.len() > 5 { let max_buffers: Option<f32> = if args.len() > 5 {
args[5].parse().ok() args[5].parse().ok()
} else { } else {
None None
@ -243,15 +228,24 @@ fn main() {
let l_clone = l.clone(); let l_clone = l.clone();
thread::spawn(move || { thread::spawn(move || {
let throughput_factor = 1_000f32 / (n_streams as f32); let n_streams_f32 = n_streams as f32;
let mut prev_reset_instant: Option<Instant> = None;
let mut count; let mut total_count = 0.0;
let mut reset_instant; let mut ramp_up_complete_instant: Option<Instant> = None;
#[cfg(feature = "tuning")]
let ctx_0 = gstthreadshare::runtime::Context::acquire(
"context-0",
Duration::from_millis(wait as u64),
)
.unwrap();
#[cfg(feature = "tuning")]
let mut parked_init = Duration::ZERO;
loop { loop {
count = counter.fetch_and(0, Ordering::SeqCst); total_count += counter.fetch_and(0, Ordering::SeqCst) as f32 / n_streams_f32;
if let Some(max_buffers) = max_buffers { if let Some(max_buffers) = max_buffers {
if count > max_buffers { if total_count > max_buffers {
gst::info!(CAT, "Stopping"); gst::info!(CAT, "Stopping");
let stopping_instant = Instant::now(); let stopping_instant = Instant::now();
pipeline.set_state(gst::State::Ready).unwrap(); pipeline.set_state(gst::State::Ready).unwrap();
@ -263,22 +257,34 @@ fn main() {
} }
} }
reset_instant = Instant::now(); if let Some(init) = ramp_up_complete_instant {
let elapsed = init.elapsed();
if let Some(prev_reset_instant) = prev_reset_instant {
gst::info!( gst::info!(
CAT, CAT,
"{:>6.2} / s / stream", "{:>6.2} / s / stream",
(count as f32) * throughput_factor total_count * 1_000.0 / elapsed.as_millis() as f32
/ ((reset_instant - prev_reset_instant).as_millis() as f32)
); );
#[cfg(feature = "tuning")]
gst::info!(
CAT,
"{:>6.2}% parked",
(ctx_0.parked_duration() - parked_init).as_nanos() as f32 * 100.0
/ elapsed.as_nanos() as f32
);
} else {
// Ramp up 30s worth of buffers before following parked
if total_count > 50.0 * 30.0 {
total_count = 0.0;
ramp_up_complete_instant = Some(Instant::now());
#[cfg(feature = "tuning")]
{
parked_init = ctx_0.parked_duration();
}
}
} }
if let Some(sleep_duration) = THROUGHPUT_PERIOD.checked_sub(reset_instant.elapsed()) { thread::sleep(THROUGHPUT_PERIOD);
thread::sleep(sleep_duration);
}
prev_reset_instant = Some(reset_instant);
} }
}); });

View file

@ -189,6 +189,8 @@ struct Stats {
interval_late_warn: Duration, interval_late_warn: Duration,
interval_late_count: f32, interval_late_count: f32,
interval_late_count_delta: f32, interval_late_count_delta: f32,
#[cfg(feature = "tuning")]
parked_duration_init: Duration,
} }
impl Stats { impl Stats {
@ -238,6 +240,11 @@ impl Stats {
gst::info!(CAT, "Ramp up complete. Stats logs in {:2?}", LOG_PERIOD); gst::info!(CAT, "Ramp up complete. Stats logs in {:2?}", LOG_PERIOD);
self.log_start_instant = Some(Instant::now()); self.log_start_instant = Some(Instant::now());
self.last_delta_instant = self.log_start_instant; self.last_delta_instant = self.log_start_instant;
#[cfg(feature = "tuning")]
{
self.parked_duration_init = Context::current().unwrap().parked_duration();
}
} }
use std::cmp::Ordering::*; use std::cmp::Ordering::*;
@ -304,102 +311,115 @@ impl Stats {
self.last_delta_instant = Some(Instant::now()); self.last_delta_instant = Some(Instant::now());
if self.buffer_count_delta > 1.0 { gst::info!(CAT, "Delta stats:");
gst::info!(CAT, "Delta stats:"); let interval_mean = self.interval_sum_delta / self.buffer_count_delta;
let interval_std_dev = f32::sqrt(
self.interval_square_sum_delta / self.buffer_count_delta - interval_mean.powi(2),
);
let interval_mean = self.interval_sum_delta / self.buffer_count_delta; gst::info!(
let interval_std_dev = f32::sqrt( CAT,
self.interval_square_sum_delta / self.buffer_count_delta - interval_mean.powi(2), "o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
); Duration::from_nanos(interval_mean as u64),
Duration::from_nanos(interval_std_dev as u64),
self.interval_min_delta,
self.interval_max_delta,
);
gst::info!( if self.interval_late_count_delta > f32::EPSILON {
gst::warning!(
CAT, CAT,
"o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]", "o {:5.2}% late buffers",
Duration::from_nanos(interval_mean as u64), 100f32 * self.interval_late_count_delta / self.buffer_count_delta
Duration::from_nanos(interval_std_dev as u64),
self.interval_min_delta,
self.interval_max_delta,
); );
if self.interval_late_count_delta > 1.0 {
gst::warning!(
CAT,
"o {:5.2}% late buffers",
100f32 * self.interval_late_count_delta / self.buffer_count_delta
);
}
self.interval_sum_delta = 0.0;
self.interval_square_sum_delta = 0.0;
self.interval_min_delta = Duration::MAX;
self.interval_max_delta = Duration::ZERO;
self.interval_late_count_delta = 0.0;
let latency_mean = self.latency_sum_delta / self.buffer_count_delta;
let latency_std_dev = f32::sqrt(
self.latency_square_sum_delta / self.buffer_count_delta - latency_mean.powi(2),
);
gst::info!(
CAT,
"o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
Duration::from_nanos(latency_mean as u64),
Duration::from_nanos(latency_std_dev as u64),
self.latency_min_delta,
self.latency_max_delta,
);
self.latency_sum_delta = 0.0;
self.latency_square_sum_delta = 0.0;
self.latency_min_delta = Duration::MAX;
self.latency_max_delta = Duration::ZERO;
} }
self.interval_sum_delta = 0.0;
self.interval_square_sum_delta = 0.0;
self.interval_min_delta = Duration::MAX;
self.interval_max_delta = Duration::ZERO;
self.interval_late_count_delta = 0.0;
let latency_mean = self.latency_sum_delta / self.buffer_count_delta;
let latency_std_dev = f32::sqrt(
self.latency_square_sum_delta / self.buffer_count_delta - latency_mean.powi(2),
);
gst::info!(
CAT,
"o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
Duration::from_nanos(latency_mean as u64),
Duration::from_nanos(latency_std_dev as u64),
self.latency_min_delta,
self.latency_max_delta,
);
self.latency_sum_delta = 0.0;
self.latency_square_sum_delta = 0.0;
self.latency_min_delta = Duration::MAX;
self.latency_max_delta = Duration::ZERO;
self.buffer_count_delta = 0.0; self.buffer_count_delta = 0.0;
} }
fn log_global(&mut self) { fn log_global(&mut self) {
if self.log_start_instant.is_none() { if self.buffer_count < 1.0 {
return; return;
} }
if self.buffer_count > 1.0 { let _log_start = if let Some(log_start) = self.log_start_instant {
gst::info!(CAT, "Global stats:"); log_start
} else {
return;
};
let interval_mean = self.interval_sum / self.buffer_count; gst::info!(CAT, "Global stats:");
let interval_std_dev =
f32::sqrt(self.interval_square_sum / self.buffer_count - interval_mean.powi(2));
#[cfg(feature = "tuning")]
{
let duration = _log_start.elapsed();
let parked_duration =
Context::current().unwrap().parked_duration() - self.parked_duration_init;
gst::info!( gst::info!(
CAT, CAT,
"o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]", "o parked: {parked_duration:4.2?} ({:5.2?}%)",
Duration::from_nanos(interval_mean as u64), (parked_duration.as_nanos() as f32 * 100.0 / duration.as_nanos() as f32)
Duration::from_nanos(interval_std_dev as u64),
self.interval_min,
self.interval_max,
);
if self.interval_late_count > f32::EPSILON {
gst::warning!(
CAT,
"o {:5.2}% late buffers",
100f32 * self.interval_late_count / self.buffer_count
);
}
let latency_mean = self.latency_sum / self.buffer_count;
let latency_std_dev =
f32::sqrt(self.latency_square_sum / self.buffer_count - latency_mean.powi(2));
gst::info!(
CAT,
"o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
Duration::from_nanos(latency_mean as u64),
Duration::from_nanos(latency_std_dev as u64),
self.latency_min,
self.latency_max,
); );
} }
let interval_mean = self.interval_sum / self.buffer_count;
let interval_std_dev =
f32::sqrt(self.interval_square_sum / self.buffer_count - interval_mean.powi(2));
gst::info!(
CAT,
"o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
Duration::from_nanos(interval_mean as u64),
Duration::from_nanos(interval_std_dev as u64),
self.interval_min,
self.interval_max,
);
if self.interval_late_count > f32::EPSILON {
gst::warning!(
CAT,
"o {:5.2}% late buffers",
100f32 * self.interval_late_count / self.buffer_count
);
}
let latency_mean = self.latency_sum / self.buffer_count;
let latency_std_dev =
f32::sqrt(self.latency_square_sum / self.buffer_count - latency_mean.powi(2));
gst::info!(
CAT,
"o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
Duration::from_nanos(latency_mean as u64),
Duration::from_nanos(latency_std_dev as u64),
self.latency_min,
self.latency_max,
);
} }
} }

View file

@ -187,6 +187,14 @@ impl Context {
self.0.max_throttling() self.0.max_throttling()
} }
/// Total duration the scheduler spent parked.
///
/// This is only useful for performance evaluation.
#[cfg(feature = "tuning")]
pub fn parked_duration(&self) -> Duration {
self.0.parked_duration()
}
/// Returns `true` if a `Context` is running on current thread. /// Returns `true` if a `Context` is running on current thread.
pub fn is_context_thread() -> bool { pub fn is_context_thread() -> bool {
Scheduler::is_scheduler_thread() Scheduler::is_scheduler_thread()

View file

@ -11,6 +11,8 @@ use gio::glib::clone::Downgrade;
use std::cell::RefCell; use std::cell::RefCell;
use std::future::Future; use std::future::Future;
use std::panic; use std::panic;
#[cfg(feature = "tuning")]
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::mpsc as sync_mpsc; use std::sync::mpsc as sync_mpsc;
use std::sync::{Arc, Condvar, Mutex, Weak}; use std::sync::{Arc, Condvar, Mutex, Weak};
use std::task::Poll; use std::task::Poll;
@ -34,6 +36,8 @@ pub(super) struct Scheduler {
tasks: TaskQueue, tasks: TaskQueue,
must_unpark: Mutex<bool>, must_unpark: Mutex<bool>,
must_unpark_cvar: Condvar, must_unpark_cvar: Condvar,
#[cfg(feature = "tuning")]
parked_duration: AtomicU64,
} }
impl Scheduler { impl Scheduler {
@ -104,6 +108,8 @@ impl Scheduler {
tasks: TaskQueue::new(context_name), tasks: TaskQueue::new(context_name),
must_unpark: Mutex::new(false), must_unpark: Mutex::new(false),
must_unpark_cvar: Condvar::new(), must_unpark_cvar: Condvar::new(),
#[cfg(feature = "tuning")]
parked_duration: AtomicU64::new(0),
})); }));
*cur_scheduler = Some(handle.downgrade()); *cur_scheduler = Some(handle.downgrade());
@ -201,12 +207,16 @@ impl Scheduler {
break; break;
} }
if let Some(wait_duration) = self.max_throttling.checked_sub(last.elapsed()) { if let Some(parking_duration) = self.max_throttling.checked_sub(last.elapsed()) {
let result = self let result = self
.must_unpark_cvar .must_unpark_cvar
.wait_timeout(must_unpark, wait_duration) .wait_timeout(must_unpark, parking_duration)
.unwrap(); .unwrap();
#[cfg(feature = "tuning")]
self.parked_duration
.fetch_add(parking_duration.subsec_nanos() as u64, Ordering::Relaxed);
must_unpark = result.0; must_unpark = result.0;
} else { } else {
*must_unpark = false; *must_unpark = false;
@ -360,6 +370,11 @@ impl Handle {
self.0.scheduler.max_throttling self.0.scheduler.max_throttling
} }
#[cfg(feature = "tuning")]
pub fn parked_duration(&self) -> Duration {
Duration::from_nanos(self.0.scheduler.parked_duration.load(Ordering::Relaxed))
}
/// Executes the provided function relatively to this [`Scheduler`]'s [`Reactor`]. /// Executes the provided function relatively to this [`Scheduler`]'s [`Reactor`].
/// ///
/// Usefull to initialze i/o sources and timers from outside /// Usefull to initialze i/o sources and timers from outside