threadshare: examples: add a new benchmark program

This new example is built upon the existing `ts-standalone`:

* `ts-standalone` allows focusing on the overhead of threadshare specific
  constructs. It uses a very light source element because buffer content is not
  the concern in this case.
* `ts-standalone-rtprecv` aims at simulating a closer to real world use case:
  it uses `ts-audiotestsrc` and `rtprecv`. `rtprecv` can push buffers either
  from the upstream thread or from a blocking thread. Note that this example
  intentionally drops buffers so `rtprecv` can exhibit both behaviours.

Reminders:

* Use `GST_DEBUG=ts-standalone*:4` to display statistics.
* Compile with `--release`.
* Compile with `--feature tunning` to get stats about the duration the
  Throttling Scheduler at the sink spent parked.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2496>
This commit is contained in:
François Laignel 2025-08-26 20:19:20 +02:00
parent 0f87d52d0d
commit 3fec94a234
9 changed files with 367 additions and 78 deletions

1
Cargo.lock generated
View file

@ -3427,6 +3427,7 @@ dependencies = [
"futures",
"getifaddrs",
"gio",
"gst-plugin-rtp",
"gst-plugin-version-helper",
"gstreamer",
"gstreamer-app",

View file

@ -41,6 +41,7 @@ gst-check.workspace = true
gst-app = { workspace = true, features = [ "v1_20" ] }
# Used by examples
clap = { version = "4", features = ["derive"] }
gst-plugin-rtp = { path = "../../net/rtp" }
[lib]
name = "gstthreadshare"
@ -63,6 +64,10 @@ path = "examples/tcpclientsrc_benchmark_sender.rs"
name = "ts-standalone"
path = "examples/standalone/main.rs"
[[example]]
name = "ts-standalone-rtprecv"
path = "examples/standalone/rtprecv.rs"
[[example]]
name = "ts-inter-simple"
path = "examples/inter/simple.rs"

View file

@ -50,7 +50,7 @@ pub struct Args {
pub num_buffers: i32,
/// The Sink variant to use.
#[clap(long, value_enum, default_value_t = Sink::SyncMutex)]
#[clap(long, value_enum, default_value_t = Sink::Task)]
pub sink: Sink,
/// Disables statistics logging.

View file

@ -0,0 +1,274 @@
use gst::glib;
use std::sync::LazyLock;
mod args;
use args::*;
#[macro_use]
mod macros;
mod sink;
mod src;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
const DROP_PROBABILITY: f32 = 0.125f32;
const RTPRECV_LATENCY_MS: u32 = 40;
static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
gst::DebugCategory::new(
"ts-standalone-rtprecv",
gst::DebugColorFlags::empty(),
Some("Thread-sharing standalone rtprecv test"),
)
});
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
sink::async_mutex::register(plugin)?;
sink::sync_mutex::register(plugin)?;
sink::task::register(plugin)?;
Ok(())
}
gst::plugin_define!(
threadshare_standalone_test,
env!("CARGO_PKG_DESCRIPTION"),
plugin_init,
concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")),
"LGPL",
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_REPOSITORY"),
env!("BUILD_REL_DATE")
);
fn main() {
use gst::prelude::*;
use std::time::Instant;
gst::init().unwrap();
gstthreadshare::plugin_register_static().unwrap();
gstrsrtp::plugin_register_static().unwrap();
self::plugin_register_static().unwrap();
let main_context = glib::MainContext::default();
let _guard = main_context.acquire().unwrap();
#[cfg(debug_assertions)]
gst::warning!(CAT, "RUNNING DEBUG BUILD");
let args = args();
let pipeline = gst::Pipeline::default();
for i in 0..args.streams {
let ctx_name = format!("standalone {}", i % args.groups);
let src = gst::ElementFactory::make("ts-audiotestsrc")
.name(format!("src-{i}").as_str())
.property("is-live", true)
.property("context", &ctx_name)
.property("context-wait", args.wait)
.property("num-buffers", args.num_buffers)
.build()
.unwrap();
let queue = gst::ElementFactory::make("ts-queue")
.name(format!("queue-src-{i}").as_str())
.property("context", &ctx_name)
.property("context-wait", args.wait)
.property("max-size-buffers", 10u32)
.property("max-size-bytes", 0u32)
.property("max-size-time", 0u64)
.build()
.unwrap();
let convert = gst::ElementFactory::make("audioconvert")
.name(format!("convert-{i}").as_str())
.build()
.unwrap();
let pay = gst::ElementFactory::make("rtpL16pay")
.name(format!("pay-{i}").as_str())
.build()
.unwrap();
let dropper = gst::ElementFactory::make("identity")
.name(format!("dropper-{i}").as_str())
.property("drop-probability", DROP_PROBABILITY)
.build()
.unwrap();
let rtprecv = gst::ElementFactory::make("rtprecv")
.name(format!("rtprecv-{i}").as_str())
.property("latency", RTPRECV_LATENCY_MS)
.build()
.unwrap();
rtprecv.connect_pad_added(move |elem, pad| {
if pad.direction() != gst::PadDirection::Src {
return;
}
let depay = gst::ElementFactory::make("rtpL16depay")
.name(format!("depay-{i}").as_str())
.build()
.unwrap();
let queue = gst::ElementFactory::make("ts-queue")
.name(format!("queue-sink-{i}").as_str())
.property("context", &ctx_name)
.property("context-wait", args.wait)
.property("max-size-buffers", 0u32)
.property("max-size-bytes", 0u32)
.property(
"max-size-time",
(20u64 + RTPRECV_LATENCY_MS as u64).mseconds(),
)
.build()
.unwrap();
let sink = gst::ElementFactory::make(args.sink.element_name())
.name(format!("sink-{i}").as_str())
.property("context", &ctx_name)
.property("context-wait", args.wait)
.build()
.unwrap();
if i == 0 {
sink.set_property("main-elem", true);
if !args.disable_stats_log {
// Don't use the last 5 secs in stats
// otherwise we get outliers when reaching EOS.
// Note that stats don't start before the 20 first seconds
// and we get 50 buffers per sec.
const BUFFERS_BEFORE_LOGS: i32 = 20 * 50;
const BUFFERS_TO_SKIP: i32 = BUFFERS_BEFORE_LOGS + 5 * 50;
let expected_buffers =
(args.num_buffers as f32 * (1.0f32 - DROP_PROBABILITY)) as i32;
if expected_buffers > BUFFERS_TO_SKIP {
sink.set_property("push-period", args.push_period);
sink.set_property("logs-stats", true);
let max_buffers = expected_buffers - BUFFERS_TO_SKIP;
sink.set_property("max-buffers", max_buffers);
} else {
gst::warning!(CAT, "Not enough buffers to log, disabling stats");
}
}
}
let elements = &[&depay, &queue, &sink];
elem.parent()
.unwrap()
.downcast_ref::<gst::Bin>()
.unwrap()
.add_many(elements)
.unwrap();
pad.link(&depay.static_pad("sink").unwrap()).unwrap();
gst::Element::link_many(elements).unwrap();
sink.sync_state_with_parent().unwrap();
queue.sync_state_with_parent().unwrap();
depay.sync_state_with_parent().unwrap();
});
let elements = &[&src, &queue, &convert, &pay, &dropper, &rtprecv];
pipeline.add_many(elements).unwrap();
gst::Element::link_many(elements).unwrap();
}
let l = glib::MainLoop::new(None, false);
let bus = pipeline.bus().unwrap();
let mut bus_stream = bus.stream();
let pipeline_weak = pipeline.downgrade();
let l_clone = l.clone();
main_context.spawn_local(async move {
use futures::prelude::*;
let terminated_count = Arc::new(AtomicU32::new(0));
while let Some(msg) = bus_stream.next().await {
use gst::MessageView::*;
let Some(pipeline) = pipeline_weak.upgrade() else {
break;
};
match msg.view() {
Eos(_) => {
// Actually, we don't post EOS (see sinks impl).
gst::info!(CAT, "Received eos");
l_clone.quit();
break;
}
Error(msg) => {
if let gst::MessageView::Error(msg) = msg.message().view() {
if msg.error().matches(gst::LibraryError::Shutdown) {
if terminated_count.fetch_add(1, Ordering::SeqCst) == args.streams - 1 {
gst::info!(CAT, "Received all shutdown requests");
l_clone.quit();
break;
} else {
continue;
}
}
}
gst::error!(
CAT,
"Error from {:?}: {} ({:?})",
msg.src().map(|s| s.name()),
msg.error(),
msg.debug()
);
l_clone.quit();
break;
}
Latency(msg) => {
gst::info!(
CAT,
"Latency requirements have changed for element {}",
msg.src()
.map(|src| src.name())
.as_deref()
.unwrap_or("UNKNOWN"),
);
if let Err(err) = pipeline.recalculate_latency() {
gst::error!(CAT, "Error recalculating latency: {err}");
}
}
_ => (),
}
}
});
gst::info!(CAT, "Switching to Ready");
let start = Instant::now();
pipeline.set_state(gst::State::Ready).unwrap();
gst::info!(CAT, "Switching to Ready took {:.2?}", start.elapsed());
gst::info!(CAT, "Switching to Playing");
let start = Instant::now();
pipeline.set_state(gst::State::Playing).unwrap();
gst::info!(CAT, "Switching to Playing took {:.2?}", start.elapsed());
l.run();
gst::info!(CAT, "Switching to Ready");
let stop = Instant::now();
pipeline.set_state(gst::State::Ready).unwrap();
gst::info!(CAT, "Switching to Ready took {:.2?}", stop.elapsed());
gst::info!(CAT, "Shutting down");
let stop = Instant::now();
pipeline.set_state(gst::State::Null).unwrap();
gst::info!(CAT, "Shutting down took {:.2?}", stop.elapsed());
}

