utils/streamproducer: Request keyframes without a lock held and also after an appsrc has dropped some data

This commit is contained in:
Sebastian Dröge 2022-05-12 10:44:59 +03:00
parent d6095900e9
commit e263bd8945

View file

@ -159,7 +159,8 @@ impl<'a> From<&'a gst_app::AppSink> for StreamProducer {
let latency = consumers.current_latency; let latency = consumers.current_latency;
let latency_updated = mem::replace(&mut consumers.latency_updated, false); let latency_updated = mem::replace(&mut consumers.latency_updated, false);
let mut requested_keyframe = false;
let mut needs_keyframe_request = false;
let current_consumers = consumers let current_consumers = consumers
.consumers .consumers
@ -180,7 +181,7 @@ impl<'a> From<&'a gst_app::AppSink> for StreamProducer {
} }
} }
if is_discont { if is_discont && !is_keyframe {
// Whenever we have a discontinuity, we need a new keyframe // Whenever we have a discontinuity, we need a new keyframe
consumer.needs_keyframe.store( consumer.needs_keyframe.store(
true, true,
@ -191,15 +192,9 @@ impl<'a> From<&'a gst_app::AppSink> for StreamProducer {
if !is_keyframe && consumer.needs_keyframe.load(atomic::Ordering::SeqCst) if !is_keyframe && consumer.needs_keyframe.load(atomic::Ordering::SeqCst)
{ {
// If we need a keyframe (and this one isn't) request a keyframe upstream // If we need a keyframe (and this one isn't) request a keyframe upstream
if !requested_keyframe { if !needs_keyframe_request {
gst::debug!(CAT, obj: appsink, "Requesting keyframe for first buffer"); gst::debug!(CAT, obj: appsink, "Requesting keyframe for first buffer");
needs_keyframe_request = true;
appsink.send_event(
gst_video::UpstreamForceKeyUnitEvent::builder()
.all_headers(true)
.build(),
);
requested_keyframe = true;
} }
gst::debug!(CAT, obj: appsink, "Ignoring frame for {} while waiting for a keyframe", gst::debug!(CAT, obj: appsink, "Ignoring frame for {} while waiting for a keyframe",
@ -214,6 +209,14 @@ impl<'a> From<&'a gst_app::AppSink> for StreamProducer {
.collect::<Vec<_>>(); .collect::<Vec<_>>();
drop(consumers); drop(consumers);
if needs_keyframe_request {
appsink.send_event(
gst_video::UpstreamForceKeyUnitEvent::builder()
.all_headers(true)
.build(),
);
}
for consumer in current_consumers { for consumer in current_consumers {
if let Err(err) = consumer.push_sample(&sample) { if let Err(err) = consumer.push_sample(&sample) {
gst::warning!(CAT, obj: appsink, "Failed to push sample: {}", err); gst::warning!(CAT, obj: appsink, "Failed to push sample: {}", err);
@ -287,12 +290,15 @@ struct StreamConsumer {
/// Whether a first buffer has made it through, used to determine /// Whether a first buffer has made it through, used to determine
/// whether a new key unit should be requested. Only useful for encoded /// whether a new key unit should be requested. Only useful for encoded
/// streams. /// streams.
needs_keyframe: atomic::AtomicBool, needs_keyframe: Arc<atomic::AtomicBool>,
} }
impl StreamConsumer { impl StreamConsumer {
/// Create a new consumer /// Create a new consumer
fn new(appsrc: &gst_app::AppSrc, fku_probe_id: gst::PadProbeId) -> Self { fn new(appsrc: &gst_app::AppSrc, fku_probe_id: gst::PadProbeId) -> Self {
let needs_keyframe = Arc::new(atomic::AtomicBool::new(true));
let needs_keyframe_clone = needs_keyframe.clone();
appsrc.set_callbacks( appsrc.set_callbacks(
gst_app::AppSrcCallbacks::builder() gst_app::AppSrcCallbacks::builder()
.enough_data(move |appsrc| { .enough_data(move |appsrc| {
@ -303,6 +309,8 @@ impl StreamConsumer {
appsrc.name(), appsrc.name(),
appsrc, appsrc,
); );
needs_keyframe_clone.store(true, atomic::Ordering::SeqCst);
}) })
.build(), .build(),
); );
@ -311,7 +319,7 @@ impl StreamConsumer {
appsrc: appsrc.clone(), appsrc: appsrc.clone(),
fku_probe_id: Some(fku_probe_id), fku_probe_id: Some(fku_probe_id),
forwarded_latency: atomic::AtomicBool::new(false), forwarded_latency: atomic::AtomicBool::new(false),
needs_keyframe: atomic::AtomicBool::new(true), needs_keyframe,
} }
} }
} }