streamproducer: add wait_for_keyframe configuration option

By default, StreamProducer will wait for keyframes after a DISCONT.

This is not always desirable, and this commit exposes a method for
controlling that behavior.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/merge_requests/1697>
This commit is contained in:
Mathieu Duponchelle 2025-04-03 13:10:31 +02:00
parent 86d998336a
commit 2865bdfc92

View file

@ -101,6 +101,8 @@ pub struct ConsumptionLink {
pushed: Arc<WrappedAtomicU64>, pushed: Arc<WrappedAtomicU64>,
/// if buffers should not be pushed to the `consumer` right now /// if buffers should not be pushed to the `consumer` right now
discard: Arc<atomic::AtomicBool>, discard: Arc<atomic::AtomicBool>,
/// whether the link will drop delta frames until next keyframe on discont
wait_for_keyframe: Arc<atomic::AtomicBool>,
} }
impl ConsumptionLink { impl ConsumptionLink {
@ -112,6 +114,7 @@ impl ConsumptionLink {
dropped: Arc::new(WrappedAtomicU64::new(0)), dropped: Arc::new(WrappedAtomicU64::new(0)),
pushed: Arc::new(WrappedAtomicU64::new(0)), pushed: Arc::new(WrappedAtomicU64::new(0)),
discard: Arc::new(atomic::AtomicBool::new(false)), discard: Arc::new(atomic::AtomicBool::new(false)),
wait_for_keyframe: Arc::new(atomic::AtomicBool::new(true)),
} }
} }
@ -131,6 +134,7 @@ impl ConsumptionLink {
self.dropped.clone(), self.dropped.clone(),
self.pushed.clone(), self.pushed.clone(),
self.discard.clone(), self.discard.clone(),
self.wait_for_keyframe.clone(),
)?; )?;
self.producer = Some(new_producer.clone()); self.producer = Some(new_producer.clone());
Ok(()) Ok(())
@ -163,6 +167,17 @@ impl ConsumptionLink {
self.discard.store(discard, atomic::Ordering::SeqCst) self.discard.store(discard, atomic::Ordering::SeqCst)
} }
/// if the link will drop frames until the next keyframe on discont
pub fn wait_for_keyframe(&self) -> bool {
self.wait_for_keyframe.load(atomic::Ordering::SeqCst)
}
/// If set to `true` then the link will drop delta-frames until the next
/// keyframe on discont (default behavior).
pub fn set_wait_for_keyframe(&self, wait: bool) {
self.wait_for_keyframe.store(wait, atomic::Ordering::SeqCst)
}
/// Get the GStreamer `appsrc` wrapped by this link /// Get the GStreamer `appsrc` wrapped by this link
pub fn appsrc(&self) -> &gst_app::AppSrc { pub fn appsrc(&self) -> &gst_app::AppSrc {
&self.consumer &self.consumer
@ -211,8 +226,15 @@ impl StreamProducer {
let dropped = Arc::new(WrappedAtomicU64::new(0)); let dropped = Arc::new(WrappedAtomicU64::new(0));
let pushed = Arc::new(WrappedAtomicU64::new(0)); let pushed = Arc::new(WrappedAtomicU64::new(0));
let discard = Arc::new(atomic::AtomicBool::new(false)); let discard = Arc::new(atomic::AtomicBool::new(false));
let wait_for_keyframe = Arc::new(atomic::AtomicBool::new(true));
self.add_consumer_internal(consumer, dropped.clone(), pushed.clone(), discard.clone())?; self.add_consumer_internal(
consumer,
dropped.clone(),
pushed.clone(),
discard.clone(),
wait_for_keyframe.clone(),
)?;
Ok(ConsumptionLink { Ok(ConsumptionLink {
consumer: consumer.clone(), consumer: consumer.clone(),
@ -220,6 +242,7 @@ impl StreamProducer {
dropped, dropped,
pushed, pushed,
discard, discard,
wait_for_keyframe,
}) })
} }
@ -229,6 +252,7 @@ impl StreamProducer {
dropped: Arc<WrappedAtomicU64>, dropped: Arc<WrappedAtomicU64>,
pushed: Arc<WrappedAtomicU64>, pushed: Arc<WrappedAtomicU64>,
discard: Arc<atomic::AtomicBool>, discard: Arc<atomic::AtomicBool>,
wait_for_keyframe: Arc<atomic::AtomicBool>,
) -> Result<(), AddConsumerError> { ) -> Result<(), AddConsumerError> {
let mut consumers = self.consumers.lock().unwrap(); let mut consumers = self.consumers.lock().unwrap();
if consumers.consumers.contains_key(consumer) { if consumers.consumers.contains_key(consumer) {
@ -280,7 +304,14 @@ impl StreamProducer {
) )
.unwrap(); .unwrap();
let stream_consumer = StreamConsumer::new(consumer, fku_probe_id, dropped, pushed, discard); let stream_consumer = StreamConsumer::new(
consumer,
fku_probe_id,
dropped,
pushed,
discard,
wait_for_keyframe,
);
consumers consumers
.consumers .consumers
@ -361,7 +392,10 @@ impl StreamProducer {
return None; return None;
} }
if is_discont && !is_keyframe { if is_discont
&& !is_keyframe
&& consumer.wait_for_keyframe.load(atomic::Ordering::SeqCst)
{
// Whenever we have a discontinuity, we need a new keyframe // Whenever we have a discontinuity, we need a new keyframe
consumer consumer
.needs_keyframe .needs_keyframe
@ -377,7 +411,7 @@ impl StreamProducer {
consumer.dropped.fetch_add(1, atomic::Ordering::SeqCst); consumer.dropped.fetch_add(1, atomic::Ordering::SeqCst);
gst::debug!( gst::error!(
CAT, CAT,
obj = appsink, obj = appsink,
"Ignoring frame for {} while waiting for a keyframe", "Ignoring frame for {} while waiting for a keyframe",
@ -685,6 +719,8 @@ struct StreamConsumer {
pushed: Arc<WrappedAtomicU64>, pushed: Arc<WrappedAtomicU64>,
/// if buffers should not be pushed to the `appsrc` right now /// if buffers should not be pushed to the `appsrc` right now
discard: Arc<atomic::AtomicBool>, discard: Arc<atomic::AtomicBool>,
/// whether the consumer should drop delta frames until next keyframe on discont
wait_for_keyframe: Arc<atomic::AtomicBool>,
} }
impl StreamConsumer { impl StreamConsumer {
@ -695,9 +731,13 @@ impl StreamConsumer {
dropped: Arc<WrappedAtomicU64>, dropped: Arc<WrappedAtomicU64>,
pushed: Arc<WrappedAtomicU64>, pushed: Arc<WrappedAtomicU64>,
discard: Arc<atomic::AtomicBool>, discard: Arc<atomic::AtomicBool>,
wait_for_keyframe: Arc<atomic::AtomicBool>,
) -> Self { ) -> Self {
let needs_keyframe = Arc::new(atomic::AtomicBool::new(true)); let needs_keyframe = Arc::new(atomic::AtomicBool::new(
wait_for_keyframe.load(atomic::Ordering::SeqCst),
));
let needs_keyframe_clone = needs_keyframe.clone(); let needs_keyframe_clone = needs_keyframe.clone();
let wait_for_keyframe_clone = wait_for_keyframe.clone();
let dropped_clone = dropped.clone(); let dropped_clone = dropped.clone();
appsrc.set_callbacks( appsrc.set_callbacks(
@ -711,7 +751,7 @@ impl StreamConsumer {
appsrc, appsrc,
); );
needs_keyframe_clone.store(true, atomic::Ordering::SeqCst); needs_keyframe_clone.store(wait_for_keyframe_clone.load(atomic::Ordering::SeqCst), atomic::Ordering::SeqCst);
dropped_clone.fetch_add(1, atomic::Ordering::SeqCst); dropped_clone.fetch_add(1, atomic::Ordering::SeqCst);
}) })
.build(), .build(),
@ -725,6 +765,7 @@ impl StreamConsumer {
dropped, dropped,
pushed, pushed,
discard, discard,
wait_for_keyframe,
} }
} }
} }