From 0e17878bede8806e1e0512820999609a954787e7 Mon Sep 17 00:00:00 2001 From: Guillaume Desmottes Date: Tue, 5 Jul 2022 12:35:01 +0200 Subject: [PATCH] utils: streamproducer: add API to retrieve pushed and dropped buffers This may be used by applications to compute statistics about the overall pipeline health, like the ratio of buffers dropped. --- gstreamer-utils/src/streamproducer.rs | 48 +++++++++++++++++++++++---- 1 file changed, 42 insertions(+), 6 deletions(-) diff --git a/gstreamer-utils/src/streamproducer.rs b/gstreamer-utils/src/streamproducer.rs index fca5118c9..9bceb89db 100644 --- a/gstreamer-utils/src/streamproducer.rs +++ b/gstreamer-utils/src/streamproducer.rs @@ -42,6 +42,10 @@ impl Eq for StreamProducer {} pub struct ConsumptionLink { consumer: gst_app::AppSrc, producer: Option, + /// number of buffers dropped because `consumer` internal queue was full + dropped: Arc, + /// number of buffers pushed through `consumer` + pushed: Arc, } impl ConsumptionLink { @@ -61,6 +65,16 @@ impl ConsumptionLink { producer.remove_consumer(&self.consumer); } } + + /// number of dropped buffers because the consumer internal queue was full + pub fn dropped(&self) -> u64 { + self.dropped.load(atomic::Ordering::SeqCst) + } + + /// number of buffers pushed through this link + pub fn pushed(&self) -> u64 { + self.pushed.load(atomic::Ordering::SeqCst) + } } impl Drop for ConsumptionLink { @@ -131,14 +145,19 @@ impl StreamProducer { ) .unwrap(); - consumers.consumers.insert( - consumer.clone(), - StreamConsumer::new(consumer, fku_probe_id), - ); + let stream_consumer = StreamConsumer::new(consumer, fku_probe_id); + let dropped = stream_consumer.dropped.clone(); + let pushed = stream_consumer.pushed.clone(); + + consumers + .consumers + .insert(consumer.clone(), stream_consumer); Ok(ConsumptionLink { consumer: consumer.clone(), producer: Some(self.clone()), + dropped, + pushed, }) } @@ -271,11 +290,14 @@ impl<'a> From<&'a gst_app::AppSink> for StreamProducer { needs_keyframe_request = true; } + consumer.dropped.fetch_add(1, atomic::Ordering::SeqCst); + gst::debug!(CAT, obj: appsink, "Ignoring frame for {} while waiting for a keyframe", consumer.appsrc.name()); None } else { consumer.needs_keyframe.store(false, atomic::Ordering::SeqCst); + consumer.pushed.fetch_add(1, atomic::Ordering::SeqCst); Some(consumer.appsrc.clone()) } @@ -375,6 +397,10 @@ struct StreamConsumer { /// whether a new key unit should be requested. Only useful for encoded /// streams. needs_keyframe: Arc, + /// number of buffers dropped because `appsrc` internal queue was full + dropped: Arc, + /// number of buffers pushed through `appsrc` + pushed: Arc, } impl StreamConsumer { @@ -382,6 +408,8 @@ impl StreamConsumer { fn new(appsrc: &gst_app::AppSrc, fku_probe_id: gst::PadProbeId) -> Self { let needs_keyframe = Arc::new(atomic::AtomicBool::new(true)); let needs_keyframe_clone = needs_keyframe.clone(); + let dropped = Arc::new(atomic::AtomicU64::new(0)); + let dropped_clone = dropped.clone(); appsrc.set_callbacks( gst_app::AppSrcCallbacks::builder() @@ -395,6 +423,7 @@ impl StreamConsumer { ); needs_keyframe_clone.store(true, atomic::Ordering::SeqCst); + dropped_clone.fetch_add(1, atomic::Ordering::SeqCst); }) .build(), ); @@ -404,6 +433,8 @@ impl StreamConsumer { fku_probe_id: Some(fku_probe_id), forwarded_latency: atomic::AtomicBool::new(false), needs_keyframe, + dropped, + pushed: Arc::new(atomic::AtomicU64::new(0)), } } } @@ -565,7 +596,7 @@ mod tests { let mut consumers: Vec = Vec::new(); let consumer = Consumer::new("consumer1"); - let _link = consumer.connect(&producer); + let link1 = consumer.connect(&producer); consumer .pipeline .set_state(gst::State::Playing) @@ -573,7 +604,7 @@ mod tests { consumers.push(consumer); let consumer = Consumer::new("consumer2"); - let _link = consumer.connect(&producer); + let link2 = consumer.connect(&producer); consumer .pipeline .set_state(gst::State::Playing) @@ -607,5 +638,10 @@ mod tests { consumers.get(0).unwrap().disconnect(&producer); } } + + assert_eq!(link1.pushed(), 6); + assert_eq!(link1.dropped(), 0); + assert_eq!(link2.pushed(), 10); + assert_eq!(link2.dropped(), 0); } }