threadshare: fix ts-inter test one_to_one_up_first

The test has been seen failing on CI a couple of times. I ran it 5000 times
locally without failure. This commit attempts to prevent CI from failing by
waiting for the latency event then start counting 10 buffers before checking
the resulting latency.

Also fix confusing naming for the latency field in `InterSrc`.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2497>
This commit is contained in:
François Laignel 2025-08-26 22:04:38 +02:00
parent 3fec94a234
commit c1e9d034bc
2 changed files with 34 additions and 14 deletions

View file

@ -278,7 +278,7 @@ impl PadSrcHandler for InterSrcPadHandler {
QueryViewMut::Latency(q) => {
let (_, q_min, q_max) = q.result();
let Some(upstream_latency) = *imp.upstream_latency.lock().unwrap() else {
let Some(upstream_latency) = *imp.our_latency.lock().unwrap() else {
gst::debug!(
CAT,
obj = pad,
@ -355,7 +355,7 @@ impl InterSrcTask {
async fn maybe_get_upstream_latency(&self) -> Result<(), gst::FlowError> {
let imp = self.elem.imp();
if imp.upstream_latency.lock().unwrap().is_some() {
if imp.our_latency.lock().unwrap().is_some() {
return Ok(());
}
@ -511,7 +511,7 @@ pub struct InterSrc {
src_ctx: Mutex<Option<InterContextSrc>>,
ts_ctx: Mutex<Option<Context>>,
dataqueue: Mutex<Option<DataQueue>>,
upstream_latency: Mutex<Option<gst::ClockTime>>,
our_latency: Mutex<Option<gst::ClockTime>>,
settings: Mutex<Settings>,
}
@ -590,7 +590,7 @@ impl InterSrc {
// Remove the InterContextSrc from the InterContext
drop(self.src_ctx.lock().unwrap().take());
*self.upstream_latency.lock().unwrap() = None;
*self.our_latency.lock().unwrap() = None;
let dataqueue = self
.dataqueue
@ -647,13 +647,13 @@ impl InterSrc {
);
{
let mut upstream_latency = self.upstream_latency.lock().unwrap();
if let Some(upstream_latency) = *upstream_latency {
if upstream_latency == new_latency {
let mut our_latency = self.our_latency.lock().unwrap();
if let Some(our_latency) = *our_latency {
if our_latency == new_latency {
return;
}
}
*upstream_latency = Some(new_latency);
*our_latency = Some(new_latency);
}
gst::debug!(
@ -737,7 +737,7 @@ impl InterSrc {
self.task
.stop()
.block_on_or_add_subtask_then(self.obj(), |elem, res| {
*elem.imp().upstream_latency.lock().unwrap() = gst::ClockTime::NONE;
*elem.imp().our_latency.lock().unwrap() = gst::ClockTime::NONE;
if res.is_ok() {
gst::debug!(CAT, obj = elem, "Stopped");
@ -812,7 +812,7 @@ impl ObjectSubclass for InterSrc {
src_ctx: Mutex::new(None),
ts_ctx: Mutex::new(None),
dataqueue: Mutex::new(None),
upstream_latency: Mutex::new(gst::ClockTime::NONE),
our_latency: Mutex::new(gst::ClockTime::NONE),
settings: Mutex::new(Settings::default()),
}
}

View file

@ -204,17 +204,21 @@ fn one_to_one_up_first() {
pipe_down.set_base_time(gst::ClockTime::ZERO);
pipe_down.set_start_time(gst::ClockTime::NONE);
let got_latency_evt = Arc::new(AtomicBool::new(false));
let (n_buf_tx, mut n_buf_rx) = oneshot::channel::<()>();
appsink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample({
let got_latency_evt = got_latency_evt.clone();
let mut samples = 0;
let mut eos_tx = Some(n_buf_tx);
let mut n_buf_tx = Some(n_buf_tx);
move |appsink| {
let _ = appsink.pull_sample().unwrap();
samples += 1;
if samples == 10 {
eos_tx.take().unwrap().send(()).unwrap();
if got_latency_evt.load(Ordering::SeqCst) {
samples += 1;
if samples == 10 {
n_buf_tx.take().unwrap().send(()).unwrap();
}
}
Ok(gst::FlowSuccess::Ok)
@ -223,6 +227,22 @@ fn one_to_one_up_first() {
.build(),
);
appsink.static_pad("sink").unwrap().add_probe(
gst::PadProbeType::EVENT_UPSTREAM,
move |_, info| {
let Some(gst::PadProbeData::Event(ref evt)) = info.data else {
unreachable!();
};
if let gst::EventView::Latency(evt) = evt.view() {
if evt.latency() > gst::ClockTime::ZERO {
got_latency_evt.store(true, Ordering::SeqCst);
}
}
gst::PadProbeReturn::Ok
},
);
// Starting upstream first
pipe_up.set_state(gst::State::Playing).unwrap();
pipe_down.set_state(gst::State::Playing).unwrap();