View file

@ -17,7 +17,6 @@ use gstthreadshare::runtime::executor::block_on_or_add_subtask;
use gstthreadshare::runtime::{prelude::*, PadSink};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use super::super::{Settings, Stats, CAT};
@ -26,7 +25,7 @@ struct PadSinkHandlerInner {
is_flushing: bool,
is_main_elem: bool,
last_ts: Option<gst::ClockTime>,
segment_start: Option<gst::ClockTime>,
segment: Option<gst::FormattedSegment<gst::format::Time>>,
stats: Option<Box<Stats>>,
}
@ -49,17 +48,17 @@ impl PadSinkHandlerInner {
debug_or_trace!(CAT, self.is_main_elem, obj = elem, "Received {buffer:?}");
let ts = buffer
.dts_or_pts()
.expect("Buffer without ts")
// FIXME do proper segment to running time
.checked_sub(self.segment_start.expect("Buffer without Time Segment"))
.expect("ts before Segment start");
let ts = self
.segment
.as_ref()
.expect("Buffer without Time Segment")
.to_running_time(buffer.dts_or_pts().expect("Buffer without ts"))
.unwrap();
if let Some(last_ts) = self.last_ts {
let rt = elem.current_running_time().unwrap();
let lateness: Duration = (rt - ts).into();
let interval: Duration = (ts - last_ts).into();
let lateness = rt.nseconds() as i64 - ts.nseconds() as i64;
let interval = ts.nseconds() as i64 - last_ts.nseconds() as i64;
if let Some(stats) = self.stats.as_mut() {
stats.add_buffer(lateness, interval);
@ -131,9 +130,8 @@ impl PadSinkHandler for AsyncPadSinkHandler {
self.0.lock().await.is_flushing = false;
}
EventView::Segment(evt) => {
if let Some(time_seg) = evt.segment().downcast_ref::<gst::ClockTime>() {
self.0.lock().await.segment_start = time_seg.start();
}
self.0.lock().await.segment =
evt.segment().downcast_ref::<gst::ClockTime>().cloned();
}
EventView::SinkMessage(evt) => {
let _ = elem.post_message(evt.message());

View file

@ -1,4 +1,5 @@
use gst::prelude::*;
use std::fmt::{self, Write};
use std::time::{Duration, Instant};
#[cfg(feature = "tuning")]
@ -20,19 +21,19 @@ pub struct Stats {
lateness_square_sum: f32,
lateness_sum_delta: f32,
lateness_square_sum_delta: f32,
lateness_min: Duration,
lateness_min_delta: Duration,
lateness_max: Duration,
lateness_max_delta: Duration,
lateness_min: i64,
lateness_min_delta: i64,
lateness_max: i64,
lateness_max_delta: i64,
interval_sum: f32,
interval_square_sum: f32,
interval_sum_delta: f32,
interval_square_sum_delta: f32,
interval_min: Duration,
interval_min_delta: Duration,
interval_max: Duration,
interval_max_delta: Duration,
interval_late_warn: Duration,
interval_min: i64,
interval_min_delta: i64,
interval_max: i64,
interval_max_delta: i64,
interval_late_warn: i64,
interval_late_count: f32,
interval_late_count_delta: f32,
#[cfg(feature = "tuning")]
@ -43,7 +44,7 @@ impl Stats {
pub fn new(max_buffers: Option<u32>, interval_late_warn: Duration) -> Self {
Stats {
max_buffers: max_buffers.map(|max_buffers| max_buffers as f32),
interval_late_warn,
interval_late_warn: interval_late_warn.as_nanos() as i64,
..Default::default()
}
}
@ -55,18 +56,18 @@ impl Stats {
self.lateness_square_sum = 0.0;
self.lateness_sum_delta = 0.0;
self.lateness_square_sum_delta = 0.0;
self.lateness_min = Duration::MAX;
self.lateness_min_delta = Duration::MAX;
self.lateness_max = Duration::ZERO;
self.lateness_max_delta = Duration::ZERO;
self.lateness_min = i64::MAX;
self.lateness_min_delta = i64::MAX;
self.lateness_max = i64::MIN;
self.lateness_max_delta = i64::MIN;
self.interval_sum = 0.0;
self.interval_square_sum = 0.0;
self.interval_sum_delta = 0.0;
self.interval_square_sum_delta = 0.0;
self.interval_min = Duration::MAX;
self.interval_min_delta = Duration::MAX;
self.interval_max = Duration::ZERO;
self.interval_max_delta = Duration::ZERO;
self.interval_min = i64::MAX;
self.interval_min_delta = i64::MAX;
self.interval_max = i64::MIN;
self.interval_max_delta = i64::MIN;
self.interval_late_count = 0.0;
self.interval_late_count_delta = 0.0;
self.last_delta_instant = None;
@ -105,7 +106,7 @@ impl Stats {
}
}
pub fn add_buffer(&mut self, lateness: Duration, interval: Duration) {
pub fn add_buffer(&mut self, lateness: i64, interval: i64) {
if !self.is_active() {
return;
}
@ -114,7 +115,7 @@ impl Stats {
self.buffer_count_delta += 1.0;
// Lateness
let lateness_f32 = lateness.as_nanos() as f32;
let lateness_f32 = lateness as f32;
let lateness_square = lateness_f32.powi(2);
self.lateness_sum += lateness_f32;
@ -128,7 +129,7 @@ impl Stats {
self.lateness_max_delta = self.lateness_max_delta.max(lateness);
// Interval
let interval_f32 = interval.as_nanos() as f32;
let interval_f32 = interval as f32;
let interval_square = interval_f32.powi(2);
self.interval_sum += interval_f32;
@ -166,10 +167,10 @@ impl Stats {
gst::info!(
CAT,
"o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
Duration::from_nanos(interval_mean as u64),
SignedDuration(interval_mean as i64),
Duration::from_nanos(interval_std_dev as u64),
self.interval_min_delta,
self.interval_max_delta,
SignedDuration(self.interval_min_delta),
SignedDuration(self.interval_max_delta),
);
if self.interval_late_count_delta > f32::EPSILON {
@ -182,8 +183,8 @@ impl Stats {
self.interval_sum_delta = 0.0;
self.interval_square_sum_delta = 0.0;
self.interval_min_delta = Duration::MAX;
self.interval_max_delta = Duration::ZERO;
self.interval_min_delta = i64::MAX;
self.interval_max_delta = i64::MIN;
self.interval_late_count_delta = 0.0;
let lateness_mean = self.lateness_sum_delta / self.buffer_count_delta;
@ -194,16 +195,16 @@ impl Stats {
gst::info!(
CAT,
"o lateness: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
Duration::from_nanos(lateness_mean as u64),
SignedDuration(lateness_mean as i64),
Duration::from_nanos(lateness_std_dev as u64),
self.lateness_min_delta,
self.lateness_max_delta,
SignedDuration(self.lateness_min_delta),
SignedDuration(self.lateness_max_delta),
);
self.lateness_sum_delta = 0.0;
self.lateness_square_sum_delta = 0.0;
self.lateness_min_delta = Duration::MAX;
self.lateness_max_delta = Duration::ZERO;
self.lateness_min_delta = i64::MAX;
self.lateness_max_delta = i64::MIN;
self.buffer_count_delta = 0.0;
}
@ -240,10 +241,10 @@ impl Stats {
gst::info!(
CAT,
"o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
Duration::from_nanos(interval_mean as u64),
SignedDuration(interval_mean as i64),
Duration::from_nanos(interval_std_dev as u64),
self.interval_min,
self.interval_max,
SignedDuration(self.interval_min),
SignedDuration(self.interval_max),
);
if self.interval_late_count > f32::EPSILON {
@ -261,10 +262,24 @@ impl Stats {
gst::info!(
CAT,
"o lateness: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
Duration::from_nanos(lateness_mean as u64),
SignedDuration(lateness_mean as i64),
Duration::from_nanos(lateness_std_dev as u64),
self.lateness_min,
self.lateness_max,
SignedDuration(self.lateness_min),
SignedDuration(self.lateness_max),
);
}
}
#[derive(Eq, PartialEq, Ord, PartialOrd, Hash)]
struct SignedDuration(i64);
impl fmt::Debug for SignedDuration {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.0.is_negative() {
f.write_char('-')?;
Duration::from_nanos(i64::abs(self.0) as u64).fmt(f)
} else {
Duration::from_nanos(self.0 as u64).fmt(f)
}
}
}

View file

@ -16,7 +16,6 @@ use std::sync::LazyLock;
use gstthreadshare::runtime::{prelude::*, PadSink};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use super::super::{Settings, Stats, CAT};
@ -25,7 +24,7 @@ struct PadSinkHandlerInner {
is_flushing: bool,
is_main_elem: bool,
last_ts: Option<gst::ClockTime>,
segment_start: Option<gst::ClockTime>,
segment: Option<gst::FormattedSegment<gst::format::Time>>,
stats: Option<Box<Stats>>,
}
@ -48,17 +47,17 @@ impl PadSinkHandlerInner {
debug_or_trace!(CAT, self.is_main_elem, obj = elem, "Received {buffer:?}");
let ts = buffer
.dts_or_pts()
.expect("Buffer without ts")
// FIXME do proper segment to running time
.checked_sub(self.segment_start.expect("Buffer without Time Segment"))
.expect("ts before Segment start");
let ts = self
.segment
.as_ref()
.expect("Buffer without Time Segment")
.to_running_time(buffer.dts_or_pts().expect("Buffer without ts"))
.unwrap();
if let Some(last_ts) = self.last_ts {
let rt = elem.current_running_time().unwrap();
let lateness: Duration = (rt - ts).into();
let interval: Duration = (ts - last_ts).into();
let lateness = rt.nseconds() as i64 - ts.nseconds() as i64;
let interval = ts.nseconds() as i64 - last_ts.nseconds() as i64;
if let Some(stats) = self.stats.as_mut() {
stats.add_buffer(lateness, interval);
@ -130,9 +129,8 @@ impl PadSinkHandler for SyncPadSinkHandler {
self.0.lock().unwrap().is_flushing = false;
}
EventView::Segment(evt) => {
if let Some(time_seg) = evt.segment().downcast_ref::<gst::ClockTime>() {
self.0.lock().unwrap().segment_start = time_seg.start();
}
self.0.lock().unwrap().segment =
evt.segment().downcast_ref::<gst::ClockTime>().cloned();
}
EventView::SinkMessage(evt) => {
let _ = elem.post_message(evt.message());

View file

@ -18,7 +18,6 @@ use gstthreadshare::runtime::prelude::*;
use gstthreadshare::runtime::{Context, PadSink, Task};
use std::sync::Mutex;
use std::time::Duration;
use super::super::{Settings, Stats, CAT};
@ -97,7 +96,7 @@ struct TaskSinkTask {
item_receiver: flume::Receiver<StreamItem>,
is_main_elem: bool,
last_ts: Option<gst::ClockTime>,
segment_start: Option<gst::ClockTime>,
segment: Option<gst::FormattedSegment<gst::format::Time>>,
stats: Option<Box<Stats>>,
}
@ -114,7 +113,7 @@ impl TaskSinkTask {
is_main_elem,
last_ts: None,
stats,
segment_start: None,
segment: None,
}
}
@ -161,17 +160,17 @@ impl TaskImpl for TaskSinkTask {
match item {
StreamItem::Buffer(buffer) => {
let ts = buffer
.dts_or_pts()
.expect("Buffer without ts")
// FIXME do proper segment to running time
.checked_sub(self.segment_start.expect("Buffer without Time Segment"))
.expect("dts before Segment start");
let ts = self
.segment
.as_ref()
.expect("Buffer without Time Segment")
.to_running_time(buffer.dts_or_pts().expect("Buffer without ts"))
.unwrap();
if let Some(last_ts) = self.last_ts {
let rt = self.elem.current_running_time().unwrap();
let lateness: Duration = (rt - ts).into();
let interval: Duration = (ts - last_ts).into();
let lateness = rt.nseconds() as i64 - ts.nseconds() as i64;
let interval = ts.nseconds() as i64 - last_ts.nseconds() as i64;
if let Some(stats) = self.stats.as_mut() {
stats.add_buffer(lateness, interval);
@ -197,9 +196,7 @@ impl TaskImpl for TaskSinkTask {
}
StreamItem::Event(evt) => {
if let EventView::Segment(evt) = evt.view() {
if let Some(time_seg) = evt.segment().downcast_ref::<gst::ClockTime>() {
self.segment_start = time_seg.start();
}
self.segment = evt.segment().downcast_ref::<gst::ClockTime>().cloned();
}
}
}

View file

@ -9,6 +9,7 @@ glib::wrapper! {
pub struct TestSrc(ObjectSubclass<imp::TestSrc>) @extends gst::Element, gst::Object;
}
#[allow(unused)]
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),