utils/streamproducer: add ConsumptionLink

Having an explicit link object makes it much easier to build complex
producers/consumers dynamics graphs.
It provides helper to switch producers and prevent accidentally keeping
broken links around as they are disconnected on Drop.
This commit is contained in:
Guillaume Desmottes 2022-05-12 15:41:54 +02:00 committed by Sebastian Dröge
parent f68efd0cab
commit 5349822962
3 changed files with 62 additions and 9 deletions

View file

@ -20,6 +20,7 @@ gst = { package = "gstreamer", path = "../gstreamer", features = ["v1_20"] }
gst_app = { package = "gstreamer-app", path = "../gstreamer-app", features = ["v1_20"] } gst_app = { package = "gstreamer-app", path = "../gstreamer-app", features = ["v1_20"] }
gst_video = { package = "gstreamer-video", path = "../gstreamer-video", features = ["v1_20"] } gst_video = { package = "gstreamer-video", path = "../gstreamer-video", features = ["v1_20"] }
once_cell = "1" once_cell = "1"
thiserror = "1"
[dev-dependencies] [dev-dependencies]
futures = { version = "0.3", features = ["executor"] } futures = { version = "0.3", features = ["executor"] }

View file

@ -1,3 +1,3 @@
mod streamproducer; mod streamproducer;
pub use crate::streamproducer::StreamProducer; pub use crate::streamproducer::{ConsumptionLink, StreamProducer};

View file

@ -3,6 +3,7 @@ use std::mem;
use std::sync::{atomic, Arc, Mutex}; use std::sync::{atomic, Arc, Mutex};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use thiserror::Error;
use gst::{glib, prelude::*}; use gst::{glib, prelude::*};
@ -35,6 +36,47 @@ impl PartialEq for StreamProducer {
impl Eq for StreamProducer {} impl Eq for StreamProducer {}
/// Link between a `StreamProducer` and a consumer, disconnecting the link on `Drop`.
/// The producer and consumer will stay alive while the link is.
#[derive(Debug)]
pub struct ConsumptionLink {
consumer: gst_app::AppSrc,
producer: Option<StreamProducer>,
}
impl ConsumptionLink {
/// Replace the producer by a new one, keeping the existing consumer.
pub fn change_producer(
&mut self,
new_producer: &StreamProducer,
) -> Result<(), AddConsumerError> {
self.disconnect();
*self = new_producer.add_consumer(&self.consumer)?;
Ok(())
}
/// Disconnect the consumer from the producer
pub fn disconnect(&mut self) {
if let Some(producer) = self.producer.take() {
producer.remove_consumer(&self.consumer);
}
}
}
impl Drop for ConsumptionLink {
fn drop(&mut self) {
self.disconnect();
}
}
#[derive(Debug, Error)]
/// Error type returned when adding consumers to producers.
pub enum AddConsumerError {
#[error("Consumer already added")]
/// Consumer has already been added to this producer.
AlreadyAdded,
}
impl StreamProducer { impl StreamProducer {
/// Configure a consumer `appsrc` for later use in a `StreamProducer` /// Configure a consumer `appsrc` for later use in a `StreamProducer`
/// ///
@ -53,12 +95,17 @@ impl StreamProducer {
consumer.set_automatic_eos(false); consumer.set_automatic_eos(false);
} }
/// Add an appsrc to dispatch data to /// Add an appsrc to dispatch data to.
pub fn add_consumer(&self, consumer: &gst_app::AppSrc) { ///
/// Dropping the returned `ConsumptionLink` will automatically disconnect the consumer from the producer.
pub fn add_consumer(
&self,
consumer: &gst_app::AppSrc,
) -> Result<ConsumptionLink, AddConsumerError> {
let mut consumers = self.consumers.lock().unwrap(); let mut consumers = self.consumers.lock().unwrap();
if consumers.consumers.contains_key(consumer) { if consumers.consumers.contains_key(consumer) {
gst::error!(CAT, obj: &self.appsink, "Consumer {} ({:?}) already added", consumer.name(), consumer); gst::error!(CAT, obj: &self.appsink, "Consumer {} ({:?}) already added", consumer.name(), consumer);
return; return Err(AddConsumerError::AlreadyAdded);
} }
gst::debug!(CAT, obj: &self.appsink, "Adding consumer {} ({:?})", consumer.name(), consumer); gst::debug!(CAT, obj: &self.appsink, "Adding consumer {} ({:?})", consumer.name(), consumer);
@ -88,6 +135,11 @@ impl StreamProducer {
consumer.clone(), consumer.clone(),
StreamConsumer::new(consumer, fku_probe_id), StreamConsumer::new(consumer, fku_probe_id),
); );
Ok(ConsumptionLink {
consumer: consumer.clone(),
producer: Some(self.clone()),
})
} }
/// Remove a consumer appsrc by id /// Remove a consumer appsrc by id
@ -396,7 +448,7 @@ mod tests {
use futures::{channel::mpsc::Receiver, SinkExt, StreamExt}; use futures::{channel::mpsc::Receiver, SinkExt, StreamExt};
use gst::prelude::*; use gst::prelude::*;
use crate::StreamProducer; use crate::{ConsumptionLink, StreamProducer};
fn create_producer() -> ( fn create_producer() -> (
gst::Pipeline, gst::Pipeline,
@ -482,13 +534,13 @@ mod tests {
} }
} }
fn connect(&self, producer: &StreamProducer) { fn connect(&self, producer: &StreamProducer) -> ConsumptionLink {
{ {
let mut connected = self.connected.lock().unwrap(); let mut connected = self.connected.lock().unwrap();
*connected = true; *connected = true;
} }
producer.add_consumer(&self.src); producer.add_consumer(&self.src).unwrap()
} }
fn disconnect(&self, producer: &StreamProducer) { fn disconnect(&self, producer: &StreamProducer) {
@ -513,7 +565,7 @@ mod tests {
let mut consumers: Vec<Consumer> = Vec::new(); let mut consumers: Vec<Consumer> = Vec::new();
let consumer = Consumer::new("consumer1"); let consumer = Consumer::new("consumer1");
consumer.connect(&producer); let _link = consumer.connect(&producer);
consumer consumer
.pipeline .pipeline
.set_state(gst::State::Playing) .set_state(gst::State::Playing)
@ -521,7 +573,7 @@ mod tests {
consumers.push(consumer); consumers.push(consumer);
let consumer = Consumer::new("consumer2"); let consumer = Consumer::new("consumer2");
consumer.connect(&producer); let _link = consumer.connect(&producer);
consumer consumer
.pipeline .pipeline
.set_state(gst::State::Playing) .set_state(gst::State::Playing)