diff --git a/generic/threadshare/Cargo.toml b/generic/threadshare/Cargo.toml index 8a34d333..80ae0dea 100644 --- a/generic/threadshare/Cargo.toml +++ b/generic/threadshare/Cargo.toml @@ -54,9 +54,8 @@ name = "tcpclientsrc-benchmark-sender" path = "examples/tcpclientsrc_benchmark_sender.rs" [[example]] -name = "standalone" +name = "ts-standalone" path = "examples/standalone/main.rs" -required-features = ["clap"] [build-dependencies] gst-plugin-version-helper = { path="../../version-helper" } diff --git a/generic/threadshare/examples/standalone/main.rs b/generic/threadshare/examples/standalone/main.rs index 1b14bc62..8852d9f7 100644 --- a/generic/threadshare/examples/standalone/main.rs +++ b/generic/threadshare/examples/standalone/main.rs @@ -31,14 +31,18 @@ gst::plugin_define!( env!("BUILD_REL_DATE") ); +#[cfg(feature = "clap")] use clap::Parser; +#[cfg(feature = "clap")] #[derive(Parser, Debug)] #[clap(version)] -#[clap(about = "Standalone pipeline threadshare runtime test")] +#[clap( + about = "Standalone pipeline threadshare runtime test. Use `GST_DEBUG=ts-standalone*:4` for stats" +)] struct Args { /// Parallel streams to process. - #[clap(short, long, default_value_t = 100)] + #[clap(short, long, default_value_t = 5000)] streams: u32, /// Threadshare groups. @@ -49,13 +53,67 @@ struct Args { #[clap(short, long, default_value_t = 20)] wait: u32, + /// Buffer push period in ms. + #[clap(short, long, default_value_t = 20)] + push_period: u32, + /// Number of buffers per stream to output before sending EOS (-1 = unlimited). - #[clap(short, long, default_value_t = 6000)] + #[clap(short, long, default_value_t = 5000)] num_buffers: i32, - /// Enables statistics logging (use GST_DEBUG=ts-standalone*:4). + /// Disables statistics logging. #[clap(short, long)] - log_stats: bool, + disable_stats_log: bool, +} + +#[cfg(not(feature = "clap"))] +#[derive(Debug)] +struct Args { + streams: u32, + groups: u32, + wait: u32, + push_period: u32, + num_buffers: i32, + disable_stats_log: bool, +} + +#[cfg(not(feature = "clap"))] +impl Default for Args { + fn default() -> Self { + Args { + streams: 5000, + groups: 2, + wait: 20, + push_period: 20, + num_buffers: 5000, + disable_stats_log: false, + } + } +} + +fn args() -> Args { + #[cfg(feature = "clap")] + let args = { + let args = Args::parse(); + gst::info!(CAT, "{:?}", args); + + args + }; + + #[cfg(not(feature = "clap"))] + let args = { + if std::env::args().len() > 1 { + gst::warning!(CAT, "Ignoring command line arguments"); + gst::warning!(CAT, "Build with `--features=clap`"); + } + + let args = Args::default(); + gst::warning!(CAT, "{:?}", args); + + args + }; + + args } fn main() { @@ -68,7 +126,7 @@ fn main() { #[cfg(debug_assertions)] gst::warning!(CAT, "RUNNING DEBUG BUILD"); - let args = Args::parse(); + let args = args(); let pipeline = gst::Pipeline::new(None); @@ -82,6 +140,7 @@ fn main() { .unwrap(); src.set_property("context", &ctx_name); src.set_property("context-wait", args.wait); + src.set_property("push-period", args.push_period); src.set_property("num-buffers", args.num_buffers); let sink = gst::ElementFactory::make( @@ -91,8 +150,27 @@ fn main() { .unwrap(); sink.set_property("context", &ctx_name); sink.set_property("context-wait", args.wait); - if i == 0 && args.log_stats { - sink.set_property("must-log-stats", true); + + if i == 0 { + src.set_property("raise-log-level", true); + sink.set_property("raise-log-level", true); + + if !args.disable_stats_log { + // Don't use the last 5 secs in stats + // otherwise we get outliers when reaching EOS. + // Note that stats don't start before the 20 first seconds + // and we get 50 buffers per sec. + const BUFFERS_BEFORE_LOGS: i32 = 20 * 50; + const BUFFERS_TO_SKIP: i32 = BUFFERS_BEFORE_LOGS + 5 * 50; + if args.num_buffers > BUFFERS_TO_SKIP { + sink.set_property("push-period", args.push_period); + sink.set_property("logs-stats", true); + let max_buffers = args.num_buffers - BUFFERS_TO_SKIP; + sink.set_property("max-buffers", max_buffers); + } else { + gst::warning!(CAT, "Not enough buffers to log, disabling stats"); + } + } } let elements = &[&src, &sink]; @@ -109,11 +187,8 @@ fn main() { use gst::MessageView; match msg.view() { - MessageView::Eos(..) => { - gst::info!(CAT, "Shuting down"); - let stop = Instant::now(); - pipeline_clone.set_state(gst::State::Null).unwrap(); - gst::info!(CAT, "Shuting down took {:.2?}", stop.elapsed()); + MessageView::Eos(_) => { + gst::info!(CAT, "Received eos"); l_clone.quit(); } MessageView::Error(err) => { @@ -133,10 +208,25 @@ fn main() { }) .expect("Failed to add bus watch"); - gst::info!(CAT, "Starting"); + gst::info!(CAT, "Switching to Ready"); + let start = Instant::now(); + pipeline.set_state(gst::State::Ready).unwrap(); + gst::info!(CAT, "Switching to Ready took {:.2?}", start.elapsed()); + + gst::info!(CAT, "Switching to Playing"); let start = Instant::now(); pipeline.set_state(gst::State::Playing).unwrap(); - gst::info!(CAT, "Starting took {:.2?}", start.elapsed()); + gst::info!(CAT, "Switching to Playing took {:.2?}", start.elapsed()); l.run(); + + gst::info!(CAT, "Switching to Ready"); + let stop = Instant::now(); + pipeline_clone.set_state(gst::State::Ready).unwrap(); + gst::info!(CAT, "Switching to Ready took {:.2?}", stop.elapsed()); + + gst::info!(CAT, "Shutting down"); + let stop = Instant::now(); + pipeline_clone.set_state(gst::State::Null).unwrap(); + gst::info!(CAT, "Shutting down took {:.2?}", stop.elapsed()); } diff --git a/generic/threadshare/examples/standalone/sink/imp.rs b/generic/threadshare/examples/standalone/sink/imp.rs index d2083037..67ab406d 100644 --- a/generic/threadshare/examples/standalone/sink/imp.rs +++ b/generic/threadshare/examples/standalone/sink/imp.rs @@ -10,18 +10,17 @@ use futures::future::BoxFuture; use futures::prelude::*; use futures::stream::Peekable; +use gst::error_msg; use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; use gst::EventView; -use gst::{element_error, error_msg}; use once_cell::sync::Lazy; use gstthreadshare::runtime::prelude::*; -use gstthreadshare::runtime::{self, Context, PadSink, PadSinkRef, Task}; +use gstthreadshare::runtime::{Context, PadSink, PadSinkRef, Task}; -use std::pin::Pin; use std::sync::Mutex; use std::task::Poll; use std::time::{Duration, Instant}; @@ -36,32 +35,36 @@ static CAT: Lazy = Lazy::new(|| { const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_millis(20); -const DEFAULT_SYNC: bool = true; -const DEFAULT_MUST_LOG_STATS: bool = false; +const DEFAULT_PUSH_PERIOD: Duration = Duration::from_millis(20); +const DEFAULT_MAX_BUFFERS: i32 = 50 * (100 - 25); const LOG_PERIOD: Duration = Duration::from_secs(20); #[derive(Debug, Clone)] struct Settings { - sync: bool, context: String, context_wait: Duration, - must_log_stats: bool, + raise_log_level: bool, + logs_stats: bool, + push_period: Duration, + max_buffers: Option, } impl Default for Settings { fn default() -> Self { Settings { - sync: DEFAULT_SYNC, context: DEFAULT_CONTEXT.into(), context_wait: DEFAULT_CONTEXT_WAIT, - must_log_stats: DEFAULT_MUST_LOG_STATS, + raise_log_level: false, + logs_stats: false, + push_period: DEFAULT_PUSH_PERIOD, + max_buffers: Some(DEFAULT_MAX_BUFFERS as u32), } } } #[derive(Debug)] -enum TaskItem { +enum StreamItem { Buffer(gst::Buffer), Event(gst::Event), } @@ -83,7 +86,7 @@ impl PadSinkHandler for TestSinkPadHandler { let element = element.clone().downcast::().unwrap(); async move { - if sender.send_async(TaskItem::Buffer(buffer)).await.is_err() { + if sender.send_async(StreamItem::Buffer(buffer)).await.is_err() { gst::debug!(CAT, obj: &element, "Flushing"); return Err(gst::FlowError::Flushing); } @@ -105,7 +108,7 @@ impl PadSinkHandler for TestSinkPadHandler { async move { for buffer in list.iter_owned() { - if sender.send_async(TaskItem::Buffer(buffer)).await.is_err() { + if sender.send_async(StreamItem::Buffer(buffer)).await.is_err() { gst::debug!(CAT, obj: &element, "Flushing"); return Err(gst::FlowError::Flushing); } @@ -130,7 +133,7 @@ impl PadSinkHandler for TestSinkPadHandler { if let EventView::FlushStop(_) = event.view() { let test_sink = element.imp(); return test_sink.task.flush_stop().await_maybe_on_context().is_ok(); - } else if sender.send_async(TaskItem::Event(event)).await.is_err() { + } else if sender.send_async(StreamItem::Event(event)).await.is_err() { gst::debug!(CAT, obj: &element, "Flushing"); } @@ -161,20 +164,31 @@ impl PadSinkHandler for TestSinkPadHandler { #[derive(Default)] struct Stats { must_log: bool, - sync: bool, ramp_up_instant: Option, log_start_instant: Option, last_delta_instant: Option, - buffer_count: u32, - buffer_count_delta: u32, - buffer_headroom: Duration, - buffer_headroom_delta: Duration, - late_buffer_count: u32, - lateness: Duration, - max_lateness: Duration, - late_buffer_count_delta: u32, - lateness_delta: Duration, - max_lateness_delta: Duration, + max_buffers: Option, + buffer_count: f32, + buffer_count_delta: f32, + latency_sum: f32, + latency_square_sum: f32, + latency_sum_delta: f32, + latency_square_sum_delta: f32, + latency_min: Duration, + latency_min_delta: Duration, + latency_max: Duration, + latency_max_delta: Duration, + interval_sum: f32, + interval_square_sum: f32, + interval_sum_delta: f32, + interval_square_sum_delta: f32, + interval_min: Duration, + interval_min_delta: Duration, + interval_max: Duration, + interval_max_delta: Duration, + interval_late_warn: Duration, + interval_late_count: f32, + interval_late_count_delta: f32, } impl Stats { @@ -183,24 +197,34 @@ impl Stats { return; } - self.buffer_count = 0; - self.buffer_count_delta = 0; - self.buffer_headroom = Duration::ZERO; - self.buffer_headroom_delta = Duration::ZERO; - self.late_buffer_count = 0; - self.lateness = Duration::ZERO; - self.max_lateness = Duration::ZERO; - self.late_buffer_count_delta = 0; - self.lateness_delta = Duration::ZERO; - self.max_lateness_delta = Duration::ZERO; + self.buffer_count = 0.0; + self.buffer_count_delta = 0.0; + self.latency_sum = 0.0; + self.latency_square_sum = 0.0; + self.latency_sum_delta = 0.0; + self.latency_square_sum_delta = 0.0; + self.latency_min = Duration::MAX; + self.latency_min_delta = Duration::MAX; + self.latency_max = Duration::ZERO; + self.latency_max_delta = Duration::ZERO; + self.interval_sum = 0.0; + self.interval_square_sum = 0.0; + self.interval_sum_delta = 0.0; + self.interval_square_sum_delta = 0.0; + self.interval_min = Duration::MAX; + self.interval_min_delta = Duration::MAX; + self.interval_max = Duration::ZERO; + self.interval_max_delta = Duration::ZERO; + self.interval_late_count = 0.0; + self.interval_late_count_delta = 0.0; self.last_delta_instant = None; self.log_start_instant = None; self.ramp_up_instant = Some(Instant::now()); - gst::info!(CAT, "First stats logs in {:.2?}", 2 * LOG_PERIOD); + gst::info!(CAT, "First stats logs in {:2?}", 2 * LOG_PERIOD); } - fn can_count(&mut self) -> bool { + fn is_active(&mut self) -> bool { if !self.must_log { return false; } @@ -211,53 +235,64 @@ impl Stats { } self.ramp_up_instant = None; - gst::info!(CAT, "Ramp up complete. Stats logs in {:.2?}", LOG_PERIOD); + gst::info!(CAT, "Ramp up complete. Stats logs in {:2?}", LOG_PERIOD); self.log_start_instant = Some(Instant::now()); self.last_delta_instant = self.log_start_instant; } - true + use std::cmp::Ordering::*; + match self.max_buffers.opt_cmp(self.buffer_count) { + Some(Equal) => { + self.log_global(); + self.buffer_count += 1.0; + false + } + Some(Less) => false, + _ => true, + } } - fn notify_buffer(&mut self) { - if !self.can_count() { + fn add_buffer(&mut self, latency: Duration, interval: Duration) { + if !self.is_active() { return; } - self.buffer_count += 1; - self.buffer_count_delta += 1; - } + self.buffer_count += 1.0; + self.buffer_count_delta += 1.0; - fn notify_buffer_headroom(&mut self, headroom: Duration) { - if !self.can_count() { - return; + // Latency + let latency_f32 = latency.as_nanos() as f32; + let latency_square = latency_f32.powi(2); + + self.latency_sum += latency_f32; + self.latency_square_sum += latency_square; + self.latency_min = self.latency_min.min(latency); + self.latency_max = self.latency_max.max(latency); + + self.latency_sum_delta += latency_f32; + self.latency_square_sum_delta += latency_square; + self.latency_min_delta = self.latency_min_delta.min(latency); + self.latency_max_delta = self.latency_max_delta.max(latency); + + // Interval + let interval_f32 = interval.as_nanos() as f32; + let interval_square = interval_f32.powi(2); + + self.interval_sum += interval_f32; + self.interval_square_sum += interval_square; + self.interval_min = self.interval_min.min(interval); + self.interval_max = self.interval_max.max(interval); + + self.interval_sum_delta += interval_f32; + self.interval_square_sum_delta += interval_square; + self.interval_min_delta = self.interval_min_delta.min(interval); + self.interval_max_delta = self.interval_max_delta.max(interval); + + if interval > self.interval_late_warn { + self.interval_late_count += 1.0; + self.interval_late_count_delta += 1.0; } - self.buffer_headroom += headroom; - self.buffer_headroom_delta += headroom; - } - - fn notify_late_buffer(&mut self, now: Option, pts: gst::ClockTime) { - if !self.can_count() { - return; - } - - let lateness = now - .opt_checked_sub(pts) - .ok() - .flatten() - .map_or(Duration::ZERO, Duration::from); - - self.late_buffer_count += 1; - self.lateness += lateness; - self.max_lateness = self.max_lateness.max(lateness); - - self.late_buffer_count_delta += 1; - self.lateness_delta += lateness; - self.max_lateness_delta = self.max_lateness_delta.max(lateness); - } - - fn log_delta(&mut self) { let delta_duration = match self.last_delta_instant { Some(last_delta) => last_delta.elapsed(), None => return, @@ -269,94 +304,121 @@ impl Stats { self.last_delta_instant = Some(Instant::now()); - gst::info!(CAT, "Delta stats:"); - gst::info!( - CAT, - "o {:>5.2} buffers / s", - self.buffer_count_delta as f32 / delta_duration.as_millis() as f32 * 1_000f32, - ); + if self.buffer_count_delta > 1.0 { + gst::info!(CAT, "Delta stats:"); - if self.sync && self.buffer_count_delta > 0 { - let early_buffers_count = self - .buffer_count_delta - .saturating_sub(self.late_buffer_count_delta); - if early_buffers_count > 0 { - gst::info!( - CAT, - "o {:>5.2?} headroom / early buffers", - self.buffer_headroom_delta / early_buffers_count, - ); + let interval_mean = self.interval_sum_delta / self.buffer_count_delta; + let interval_std_dev = f32::sqrt( + self.interval_square_sum_delta / self.buffer_count_delta - interval_mean.powi(2), + ); - gst::info!( + gst::info!( + CAT, + "o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]", + Duration::from_nanos(interval_mean as u64), + Duration::from_nanos(interval_std_dev as u64), + self.interval_min_delta, + self.interval_max_delta, + ); + + if self.interval_late_count_delta > 1.0 { + gst::warning!( CAT, - "o {:>5.2}% late buffers - mean {:>5.2?}, max {:>5.2?}", - self.late_buffer_count_delta as f32 / self.buffer_count_delta as f32 * 100f32, - self.lateness_delta - .checked_div(self.late_buffer_count_delta) - .unwrap_or(Duration::ZERO), - self.max_lateness_delta, + "o {:5.2}% late buffers", + 100f32 * self.interval_late_count_delta / self.buffer_count_delta ); } - self.buffer_headroom_delta = Duration::ZERO; - self.late_buffer_count_delta = 0; - self.lateness_delta = Duration::ZERO; - self.max_lateness_delta = Duration::ZERO; + self.interval_sum_delta = 0.0; + self.interval_square_sum_delta = 0.0; + self.interval_min_delta = Duration::MAX; + self.interval_max_delta = Duration::ZERO; + self.interval_late_count_delta = 0.0; + + let latency_mean = self.latency_sum_delta / self.buffer_count_delta; + let latency_std_dev = f32::sqrt( + self.latency_square_sum_delta / self.buffer_count_delta - latency_mean.powi(2), + ); + + gst::info!( + CAT, + "o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]", + Duration::from_nanos(latency_mean as u64), + Duration::from_nanos(latency_std_dev as u64), + self.latency_min_delta, + self.latency_max_delta, + ); + + self.latency_sum_delta = 0.0; + self.latency_square_sum_delta = 0.0; + self.latency_min_delta = Duration::MAX; + self.latency_max_delta = Duration::ZERO; } - self.buffer_count_delta = 0; + self.buffer_count_delta = 0.0; } fn log_global(&mut self) { - let log_duration = match self.log_start_instant { - Some(start) => start.elapsed(), - None => return, - }; + if self.log_start_instant.is_none() { + return; + } - gst::info!(CAT, "Global stats:"); - gst::info!( - CAT, - "o {:>5.2} buffers / s", - self.buffer_count as f32 / log_duration.as_millis() as f32 * 1_000f32, - ); + if self.buffer_count > 1.0 { + gst::info!(CAT, "Global stats:"); - if self.sync && self.buffer_count > 0 { - let early_buffers_count = self.buffer_count.saturating_sub(self.late_buffer_count); - if early_buffers_count > 0 { - gst::info!( + let interval_mean = self.interval_sum / self.buffer_count; + let interval_std_dev = + f32::sqrt(self.interval_square_sum / self.buffer_count - interval_mean.powi(2)); + + gst::info!( + CAT, + "o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]", + Duration::from_nanos(interval_mean as u64), + Duration::from_nanos(interval_std_dev as u64), + self.interval_min, + self.interval_max, + ); + + if self.interval_late_count > f32::EPSILON { + gst::warning!( CAT, - "o {:>5.2?} headroom / early buffers", - self.buffer_headroom / early_buffers_count, - ); - - gst::info!( - CAT, - "o {:>5.2}% late buffers - mean {:>5.2?}, max {:>5.2?}", - self.late_buffer_count as f32 / self.buffer_count as f32 * 100f32, - self.lateness - .checked_div(self.late_buffer_count) - .unwrap_or(Duration::ZERO), - self.max_lateness, + "o {:5.2}% late buffers", + 100f32 * self.interval_late_count / self.buffer_count ); } + + let latency_mean = self.latency_sum / self.buffer_count; + let latency_std_dev = + f32::sqrt(self.latency_square_sum / self.buffer_count - latency_mean.powi(2)); + + gst::info!( + CAT, + "o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]", + Duration::from_nanos(latency_mean as u64), + Duration::from_nanos(latency_std_dev as u64), + self.latency_min, + self.latency_max, + ); } } } struct TestSinkTask { element: super::TestSink, - item_receiver: Peekable>, - sync: bool, + raise_log_level: bool, + last_dts: Option, + item_receiver: Peekable>, stats: Stats, segment: Option, } impl TestSinkTask { - fn new(element: &super::TestSink, item_receiver: flume::Receiver) -> Self { + fn new(element: &super::TestSink, item_receiver: flume::Receiver) -> Self { TestSinkTask { element: element.clone(), + raise_log_level: false, + last_dts: None, item_receiver: item_receiver.into_stream().peekable(), - sync: DEFAULT_SYNC, stats: Stats::default(), segment: None, } @@ -369,17 +431,23 @@ impl TestSinkTask { } impl TaskImpl for TestSinkTask { - type Item = TaskItem; + type Item = StreamItem; fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { - gst::log!(CAT, obj: &self.element, "Preparing Task"); - let sink = self.element.imp(); let settings = sink.settings.lock().unwrap(); - self.sync = settings.sync; - self.stats.sync = self.sync; - self.stats.must_log = settings.must_log_stats; + self.raise_log_level = settings.raise_log_level; + + if self.raise_log_level { + gst::log!(CAT, obj: &self.element, "Preparing Task"); + } else { + gst::trace!(CAT, obj: &self.element, "Preparing Task"); + } + + self.stats.must_log = settings.logs_stats; + self.stats.max_buffers = settings.max_buffers.map(|max_buffers| max_buffers as f32); + self.stats.interval_late_warn = settings.push_period + settings.context_wait / 2; Ok(()) } @@ -388,7 +456,13 @@ impl TaskImpl for TestSinkTask { fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async { - gst::log!(CAT, obj: &self.element, "Starting Task"); + if self.raise_log_level { + gst::log!(CAT, obj: &self.element, "Starting Task"); + } else { + gst::trace!(CAT, obj: &self.element, "Starting Task"); + } + + self.last_dts = None; self.stats.start(); Ok(()) } @@ -397,81 +471,92 @@ impl TaskImpl for TestSinkTask { fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async { - gst::log!(CAT, obj: &self.element, "Stopping Task"); - self.flush().await; - Ok(()) - } - .boxed() - } - - fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { - async { - gst::log!(CAT, obj: &self.element, "Starting Task Flush"); - self.flush().await; - Ok(()) - } - .boxed() - } - - fn try_next(&mut self) -> BoxFuture<'_, Result> { - async move { - let item_opt = Pin::new(&mut self.item_receiver).peek().await; - - // Check the peeked item in case we need to sync. - // The item will still be available in the channel - // in case this is cancelled by a state transition. - match item_opt { - Some(TaskItem::Buffer(buffer)) => { - self.stats.notify_buffer(); - - if self.sync { - let rtime = self.segment.as_ref().and_then(|segment| { - segment - .downcast_ref::() - .and_then(|segment| segment.to_running_time(buffer.pts())) - }); - if let Some(pts) = rtime { - // This can be cancelled by a state transition. - self.sync(pts).await; - } - } - } - Some(_) => (), - None => { - panic!("Internal channel sender dropped while Task is Started"); - } + if self.raise_log_level { + gst::log!(CAT, obj: &self.element, "Stopping Task"); + } else { + gst::trace!(CAT, obj: &self.element, "Stopping Task"); } - // An item was peeked above, we can now pop it without losing it. - Ok(self.item_receiver.next().await.unwrap()) + self.flush().await; + Ok(()) } .boxed() } - fn handle_item(&mut self, item: TaskItem) -> BoxFuture<'_, Result<(), gst::FlowError>> { + fn try_next(&mut self) -> BoxFuture<'_, Result> { async move { - gst::debug!(CAT, obj: &self.element, "Handling {:?}", item); + let item = self.item_receiver.next().await.unwrap(); + + if self.raise_log_level { + gst::log!(CAT, obj: &self.element, "Popped item"); + } else { + gst::trace!(CAT, obj: &self.element, "Popped item"); + } + + Ok(item) + } + .boxed() + } + + fn handle_item(&mut self, item: StreamItem) -> BoxFuture<'_, Result<(), gst::FlowError>> { + async move { + if self.raise_log_level { + gst::debug!(CAT, obj: &self.element, "Received {:?}", item); + } else { + gst::trace!(CAT, obj: &self.element, "Received {:?}", item); + } match item { - TaskItem::Buffer(buffer) => { - self.render(buffer).await.map_err(|err| { - element_error!( - &self.element, - gst::StreamError::Failed, - ["Failed to render item, stopping task: {}", err] - ); - gst::FlowError::Error - })?; + StreamItem::Buffer(buffer) => { + let dts = self + .segment + .as_ref() + .and_then(|segment| { + segment + .downcast_ref::() + .and_then(|segment| segment.to_running_time(buffer.dts())) + }) + .unwrap(); - self.stats.log_delta(); + if let Some(last_dts) = self.last_dts { + let cur_ts = self.element.current_running_time().unwrap(); + let latency: Duration = (cur_ts - dts).into(); + let interval: Duration = (dts - last_dts).into(); + + self.stats.add_buffer(latency, interval); + + if self.raise_log_level { + gst::debug!(CAT, obj: &self.element, "o latency {:.2?}", latency); + gst::debug!(CAT, obj: &self.element, "o interval {:.2?}", interval); + } else { + gst::trace!(CAT, obj: &self.element, "o latency {:.2?}", latency); + gst::trace!(CAT, obj: &self.element, "o interval {:.2?}", interval); + } + } + + self.last_dts = Some(dts); + + if self.raise_log_level { + gst::log!(CAT, obj: &self.element, "Buffer processed"); + } else { + gst::trace!(CAT, obj: &self.element, "Buffer processed"); + } } - TaskItem::Event(event) => match event.view() { + StreamItem::Event(event) => match event.view() { EventView::Eos(_) => { - self.stats.log_global(); + if self.raise_log_level { + gst::debug!(CAT, obj: &self.element, "EOS"); + } else { + gst::trace!(CAT, obj: &self.element, "EOS"); + } - let _ = self - .element - .post_message(gst::message::Eos::builder().src(&self.element).build()); + let elem = self.element.clone(); + self.element.call_async(move |_| { + let _ = + elem.post_message(gst::message::Eos::builder().src(&elem).build()); + }); + + return Err(gst::FlowError::Eos); } EventView::Segment(e) => { self.segment = Some(e.segment().clone()); @@ -489,54 +574,27 @@ impl TaskImpl for TestSinkTask { } } -impl TestSinkTask { - async fn render(&mut self, buffer: gst::Buffer) -> Result<(), gst::FlowError> { - let _data = buffer.map_readable().map_err(|_| { - element_error!( - self.element, - gst::StreamError::Format, - ["Failed to map buffer readable"] - ); - gst::FlowError::Error - })?; - - gst::log!(CAT, obj: &self.element, "buffer {:?} rendered", buffer); - - Ok(()) - } - - /// Waits until specified time. - async fn sync(&mut self, pts: gst::ClockTime) { - let now = self.element.current_running_time(); - - if let Ok(Some(delay)) = pts.opt_checked_sub(now) { - let delay = delay.into(); - gst::trace!(CAT, obj: &self.element, "sync: waiting {:?}", delay); - runtime::time::delay_for(delay).await; - - self.stats.notify_buffer_headroom(delay); - } else { - self.stats.notify_late_buffer(now, pts); - } - } -} - #[derive(Debug)] pub struct TestSink { sink_pad: PadSink, task: Task, - item_sender: Mutex>>, + item_sender: Mutex>>, settings: Mutex, } impl TestSink { #[track_caller] - fn clone_item_sender(&self) -> flume::Sender { + fn clone_item_sender(&self) -> flume::Sender { self.item_sender.lock().unwrap().as_ref().unwrap().clone() } fn prepare(&self, element: &super::TestSink) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, obj: element, "Preparing"); + let raise_log_level = self.settings.lock().unwrap().raise_log_level; + if raise_log_level { + gst::debug!(CAT, obj: element, "Preparing"); + } else { + gst::trace!(CAT, obj: element, "Preparing"); + } let context = { let settings = self.settings.lock().unwrap(); @@ -556,28 +614,67 @@ impl TestSink { *self.item_sender.lock().unwrap() = Some(item_sender); - gst::debug!(CAT, obj: element, "Started preparation"); + if raise_log_level { + gst::debug!(CAT, obj: element, "Prepared"); + } else { + gst::trace!(CAT, obj: element, "Prepared"); + } Ok(()) } fn unprepare(&self, element: &super::TestSink) { - gst::debug!(CAT, obj: element, "Unpreparing"); + let raise_log_level = self.settings.lock().unwrap().raise_log_level; + if raise_log_level { + gst::debug!(CAT, obj: element, "Unpreparing"); + } else { + gst::trace!(CAT, obj: element, "Unpreparing"); + } + self.task.unprepare().block_on().unwrap(); - gst::debug!(CAT, obj: element, "Unprepared"); + + if raise_log_level { + gst::debug!(CAT, obj: element, "Unprepared"); + } else { + gst::trace!(CAT, obj: element, "Unprepared"); + } } fn stop(&self, element: &super::TestSink) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, obj: element, "Stopping"); + let raise_log_level = self.settings.lock().unwrap().raise_log_level; + if raise_log_level { + gst::debug!(CAT, obj: element, "Stopping"); + } else { + gst::trace!(CAT, obj: element, "Stopping"); + } + self.task.stop().block_on()?; - gst::debug!(CAT, obj: element, "Stopped"); + + if raise_log_level { + gst::debug!(CAT, obj: element, "Stopped"); + } else { + gst::trace!(CAT, obj: element, "Stopped"); + } + Ok(()) } fn start(&self, element: &super::TestSink) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, obj: element, "Starting"); + let raise_log_level = self.settings.lock().unwrap().raise_log_level; + if raise_log_level { + gst::debug!(CAT, obj: element, "Starting"); + } else { + gst::trace!(CAT, obj: element, "Starting"); + } + self.task.start().block_on()?; - gst::debug!(CAT, obj: element, "Started"); + + if raise_log_level { + gst::debug!(CAT, obj: element, "Started"); + } else { + gst::trace!(CAT, obj: element, "Started"); + } + Ok(()) } } @@ -616,17 +713,27 @@ impl ObjectImpl for TestSink { .maximum(1000) .default_value(DEFAULT_CONTEXT_WAIT.as_millis() as u32) .build(), - glib::ParamSpecBoolean::builder("sync") - .nick("Sync") - .blurb("Sync on the clock") - .default_value(DEFAULT_SYNC) - .build(), - glib::ParamSpecBoolean::builder("must-log-stats") - .nick("Must Log Stats") - .blurb("Whether statistics should be logged") - .default_value(DEFAULT_MUST_LOG_STATS) + glib::ParamSpecBoolean::builder("raise-log-level") + .nick("Raise log level") + .blurb("Raises the log level so that this element stands out") .write_only() .build(), + glib::ParamSpecBoolean::builder("logs-stats") + .nick("Logs Stats") + .blurb("Whether statistics should be logged") + .write_only() + .build(), + glib::ParamSpecUInt::builder("push-period") + .nick("Src buffer Push Period") + .blurb("Push period used by `src` element (used for stats warnings)") + .default_value(DEFAULT_PUSH_PERIOD.as_millis() as u32) + .build(), + glib::ParamSpecInt::builder("max-buffers") + .nick("Max Buffers") + .blurb("Number of buffers to count before stopping stats (-1 = unlimited)") + .minimum(-1i32) + .default_value(DEFAULT_MAX_BUFFERS) + .build(), ] }); @@ -642,10 +749,6 @@ impl ObjectImpl for TestSink { ) { let mut settings = self.settings.lock().unwrap(); match pspec.name() { - "sync" => { - let sync = value.get().expect("type checked upstream"); - settings.sync = sync; - } "context" => { settings.context = value .get::>() @@ -657,9 +760,21 @@ impl ObjectImpl for TestSink { value.get::().expect("type checked upstream").into(), ); } - "must-log-stats" => { - let must_log_stats = value.get().expect("type checked upstream"); - settings.must_log_stats = must_log_stats; + "raise-log-level" => { + settings.raise_log_level = value.get::().expect("type checked upstream"); + } + "logs-stats" => { + let logs_stats = value.get().expect("type checked upstream"); + settings.logs_stats = logs_stats; + } + "push-period" => { + settings.push_period = Duration::from_millis( + value.get::().expect("type checked upstream").into(), + ); + } + "max-buffers" => { + let value = value.get::().expect("type checked upstream"); + settings.max_buffers = if value > 0 { Some(value as u32) } else { None }; } _ => unimplemented!(), } @@ -668,9 +783,15 @@ impl ObjectImpl for TestSink { fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { let settings = self.settings.lock().unwrap(); match pspec.name() { - "sync" => settings.sync.to_value(), "context" => settings.context.to_value(), "context-wait" => (settings.context_wait.as_millis() as u32).to_value(), + "raise-log-level" => settings.raise_log_level.to_value(), + "push-period" => (settings.push_period.as_millis() as u32).to_value(), + "max-buffers" => settings + .max_buffers + .and_then(|val| val.try_into().ok()) + .unwrap_or(-1i32) + .to_value(), _ => unimplemented!(), } } diff --git a/generic/threadshare/examples/standalone/src/imp.rs b/generic/threadshare/examples/standalone/src/imp.rs index 6b5f1d42..afcb2c41 100644 --- a/generic/threadshare/examples/standalone/src/imp.rs +++ b/generic/threadshare/examples/standalone/src/imp.rs @@ -16,7 +16,7 @@ use gst::subclass::prelude::*; use once_cell::sync::Lazy; use std::sync::Mutex; -use std::time::{Duration, Instant}; +use std::time::Duration; use gstthreadshare::runtime::prelude::*; use gstthreadshare::runtime::{Context, PadSrc, Task, Timer}; @@ -29,17 +29,18 @@ static CAT: Lazy = Lazy::new(|| { ) }); -const BUFFER_DURATION: gst::ClockTime = gst::ClockTime::from_mseconds(20); - const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_millis(20); -const DEFAULT_NUM_BUFFERS: i32 = 50 * 60 * 2; +const DEFAULT_PUSH_PERIOD: gst::ClockTime = gst::ClockTime::from_mseconds(20); +const DEFAULT_NUM_BUFFERS: i32 = 50 * 100; #[derive(Debug, Clone)] struct Settings { context: String, context_wait: Duration, - num_buffers: Option, + push_period: gst::ClockTime, + raise_log_level: bool, + num_buffers: Option, } impl Default for Settings { @@ -47,7 +48,9 @@ impl Default for Settings { Settings { context: DEFAULT_CONTEXT.into(), context_wait: DEFAULT_CONTEXT_WAIT, - num_buffers: Some(DEFAULT_NUM_BUFFERS), + push_period: DEFAULT_PUSH_PERIOD, + raise_log_level: false, + num_buffers: Some(DEFAULT_NUM_BUFFERS as u32), } } } @@ -62,13 +65,13 @@ impl PadSrcHandler for TestSrcPadHandler { struct SrcTask { element: super::TestSrc, buffer_pool: gst::BufferPool, - last_pts: gst::ClockTime, - last_buf_instant: Option, - push_period: Duration, + timer: Option, + raise_log_level: bool, + push_period: gst::ClockTime, need_initial_events: bool, need_segment: bool, - num_buffers: Option, - buffer_count: i32, + num_buffers: Option, + buffer_count: u32, } impl SrcTask { @@ -83,12 +86,12 @@ impl SrcTask { SrcTask { element, buffer_pool, - last_pts: gst::ClockTime::ZERO, - last_buf_instant: None, - push_period: Duration::ZERO, + timer: None, + raise_log_level: false, + push_period: gst::ClockTime::ZERO, need_initial_events: true, need_segment: true, - num_buffers: Some(DEFAULT_NUM_BUFFERS), + num_buffers: Some(DEFAULT_NUM_BUFFERS as u32), buffer_count: 0, } } @@ -99,11 +102,17 @@ impl TaskImpl for SrcTask { fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { - gst::log!(CAT, obj: &self.element, "Preparing Task"); - let src = self.element.imp(); let settings = src.settings.lock().unwrap(); - self.push_period = settings.context_wait; + self.raise_log_level = settings.raise_log_level; + + if self.raise_log_level { + gst::log!(CAT, obj: &self.element, "Preparing Task"); + } else { + gst::trace!(CAT, obj: &self.element, "Preparing Task"); + } + + self.push_period = settings.push_period; self.num_buffers = settings.num_buffers; Ok(()) @@ -113,9 +122,18 @@ impl TaskImpl for SrcTask { fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async { - gst::log!(CAT, obj: &self.element, "Starting Task"); + if self.raise_log_level { + gst::log!(CAT, obj: &self.element, "Starting Task"); + } else { + gst::trace!(CAT, obj: &self.element, "Starting Task"); + } + + self.timer = Some(Timer::interval_delayed_by( + // Delay first buffer push so as to let others start. + Duration::from_secs(2), + self.push_period.into(), + )); self.buffer_count = 0; - self.last_buf_instant = None; self.buffer_pool.set_active(true).unwrap(); Ok(()) } @@ -124,27 +142,17 @@ impl TaskImpl for SrcTask { fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { - gst::log!(CAT, obj: &self.element, "Stopping task"); + if self.raise_log_level { + gst::log!(CAT, obj: &self.element, "Stopping Task"); + } else { + gst::trace!(CAT, obj: &self.element, "Stopping Task"); + } self.buffer_pool.set_active(false).unwrap(); - self.last_pts = gst::ClockTime::ZERO; + self.timer = None; self.need_initial_events = true; self.need_segment = true; - gst::log!(CAT, obj: &self.element, "Task stopped"); - Ok(()) - } - .boxed() - } - - fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { - async move { - gst::log!(CAT, obj: &self.element, "Starting task flush"); - - self.buffer_pool.set_active(false).unwrap(); - self.need_segment = true; - - gst::log!(CAT, obj: &self.element, "Task flush started"); Ok(()) } .boxed() @@ -152,30 +160,34 @@ impl TaskImpl for SrcTask { fn try_next(&mut self) -> BoxFuture<'_, Result> { async move { - if let Some(delay) = self - .last_buf_instant - .map(|last| last.elapsed()) - .opt_checked_sub(self.push_period) - .ok() - .flatten() - { - Timer::after(delay).await; + if self.raise_log_level { + gst::log!(CAT, obj: &self.element, "Awaiting timer"); + } else { + gst::trace!(CAT, obj: &self.element, "Awaiting timer"); } - self.last_buf_instant = Some(Instant::now()); + self.timer.as_mut().unwrap().next().await; - let start = self.last_pts; - self.last_pts = start + BUFFER_DURATION; + if self.raise_log_level { + gst::log!(CAT, obj: &self.element, "Timer ticked"); + } else { + gst::trace!(CAT, obj: &self.element, "Timer ticked"); + } - self.buffer_pool.acquire_buffer(None).map(|mut buffer| { - { - let buffer = buffer.get_mut().unwrap(); - buffer.set_pts(start); - buffer.set_duration(BUFFER_DURATION); - } - - buffer - }) + self.buffer_pool + .acquire_buffer(None) + .map(|mut buffer| { + { + let buffer = buffer.get_mut().unwrap(); + let rtime = self.element.current_running_time().unwrap(); + buffer.set_dts(rtime); + } + buffer + }) + .map_err(|err| { + gst::error!(CAT, obj: &self.element, "Failed to acquire buffer {}", err); + err + }) } .boxed() } @@ -185,15 +197,29 @@ impl TaskImpl for SrcTask { let res = self.push(buffer).await; match res { Ok(_) => { - gst::log!(CAT, obj: &self.element, "Successfully pushed buffer"); + if self.raise_log_level { + gst::log!(CAT, obj: &self.element, "Successfully pushed buffer"); + } else { + gst::trace!(CAT, obj: &self.element, "Successfully pushed buffer"); + } } Err(gst::FlowError::Eos) => { - gst::debug!(CAT, obj: &self.element, "EOS"); + if self.raise_log_level { + gst::debug!(CAT, obj: &self.element, "EOS"); + } else { + gst::trace!(CAT, obj: &self.element, "EOS"); + } let test_src = self.element.imp(); test_src.src_pad.push_event(gst::event::Eos::new()).await; + + return Err(gst::FlowError::Eos); } Err(gst::FlowError::Flushing) => { - gst::debug!(CAT, obj: &self.element, "Flushing"); + if self.raise_log_level { + gst::debug!(CAT, obj: &self.element, "Flushing"); + } else { + gst::trace!(CAT, obj: &self.element, "Flushing"); + } } Err(err) => { gst::error!(CAT, obj: &self.element, "Got error {}", err); @@ -214,11 +240,20 @@ impl TaskImpl for SrcTask { impl SrcTask { async fn push(&mut self, buffer: gst::Buffer) -> Result { - gst::log!(CAT, obj: &self.element, "Handling {:?}", buffer); + if self.raise_log_level { + gst::debug!(CAT, obj: &self.element, "Pushing {:?}", buffer); + } else { + gst::trace!(CAT, obj: &self.element, "Pushing {:?}", buffer); + } + let test_src = self.element.imp(); if self.need_initial_events { - gst::debug!(CAT, obj: &self.element, "Pushing initial events"); + if self.raise_log_level { + gst::debug!(CAT, obj: &self.element, "Pushing initial events"); + } else { + gst::trace!(CAT, obj: &self.element, "Pushing initial events"); + } let stream_id = format!("{:08x}{:08x}", rand::random::(), rand::random::()); let stream_start_evt = gst::event::StreamStart::builder(&stream_id) @@ -244,12 +279,27 @@ impl SrcTask { self.need_segment = false; } - gst::debug!(CAT, obj: &self.element, "Forwarding {:?}", buffer); + if self.raise_log_level { + gst::debug!(CAT, obj: &self.element, "Forwarding buffer"); + } else { + gst::trace!(CAT, obj: &self.element, "Forwarding buffer"); + } + let ok = test_src.src_pad.push(buffer).await?; self.buffer_count += 1; if self.num_buffers.opt_eq(self.buffer_count).unwrap_or(false) { + if self.raise_log_level { + gst::debug!(CAT, obj: &self.element, "Pushing EOS"); + } else { + gst::trace!(CAT, obj: &self.element, "Pushing EOS"); + } + + let test_src = self.element.imp(); + if !test_src.src_pad.push_event(gst::event::Eos::new()).await { + gst::error!(CAT, obj: &self.element, "Error pushing EOS"); + } return Err(gst::FlowError::Eos); } @@ -266,7 +316,12 @@ pub struct TestSrc { impl TestSrc { fn prepare(&self, element: &super::TestSrc) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, obj: element, "Preparing"); + let raise_log_level = self.settings.lock().unwrap().raise_log_level; + if raise_log_level { + gst::debug!(CAT, obj: element, "Preparing"); + } else { + gst::trace!(CAT, obj: element, "Preparing"); + } let settings = self.settings.lock().unwrap(); let context = @@ -282,35 +337,86 @@ impl TestSrc { .prepare(SrcTask::new(element.clone()), context) .block_on()?; - gst::debug!(CAT, obj: element, "Prepared"); + if raise_log_level { + gst::debug!(CAT, obj: element, "Prepared"); + } else { + gst::trace!(CAT, obj: element, "Prepared"); + } Ok(()) } fn unprepare(&self, element: &super::TestSrc) { - gst::debug!(CAT, obj: element, "Unpreparing"); + let raise_log_level = self.settings.lock().unwrap().raise_log_level; + if raise_log_level { + gst::debug!(CAT, obj: element, "Unpreparing"); + } else { + gst::trace!(CAT, obj: element, "Unpreparing"); + } + self.task.unprepare().block_on().unwrap(); - gst::debug!(CAT, obj: element, "Unprepared"); + + if raise_log_level { + gst::debug!(CAT, obj: element, "Unprepared"); + } else { + gst::trace!(CAT, obj: element, "Unprepared"); + } } fn stop(&self, element: &super::TestSrc) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, obj: element, "Stopping"); + let raise_log_level = self.settings.lock().unwrap().raise_log_level; + if raise_log_level { + gst::debug!(CAT, obj: element, "Stopping"); + } else { + gst::trace!(CAT, obj: element, "Stopping"); + } + self.task.stop().block_on()?; - gst::debug!(CAT, obj: element, "Stopped"); + + if raise_log_level { + gst::debug!(CAT, obj: element, "Stopped"); + } else { + gst::trace!(CAT, obj: element, "Stopped"); + } + Ok(()) } fn start(&self, element: &super::TestSrc) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, obj: element, "Starting"); + let raise_log_level = self.settings.lock().unwrap().raise_log_level; + if raise_log_level { + gst::debug!(CAT, obj: element, "Starting"); + } else { + gst::trace!(CAT, obj: element, "Starting"); + } + self.task.start().block_on()?; - gst::debug!(CAT, obj: element, "Started"); + + if raise_log_level { + gst::debug!(CAT, obj: element, "Started"); + } else { + gst::trace!(CAT, obj: element, "Started"); + } + Ok(()) } fn pause(&self, element: &super::TestSrc) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, obj: element, "Pausing"); + let raise_log_level = self.settings.lock().unwrap().raise_log_level; + if raise_log_level { + gst::debug!(CAT, obj: element, "Pausing"); + } else { + gst::trace!(CAT, obj: element, "Pausing"); + } + self.task.pause().block_on()?; - gst::debug!(CAT, obj: element, "Paused"); + + if raise_log_level { + gst::debug!(CAT, obj: element, "Paused"); + } else { + gst::trace!(CAT, obj: element, "Paused"); + } + Ok(()) } } @@ -348,6 +454,16 @@ impl ObjectImpl for TestSrc { .maximum(1000) .default_value(DEFAULT_CONTEXT_WAIT.as_millis() as u32) .build(), + glib::ParamSpecUInt::builder("push-period") + .nick("Buffer Push Period") + .blurb("Push a new buffer every this many ms") + .default_value(DEFAULT_PUSH_PERIOD.mseconds() as u32) + .build(), + glib::ParamSpecBoolean::builder("raise-log-level") + .nick("Raise log level") + .blurb("Raises the log level so that this element stands out") + .write_only() + .build(), glib::ParamSpecInt::builder("num-buffers") .nick("Num Buffers") .blurb("Number of buffers to output before sending EOS (-1 = unlimited)") @@ -380,9 +496,17 @@ impl ObjectImpl for TestSrc { value.get::().expect("type checked upstream").into(), ); } + "push-period" => { + settings.push_period = gst::ClockTime::from_mseconds( + value.get::().expect("type checked upstream").into(), + ); + } + "raise-log-level" => { + settings.raise_log_level = value.get::().expect("type checked upstream"); + } "num-buffers" => { let value = value.get::().expect("type checked upstream"); - settings.num_buffers = if value > 0 { Some(value) } else { None }; + settings.num_buffers = if value > 0 { Some(value as u32) } else { None }; } _ => unimplemented!(), } @@ -393,7 +517,13 @@ impl ObjectImpl for TestSrc { match pspec.name() { "context" => settings.context.to_value(), "context-wait" => (settings.context_wait.as_millis() as u32).to_value(), - "num-buffers" => settings.num_buffers.unwrap_or(-1).to_value(), + "push-period" => (settings.push_period.mseconds() as u32).to_value(), + "raise-log-level" => settings.raise_log_level.to_value(), + "num-buffers" => settings + .num_buffers + .and_then(|val| val.try_into().ok()) + .unwrap_or(-1i32) + .to_value(), _ => unimplemented!(), } } diff --git a/generic/threadshare/src/runtime/executor/timer.rs b/generic/threadshare/src/runtime/executor/timer.rs index e08ebf39..079c7a9e 100644 --- a/generic/threadshare/src/runtime/executor/timer.rs +++ b/generic/threadshare/src/runtime/executor/timer.rs @@ -83,6 +83,16 @@ impl Timer { Timer::interval_at(Instant::now() + period, period) } + /// Creates a timer that emits events periodically, starting after `delay`. + /// + /// When throttling is activated (i.e. when using a non-`0` `wait` + /// duration in `Context::acquire`), timer entries are assigned to + /// the nearest time frame, meaning that the delay might elapse + /// `wait` / 2 ms earlier or later than the expected instant. + pub fn interval_delayed_by(delay: Duration, period: Duration) -> Timer { + Timer::interval_at(Instant::now() + delay, period) + } + /// Creates a timer that emits events periodically, starting at `start`. /// /// When throttling is activated (i.e. when using a non-`0` `wait`