ts-intersink: post Latency message on first buffer

Push a Latency message on first buffer, so as to make sure latency is properly
configured even with dynamic pipelines.

Future improvement: implement Prerolling & async state change.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2387>
This commit is contained in:
François Laignel 2025-07-17 14:01:08 +02:00
parent 3a53d857b8
commit 10ec242460

View file

@ -35,7 +35,10 @@ use gst::glib;
use gst::prelude::*; use gst::prelude::*;
use gst::subclass::prelude::*; use gst::subclass::prelude::*;
use std::sync::{LazyLock, Mutex}; use std::sync::{
atomic::{AtomicBool, Ordering},
LazyLock, Mutex,
};
use crate::runtime::executor::{block_on, block_on_or_add_sub_task}; use crate::runtime::executor::{block_on, block_on_or_add_sub_task};
use crate::runtime::prelude::*; use crate::runtime::prelude::*;
@ -212,6 +215,7 @@ pub struct InterSink {
sinkpad: PadSink, sinkpad: PadSink,
sink_ctx: Mutex<Option<InterContextSink>>, sink_ctx: Mutex<Option<InterContextSink>>,
upstream_latency: Mutex<Option<gst::ClockTime>>, upstream_latency: Mutex<Option<gst::ClockTime>>,
got_first_buffer: AtomicBool,
settings: Mutex<Settings>, settings: Mutex<Settings>,
} }
@ -222,6 +226,16 @@ impl InterSink {
} }
async fn enqueue_item(&self, item: DataQueueItem) -> Result<gst::FlowSuccess, gst::FlowError> { async fn enqueue_item(&self, item: DataQueueItem) -> Result<gst::FlowSuccess, gst::FlowError> {
if !self.got_first_buffer.load(Ordering::SeqCst)
&& matches!(
item,
DataQueueItem::Buffer(_) | DataQueueItem::BufferList(_)
)
{
self.got_first_buffer.store(true, Ordering::SeqCst);
let _ = self.post_message(gst::message::Latency::new());
}
let shared_ctx = self.shared_ctx(); let shared_ctx = self.shared_ctx();
let shared_ctx = shared_ctx.read().await; let shared_ctx = shared_ctx.read().await;
@ -275,6 +289,7 @@ impl InterSink {
fn stop(&self) { fn stop(&self) {
gst::debug!(CAT, imp = self, "Stopped"); gst::debug!(CAT, imp = self, "Stopped");
self.got_first_buffer.store(false, Ordering::SeqCst);
*self.upstream_latency.lock().unwrap() = gst::ClockTime::NONE; *self.upstream_latency.lock().unwrap() = gst::ClockTime::NONE;
gst::debug!(CAT, imp = self, "Stopped"); gst::debug!(CAT, imp = self, "Stopped");
} }
@ -294,6 +309,7 @@ impl ObjectSubclass for InterSink {
), ),
sink_ctx: Mutex::new(None), sink_ctx: Mutex::new(None),
upstream_latency: Mutex::new(gst::ClockTime::NONE), upstream_latency: Mutex::new(gst::ClockTime::NONE),
got_first_buffer: AtomicBool::new(false),
settings: Mutex::new(Settings::default()), settings: Mutex::new(Settings::default()),
} }
} }
@ -360,6 +376,13 @@ impl ElementImpl for InterSink {
Some(&*ELEMENT_METADATA) Some(&*ELEMENT_METADATA)
} }
fn query(&self, query: &mut gst::QueryRef) -> bool {
gst::log!(CAT, imp = self, "Got {query:?}");
let res = self.parent_query(query);
gst::log!(CAT, imp = self, "Parent returned {res}, {query:?}");
res
}
fn send_event(&self, event: gst::Event) -> bool { fn send_event(&self, event: gst::Event) -> bool {
gst::log!(CAT, imp = self, "Got {event:?}"); gst::log!(CAT, imp = self, "Got {event:?}");