streamproducer: Drop pad probe and callbacks from appsink when releasing the StreamProducer

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/merge_requests/1704>
This commit is contained in:
Sebastian Dröge 2025-04-07 11:53:30 +03:00
parent 521f0fe168
commit 96ce95db9d

View file

@ -73,21 +73,38 @@ static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
/// A producer is essentially a GStreamer `appsink` whose output /// A producer is essentially a GStreamer `appsink` whose output
/// is sent to a set of consumers, who are essentially `appsrc` wrappers /// is sent to a set of consumers, who are essentially `appsrc` wrappers
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct StreamProducer { pub struct StreamProducer(Arc<StreamProducerInner>);
/// The appsink to dispatch data for
appsink: gst_app::AppSink,
/// The consumers to dispatch data to
consumers: Arc<Mutex<StreamConsumers>>,
}
impl PartialEq for StreamProducer { impl PartialEq for StreamProducer {
fn eq(&self, other: &Self) -> bool { fn eq(&self, other: &Self) -> bool {
self.appsink.eq(&other.appsink) self.0.appsink.eq(&other.0.appsink)
} }
} }
impl Eq for StreamProducer {} impl Eq for StreamProducer {}
#[derive(Debug)]
struct StreamProducerInner {
/// The appsink to dispatch data for
appsink: gst_app::AppSink,
/// The pad probe on the appsink=
appsink_probe_id: Option<gst::PadProbeId>,
/// The consumers to dispatch data to
consumers: Arc<Mutex<StreamConsumers>>,
}
impl Drop for StreamProducerInner {
fn drop(&mut self) {
if let Some(probe_id) = self.appsink_probe_id.take() {
let pad = self.appsink.static_pad("sink").unwrap();
pad.remove_probe(probe_id);
}
self.appsink
.set_callbacks(gst_app::AppSinkCallbacks::builder().build());
}
}
/// Link between a `StreamProducer` and a consumer, disconnecting the link on `Drop`. /// Link between a `StreamProducer` and a consumer, disconnecting the link on `Drop`.
/// The producer and consumer will stay alive while the link is. /// The producer and consumer will stay alive while the link is.
#[derive(Debug)] #[derive(Debug)]
@ -254,11 +271,11 @@ impl StreamProducer {
discard: Arc<atomic::AtomicBool>, discard: Arc<atomic::AtomicBool>,
wait_for_keyframe: Arc<atomic::AtomicBool>, wait_for_keyframe: Arc<atomic::AtomicBool>,
) -> Result<(), AddConsumerError> { ) -> Result<(), AddConsumerError> {
let mut consumers = self.consumers.lock().unwrap(); let mut consumers = self.0.consumers.lock().unwrap();
if consumers.consumers.contains_key(consumer) { if consumers.consumers.contains_key(consumer) {
gst::error!( gst::error!(
CAT, CAT,
obj = &self.appsink, obj = &self.0.appsink,
"Consumer {} ({:?}) already added", "Consumer {} ({:?}) already added",
consumer.name(), consumer.name(),
consumer consumer
@ -268,7 +285,7 @@ impl StreamProducer {
gst::debug!( gst::debug!(
CAT, CAT,
obj = &self.appsink, obj = &self.0.appsink,
"Adding consumer {} ({:?})", "Adding consumer {} ({:?})",
consumer.name(), consumer.name(),
consumer consumer
@ -278,13 +295,12 @@ impl StreamProducer {
// Forward force-keyunit events upstream to the appsink // Forward force-keyunit events upstream to the appsink
let srcpad = consumer.static_pad("src").unwrap(); let srcpad = consumer.static_pad("src").unwrap();
let appsink = &self.appsink;
let fku_probe_id = srcpad let fku_probe_id = srcpad
.add_probe( .add_probe(
gst::PadProbeType::EVENT_UPSTREAM, gst::PadProbeType::EVENT_UPSTREAM,
glib::clone!( glib::clone!(
#[weak] #[weak(rename_to = appsink)]
appsink, self.0.appsink,
#[upgrade_or_panic] #[upgrade_or_panic]
move |_pad, info| { move |_pad, info| {
let Some(event) = info.event() else { let Some(event) = info.event() else {
@ -323,10 +339,15 @@ impl StreamProducer {
// drop the lock before sending events // drop the lock before sending events
drop(consumers); drop(consumers);
let appsink_pad = self.appsink.static_pad("sink").unwrap(); let appsink_pad = self.0.appsink.static_pad("sink").unwrap();
appsink_pad.sticky_events_foreach(|event| { appsink_pad.sticky_events_foreach(|event| {
if events_to_forward.contains(&event.type_()) { if events_to_forward.contains(&event.type_()) {
gst::debug!(CAT, obj = &self.appsink, "forward sticky event {:?}", event); gst::debug!(
CAT,
obj = &self.0.appsink,
"forward sticky event {:?}",
event
);
consumer.send_event(event.clone()); consumer.send_event(event.clone());
} }
@ -453,6 +474,7 @@ impl StreamProducer {
pub fn remove_consumer(&self, consumer: &gst_app::AppSrc) { pub fn remove_consumer(&self, consumer: &gst_app::AppSrc) {
let name = consumer.name(); let name = consumer.name();
if self if self
.0
.consumers .consumers
.lock() .lock()
.unwrap() .unwrap()
@ -462,7 +484,7 @@ impl StreamProducer {
{ {
gst::debug!( gst::debug!(
CAT, CAT,
obj = &self.appsink, obj = &self.0.appsink,
"Removed consumer {} ({:?})", "Removed consumer {} ({:?})",
name, name,
consumer consumer
@ -471,7 +493,7 @@ impl StreamProducer {
} else { } else {
gst::debug!( gst::debug!(
CAT, CAT,
obj = &self.appsink, obj = &self.0.appsink,
"Consumer {} ({:?}) not found", "Consumer {} ({:?}) not found",
name, name,
consumer consumer
@ -481,27 +503,28 @@ impl StreamProducer {
/// configure event types the appsink should forward to all its consumers (default: `Eos`). /// configure event types the appsink should forward to all its consumers (default: `Eos`).
pub fn set_forward_events(&self, events_to_forward: impl IntoIterator<Item = gst::EventType>) { pub fn set_forward_events(&self, events_to_forward: impl IntoIterator<Item = gst::EventType>) {
self.consumers.lock().unwrap().events_to_forward = events_to_forward.into_iter().collect(); self.0.consumers.lock().unwrap().events_to_forward =
events_to_forward.into_iter().collect();
} }
/// get event types the appsink should forward to all its consumers /// get event types the appsink should forward to all its consumers
pub fn get_forwarded_events(&self) -> Vec<gst::EventType> { pub fn get_forwarded_events(&self) -> Vec<gst::EventType> {
self.consumers.lock().unwrap().events_to_forward.clone() self.0.consumers.lock().unwrap().events_to_forward.clone()
} }
/// configure whether the preroll sample should be forwarded (default: `true`) /// configure whether the preroll sample should be forwarded (default: `true`)
pub fn set_forward_preroll(&self, forward_preroll: bool) { pub fn set_forward_preroll(&self, forward_preroll: bool) {
self.consumers.lock().unwrap().forward_preroll = forward_preroll; self.0.consumers.lock().unwrap().forward_preroll = forward_preroll;
} }
/// Get the GStreamer `appsink` wrapped by this producer /// Get the GStreamer `appsink` wrapped by this producer
pub fn appsink(&self) -> &gst_app::AppSink { pub fn appsink(&self) -> &gst_app::AppSink {
&self.appsink &self.0.appsink
} }
/// Signals an error on all consumers /// Signals an error on all consumers
pub fn error(&self, error: &gst::glib::Error, debug: Option<&str>) { pub fn error(&self, error: &gst::glib::Error, debug: Option<&str>) {
let consumers = self.consumers.lock().unwrap(); let consumers = self.0.consumers.lock().unwrap();
for consumer in consumers.consumers.keys() { for consumer in consumers.consumers.keys() {
let mut msg_builder = let mut msg_builder =
@ -516,7 +539,7 @@ impl StreamProducer {
/// The last sample produced by this producer. /// The last sample produced by this producer.
pub fn last_sample(&self) -> Option<gst::Sample> { pub fn last_sample(&self) -> Option<gst::Sample> {
self.appsink.property("last-sample") self.0.appsink.property("last-sample")
} }
} }
@ -649,34 +672,37 @@ impl<'a> From<&'a gst_app::AppSink> for StreamProducer {
); );
let sinkpad = appsink.static_pad("sink").unwrap(); let sinkpad = appsink.static_pad("sink").unwrap();
sinkpad.add_probe( let appsink_probe_id = sinkpad
gst::PadProbeType::EVENT_UPSTREAM, .add_probe(
glib::clone!( gst::PadProbeType::EVENT_UPSTREAM,
#[strong] glib::clone!(
consumers, #[strong]
move |_pad, info| { consumers,
let Some(event) = info.event() else { move |_pad, info| {
return gst::PadProbeReturn::Ok; let Some(event) = info.event() else {
}; return gst::PadProbeReturn::Ok;
};
let gst::EventView::Latency(event) = event.view() else { let gst::EventView::Latency(event) = event.view() else {
return gst::PadProbeReturn::Ok; return gst::PadProbeReturn::Ok;
}; };
let latency = event.latency(); let latency = event.latency();
let mut consumers = consumers.lock().unwrap(); let mut consumers = consumers.lock().unwrap();
consumers.current_latency = Some(latency); consumers.current_latency = Some(latency);
consumers.latency_updated = true; consumers.latency_updated = true;
gst::PadProbeReturn::Ok gst::PadProbeReturn::Ok
} }
), ),
); )
.unwrap();
StreamProducer { StreamProducer(Arc::new(StreamProducerInner {
appsink: appsink.clone(), appsink: appsink.clone(),
appsink_probe_id: Some(appsink_probe_id),
consumers, consumers,
} }))
} }
} }