threadshare: inter: store upstream latency in InterContext

... instead of retrieving the element following the sink pad.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2465>
This commit is contained in:
François Laignel 2025-08-12 18:39:52 +02:00 committed by GStreamer Marge Bot
parent c9334b218f
commit 458e59979f
3 changed files with 14 additions and 25 deletions

View file

@ -49,6 +49,7 @@ struct InterContextInner {
dataqueues: Slab<DataQueue>,
sources: Slab<src::InterSrc>,
sinkpad: Option<gst::Pad>,
upstream_latency: Option<gst::ClockTime>,
}
impl InterContextInner {
@ -58,6 +59,7 @@ impl InterContextInner {
dataqueues: Slab::new(),
sources: Slab::new(),
sinkpad: None,
upstream_latency: None,
}
}
}

View file

@ -214,7 +214,6 @@ impl PadSinkHandler for InterSinkPadHandler {
pub struct InterSink {
sinkpad: PadSink,
sink_ctx: Mutex<Option<InterContextSink>>,
upstream_latency: Mutex<Option<gst::ClockTime>>,
got_first_buffer: AtomicBool,
settings: Mutex<Settings>,
}
@ -248,10 +247,6 @@ impl InterSink {
Ok(gst::FlowSuccess::Ok)
}
pub fn latency(&self) -> Option<gst::ClockTime> {
*self.upstream_latency.lock().unwrap()
}
fn prepare(&self) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, imp = self, "Preparing");
@ -288,9 +283,15 @@ impl InterSink {
}
fn stop(&self) {
gst::debug!(CAT, imp = self, "Stopped");
gst::debug!(CAT, imp = self, "Stopping");
self.got_first_buffer.store(false, Ordering::SeqCst);
*self.upstream_latency.lock().unwrap() = gst::ClockTime::NONE;
let shared_ctx = self.shared_ctx();
block_on(async move {
shared_ctx.write().await.upstream_latency = None;
});
gst::debug!(CAT, imp = self, "Stopped");
}
}
@ -308,7 +309,6 @@ impl ObjectSubclass for InterSink {
InterSinkPadHandler,
),
sink_ctx: Mutex::new(None),
upstream_latency: Mutex::new(gst::ClockTime::NONE),
got_first_buffer: AtomicBool::new(false),
settings: Mutex::new(Settings::default()),
}
@ -388,13 +388,14 @@ impl ElementImpl for InterSink {
if let gst::EventView::Latency(lat_evt) = event.view() {
let latency = lat_evt.latency();
*self.upstream_latency.lock().unwrap() = Some(latency);
let obj = self.obj().clone();
let shared_ctx = self.shared_ctx();
let _ = block_on_or_add_sub_task(async move {
let shared_ctx = shared_ctx.read().await;
let mut shared_ctx = shared_ctx.write().await;
shared_ctx.upstream_latency = Some(latency);
if shared_ctx.sources.is_empty() {
gst::info!(CAT, obj = obj, "No sources to set upstream latency");
} else {

View file

@ -362,21 +362,7 @@ impl InterSrcTask {
let shared_ctx = imp.shared_ctx();
let shared_ctx = shared_ctx.read().await;
let Some(ref sinkpad) = shared_ctx.sinkpad else {
gst::info!(
CAT,
imp = imp,
"sinkpad is gone before we could get latency"
);
return Err(gst::FlowError::Error);
};
let sinkpad_parent = sinkpad.parent().expect("sinkpad should have a parent");
let intersink = sinkpad_parent
.downcast_ref::<crate::inter::sink::InterSink>()
.expect("sinkpad parent should be a ts-intersink");
if let Some(latency) = intersink.imp().latency() {
if let Some(latency) = shared_ctx.upstream_latency {
imp.set_upstream_latency_priv(latency);
} else {
gst::log!(CAT, imp = imp, "Upstream latency is still unknown");