threadshare: standalone examples: more improvements

* Only set `num-buffers` on the main source (the one that logs statistics).
  Previously all sources would push `num-buffers` so the last 5s were excluded
  from stats because some elements would start stopping before the main source
  which would cause outliers. All sources now keep pushing buffers until the
  main has logged the global stats & request the pipeline to shut down.
* Log when all sinks have received the Segment event. This helps identify
  deadlocks in some branches. When EOS is received a log warns if not all
  sinks notified they received the Segment event.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2518>
This commit is contained in:
François Laignel 2025-08-28 18:16:08 +02:00
parent d976d2e376
commit f553bd6495
8 changed files with 137 additions and 153 deletions

View file

@ -68,7 +68,8 @@ fn main() {
.property("context", &ctx_name)
.property("context-wait", args.wait)
.property("push-period", args.push_period)
.property("num-buffers", args.num_buffers)
.property("main-elem", i == 0)
.property_if("num-buffers", args.num_buffers, i == 0)
.build()
.unwrap();
@ -90,21 +91,15 @@ fn main() {
.unwrap();
if i == 0 {
src.set_property("main-elem", true);
sink.set_property("main-elem", true);
if !args.disable_stats_log {
// Don't use the last 5 secs in stats
// otherwise we get outliers when reaching EOS.
// Note that stats don't start before the 20 first seconds
// Stats don't start before the 20 first seconds
// and we get 50 buffers per sec.
const BUFFERS_BEFORE_LOGS: i32 = 20 * 50;
const BUFFERS_TO_SKIP: i32 = BUFFERS_BEFORE_LOGS + 5 * 50;
if args.num_buffers > BUFFERS_TO_SKIP {
if args.num_buffers > BUFFERS_BEFORE_LOGS {
sink.set_property("push-period", args.push_period);
sink.set_property("logs-stats", true);
let max_buffers = args.num_buffers - BUFFERS_TO_SKIP;
sink.set_property("max-buffers", max_buffers);
} else {
gst::warning!(CAT, "Not enough buffers to log, disabling stats");
}
@ -135,27 +130,33 @@ fn main() {
};
match msg.view() {
Eos(_) => {
// Actually, we don't post EOS (see sinks impl).
gst::info!(CAT, "Received eos");
l_clone.quit();
break;
}
Error(msg) => {
if let gst::MessageView::Error(msg) = msg.message().view() {
if msg.error().matches(gst::LibraryError::Shutdown) {
Application(app_msg) => {
let s = app_msg.structure().unwrap();
match s.name().as_str() {
"ts-standalone-sink/streaming" => {
if terminated_count.fetch_add(1, Ordering::SeqCst) == args.streams - 1 {
gst::info!(CAT, "Received all shutdown requests");
l_clone.quit();
break;
} else {
continue;
gst::info!(CAT, "Received streaming notification from all sinks");
}
}
}
"ts-standalone-sink/eos" => {
gst::info!(CAT, "Received eos");
let notifs = terminated_count.load(Ordering::SeqCst);
if notifs != args.streams {
gst::warning!(
CAT,
"Got {notifs} streaming notifications, expected {}",
args.streams
);
}
l_clone.quit();
break;
}
_ => gst::warning!(CAT, "Unknown {msg:?}"),
}
}
Error(msg) => {
gst::error!(
CAT,
"Error from {:?}: {} ({:?})",

View file

@ -71,7 +71,7 @@ fn main() {
.property("is-live", true)
.property("context", &ctx_name)
.property("context-wait", args.wait)
.property("num-buffers", args.num_buffers)
.property_if("num-buffers", args.num_buffers, i == 0)
.build()
.unwrap();
@ -141,19 +141,14 @@ fn main() {
sink.set_property("main-elem", true);
if !args.disable_stats_log {
// Don't use the last 5 secs in stats
// otherwise we get outliers when reaching EOS.
// Note that stats don't start before the 20 first seconds
// Stats don't start before the 20 first seconds
// and we get 50 buffers per sec.
const BUFFERS_BEFORE_LOGS: i32 = 20 * 50;
const BUFFERS_TO_SKIP: i32 = BUFFERS_BEFORE_LOGS + 5 * 50;
let expected_buffers =
(args.num_buffers as f32 * (1.0f32 - DROP_PROBABILITY)) as i32;
if expected_buffers > BUFFERS_TO_SKIP {
if expected_buffers > BUFFERS_BEFORE_LOGS {
sink.set_property("push-period", args.push_period);
sink.set_property("logs-stats", true);
let max_buffers = expected_buffers - BUFFERS_TO_SKIP;
sink.set_property("max-buffers", max_buffers);
} else {
gst::warning!(CAT, "Not enough buffers to log, disabling stats");
}
@ -200,27 +195,33 @@ fn main() {
};
match msg.view() {
Eos(_) => {
// Actually, we don't post EOS (see sinks impl).
gst::info!(CAT, "Received eos");
l_clone.quit();
break;
}
Error(msg) => {
if let gst::MessageView::Error(msg) = msg.message().view() {
if msg.error().matches(gst::LibraryError::Shutdown) {
Application(app_msg) => {
let s = app_msg.structure().unwrap();
match s.name().as_str() {
"ts-standalone-sink/streaming" => {
if terminated_count.fetch_add(1, Ordering::SeqCst) == args.streams - 1 {
gst::info!(CAT, "Received all shutdown requests");
l_clone.quit();
break;
} else {
continue;
gst::info!(CAT, "Received streaming notification from all sinks");
}
}
}
"ts-standalone-sink/eos" => {
gst::info!(CAT, "Received eos");
let notifs = terminated_count.load(Ordering::SeqCst);
if notifs != args.streams {
gst::warning!(
CAT,
"Got {notifs} streaming notifications, expected {}",
args.streams
);
}
l_clone.quit();
break;
}
_ => gst::warning!(CAT, "Unknown {msg:?}"),
}
}
Error(msg) => {
gst::error!(
CAT,
"Error from {:?}: {} ({:?})",
@ -250,25 +251,15 @@ fn main() {
}
});
gst::info!(CAT, "Switching to Ready");
let start = Instant::now();
pipeline.set_state(gst::State::Ready).unwrap();
gst::info!(CAT, "Switching to Ready took {:.2?}", start.elapsed());
gst::info!(CAT, "Switching to Playing");
let start = Instant::now();
pipeline.set_state(gst::State::Playing).unwrap();
gst::info!(CAT, "Switching to Playing took {:.2?}", start.elapsed());
l.run();
gst::info!(CAT, "Switching to Ready");
let stop = Instant::now();
pipeline.set_state(gst::State::Ready).unwrap();
gst::info!(CAT, "Switching to Ready took {:.2?}", stop.elapsed());
gst::info!(CAT, "Shutting down");
let stop = Instant::now();
pipeline.set_state(gst::State::Null).unwrap();
gst::info!(CAT, "Shutting down took {:.2?}", stop.elapsed());
}

View file

@ -113,18 +113,17 @@ impl PadSinkHandler for AsyncPadSinkHandler {
) -> bool {
match event.view() {
EventView::Eos(_) => {
{
let mut inner = self.0.lock().await;
debug_or_trace!(CAT, inner.is_main_elem, obj = elem, "EOS");
inner.is_flushing = true;
let is_main_elem = elem.imp().settings.lock().unwrap().is_main_elem;
if is_main_elem {
gst::info!(CAT, obj = elem, "EOS");
let _ = elem.post_message(
gst::message::Application::builder(gst::Structure::new_empty(
"ts-standalone-sink/eos",
))
.src(&elem)
.build(),
);
}
// When each element sends its own EOS message,
// it takes ages for the pipeline to process all of them.
// Let's just post an error message and let main shuts down
// after all streams have posted this message.
let _ =
elem.post_message(gst::message::Error::new(gst::LibraryError::Shutdown, "EOS"));
}
EventView::FlushStop(_) => {
self.0.lock().await.is_flushing = false;
@ -132,6 +131,13 @@ impl PadSinkHandler for AsyncPadSinkHandler {
EventView::Segment(evt) => {
self.0.lock().await.segment =
evt.segment().downcast_ref::<gst::ClockTime>().cloned();
let _ = elem.post_message(
gst::message::Application::builder(gst::Structure::new_empty(
"ts-standalone-sink/streaming",
))
.src(&elem)
.build(),
);
}
EventView::SinkMessage(evt) => {
let _ = elem.post_message(evt.message());
@ -176,6 +182,9 @@ impl AsyncPadSinkHandler {
fn stop(&self) {
futures::executor::block_on(async move {
let mut inner = self.0.lock().await;
if let Some(ref mut stats) = inner.stats {
stats.log_global();
}
inner.is_flushing = true;
});
}
@ -193,10 +202,7 @@ impl AsyncMutexSink {
let settings = self.settings.lock().unwrap();
debug_or_trace!(CAT, settings.is_main_elem, imp = self, "Preparing");
let stats = if settings.logs_stats {
Some(Stats::new(
settings.max_buffers,
settings.push_period + settings.context_wait / 2,
))
Some(Stats::new(settings.push_period + settings.context_wait / 2))
} else {
None
};

View file

@ -6,7 +6,6 @@ use std::time::Duration;
const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_millis(20);
const DEFAULT_PUSH_PERIOD: Duration = Duration::from_millis(20);
const DEFAULT_MAX_BUFFERS: i32 = 50 * (100 - 25);
#[derive(Debug, Clone)]
pub struct Settings {
@ -15,7 +14,6 @@ pub struct Settings {
pub is_main_elem: bool,
pub logs_stats: bool,
pub push_period: Duration,
pub max_buffers: Option<u32>,
}
impl Default for Settings {
@ -26,7 +24,6 @@ impl Default for Settings {
is_main_elem: false,
logs_stats: false,
push_period: DEFAULT_PUSH_PERIOD,
max_buffers: Some(DEFAULT_MAX_BUFFERS as u32),
}
}
}
@ -60,12 +57,6 @@ impl Settings {
.blurb("Push period used by `src` element (used for stats warnings)")
.default_value(DEFAULT_PUSH_PERIOD.as_millis() as u32)
.build(),
glib::ParamSpecInt::builder("max-buffers")
.nick("Max Buffers")
.blurb("Number of buffers to count before stopping stats (-1 = unlimited)")
.minimum(-1i32)
.default_value(DEFAULT_MAX_BUFFERS)
.build(),
]
}
@ -90,10 +81,6 @@ impl Settings {
"push-period" => {
self.push_period = Duration::from_millis(value.get::<u32>().unwrap().into());
}
"max-buffers" => {
let value = value.get::<i32>().unwrap();
self.max_buffers = if value > 0 { Some(value as u32) } else { None };
}
_ => unimplemented!(),
}
}
@ -104,11 +91,6 @@ impl Settings {
"context-wait" => (self.context_wait.as_millis() as u32).to_value(),
"main-elem" => self.is_main_elem.to_value(),
"push-period" => (self.push_period.as_millis() as u32).to_value(),
"max-buffers" => self
.max_buffers
.and_then(|val| val.try_into().ok())
.unwrap_or(-1i32)
.to_value(),
_ => unimplemented!(),
}
}

View file

@ -1,4 +1,3 @@
use gst::prelude::*;
use std::fmt::{self, Write};
use std::time::{Duration, Instant};
@ -14,7 +13,6 @@ pub struct Stats {
ramp_up_instant: Option<Instant>,
log_start_instant: Option<Instant>,
last_delta_instant: Option<Instant>,
max_buffers: Option<f32>,
buffer_count: f32,
buffer_count_delta: f32,
lateness_sum: f32,
@ -41,9 +39,8 @@ pub struct Stats {
}
impl Stats {
pub fn new(max_buffers: Option<u32>, interval_late_warn: Duration) -> Self {
pub fn new(interval_late_warn: Duration) -> Self {
Stats {
max_buffers: max_buffers.map(|max_buffers| max_buffers as f32),
interval_late_warn: interval_late_warn.as_nanos() as i64,
..Default::default()
}
@ -94,16 +91,7 @@ impl Stats {
}
}
use std::cmp::Ordering::*;
match self.max_buffers.opt_cmp(self.buffer_count) {
Some(Equal) => {
self.log_global();
self.buffer_count += 1.0;
false
}
Some(Less) => false,
_ => true,
}
true
}
pub fn add_buffer(&mut self, lateness: i64, interval: i64) {
@ -166,7 +154,7 @@ impl Stats {
gst::info!(
CAT,
"o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
"* interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
SignedDuration(interval_mean as i64),
Duration::from_nanos(interval_std_dev as u64),
SignedDuration(self.interval_min_delta),
@ -176,7 +164,7 @@ impl Stats {
if self.interval_late_count_delta > f32::EPSILON {
gst::warning!(
CAT,
"o {:5.2}% late buffers",
"* {:5.2}% late buffers",
100f32 * self.interval_late_count_delta / self.buffer_count_delta
);
}
@ -194,7 +182,7 @@ impl Stats {
gst::info!(
CAT,
"o lateness: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
"* lateness: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
SignedDuration(lateness_mean as i64),
Duration::from_nanos(lateness_std_dev as u64),
SignedDuration(self.lateness_min_delta),
@ -229,7 +217,7 @@ impl Stats {
Context::current().unwrap().parked_duration() - self.parked_duration_init;
gst::info!(
CAT,
"o parked: {parked_duration:4.2?} ({:5.2?}%)",
"* parked: {parked_duration:4.2?} ({:5.2?}%)",
(parked_duration.as_nanos() as f32 * 100.0 / duration.as_nanos() as f32)
);
}
@ -240,7 +228,7 @@ impl Stats {
gst::info!(
CAT,
"o interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
"* interval: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
SignedDuration(interval_mean as i64),
Duration::from_nanos(interval_std_dev as u64),
SignedDuration(self.interval_min),
@ -250,7 +238,7 @@ impl Stats {
if self.interval_late_count > f32::EPSILON {
gst::warning!(
CAT,
"o {:5.2}% late buffers",
"* {:5.2}% late buffers",
100f32 * self.interval_late_count / self.buffer_count
);
}
@ -261,7 +249,7 @@ impl Stats {
gst::info!(
CAT,
"o lateness: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
"* lateness: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
SignedDuration(lateness_mean as i64),
Duration::from_nanos(lateness_std_dev as u64),
SignedDuration(self.lateness_min),

View file

@ -112,18 +112,17 @@ impl PadSinkHandler for SyncPadSinkHandler {
) -> bool {
match event.view() {
EventView::Eos(_) => {
{
let mut inner = self.0.lock().unwrap();
debug_or_trace!(CAT, inner.is_main_elem, obj = elem, "EOS");
inner.is_flushing = true;
let is_main_elem = elem.imp().settings.lock().unwrap().is_main_elem;
if is_main_elem {
gst::info!(CAT, obj = elem, "EOS");
let _ = elem.post_message(
gst::message::Application::builder(gst::Structure::new_empty(
"ts-standalone-sink/eos",
))
.src(&elem)
.build(),
);
}
// When each element sends its own EOS message,
// it takes ages for the pipeline to process all of them.
// Let's just post an error message and let main shuts down
// after all streams have posted this message.
let _ =
elem.post_message(gst::message::Error::new(gst::LibraryError::Shutdown, "EOS"));
}
EventView::FlushStop(_) => {
self.0.lock().unwrap().is_flushing = false;
@ -131,6 +130,13 @@ impl PadSinkHandler for SyncPadSinkHandler {
EventView::Segment(evt) => {
self.0.lock().unwrap().segment =
evt.segment().downcast_ref::<gst::ClockTime>().cloned();
let _ = elem.post_message(
gst::message::Application::builder(gst::Structure::new_empty(
"ts-standalone-sink/streaming",
))
.src(&elem)
.build(),
);
}
EventView::SinkMessage(evt) => {
let _ = elem.post_message(evt.message());
@ -170,6 +176,9 @@ impl SyncPadSinkHandler {
fn stop(&self) {
let mut inner = self.0.lock().unwrap();
if let Some(ref mut stats) = inner.stats {
stats.log_global();
}
inner.is_flushing = true;
}
}
@ -186,10 +195,7 @@ impl DirectSink {
let settings = self.settings.lock().unwrap();
debug_or_trace!(CAT, settings.is_main_elem, imp = self, "Preparing");
let stats = if settings.logs_stats {
Some(Stats::new(
settings.max_buffers,
settings.push_period + settings.context_wait / 2,
))
Some(Stats::new(settings.push_period + settings.context_wait / 2))
} else {
None
};

View file

@ -55,24 +55,34 @@ impl PadSinkHandler for TaskPadSinkHandler {
) -> bool {
let sender = elem.imp().clone_item_sender();
match event.view() {
EventView::Segment(_) => {
let _ = sender.send_async(StreamItem::Event(event)).await;
}
EventView::Eos(_) => {
let is_main_elem = elem.imp().settings.lock().unwrap().is_main_elem;
debug_or_trace!(CAT, is_main_elem, obj = elem, "EOS");
if is_main_elem {
gst::info!(CAT, obj = elem, "EOS");
// When each element sends its own EOS message,
// it takes ages for the pipeline to process all of them.
// Let's just post an error message and let main shuts down
// after all streams have posted this message.
let _ =
elem.post_message(gst::message::Error::new(gst::LibraryError::Shutdown, "EOS"));
let _ = elem.post_message(
gst::message::Application::builder(gst::Structure::new_empty(
"ts-standalone-sink/eos",
))
.src(&elem)
.build(),
);
}
}
EventView::FlushStop(_) => {
let imp = elem.imp();
return imp.task.flush_stop().block_on_or_add_subtask(&pad).is_ok();
}
EventView::Segment(_) => {
let _ = sender.send_async(StreamItem::Event(event)).await;
let _ = elem.post_message(
gst::message::Application::builder(gst::Structure::new_empty(
"ts-standalone-sink/streaming",
))
.src(&elem)
.build(),
);
}
EventView::SinkMessage(evt) => {
let _ = elem.post_message(evt.message());
}
@ -119,7 +129,7 @@ impl TaskSinkTask {
fn flush(&mut self) {
// Purge the channel
while !self.item_receiver.is_empty() {}
while self.item_receiver.try_recv().is_ok() {}
}
}
@ -147,6 +157,9 @@ impl TaskImpl for TaskSinkTask {
async fn stop(&mut self) -> Result<(), gst::ErrorMessage> {
log_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Stopping Task");
if let Some(ref mut stats) = self.stats {
stats.log_global();
}
self.flush();
Ok(())
}
@ -223,7 +236,6 @@ impl TaskSink {
let settings = self.settings.lock().unwrap();
let stats = if settings.logs_stats {
Some(Box::new(Stats::new(
settings.max_buffers,
settings.push_period + settings.context_wait / 2,
)))
} else {

View file

@ -204,7 +204,14 @@ impl TaskImpl for SrcTask {
self.buffer_count += 1;
if self.num_buffers.opt_eq(self.buffer_count) == Some(true) {
if self.num_buffers.opt_eq(self.buffer_count).unwrap_or(false) {
gst::debug!(CAT, obj = self.elem, "Pushing EOS");
let imp = self.elem.imp();
if !imp.src_pad.push_event(gst::event::Eos::new()).await {
gst::error!(CAT, imp = imp, "Error pushing EOS");
}
return Err(gst::FlowError::Eos);
}
@ -213,16 +220,7 @@ impl TaskImpl for SrcTask {
async fn handle_loop_error(&mut self, err: gst::FlowError) -> task::Trigger {
match err {
gst::FlowError::Eos => {
debug_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Pushing EOS");
let imp = self.elem.imp();
if !imp.src_pad.push_event(gst::event::Eos::new()).await {
gst::error!(CAT, imp = imp, "Error pushing EOS");
}
task::Trigger::Stop
}
gst::FlowError::Eos => task::Trigger::Stop,
gst::FlowError::Flushing => {
debug_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Flushing");