From 5349822962bbc19c049e807aade0c3b3ba554537 Mon Sep 17 00:00:00 2001 From: Guillaume Desmottes Date: Thu, 12 May 2022 15:41:54 +0200 Subject: [PATCH] utils/streamproducer: add ConsumptionLink Having an explicit link object makes it much easier to build complex producers/consumers dynamics graphs. It provides helper to switch producers and prevent accidentally keeping broken links around as they are disconnected on Drop. --- gstreamer-utils/Cargo.toml | 1 + gstreamer-utils/src/lib.rs | 2 +- gstreamer-utils/src/streamproducer.rs | 68 +++++++++++++++++++++++---- 3 files changed, 62 insertions(+), 9 deletions(-) diff --git a/gstreamer-utils/Cargo.toml b/gstreamer-utils/Cargo.toml index 58ec22c9d..f1154ca0b 100644 --- a/gstreamer-utils/Cargo.toml +++ b/gstreamer-utils/Cargo.toml @@ -20,6 +20,7 @@ gst = { package = "gstreamer", path = "../gstreamer", features = ["v1_20"] } gst_app = { package = "gstreamer-app", path = "../gstreamer-app", features = ["v1_20"] } gst_video = { package = "gstreamer-video", path = "../gstreamer-video", features = ["v1_20"] } once_cell = "1" +thiserror = "1" [dev-dependencies] futures = { version = "0.3", features = ["executor"] } diff --git a/gstreamer-utils/src/lib.rs b/gstreamer-utils/src/lib.rs index fe94a8ebd..6f774f68a 100644 --- a/gstreamer-utils/src/lib.rs +++ b/gstreamer-utils/src/lib.rs @@ -1,3 +1,3 @@ mod streamproducer; -pub use crate::streamproducer::StreamProducer; +pub use crate::streamproducer::{ConsumptionLink, StreamProducer}; diff --git a/gstreamer-utils/src/streamproducer.rs b/gstreamer-utils/src/streamproducer.rs index 05a17471d..fca5118c9 100644 --- a/gstreamer-utils/src/streamproducer.rs +++ b/gstreamer-utils/src/streamproducer.rs @@ -3,6 +3,7 @@ use std::mem; use std::sync::{atomic, Arc, Mutex}; use once_cell::sync::Lazy; +use thiserror::Error; use gst::{glib, prelude::*}; @@ -35,6 +36,47 @@ impl PartialEq for StreamProducer { impl Eq for StreamProducer {} +/// Link between a `StreamProducer` and a consumer, disconnecting the link on `Drop`. +/// The producer and consumer will stay alive while the link is. +#[derive(Debug)] +pub struct ConsumptionLink { + consumer: gst_app::AppSrc, + producer: Option, +} + +impl ConsumptionLink { + /// Replace the producer by a new one, keeping the existing consumer. + pub fn change_producer( + &mut self, + new_producer: &StreamProducer, + ) -> Result<(), AddConsumerError> { + self.disconnect(); + *self = new_producer.add_consumer(&self.consumer)?; + Ok(()) + } + + /// Disconnect the consumer from the producer + pub fn disconnect(&mut self) { + if let Some(producer) = self.producer.take() { + producer.remove_consumer(&self.consumer); + } + } +} + +impl Drop for ConsumptionLink { + fn drop(&mut self) { + self.disconnect(); + } +} + +#[derive(Debug, Error)] +/// Error type returned when adding consumers to producers. +pub enum AddConsumerError { + #[error("Consumer already added")] + /// Consumer has already been added to this producer. + AlreadyAdded, +} + impl StreamProducer { /// Configure a consumer `appsrc` for later use in a `StreamProducer` /// @@ -53,12 +95,17 @@ impl StreamProducer { consumer.set_automatic_eos(false); } - /// Add an appsrc to dispatch data to - pub fn add_consumer(&self, consumer: &gst_app::AppSrc) { + /// Add an appsrc to dispatch data to. + /// + /// Dropping the returned `ConsumptionLink` will automatically disconnect the consumer from the producer. + pub fn add_consumer( + &self, + consumer: &gst_app::AppSrc, + ) -> Result { let mut consumers = self.consumers.lock().unwrap(); if consumers.consumers.contains_key(consumer) { gst::error!(CAT, obj: &self.appsink, "Consumer {} ({:?}) already added", consumer.name(), consumer); - return; + return Err(AddConsumerError::AlreadyAdded); } gst::debug!(CAT, obj: &self.appsink, "Adding consumer {} ({:?})", consumer.name(), consumer); @@ -88,6 +135,11 @@ impl StreamProducer { consumer.clone(), StreamConsumer::new(consumer, fku_probe_id), ); + + Ok(ConsumptionLink { + consumer: consumer.clone(), + producer: Some(self.clone()), + }) } /// Remove a consumer appsrc by id @@ -396,7 +448,7 @@ mod tests { use futures::{channel::mpsc::Receiver, SinkExt, StreamExt}; use gst::prelude::*; - use crate::StreamProducer; + use crate::{ConsumptionLink, StreamProducer}; fn create_producer() -> ( gst::Pipeline, @@ -482,13 +534,13 @@ mod tests { } } - fn connect(&self, producer: &StreamProducer) { + fn connect(&self, producer: &StreamProducer) -> ConsumptionLink { { let mut connected = self.connected.lock().unwrap(); *connected = true; } - producer.add_consumer(&self.src); + producer.add_consumer(&self.src).unwrap() } fn disconnect(&self, producer: &StreamProducer) { @@ -513,7 +565,7 @@ mod tests { let mut consumers: Vec = Vec::new(); let consumer = Consumer::new("consumer1"); - consumer.connect(&producer); + let _link = consumer.connect(&producer); consumer .pipeline .set_state(gst::State::Playing) @@ -521,7 +573,7 @@ mod tests { consumers.push(consumer); let consumer = Consumer::new("consumer2"); - consumer.connect(&producer); + let _link = consumer.connect(&producer); consumer .pipeline .set_state(gst::State::Playing)