From a4247d5199460ba48f561bcf189079220478d25b Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Sat, 24 Jun 2023 00:29:11 +0200 Subject: [PATCH] streamproducer: extract process_sample function Part-of: --- gstreamer-utils/src/streamproducer.rs | 194 ++++++++++++++------------ 1 file changed, 108 insertions(+), 86 deletions(-) diff --git a/gstreamer-utils/src/streamproducer.rs b/gstreamer-utils/src/streamproducer.rs index a5a5ba9ce..1c92d5a37 100644 --- a/gstreamer-utils/src/streamproducer.rs +++ b/gstreamer-utils/src/streamproducer.rs @@ -217,6 +217,113 @@ impl StreamProducer { Ok(()) } + fn process_sample( + sample: gst::Sample, + appsink: &gst_app::AppSink, + consumers: &mut StreamConsumers, + ) -> Result { + let (is_discont, is_keyframe) = if let Some(buf) = sample.buffer() { + let flags = buf.flags(); + + ( + flags.contains(gst::BufferFlags::DISCONT), + !flags.contains(gst::BufferFlags::DELTA_UNIT), + ) + } else { + (false, true) + }; + + gst::trace!( + CAT, + obj: appsink, + "processing preroll {:?}", + sample.buffer() + ); + + let latency = consumers.current_latency; + let latency_updated = mem::replace(&mut consumers.latency_updated, false); + + let mut needs_keyframe_request = false; + + let current_consumers = consumers + .consumers + .values() + .filter_map(|consumer| { + if let Some(latency) = latency { + if consumer + .forwarded_latency + .compare_exchange( + false, + true, + atomic::Ordering::SeqCst, + atomic::Ordering::SeqCst, + ) + .is_ok() + || latency_updated + { + consumer.appsrc.set_latency(latency, gst::ClockTime::NONE); + } + } + + if consumer.discard.load(atomic::Ordering::SeqCst) { + consumer + .needs_keyframe + .store(false, atomic::Ordering::SeqCst); + return None; + } + + if is_discont && !is_keyframe { + // Whenever we have a discontinuity, we need a new keyframe + consumer + .needs_keyframe + .store(true, atomic::Ordering::SeqCst); + } + + if !is_keyframe && consumer.needs_keyframe.load(atomic::Ordering::SeqCst) { + // If we need a keyframe (and this one isn't) request a keyframe upstream + if !needs_keyframe_request { + gst::debug!(CAT, obj: appsink, "Requesting keyframe for first buffer"); + needs_keyframe_request = true; + } + + consumer.dropped.fetch_add(1, atomic::Ordering::SeqCst); + + gst::debug!( + CAT, + obj: appsink, + "Ignoring frame for {} while waiting for a keyframe", + consumer.appsrc.name() + ); + None + } else { + consumer + .needs_keyframe + .store(false, atomic::Ordering::SeqCst); + consumer.pushed.fetch_add(1, atomic::Ordering::SeqCst); + + Some(consumer.appsrc.clone()) + } + }) + .collect::>(); + + drop(consumers); + + if needs_keyframe_request { + appsink.send_event( + gst_video::UpstreamForceKeyUnitEvent::builder() + .all_headers(true) + .build(), + ); + } + + for consumer in current_consumers { + if let Err(err) = consumer.push_sample(&sample) { + gst::warning!(CAT, obj: appsink, "Failed to push sample: {}", err); + } + } + Ok(gst::FlowSuccess::Ok) + } + /// Remove a consumer appsrc by id pub fn remove_consumer(&self, consumer: &gst_app::AppSrc) { let name = consumer.name(); @@ -290,92 +397,7 @@ impl<'a> From<&'a gst_app::AppSink> for StreamProducer { } }; - let (is_discont, is_keyframe) = if let Some(buf) = sample.buffer() { - let flags = buf.flags(); - - (flags.contains(gst::BufferFlags::DISCONT), - !flags.contains(gst::BufferFlags::DELTA_UNIT)) - } else { - (false, true) - }; - - gst::trace!(CAT, obj: appsink, "processing sample"); - - let latency = consumers.current_latency; - let latency_updated = mem::replace(&mut consumers.latency_updated, false); - - let mut needs_keyframe_request = false; - - let current_consumers = consumers - .consumers - .values() - .filter_map(|consumer| { - if let Some(latency) = latency { - if consumer.forwarded_latency - .compare_exchange( - false, - true, - atomic::Ordering::SeqCst, - atomic::Ordering::SeqCst, - ) - .is_ok() - || latency_updated - { - consumer.appsrc.set_latency(latency, gst::ClockTime::NONE); - } - } - - if consumer.discard.load(atomic::Ordering::SeqCst) { - consumer.needs_keyframe.store(false, atomic::Ordering::SeqCst); - return None; - } - - if is_discont && !is_keyframe { - // Whenever we have a discontinuity, we need a new keyframe - consumer.needs_keyframe.store( - true, - atomic::Ordering::SeqCst, - ); - } - - if !is_keyframe && consumer.needs_keyframe.load(atomic::Ordering::SeqCst) - { - // If we need a keyframe (and this one isn't) request a keyframe upstream - if !needs_keyframe_request { - gst::debug!(CAT, obj: appsink, "Requesting keyframe for first buffer"); - needs_keyframe_request = true; - } - - consumer.dropped.fetch_add(1, atomic::Ordering::SeqCst); - - gst::debug!(CAT, obj: appsink, "Ignoring frame for {} while waiting for a keyframe", - consumer.appsrc.name()); - None - } else { - consumer.needs_keyframe.store(false, atomic::Ordering::SeqCst); - consumer.pushed.fetch_add(1, atomic::Ordering::SeqCst); - - Some(consumer.appsrc.clone()) - } - }) - .collect::>(); - drop(consumers); - - if needs_keyframe_request { - appsink.send_event( - gst_video::UpstreamForceKeyUnitEvent::builder() - .all_headers(true) - .build(), - ); - } - - for consumer in current_consumers { - if let Err(err) = consumer.push_sample(&sample) { - gst::warning!(CAT, obj: appsink, "Failed to push sample: {}", err); - } - } - - Ok(gst::FlowSuccess::Ok) + StreamProducer::process_sample(sample, appsink, &mut consumers) })) .new_event(glib::clone!(@strong consumers => move |appsink| { match appsink.pull_object().map(|obj| obj.downcast::()) {