diff --git a/gstreamer-utils/src/streamproducer.rs b/gstreamer-utils/src/streamproducer.rs index e0b86b636..abb8a9f93 100644 --- a/gstreamer-utils/src/streamproducer.rs +++ b/gstreamer-utils/src/streamproducer.rs @@ -68,9 +68,20 @@ impl ConsumptionLink { pub fn change_producer( &mut self, new_producer: &StreamProducer, + reset_stats: bool, ) -> Result<(), AddConsumerError> { self.disconnect(); - *self = new_producer.add_consumer(&self.consumer)?; + if reset_stats { + self.dropped.store(0, atomic::Ordering::SeqCst); + self.pushed.store(0, atomic::Ordering::SeqCst); + } + new_producer.add_consumer_internal( + &self.consumer, + self.dropped.clone(), + self.pushed.clone(), + self.discard.clone(), + )?; + self.producer = Some(new_producer.clone()); Ok(()) } @@ -146,6 +157,28 @@ impl StreamProducer { &self, consumer: &gst_app::AppSrc, ) -> Result { + let dropped = Arc::new(atomic::AtomicU64::new(0)); + let pushed = Arc::new(atomic::AtomicU64::new(0)); + let discard = Arc::new(atomic::AtomicBool::new(false)); + + self.add_consumer_internal(consumer, dropped.clone(), pushed.clone(), discard.clone())?; + + Ok(ConsumptionLink { + consumer: consumer.clone(), + producer: Some(self.clone()), + dropped, + pushed, + discard, + }) + } + + fn add_consumer_internal( + &self, + consumer: &gst_app::AppSrc, + dropped: Arc, + pushed: Arc, + discard: Arc, + ) -> Result<(), AddConsumerError> { let mut consumers = self.consumers.lock().unwrap(); if consumers.consumers.contains_key(consumer) { gst::error!(CAT, obj: &self.appsink, "Consumer {} ({:?}) already added", consumer.name(), consumer); @@ -175,22 +208,13 @@ impl StreamProducer { ) .unwrap(); - let stream_consumer = StreamConsumer::new(consumer, fku_probe_id); - let dropped = stream_consumer.dropped.clone(); - let pushed = stream_consumer.pushed.clone(); - let discard = stream_consumer.discard.clone(); + let stream_consumer = StreamConsumer::new(consumer, fku_probe_id, dropped, pushed, discard); consumers .consumers .insert(consumer.clone(), stream_consumer); - Ok(ConsumptionLink { - consumer: consumer.clone(), - producer: Some(self.clone()), - dropped, - pushed, - discard, - }) + Ok(()) } /// Remove a consumer appsrc by id @@ -435,10 +459,15 @@ struct StreamConsumer { impl StreamConsumer { /// Create a new consumer - fn new(appsrc: &gst_app::AppSrc, fku_probe_id: gst::PadProbeId) -> Self { + fn new( + appsrc: &gst_app::AppSrc, + fku_probe_id: gst::PadProbeId, + dropped: Arc, + pushed: Arc, + discard: Arc, + ) -> Self { let needs_keyframe = Arc::new(atomic::AtomicBool::new(true)); let needs_keyframe_clone = needs_keyframe.clone(); - let dropped = Arc::new(atomic::AtomicU64::new(0)); let dropped_clone = dropped.clone(); appsrc.set_callbacks( @@ -464,8 +493,8 @@ impl StreamConsumer { forwarded_latency: atomic::AtomicBool::new(false), needs_keyframe, dropped, - pushed: Arc::new(atomic::AtomicU64::new(0)), - discard: Arc::new(atomic::AtomicBool::new(false)), + pushed, + discard, } } }