Merge branch 'streamproducer-drop-callbacks' into 'main'

streamproducer: Drop pad probe and callbacks from appsink when releasing the StreamProducer

See merge request gstreamer/gstreamer-rs!1704
This commit is contained in:
Sebastian Dröge 2025-04-07 08:55:19 +00:00
commit 571e6ac39a

View file

@ -73,21 +73,38 @@ static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
/// A producer is essentially a GStreamer `appsink` whose output
/// is sent to a set of consumers, who are essentially `appsrc` wrappers
#[derive(Debug, Clone)]
pub struct StreamProducer {
/// The appsink to dispatch data for
appsink: gst_app::AppSink,
/// The consumers to dispatch data to
consumers: Arc<Mutex<StreamConsumers>>,
}
pub struct StreamProducer(Arc<StreamProducerInner>);
impl PartialEq for StreamProducer {
fn eq(&self, other: &Self) -> bool {
self.appsink.eq(&other.appsink)
self.0.appsink.eq(&other.0.appsink)
}
}
impl Eq for StreamProducer {}
#[derive(Debug)]
struct StreamProducerInner {
/// The appsink to dispatch data for
appsink: gst_app::AppSink,
/// The pad probe on the appsink=
appsink_probe_id: Option<gst::PadProbeId>,
/// The consumers to dispatch data to
consumers: Arc<Mutex<StreamConsumers>>,
}
impl Drop for StreamProducerInner {
fn drop(&mut self) {
if let Some(probe_id) = self.appsink_probe_id.take() {
let pad = self.appsink.static_pad("sink").unwrap();
pad.remove_probe(probe_id);
}
self.appsink
.set_callbacks(gst_app::AppSinkCallbacks::builder().build());
}
}
/// Link between a `StreamProducer` and a consumer, disconnecting the link on `Drop`.
/// The producer and consumer will stay alive while the link is.
#[derive(Debug)]
@ -254,11 +271,11 @@ impl StreamProducer {
discard: Arc<atomic::AtomicBool>,
wait_for_keyframe: Arc<atomic::AtomicBool>,
) -> Result<(), AddConsumerError> {
let mut consumers = self.consumers.lock().unwrap();
let mut consumers = self.0.consumers.lock().unwrap();
if consumers.consumers.contains_key(consumer) {
gst::error!(
CAT,
obj = &self.appsink,
obj = &self.0.appsink,
"Consumer {} ({:?}) already added",
consumer.name(),
consumer
@ -268,7 +285,7 @@ impl StreamProducer {
gst::debug!(
CAT,
obj = &self.appsink,
obj = &self.0.appsink,
"Adding consumer {} ({:?})",
consumer.name(),
consumer
@ -278,13 +295,12 @@ impl StreamProducer {
// Forward force-keyunit events upstream to the appsink
let srcpad = consumer.static_pad("src").unwrap();
let appsink = &self.appsink;
let fku_probe_id = srcpad
.add_probe(
gst::PadProbeType::EVENT_UPSTREAM,
glib::clone!(
#[weak]
appsink,
#[weak(rename_to = appsink)]
self.0.appsink,
#[upgrade_or_panic]
move |_pad, info| {
let Some(event) = info.event() else {
@ -323,10 +339,15 @@ impl StreamProducer {
// drop the lock before sending events
drop(consumers);
let appsink_pad = self.appsink.static_pad("sink").unwrap();
let appsink_pad = self.0.appsink.static_pad("sink").unwrap();
appsink_pad.sticky_events_foreach(|event| {
if events_to_forward.contains(&event.type_()) {
gst::debug!(CAT, obj = &self.appsink, "forward sticky event {:?}", event);
gst::debug!(
CAT,
obj = &self.0.appsink,
"forward sticky event {:?}",
event
);
consumer.send_event(event.clone());
}
@ -453,6 +474,7 @@ impl StreamProducer {
pub fn remove_consumer(&self, consumer: &gst_app::AppSrc) {
let name = consumer.name();
if self
.0
.consumers
.lock()
.unwrap()
@ -462,7 +484,7 @@ impl StreamProducer {
{
gst::debug!(
CAT,
obj = &self.appsink,
obj = &self.0.appsink,
"Removed consumer {} ({:?})",
name,
consumer
@ -471,7 +493,7 @@ impl StreamProducer {
} else {
gst::debug!(
CAT,
obj = &self.appsink,
obj = &self.0.appsink,
"Consumer {} ({:?}) not found",
name,
consumer
@ -481,27 +503,28 @@ impl StreamProducer {
/// configure event types the appsink should forward to all its consumers (default: `Eos`).
pub fn set_forward_events(&self, events_to_forward: impl IntoIterator<Item = gst::EventType>) {
self.consumers.lock().unwrap().events_to_forward = events_to_forward.into_iter().collect();
self.0.consumers.lock().unwrap().events_to_forward =
events_to_forward.into_iter().collect();
}
/// get event types the appsink should forward to all its consumers
pub fn get_forwarded_events(&self) -> Vec<gst::EventType> {
self.consumers.lock().unwrap().events_to_forward.clone()
self.0.consumers.lock().unwrap().events_to_forward.clone()
}
/// configure whether the preroll sample should be forwarded (default: `true`)
pub fn set_forward_preroll(&self, forward_preroll: bool) {
self.consumers.lock().unwrap().forward_preroll = forward_preroll;
self.0.consumers.lock().unwrap().forward_preroll = forward_preroll;
}
/// Get the GStreamer `appsink` wrapped by this producer
pub fn appsink(&self) -> &gst_app::AppSink {
&self.appsink
&self.0.appsink
}
/// Signals an error on all consumers
pub fn error(&self, error: &gst::glib::Error, debug: Option<&str>) {
let consumers = self.consumers.lock().unwrap();
let consumers = self.0.consumers.lock().unwrap();
for consumer in consumers.consumers.keys() {
let mut msg_builder =
@ -516,7 +539,7 @@ impl StreamProducer {
/// The last sample produced by this producer.
pub fn last_sample(&self) -> Option<gst::Sample> {
self.appsink.property("last-sample")
self.0.appsink.property("last-sample")
}
}
@ -649,34 +672,37 @@ impl<'a> From<&'a gst_app::AppSink> for StreamProducer {
);
let sinkpad = appsink.static_pad("sink").unwrap();
sinkpad.add_probe(
gst::PadProbeType::EVENT_UPSTREAM,
glib::clone!(
#[strong]
consumers,
move |_pad, info| {
let Some(event) = info.event() else {
return gst::PadProbeReturn::Ok;
};
let appsink_probe_id = sinkpad
.add_probe(
gst::PadProbeType::EVENT_UPSTREAM,
glib::clone!(
#[strong]
consumers,
move |_pad, info| {
let Some(event) = info.event() else {
return gst::PadProbeReturn::Ok;
};
let gst::EventView::Latency(event) = event.view() else {
return gst::PadProbeReturn::Ok;
};
let gst::EventView::Latency(event) = event.view() else {
return gst::PadProbeReturn::Ok;
};
let latency = event.latency();
let mut consumers = consumers.lock().unwrap();
consumers.current_latency = Some(latency);
consumers.latency_updated = true;
let latency = event.latency();
let mut consumers = consumers.lock().unwrap();
consumers.current_latency = Some(latency);
consumers.latency_updated = true;
gst::PadProbeReturn::Ok
}
),
);
gst::PadProbeReturn::Ok
}
),
)
.unwrap();
StreamProducer {
StreamProducer(Arc::new(StreamProducerInner {
appsink: appsink.clone(),
appsink_probe_id: Some(appsink_probe_id),
consumers,
}
}))
}
}