diff --git a/generic/threadshare/examples/standalone/main.rs b/generic/threadshare/examples/standalone/main.rs index 35de8483d..bc7551416 100644 --- a/generic/threadshare/examples/standalone/main.rs +++ b/generic/threadshare/examples/standalone/main.rs @@ -104,46 +104,64 @@ fn main() { let l = glib::MainLoop::new(None, false); - let bus = pipeline.bus().unwrap(); - let terminated_count = Arc::new(AtomicU32::new(0)); - let l_clone = l.clone(); - let _bus_watch = bus - .add_watch(move |_, msg| { - use gst::MessageView; - match msg.view() { - MessageView::Eos(_) => { - // Actually, we don't post EOS (see sinks impl). - gst::info!(CAT, "Received eos"); - l_clone.quit(); + let _bus_watch = pipeline + .bus() + .unwrap() + .add_watch({ + let terminated_count = Arc::new(AtomicU32::new(0)); + let l = l.clone(); + let pipeline = pipeline.clone(); + move |_, msg| { + use gst::MessageView::*; + match msg.view() { + Eos(_) => { + // Actually, we don't post EOS (see sinks impl). + gst::info!(CAT, "Received eos"); + l.quit(); - glib::ControlFlow::Break - } - MessageView::Error(msg) => { - if let gst::MessageView::Error(msg) = msg.message().view() { - if msg.error().matches(gst::LibraryError::Shutdown) { - if terminated_count.fetch_add(1, Ordering::SeqCst) == args.streams - 1 { + return glib::ControlFlow::Break; + } + Error(msg) => { + if let gst::MessageView::Error(msg) = msg.message().view() { + if msg.error().matches(gst::LibraryError::Shutdown) + && terminated_count.fetch_add(1, Ordering::SeqCst) + == args.streams - 1 + { gst::info!(CAT, "Received all shutdown requests"); - l_clone.quit(); + l.quit(); return glib::ControlFlow::Break; - } else { - return glib::ControlFlow::Continue; } } + + gst::error!( + CAT, + "Error from {:?}: {} ({:?})", + msg.src().map(|s| s.path_string()), + msg.error(), + msg.debug() + ); + l.quit(); + + return glib::ControlFlow::Break; } - - gst::error!( - CAT, - "Error from {:?}: {} ({:?})", - msg.src().map(|s| s.path_string()), - msg.error(), - msg.debug() - ); - l_clone.quit(); - - glib::ControlFlow::Break + Latency(msg) => { + gst::info!( + CAT, + "Latency requirements have changed for element {}", + msg.src() + .map(|src| src.name()) + .as_deref() + .unwrap_or("UNKNOWN"), + ); + if let Err(err) = pipeline.recalculate_latency() { + gst::error!(CAT, "Error recalculating latency: {err}"); + } + } + _ => (), } - _ => glib::ControlFlow::Continue, + + glib::ControlFlow::Continue } }) .expect("Failed to add bus watch"); diff --git a/generic/threadshare/examples/standalone/sink/async_mutex/imp.rs b/generic/threadshare/examples/standalone/sink/async_mutex/imp.rs index d65643d6d..cad59d220 100644 --- a/generic/threadshare/examples/standalone/sink/async_mutex/imp.rs +++ b/generic/threadshare/examples/standalone/sink/async_mutex/imp.rs @@ -28,7 +28,7 @@ use super::super::{Settings, Stats, CAT}; struct PadSinkHandlerInner { is_flushing: bool, is_main_elem: bool, - last_dts: Option, + last_ts: Option, segment_start: Option, stats: Option>, } @@ -52,16 +52,17 @@ impl PadSinkHandlerInner { debug_or_trace!(CAT, self.is_main_elem, obj = elem, "Received {buffer:?}"); - let dts = buffer - .dts() - .expect("Buffer without dts") + let ts = buffer + .dts_or_pts() + .expect("Buffer without ts") + // FIXME do proper segment to running time .checked_sub(self.segment_start.expect("Buffer without Time Segment")) - .expect("dts before Segment start"); + .expect("ts before Segment start"); - if let Some(last_dts) = self.last_dts { + if let Some(last_ts) = self.last_ts { let cur_ts = elem.current_running_time().unwrap(); - let latency: Duration = (cur_ts - dts).into(); - let interval: Duration = (dts - last_dts).into(); + let latency: Duration = (cur_ts - ts).into(); + let interval: Duration = (ts - last_ts).into(); if let Some(stats) = self.stats.as_mut() { stats.add_buffer(latency, interval); @@ -81,7 +82,7 @@ impl PadSinkHandlerInner { ); } - self.last_dts = Some(dts); + self.last_ts = Some(ts); log_or_trace!(CAT, self.is_main_elem, obj = elem, "Buffer processed"); @@ -175,7 +176,7 @@ impl AsyncPadSinkHandler { let mut inner = self.0.lock().await; inner.is_flushing = false; - inner.last_dts = None; + inner.last_ts = None; if let Some(stats) = inner.stats.as_mut() { stats.start(); diff --git a/generic/threadshare/examples/standalone/sink/sync_mutex/imp.rs b/generic/threadshare/examples/standalone/sink/sync_mutex/imp.rs index 7c6edec6c..341c5b2b0 100644 --- a/generic/threadshare/examples/standalone/sink/sync_mutex/imp.rs +++ b/generic/threadshare/examples/standalone/sink/sync_mutex/imp.rs @@ -27,7 +27,7 @@ use super::super::{Settings, Stats, CAT}; struct PadSinkHandlerInner { is_flushing: bool, is_main_elem: bool, - last_dts: Option, + last_ts: Option, segment_start: Option, stats: Option>, } @@ -51,16 +51,17 @@ impl PadSinkHandlerInner { debug_or_trace!(CAT, self.is_main_elem, obj = elem, "Received {buffer:?}"); - let dts = buffer - .dts() - .expect("Buffer without dts") + let ts = buffer + .dts_or_pts() + .expect("Buffer without ts") + // FIXME do proper segment to running time .checked_sub(self.segment_start.expect("Buffer without Time Segment")) - .expect("dts before Segment start"); + .expect("ts before Segment start"); - if let Some(last_dts) = self.last_dts { + if let Some(last_ts) = self.last_ts { let cur_ts = elem.current_running_time().unwrap(); - let latency: Duration = (cur_ts - dts).into(); - let interval: Duration = (dts - last_dts).into(); + let latency: Duration = (cur_ts - ts).into(); + let interval: Duration = (ts - last_ts).into(); if let Some(stats) = self.stats.as_mut() { stats.add_buffer(latency, interval); @@ -80,7 +81,7 @@ impl PadSinkHandlerInner { ); } - self.last_dts = Some(dts); + self.last_ts = Some(ts); log_or_trace!(CAT, self.is_main_elem, obj = elem, "Buffer processed"); @@ -171,7 +172,7 @@ impl SyncPadSinkHandler { let mut inner = self.0.lock().unwrap(); inner.is_flushing = false; - inner.last_dts = None; + inner.last_ts = None; if let Some(stats) = inner.stats.as_mut() { stats.start(); diff --git a/generic/threadshare/examples/standalone/sink/task/imp.rs b/generic/threadshare/examples/standalone/sink/task/imp.rs index 425617b60..61f119997 100644 --- a/generic/threadshare/examples/standalone/sink/task/imp.rs +++ b/generic/threadshare/examples/standalone/sink/task/imp.rs @@ -105,7 +105,7 @@ struct TaskSinkTask { elem: super::TaskSink, item_receiver: flume::Receiver, is_main_elem: bool, - last_dts: Option, + last_ts: Option, segment_start: Option, stats: Option>, } @@ -121,7 +121,7 @@ impl TaskSinkTask { elem: elem.clone(), item_receiver, is_main_elem, - last_dts: None, + last_ts: None, stats, segment_start: None, } @@ -144,7 +144,7 @@ impl TaskImpl for TaskSinkTask { fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async { log_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Starting Task"); - self.last_dts = None; + self.last_ts = None; if let Some(stats) = self.stats.as_mut() { stats.start(); } @@ -176,16 +176,17 @@ impl TaskImpl for TaskSinkTask { match item { StreamItem::Buffer(buffer) => { - let dts = buffer - .dts() - .expect("Buffer without dts") + let ts = buffer + .dts_or_pts() + .expect("Buffer without ts") + // FIXME do proper segment to running time .checked_sub(self.segment_start.expect("Buffer without Time Segment")) .expect("dts before Segment start"); - if let Some(last_dts) = self.last_dts { + if let Some(last_ts) = self.last_ts { let cur_ts = self.elem.current_running_time().unwrap(); - let latency: Duration = (cur_ts - dts).into(); - let interval: Duration = (dts - last_dts).into(); + let latency: Duration = (cur_ts - ts).into(); + let interval: Duration = (ts - last_ts).into(); if let Some(stats) = self.stats.as_mut() { stats.add_buffer(latency, interval); @@ -205,7 +206,7 @@ impl TaskImpl for TaskSinkTask { ); } - self.last_dts = Some(dts); + self.last_ts = Some(ts); log_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Buffer processed"); } diff --git a/generic/threadshare/examples/standalone/src/imp.rs b/generic/threadshare/examples/standalone/src/imp.rs index 35c5839d7..ab46cbc60 100644 --- a/generic/threadshare/examples/standalone/src/imp.rs +++ b/generic/threadshare/examples/standalone/src/imp.rs @@ -190,7 +190,7 @@ impl TaskImpl for SrcTask { { let buffer = buffer.get_mut().unwrap(); let rtime = self.elem.current_running_time().unwrap(); - buffer.set_dts(rtime); + buffer.set_pts(rtime); } buffer }) diff --git a/generic/threadshare/src/audiotestsrc/imp.rs b/generic/threadshare/src/audiotestsrc/imp.rs index d86dcce75..548d66ab3 100644 --- a/generic/threadshare/src/audiotestsrc/imp.rs +++ b/generic/threadshare/src/audiotestsrc/imp.rs @@ -369,7 +369,7 @@ impl TaskImpl for AudioTestSrcTask { } if self.do_timestamp { - buffer_mut.set_dts(start); + buffer_mut.set_pts(start); buffer_mut.set_duration(self.buffer_duration); }