utils: streamproducer: Persist ConsumptionLink state when changing producers

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/merge_requests/1189>
This commit is contained in:
Sebastian Dröge 2023-01-16 14:56:33 +02:00
parent 094d74e391
commit c2e7abd128

View file

@ -68,9 +68,20 @@ impl ConsumptionLink {
pub fn change_producer( pub fn change_producer(
&mut self, &mut self,
new_producer: &StreamProducer, new_producer: &StreamProducer,
reset_stats: bool,
) -> Result<(), AddConsumerError> { ) -> Result<(), AddConsumerError> {
self.disconnect(); self.disconnect();
*self = new_producer.add_consumer(&self.consumer)?; if reset_stats {
self.dropped.store(0, atomic::Ordering::SeqCst);
self.pushed.store(0, atomic::Ordering::SeqCst);
}
new_producer.add_consumer_internal(
&self.consumer,
self.dropped.clone(),
self.pushed.clone(),
self.discard.clone(),
)?;
self.producer = Some(new_producer.clone());
Ok(()) Ok(())
} }
@ -146,6 +157,28 @@ impl StreamProducer {
&self, &self,
consumer: &gst_app::AppSrc, consumer: &gst_app::AppSrc,
) -> Result<ConsumptionLink, AddConsumerError> { ) -> Result<ConsumptionLink, AddConsumerError> {
let dropped = Arc::new(atomic::AtomicU64::new(0));
let pushed = Arc::new(atomic::AtomicU64::new(0));
let discard = Arc::new(atomic::AtomicBool::new(false));
self.add_consumer_internal(consumer, dropped.clone(), pushed.clone(), discard.clone())?;
Ok(ConsumptionLink {
consumer: consumer.clone(),
producer: Some(self.clone()),
dropped,
pushed,
discard,
})
}
fn add_consumer_internal(
&self,
consumer: &gst_app::AppSrc,
dropped: Arc<atomic::AtomicU64>,
pushed: Arc<atomic::AtomicU64>,
discard: Arc<atomic::AtomicBool>,
) -> Result<(), AddConsumerError> {
let mut consumers = self.consumers.lock().unwrap(); let mut consumers = self.consumers.lock().unwrap();
if consumers.consumers.contains_key(consumer) { if consumers.consumers.contains_key(consumer) {
gst::error!(CAT, obj: &self.appsink, "Consumer {} ({:?}) already added", consumer.name(), consumer); gst::error!(CAT, obj: &self.appsink, "Consumer {} ({:?}) already added", consumer.name(), consumer);
@ -175,22 +208,13 @@ impl StreamProducer {
) )
.unwrap(); .unwrap();
let stream_consumer = StreamConsumer::new(consumer, fku_probe_id); let stream_consumer = StreamConsumer::new(consumer, fku_probe_id, dropped, pushed, discard);
let dropped = stream_consumer.dropped.clone();
let pushed = stream_consumer.pushed.clone();
let discard = stream_consumer.discard.clone();
consumers consumers
.consumers .consumers
.insert(consumer.clone(), stream_consumer); .insert(consumer.clone(), stream_consumer);
Ok(ConsumptionLink { Ok(())
consumer: consumer.clone(),
producer: Some(self.clone()),
dropped,
pushed,
discard,
})
} }
/// Remove a consumer appsrc by id /// Remove a consumer appsrc by id
@ -435,10 +459,15 @@ struct StreamConsumer {
impl StreamConsumer { impl StreamConsumer {
/// Create a new consumer /// Create a new consumer
fn new(appsrc: &gst_app::AppSrc, fku_probe_id: gst::PadProbeId) -> Self { fn new(
appsrc: &gst_app::AppSrc,
fku_probe_id: gst::PadProbeId,
dropped: Arc<atomic::AtomicU64>,
pushed: Arc<atomic::AtomicU64>,
discard: Arc<atomic::AtomicBool>,
) -> Self {
let needs_keyframe = Arc::new(atomic::AtomicBool::new(true)); let needs_keyframe = Arc::new(atomic::AtomicBool::new(true));
let needs_keyframe_clone = needs_keyframe.clone(); let needs_keyframe_clone = needs_keyframe.clone();
let dropped = Arc::new(atomic::AtomicU64::new(0));
let dropped_clone = dropped.clone(); let dropped_clone = dropped.clone();
appsrc.set_callbacks( appsrc.set_callbacks(
@ -464,8 +493,8 @@ impl StreamConsumer {
forwarded_latency: atomic::AtomicBool::new(false), forwarded_latency: atomic::AtomicBool::new(false),
needs_keyframe, needs_keyframe,
dropped, dropped,
pushed: Arc::new(atomic::AtomicU64::new(0)), pushed,
discard: Arc::new(atomic::AtomicBool::new(false)), discard,
} }
} }
} }