diff --git a/generic/threadshare/src/inter/sink/imp.rs b/generic/threadshare/src/inter/sink/imp.rs index 989e2f135..c7f661402 100644 --- a/generic/threadshare/src/inter/sink/imp.rs +++ b/generic/threadshare/src/inter/sink/imp.rs @@ -35,7 +35,10 @@ use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; -use std::sync::{LazyLock, Mutex}; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + LazyLock, Mutex, +}; use crate::runtime::executor::{block_on, block_on_or_add_sub_task}; use crate::runtime::prelude::*; @@ -212,6 +215,7 @@ pub struct InterSink { sinkpad: PadSink, sink_ctx: Mutex>, upstream_latency: Mutex>, + got_first_buffer: AtomicBool, settings: Mutex, } @@ -222,6 +226,16 @@ impl InterSink { } async fn enqueue_item(&self, item: DataQueueItem) -> Result { + if !self.got_first_buffer.load(Ordering::SeqCst) + && matches!( + item, + DataQueueItem::Buffer(_) | DataQueueItem::BufferList(_) + ) + { + self.got_first_buffer.store(true, Ordering::SeqCst); + let _ = self.post_message(gst::message::Latency::new()); + } + let shared_ctx = self.shared_ctx(); let shared_ctx = shared_ctx.read().await; @@ -275,6 +289,7 @@ impl InterSink { fn stop(&self) { gst::debug!(CAT, imp = self, "Stopped"); + self.got_first_buffer.store(false, Ordering::SeqCst); *self.upstream_latency.lock().unwrap() = gst::ClockTime::NONE; gst::debug!(CAT, imp = self, "Stopped"); } @@ -294,6 +309,7 @@ impl ObjectSubclass for InterSink { ), sink_ctx: Mutex::new(None), upstream_latency: Mutex::new(gst::ClockTime::NONE), + got_first_buffer: AtomicBool::new(false), settings: Mutex::new(Settings::default()), } } @@ -360,6 +376,13 @@ impl ElementImpl for InterSink { Some(&*ELEMENT_METADATA) } + fn query(&self, query: &mut gst::QueryRef) -> bool { + gst::log!(CAT, imp = self, "Got {query:?}"); + let res = self.parent_query(query); + gst::log!(CAT, imp = self, "Parent returned {res}, {query:?}"); + res + } + fn send_event(&self, event: gst::Event) -> bool { gst::log!(CAT, imp = self, "Got {event:?}");