From e263bd894543d8e62a932bf392742c1acbd91e3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Thu, 12 May 2022 10:44:59 +0300 Subject: [PATCH] utils/streamproducer: Request keyframes without a lock held and also after an appsrc has dropped some data --- gstreamer-utils/src/streamproducer.rs | 32 +++++++++++++++++---------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/gstreamer-utils/src/streamproducer.rs b/gstreamer-utils/src/streamproducer.rs index c9e408cb9..7088db0ee 100644 --- a/gstreamer-utils/src/streamproducer.rs +++ b/gstreamer-utils/src/streamproducer.rs @@ -159,7 +159,8 @@ impl<'a> From<&'a gst_app::AppSink> for StreamProducer { let latency = consumers.current_latency; let latency_updated = mem::replace(&mut consumers.latency_updated, false); - let mut requested_keyframe = false; + + let mut needs_keyframe_request = false; let current_consumers = consumers .consumers @@ -180,7 +181,7 @@ impl<'a> From<&'a gst_app::AppSink> for StreamProducer { } } - if is_discont { + if is_discont && !is_keyframe { // Whenever we have a discontinuity, we need a new keyframe consumer.needs_keyframe.store( true, @@ -191,15 +192,9 @@ impl<'a> From<&'a gst_app::AppSink> for StreamProducer { if !is_keyframe && consumer.needs_keyframe.load(atomic::Ordering::SeqCst) { // If we need a keyframe (and this one isn't) request a keyframe upstream - if !requested_keyframe { + if !needs_keyframe_request { gst::debug!(CAT, obj: appsink, "Requesting keyframe for first buffer"); - - appsink.send_event( - gst_video::UpstreamForceKeyUnitEvent::builder() - .all_headers(true) - .build(), - ); - requested_keyframe = true; + needs_keyframe_request = true; } gst::debug!(CAT, obj: appsink, "Ignoring frame for {} while waiting for a keyframe", @@ -214,6 +209,14 @@ impl<'a> From<&'a gst_app::AppSink> for StreamProducer { .collect::>(); drop(consumers); + if needs_keyframe_request { + appsink.send_event( + gst_video::UpstreamForceKeyUnitEvent::builder() + .all_headers(true) + .build(), + ); + } + for consumer in current_consumers { if let Err(err) = consumer.push_sample(&sample) { gst::warning!(CAT, obj: appsink, "Failed to push sample: {}", err); @@ -287,12 +290,15 @@ struct StreamConsumer { /// Whether a first buffer has made it through, used to determine /// whether a new key unit should be requested. Only useful for encoded /// streams. - needs_keyframe: atomic::AtomicBool, + needs_keyframe: Arc, } impl StreamConsumer { /// Create a new consumer fn new(appsrc: &gst_app::AppSrc, fku_probe_id: gst::PadProbeId) -> Self { + let needs_keyframe = Arc::new(atomic::AtomicBool::new(true)); + let needs_keyframe_clone = needs_keyframe.clone(); + appsrc.set_callbacks( gst_app::AppSrcCallbacks::builder() .enough_data(move |appsrc| { @@ -303,6 +309,8 @@ impl StreamConsumer { appsrc.name(), appsrc, ); + + needs_keyframe_clone.store(true, atomic::Ordering::SeqCst); }) .build(), ); @@ -311,7 +319,7 @@ impl StreamConsumer { appsrc: appsrc.clone(), fku_probe_id: Some(fku_probe_id), forwarded_latency: atomic::AtomicBool::new(false), - needs_keyframe: atomic::AtomicBool::new(true), + needs_keyframe, } } }