From 235ded35fd71082ac9ec78502ba462adb238d38f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Wed, 7 Sep 2022 23:10:29 +0200 Subject: [PATCH] ts: add feature to add counters for performance evaluation Add a `tuning` feature which adds counters that help with performance evaluation. The only counter added so far accumulates the duration a Scheduler has been parked, which is pretty accurate an indication of CPU usage of the Scheduler. --- generic/threadshare/Cargo.toml | 2 + generic/threadshare/examples/benchmark.rs | 86 +++++---- .../examples/standalone/sink/imp.rs | 174 ++++++++++-------- .../src/runtime/executor/context.rs | 8 + .../src/runtime/executor/scheduler.rs | 19 +- 5 files changed, 170 insertions(+), 119 deletions(-) diff --git a/generic/threadshare/Cargo.toml b/generic/threadshare/Cargo.toml index 80ae0dea..cd14971a 100644 --- a/generic/threadshare/Cargo.toml +++ b/generic/threadshare/Cargo.toml @@ -65,6 +65,8 @@ pkg-config = "0.3.15" [features] static = [] capi = [] +# Adds performance counters used by benchmarking tools. +tuning = [] doc = ["gst/v1_18"] [package.metadata.capi] diff --git a/generic/threadshare/examples/benchmark.rs b/generic/threadshare/examples/benchmark.rs index 6dc904ba..32b00ed2 100644 --- a/generic/threadshare/examples/benchmark.rs +++ b/generic/threadshare/examples/benchmark.rs @@ -39,29 +39,14 @@ pub static CAT: Lazy = Lazy::new(|| { fn main() { gst::init().unwrap(); - - #[cfg(debug_assertions)] - { - use std::path::Path; - - let mut path = Path::new("target/debug"); - if !path.exists() { - path = Path::new("../../target/debug"); - } - - gst::Registry::get().scan_path(path); - } - #[cfg(not(debug_assertions))] - { - use std::path::Path; - - let mut path = Path::new("target/release"); - if !path.exists() { - path = Path::new("../../target/release"); - } - - gst::Registry::get().scan_path(path); - } + // Register the plugins statically: + // - The executable can be run from anywhere. + // - No risk of running against a previous version. + // - `main` can use features that rely on `static`s or `thread_local` + // such as `Context::acquire` which otherwise don't point to + // the same `static` or `thread_local`, probably because + // the shared object uses its owns and the executable, others. + gstthreadshare::plugin_register_static().unwrap(); let args = env::args().collect::>(); assert!(args.len() > 4); @@ -71,7 +56,7 @@ fn main() { let wait: u32 = args[4].parse().unwrap(); // Nb buffers to await before stopping. - let max_buffers: Option = if args.len() > 5 { + let max_buffers: Option = if args.len() > 5 { args[5].parse().ok() } else { None @@ -243,15 +228,24 @@ fn main() { let l_clone = l.clone(); thread::spawn(move || { - let throughput_factor = 1_000f32 / (n_streams as f32); - let mut prev_reset_instant: Option = None; - let mut count; - let mut reset_instant; + let n_streams_f32 = n_streams as f32; + + let mut total_count = 0.0; + let mut ramp_up_complete_instant: Option = None; + + #[cfg(feature = "tuning")] + let ctx_0 = gstthreadshare::runtime::Context::acquire( + "context-0", + Duration::from_millis(wait as u64), + ) + .unwrap(); + #[cfg(feature = "tuning")] + let mut parked_init = Duration::ZERO; loop { - count = counter.fetch_and(0, Ordering::SeqCst); + total_count += counter.fetch_and(0, Ordering::SeqCst) as f32 / n_streams_f32; if let Some(max_buffers) = max_buffers { - if count > max_buffers { + if total_count > max_buffers { gst::info!(CAT, "Stopping"); let stopping_instant = Instant::now(); pipeline.set_state(gst::State::Ready).unwrap(); @@ -263,22 +257,34 @@ fn main() { } } - reset_instant = Instant::now(); - - if let Some(prev_reset_instant) = prev_reset_instant { + if let Some(init) = ramp_up_complete_instant { + let elapsed = init.elapsed(); gst::info!( CAT, "{:>6.2} / s / stream", - (count as f32) * throughput_factor - / ((reset_instant - prev_reset_instant).as_millis() as f32) + total_count * 1_000.0 / elapsed.as_millis() as f32 ); + + #[cfg(feature = "tuning")] + gst::info!( + CAT, + "{:>6.2}% parked", + (ctx_0.parked_duration() - parked_init).as_nanos() as f32 * 100.0 + / elapsed.as_nanos() as f32 + ); + } else { + // Ramp up 30s worth of buffers before following parked + if total_count > 50.0 * 30.0 { + total_count = 0.0; + ramp_up_complete_instant = Some(Instant::now()); + #[cfg(feature = "tuning")] + { + parked_init = ctx_0.parked_duration(); + } + } } - if let Some(sleep_duration) = THROUGHPUT_PERIOD.checked_sub(reset_instant.elapsed()) { - thread::sleep(sleep_duration); - } - - prev_reset_instant = Some(reset_instant); + thread::sleep(THROUGHPUT_PERIOD); } }); diff --git a/generic/threadshare/examples/standalone/sink/imp.rs b/generic/threadshare/examples/standalone/sink/imp.rs index 67ab406d..de0406f5 100644 --- a/generic/threadshare/examples/standalone/sink/imp.rs +++ b/generic/threadshare/examples/standalone/sink/imp.rs @@ -189,6 +189,8 @@ struct Stats { interval_late_warn: Duration, interval_late_count: f32, interval_late_count_delta: f32, + #[cfg(feature = "tuning")] + parked_duration_init: Duration, } impl Stats { @@ -238,6 +240,11 @@ impl Stats { gst::info!(CAT, "Ramp up complete. Stats logs in {:2?}", LOG_PERIOD); self.log_start_instant = Some(Instant::now()); self.last_delta_instant = self.log_start_instant; + + #[cfg(feature = "tuning")] + { + self.parked_duration_init = Context::current().unwrap().parked_duration(); + } } use std::cmp::Ordering::*; @@ -304,102 +311,115 @@ impl Stats { self.last_delta_instant = Some(Instant::now()); - if self.buffer_count_delta > 1.0 { - gst::info!(CAT, "Delta stats:"); + gst::info!(CAT, "Delta stats:"); + let interval_mean = self.interval_sum_delta / self.buffer_count_delta; + let interval_std_dev = f32::sqrt( + self.interval_square_sum_delta / self.buffer_count_delta - interval_mean.powi(2), + ); - let interval_mean = self.interval_sum_delta / self.buffer_count_delta; - let interval_std_dev = f32::sqrt( - self.interval_square_sum_delta / self.buffer_count_delta - interval_mean.powi(2), - ); + gst::info!( + CAT, + "o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]", + Duration::from_nanos(interval_mean as u64), + Duration::from_nanos(interval_std_dev as u64), + self.interval_min_delta, + self.interval_max_delta, + ); - gst::info!( + if self.interval_late_count_delta > f32::EPSILON { + gst::warning!( CAT, - "o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]", - Duration::from_nanos(interval_mean as u64), - Duration::from_nanos(interval_std_dev as u64), - self.interval_min_delta, - self.interval_max_delta, + "o {:5.2}% late buffers", + 100f32 * self.interval_late_count_delta / self.buffer_count_delta ); - - if self.interval_late_count_delta > 1.0 { - gst::warning!( - CAT, - "o {:5.2}% late buffers", - 100f32 * self.interval_late_count_delta / self.buffer_count_delta - ); - } - - self.interval_sum_delta = 0.0; - self.interval_square_sum_delta = 0.0; - self.interval_min_delta = Duration::MAX; - self.interval_max_delta = Duration::ZERO; - self.interval_late_count_delta = 0.0; - - let latency_mean = self.latency_sum_delta / self.buffer_count_delta; - let latency_std_dev = f32::sqrt( - self.latency_square_sum_delta / self.buffer_count_delta - latency_mean.powi(2), - ); - - gst::info!( - CAT, - "o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]", - Duration::from_nanos(latency_mean as u64), - Duration::from_nanos(latency_std_dev as u64), - self.latency_min_delta, - self.latency_max_delta, - ); - - self.latency_sum_delta = 0.0; - self.latency_square_sum_delta = 0.0; - self.latency_min_delta = Duration::MAX; - self.latency_max_delta = Duration::ZERO; } + self.interval_sum_delta = 0.0; + self.interval_square_sum_delta = 0.0; + self.interval_min_delta = Duration::MAX; + self.interval_max_delta = Duration::ZERO; + self.interval_late_count_delta = 0.0; + + let latency_mean = self.latency_sum_delta / self.buffer_count_delta; + let latency_std_dev = f32::sqrt( + self.latency_square_sum_delta / self.buffer_count_delta - latency_mean.powi(2), + ); + + gst::info!( + CAT, + "o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]", + Duration::from_nanos(latency_mean as u64), + Duration::from_nanos(latency_std_dev as u64), + self.latency_min_delta, + self.latency_max_delta, + ); + + self.latency_sum_delta = 0.0; + self.latency_square_sum_delta = 0.0; + self.latency_min_delta = Duration::MAX; + self.latency_max_delta = Duration::ZERO; + self.buffer_count_delta = 0.0; } fn log_global(&mut self) { - if self.log_start_instant.is_none() { + if self.buffer_count < 1.0 { return; } - if self.buffer_count > 1.0 { - gst::info!(CAT, "Global stats:"); + let _log_start = if let Some(log_start) = self.log_start_instant { + log_start + } else { + return; + }; - let interval_mean = self.interval_sum / self.buffer_count; - let interval_std_dev = - f32::sqrt(self.interval_square_sum / self.buffer_count - interval_mean.powi(2)); + gst::info!(CAT, "Global stats:"); + #[cfg(feature = "tuning")] + { + let duration = _log_start.elapsed(); + let parked_duration = + Context::current().unwrap().parked_duration() - self.parked_duration_init; gst::info!( CAT, - "o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]", - Duration::from_nanos(interval_mean as u64), - Duration::from_nanos(interval_std_dev as u64), - self.interval_min, - self.interval_max, - ); - - if self.interval_late_count > f32::EPSILON { - gst::warning!( - CAT, - "o {:5.2}% late buffers", - 100f32 * self.interval_late_count / self.buffer_count - ); - } - - let latency_mean = self.latency_sum / self.buffer_count; - let latency_std_dev = - f32::sqrt(self.latency_square_sum / self.buffer_count - latency_mean.powi(2)); - - gst::info!( - CAT, - "o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]", - Duration::from_nanos(latency_mean as u64), - Duration::from_nanos(latency_std_dev as u64), - self.latency_min, - self.latency_max, + "o parked: {parked_duration:4.2?} ({:5.2?}%)", + (parked_duration.as_nanos() as f32 * 100.0 / duration.as_nanos() as f32) ); } + + let interval_mean = self.interval_sum / self.buffer_count; + let interval_std_dev = + f32::sqrt(self.interval_square_sum / self.buffer_count - interval_mean.powi(2)); + + gst::info!( + CAT, + "o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]", + Duration::from_nanos(interval_mean as u64), + Duration::from_nanos(interval_std_dev as u64), + self.interval_min, + self.interval_max, + ); + + if self.interval_late_count > f32::EPSILON { + gst::warning!( + CAT, + "o {:5.2}% late buffers", + 100f32 * self.interval_late_count / self.buffer_count + ); + } + + let latency_mean = self.latency_sum / self.buffer_count; + let latency_std_dev = + f32::sqrt(self.latency_square_sum / self.buffer_count - latency_mean.powi(2)); + + gst::info!( + CAT, + "o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]", + Duration::from_nanos(latency_mean as u64), + Duration::from_nanos(latency_std_dev as u64), + self.latency_min, + self.latency_max, + ); } } diff --git a/generic/threadshare/src/runtime/executor/context.rs b/generic/threadshare/src/runtime/executor/context.rs index f1555627..325e2019 100644 --- a/generic/threadshare/src/runtime/executor/context.rs +++ b/generic/threadshare/src/runtime/executor/context.rs @@ -187,6 +187,14 @@ impl Context { self.0.max_throttling() } + /// Total duration the scheduler spent parked. + /// + /// This is only useful for performance evaluation. + #[cfg(feature = "tuning")] + pub fn parked_duration(&self) -> Duration { + self.0.parked_duration() + } + /// Returns `true` if a `Context` is running on current thread. pub fn is_context_thread() -> bool { Scheduler::is_scheduler_thread() diff --git a/generic/threadshare/src/runtime/executor/scheduler.rs b/generic/threadshare/src/runtime/executor/scheduler.rs index 53a7fab7..958d88b3 100644 --- a/generic/threadshare/src/runtime/executor/scheduler.rs +++ b/generic/threadshare/src/runtime/executor/scheduler.rs @@ -11,6 +11,8 @@ use gio::glib::clone::Downgrade; use std::cell::RefCell; use std::future::Future; use std::panic; +#[cfg(feature = "tuning")] +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::mpsc as sync_mpsc; use std::sync::{Arc, Condvar, Mutex, Weak}; use std::task::Poll; @@ -34,6 +36,8 @@ pub(super) struct Scheduler { tasks: TaskQueue, must_unpark: Mutex, must_unpark_cvar: Condvar, + #[cfg(feature = "tuning")] + parked_duration: AtomicU64, } impl Scheduler { @@ -104,6 +108,8 @@ impl Scheduler { tasks: TaskQueue::new(context_name), must_unpark: Mutex::new(false), must_unpark_cvar: Condvar::new(), + #[cfg(feature = "tuning")] + parked_duration: AtomicU64::new(0), })); *cur_scheduler = Some(handle.downgrade()); @@ -201,12 +207,16 @@ impl Scheduler { break; } - if let Some(wait_duration) = self.max_throttling.checked_sub(last.elapsed()) { + if let Some(parking_duration) = self.max_throttling.checked_sub(last.elapsed()) { let result = self .must_unpark_cvar - .wait_timeout(must_unpark, wait_duration) + .wait_timeout(must_unpark, parking_duration) .unwrap(); + #[cfg(feature = "tuning")] + self.parked_duration + .fetch_add(parking_duration.subsec_nanos() as u64, Ordering::Relaxed); + must_unpark = result.0; } else { *must_unpark = false; @@ -360,6 +370,11 @@ impl Handle { self.0.scheduler.max_throttling } + #[cfg(feature = "tuning")] + pub fn parked_duration(&self) -> Duration { + Duration::from_nanos(self.0.scheduler.parked_duration.load(Ordering::Relaxed)) + } + /// Executes the provided function relatively to this [`Scheduler`]'s [`Reactor`]. /// /// Usefull to initialze i/o sources and timers from outside