utils: streamproducer: Add ConsumptionLink::set_discard() to stop forwarding buffers for a while

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/merge_requests/1189>
This commit is contained in:
Sebastian Dröge 2023-01-14 19:15:15 +02:00
parent a68c37e4eb
commit dd284a80ea

View file

@ -48,6 +48,8 @@ pub struct ConsumptionLink {
dropped: Arc<atomic::AtomicU64>, dropped: Arc<atomic::AtomicU64>,
/// number of buffers pushed through `consumer` /// number of buffers pushed through `consumer`
pushed: Arc<atomic::AtomicU64>, pushed: Arc<atomic::AtomicU64>,
/// if buffers should not be pushed to the `consumer` right now
discard: Arc<atomic::AtomicBool>,
} }
impl ConsumptionLink { impl ConsumptionLink {
@ -77,6 +79,16 @@ impl ConsumptionLink {
pub fn pushed(&self) -> u64 { pub fn pushed(&self) -> u64 {
self.pushed.load(atomic::Ordering::SeqCst) self.pushed.load(atomic::Ordering::SeqCst)
} }
/// if buffers are currently pushed through this link
pub fn discard(&self) -> bool {
self.discard.load(atomic::Ordering::SeqCst)
}
/// If set to `true` then no buffers will be pushed through this link
pub fn set_discard(&self, discard: bool) {
self.discard.store(discard, atomic::Ordering::SeqCst)
}
} }
impl Drop for ConsumptionLink { impl Drop for ConsumptionLink {
@ -150,6 +162,7 @@ impl StreamProducer {
let stream_consumer = StreamConsumer::new(consumer, fku_probe_id); let stream_consumer = StreamConsumer::new(consumer, fku_probe_id);
let dropped = stream_consumer.dropped.clone(); let dropped = stream_consumer.dropped.clone();
let pushed = stream_consumer.pushed.clone(); let pushed = stream_consumer.pushed.clone();
let discard = stream_consumer.discard.clone();
consumers consumers
.consumers .consumers
@ -160,6 +173,7 @@ impl StreamProducer {
producer: Some(self.clone()), producer: Some(self.clone()),
dropped, dropped,
pushed, pushed,
discard,
}) })
} }
@ -269,6 +283,11 @@ impl<'a> From<&'a gst_app::AppSink> for StreamProducer {
} }
} }
if consumer.discard.load(atomic::Ordering::SeqCst) {
consumer.needs_keyframe.store(false, atomic::Ordering::SeqCst);
return None;
}
if is_discont && !is_keyframe { if is_discont && !is_keyframe {
// Whenever we have a discontinuity, we need a new keyframe // Whenever we have a discontinuity, we need a new keyframe
consumer.needs_keyframe.store( consumer.needs_keyframe.store(
@ -394,6 +413,8 @@ struct StreamConsumer {
dropped: Arc<atomic::AtomicU64>, dropped: Arc<atomic::AtomicU64>,
/// number of buffers pushed through `appsrc` /// number of buffers pushed through `appsrc`
pushed: Arc<atomic::AtomicU64>, pushed: Arc<atomic::AtomicU64>,
/// if buffers should not be pushed to the `appsrc` right now
discard: Arc<atomic::AtomicBool>,
} }
impl StreamConsumer { impl StreamConsumer {
@ -428,6 +449,7 @@ impl StreamConsumer {
needs_keyframe, needs_keyframe,
dropped, dropped,
pushed: Arc::new(atomic::AtomicU64::new(0)), pushed: Arc::new(atomic::AtomicU64::new(0)),
discard: Arc::new(atomic::AtomicBool::new(false)),
} }
} }
} }