ts: standalone example: minor fixes

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2278>
This commit is contained in:
François Laignel 2025-06-04 20:21:26 +02:00 committed by GStreamer Marge Bot
parent 18723217fc
commit 7ed24eac0a
6 changed files with 85 additions and 64 deletions

View file

@ -104,46 +104,64 @@ fn main() {
let l = glib::MainLoop::new(None, false);
let bus = pipeline.bus().unwrap();
let terminated_count = Arc::new(AtomicU32::new(0));
let l_clone = l.clone();
let _bus_watch = bus
.add_watch(move |_, msg| {
use gst::MessageView;
match msg.view() {
MessageView::Eos(_) => {
// Actually, we don't post EOS (see sinks impl).
gst::info!(CAT, "Received eos");
l_clone.quit();
let _bus_watch = pipeline
.bus()
.unwrap()
.add_watch({
let terminated_count = Arc::new(AtomicU32::new(0));
let l = l.clone();
let pipeline = pipeline.clone();
move |_, msg| {
use gst::MessageView::*;
match msg.view() {
Eos(_) => {
// Actually, we don't post EOS (see sinks impl).
gst::info!(CAT, "Received eos");
l.quit();
glib::ControlFlow::Break
}
MessageView::Error(msg) => {
if let gst::MessageView::Error(msg) = msg.message().view() {
if msg.error().matches(gst::LibraryError::Shutdown) {
if terminated_count.fetch_add(1, Ordering::SeqCst) == args.streams - 1 {
return glib::ControlFlow::Break;
}
Error(msg) => {
if let gst::MessageView::Error(msg) = msg.message().view() {
if msg.error().matches(gst::LibraryError::Shutdown)
&& terminated_count.fetch_add(1, Ordering::SeqCst)
== args.streams - 1
{
gst::info!(CAT, "Received all shutdown requests");
l_clone.quit();
l.quit();
return glib::ControlFlow::Break;
} else {
return glib::ControlFlow::Continue;
}
}
gst::error!(
CAT,
"Error from {:?}: {} ({:?})",
msg.src().map(|s| s.path_string()),
msg.error(),
msg.debug()
);
l.quit();
return glib::ControlFlow::Break;
}
gst::error!(
CAT,
"Error from {:?}: {} ({:?})",
msg.src().map(|s| s.path_string()),
msg.error(),
msg.debug()
);
l_clone.quit();
glib::ControlFlow::Break
Latency(msg) => {
gst::info!(
CAT,
"Latency requirements have changed for element {}",
msg.src()
.map(|src| src.name())
.as_deref()
.unwrap_or("UNKNOWN"),
);
if let Err(err) = pipeline.recalculate_latency() {
gst::error!(CAT, "Error recalculating latency: {err}");
}
}
_ => (),
}
_ => glib::ControlFlow::Continue,
glib::ControlFlow::Continue
}
})
.expect("Failed to add bus watch");

View file

@ -28,7 +28,7 @@ use super::super::{Settings, Stats, CAT};
struct PadSinkHandlerInner {
is_flushing: bool,
is_main_elem: bool,
last_dts: Option<gst::ClockTime>,
last_ts: Option<gst::ClockTime>,
segment_start: Option<gst::ClockTime>,
stats: Option<Box<Stats>>,
}
@ -52,16 +52,17 @@ impl PadSinkHandlerInner {
debug_or_trace!(CAT, self.is_main_elem, obj = elem, "Received {buffer:?}");
let dts = buffer
.dts()
.expect("Buffer without dts")
let ts = buffer
.dts_or_pts()
.expect("Buffer without ts")
// FIXME do proper segment to running time
.checked_sub(self.segment_start.expect("Buffer without Time Segment"))
.expect("dts before Segment start");
.expect("ts before Segment start");
if let Some(last_dts) = self.last_dts {
if let Some(last_ts) = self.last_ts {
let cur_ts = elem.current_running_time().unwrap();
let latency: Duration = (cur_ts - dts).into();
let interval: Duration = (dts - last_dts).into();
let latency: Duration = (cur_ts - ts).into();
let interval: Duration = (ts - last_ts).into();
if let Some(stats) = self.stats.as_mut() {
stats.add_buffer(latency, interval);
@ -81,7 +82,7 @@ impl PadSinkHandlerInner {
);
}
self.last_dts = Some(dts);
self.last_ts = Some(ts);
log_or_trace!(CAT, self.is_main_elem, obj = elem, "Buffer processed");
@ -175,7 +176,7 @@ impl AsyncPadSinkHandler {
let mut inner = self.0.lock().await;
inner.is_flushing = false;
inner.last_dts = None;
inner.last_ts = None;
if let Some(stats) = inner.stats.as_mut() {
stats.start();

View file

@ -27,7 +27,7 @@ use super::super::{Settings, Stats, CAT};
struct PadSinkHandlerInner {
is_flushing: bool,
is_main_elem: bool,
last_dts: Option<gst::ClockTime>,
last_ts: Option<gst::ClockTime>,
segment_start: Option<gst::ClockTime>,
stats: Option<Box<Stats>>,
}
@ -51,16 +51,17 @@ impl PadSinkHandlerInner {
debug_or_trace!(CAT, self.is_main_elem, obj = elem, "Received {buffer:?}");
let dts = buffer
.dts()
.expect("Buffer without dts")
let ts = buffer
.dts_or_pts()
.expect("Buffer without ts")
// FIXME do proper segment to running time
.checked_sub(self.segment_start.expect("Buffer without Time Segment"))
.expect("dts before Segment start");
.expect("ts before Segment start");
if let Some(last_dts) = self.last_dts {
if let Some(last_ts) = self.last_ts {
let cur_ts = elem.current_running_time().unwrap();
let latency: Duration = (cur_ts - dts).into();
let interval: Duration = (dts - last_dts).into();
let latency: Duration = (cur_ts - ts).into();
let interval: Duration = (ts - last_ts).into();
if let Some(stats) = self.stats.as_mut() {
stats.add_buffer(latency, interval);
@ -80,7 +81,7 @@ impl PadSinkHandlerInner {
);
}
self.last_dts = Some(dts);
self.last_ts = Some(ts);
log_or_trace!(CAT, self.is_main_elem, obj = elem, "Buffer processed");
@ -171,7 +172,7 @@ impl SyncPadSinkHandler {
let mut inner = self.0.lock().unwrap();
inner.is_flushing = false;
inner.last_dts = None;
inner.last_ts = None;
if let Some(stats) = inner.stats.as_mut() {
stats.start();

View file

@ -105,7 +105,7 @@ struct TaskSinkTask {
elem: super::TaskSink,
item_receiver: flume::Receiver<StreamItem>,
is_main_elem: bool,
last_dts: Option<gst::ClockTime>,
last_ts: Option<gst::ClockTime>,
segment_start: Option<gst::ClockTime>,
stats: Option<Box<Stats>>,
}
@ -121,7 +121,7 @@ impl TaskSinkTask {
elem: elem.clone(),
item_receiver,
is_main_elem,
last_dts: None,
last_ts: None,
stats,
segment_start: None,
}
@ -144,7 +144,7 @@ impl TaskImpl for TaskSinkTask {
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async {
log_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Starting Task");
self.last_dts = None;
self.last_ts = None;
if let Some(stats) = self.stats.as_mut() {
stats.start();
}
@ -176,16 +176,17 @@ impl TaskImpl for TaskSinkTask {
match item {
StreamItem::Buffer(buffer) => {
let dts = buffer
.dts()
.expect("Buffer without dts")
let ts = buffer
.dts_or_pts()
.expect("Buffer without ts")
// FIXME do proper segment to running time
.checked_sub(self.segment_start.expect("Buffer without Time Segment"))
.expect("dts before Segment start");
if let Some(last_dts) = self.last_dts {
if let Some(last_ts) = self.last_ts {
let cur_ts = self.elem.current_running_time().unwrap();
let latency: Duration = (cur_ts - dts).into();
let interval: Duration = (dts - last_dts).into();
let latency: Duration = (cur_ts - ts).into();
let interval: Duration = (ts - last_ts).into();
if let Some(stats) = self.stats.as_mut() {
stats.add_buffer(latency, interval);
@ -205,7 +206,7 @@ impl TaskImpl for TaskSinkTask {
);
}
self.last_dts = Some(dts);
self.last_ts = Some(ts);
log_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Buffer processed");
}

View file

@ -190,7 +190,7 @@ impl TaskImpl for SrcTask {
{
let buffer = buffer.get_mut().unwrap();
let rtime = self.elem.current_running_time().unwrap();
buffer.set_dts(rtime);
buffer.set_pts(rtime);
}
buffer
})

View file

@ -369,7 +369,7 @@ impl TaskImpl for AudioTestSrcTask {
}
if self.do_timestamp {
buffer_mut.set_dts(start);
buffer_mut.set_pts(start);
buffer_mut.set_duration(self.buffer_duration);
}