diff --git a/gstreamer-utils/src/streamproducer.rs b/gstreamer-utils/src/streamproducer.rs index ce8c4b84a..4301c102b 100644 --- a/gstreamer-utils/src/streamproducer.rs +++ b/gstreamer-utils/src/streamproducer.rs @@ -48,6 +48,8 @@ pub struct ConsumptionLink { dropped: Arc, /// number of buffers pushed through `consumer` pushed: Arc, + /// if buffers should not be pushed to the `consumer` right now + discard: Arc, } impl ConsumptionLink { @@ -77,6 +79,16 @@ impl ConsumptionLink { pub fn pushed(&self) -> u64 { self.pushed.load(atomic::Ordering::SeqCst) } + + /// if buffers are currently pushed through this link + pub fn discard(&self) -> bool { + self.discard.load(atomic::Ordering::SeqCst) + } + + /// If set to `true` then no buffers will be pushed through this link + pub fn set_discard(&self, discard: bool) { + self.discard.store(discard, atomic::Ordering::SeqCst) + } } impl Drop for ConsumptionLink { @@ -150,6 +162,7 @@ impl StreamProducer { let stream_consumer = StreamConsumer::new(consumer, fku_probe_id); let dropped = stream_consumer.dropped.clone(); let pushed = stream_consumer.pushed.clone(); + let discard = stream_consumer.discard.clone(); consumers .consumers @@ -160,6 +173,7 @@ impl StreamProducer { producer: Some(self.clone()), dropped, pushed, + discard, }) } @@ -269,6 +283,11 @@ impl<'a> From<&'a gst_app::AppSink> for StreamProducer { } } + if consumer.discard.load(atomic::Ordering::SeqCst) { + consumer.needs_keyframe.store(false, atomic::Ordering::SeqCst); + return None; + } + if is_discont && !is_keyframe { // Whenever we have a discontinuity, we need a new keyframe consumer.needs_keyframe.store( @@ -394,6 +413,8 @@ struct StreamConsumer { dropped: Arc, /// number of buffers pushed through `appsrc` pushed: Arc, + /// if buffers should not be pushed to the `appsrc` right now + discard: Arc, } impl StreamConsumer { @@ -428,6 +449,7 @@ impl StreamConsumer { needs_keyframe, dropped, pushed: Arc::new(atomic::AtomicU64::new(0)), + discard: Arc::new(atomic::AtomicBool::new(false)), } } }