streamproducer: forward preroll sample from new_preroll callback

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/merge_requests/1280>
This commit is contained in:
Mathieu Duponchelle 2023-06-24 00:30:42 +02:00
parent a4247d5199
commit 88c21505d2

View file

@ -1,7 +1,7 @@
use std::{ use std::{
collections::HashMap, collections::HashMap,
mem, mem,
sync::{atomic, Arc, Mutex}, sync::{atomic, Arc, Mutex, MutexGuard},
}; };
use glib::once_cell::sync::Lazy; use glib::once_cell::sync::Lazy;
@ -220,7 +220,7 @@ impl StreamProducer {
fn process_sample( fn process_sample(
sample: gst::Sample, sample: gst::Sample,
appsink: &gst_app::AppSink, appsink: &gst_app::AppSink,
consumers: &mut StreamConsumers, mut consumers: MutexGuard<StreamConsumers>,
) -> Result<gst::FlowSuccess, gst::FlowError> { ) -> Result<gst::FlowSuccess, gst::FlowError> {
let (is_discont, is_keyframe) = if let Some(buf) = sample.buffer() { let (is_discont, is_keyframe) = if let Some(buf) = sample.buffer() {
let flags = buf.flags(); let flags = buf.flags();
@ -233,12 +233,7 @@ impl StreamProducer {
(false, true) (false, true)
}; };
gst::trace!( gst::trace!(CAT, obj: appsink, "processing sample {:?}", sample.buffer());
CAT,
obj: appsink,
"processing preroll {:?}",
sample.buffer()
);
let latency = consumers.current_latency; let latency = consumers.current_latency;
let latency_updated = mem::replace(&mut consumers.latency_updated, false); let latency_updated = mem::replace(&mut consumers.latency_updated, false);
@ -382,6 +377,7 @@ impl<'a> From<&'a gst_app::AppSink> for StreamProducer {
// it would make sense to automatically forward more events such as Tag but that would break // it would make sense to automatically forward more events such as Tag but that would break
// with older GStreamer, see https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4297 // with older GStreamer, see https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4297
events_to_forward: vec![gst::EventType::Eos], events_to_forward: vec![gst::EventType::Eos],
just_forwarded_preroll: false,
})); }));
appsink.set_callbacks( appsink.set_callbacks(
@ -397,7 +393,28 @@ impl<'a> From<&'a gst_app::AppSink> for StreamProducer {
} }
}; };
StreamProducer::process_sample(sample, appsink, &mut consumers) let just_forwarded_preroll = mem::replace(&mut consumers.just_forwarded_preroll, false);
if just_forwarded_preroll {
return Ok(gst::FlowSuccess::Ok);
}
StreamProducer::process_sample(sample, appsink, consumers)
}))
.new_preroll(glib::clone!(@strong consumers => move |appsink| {
let mut consumers = consumers.lock().unwrap();
let sample = match appsink.pull_preroll() {
Ok(sample) => sample,
Err(_err) => {
gst::debug!(CAT, obj: appsink, "Failed to pull preroll");
return Err(gst::FlowError::Flushing);
}
};
consumers.just_forwarded_preroll = true;
StreamProducer::process_sample(sample, appsink, consumers)
})) }))
.new_event(glib::clone!(@strong consumers => move |appsink| { .new_event(glib::clone!(@strong consumers => move |appsink| {
match appsink.pull_object().map(|obj| obj.downcast::<gst::Event>()) { match appsink.pull_object().map(|obj| obj.downcast::<gst::Event>()) {
@ -482,6 +499,10 @@ struct StreamConsumers {
consumers: HashMap<gst_app::AppSrc, StreamConsumer>, consumers: HashMap<gst_app::AppSrc, StreamConsumer>,
/// What events should be forwarded to consumers /// What events should be forwarded to consumers
events_to_forward: Vec<gst::EventType>, events_to_forward: Vec<gst::EventType>,
/// Whether we just forwarded the preroll sample. When we did we want to
/// discard the next sample from on_new_sample as it would cause us to
/// otherwise push out the same sample twice to consumers.
just_forwarded_preroll: bool,
} }
/// Wrapper around a consumer's `appsrc` /// Wrapper around a consumer's `appsrc`