diff --git a/gstreamer-utils/src/streamproducer.rs b/gstreamer-utils/src/streamproducer.rs index 1c92d5a37..e4a1980bc 100644 --- a/gstreamer-utils/src/streamproducer.rs +++ b/gstreamer-utils/src/streamproducer.rs @@ -1,7 +1,7 @@ use std::{ collections::HashMap, mem, - sync::{atomic, Arc, Mutex}, + sync::{atomic, Arc, Mutex, MutexGuard}, }; use glib::once_cell::sync::Lazy; @@ -220,7 +220,7 @@ impl StreamProducer { fn process_sample( sample: gst::Sample, appsink: &gst_app::AppSink, - consumers: &mut StreamConsumers, + mut consumers: MutexGuard, ) -> Result { let (is_discont, is_keyframe) = if let Some(buf) = sample.buffer() { let flags = buf.flags(); @@ -233,12 +233,7 @@ impl StreamProducer { (false, true) }; - gst::trace!( - CAT, - obj: appsink, - "processing preroll {:?}", - sample.buffer() - ); + gst::trace!(CAT, obj: appsink, "processing sample {:?}", sample.buffer()); let latency = consumers.current_latency; let latency_updated = mem::replace(&mut consumers.latency_updated, false); @@ -382,6 +377,7 @@ impl<'a> From<&'a gst_app::AppSink> for StreamProducer { // it would make sense to automatically forward more events such as Tag but that would break // with older GStreamer, see https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4297 events_to_forward: vec![gst::EventType::Eos], + just_forwarded_preroll: false, })); appsink.set_callbacks( @@ -397,7 +393,28 @@ impl<'a> From<&'a gst_app::AppSink> for StreamProducer { } }; - StreamProducer::process_sample(sample, appsink, &mut consumers) + let just_forwarded_preroll = mem::replace(&mut consumers.just_forwarded_preroll, false); + + if just_forwarded_preroll { + return Ok(gst::FlowSuccess::Ok); + } + + StreamProducer::process_sample(sample, appsink, consumers) + })) + .new_preroll(glib::clone!(@strong consumers => move |appsink| { + let mut consumers = consumers.lock().unwrap(); + + let sample = match appsink.pull_preroll() { + Ok(sample) => sample, + Err(_err) => { + gst::debug!(CAT, obj: appsink, "Failed to pull preroll"); + return Err(gst::FlowError::Flushing); + } + }; + + consumers.just_forwarded_preroll = true; + + StreamProducer::process_sample(sample, appsink, consumers) })) .new_event(glib::clone!(@strong consumers => move |appsink| { match appsink.pull_object().map(|obj| obj.downcast::()) { @@ -482,6 +499,10 @@ struct StreamConsumers { consumers: HashMap, /// What events should be forwarded to consumers events_to_forward: Vec, + /// Whether we just forwarded the preroll sample. When we did we want to + /// discard the next sample from on_new_sample as it would cause us to + /// otherwise push out the same sample twice to consumers. + just_forwarded_preroll: bool, } /// Wrapper around a consumer's `appsrc`