From 85e427345e0261b917e6c55f085e5249e9836315 Mon Sep 17 00:00:00 2001 From: Jimmi Holst Christensen Date: Tue, 24 May 2022 12:12:50 +0200 Subject: [PATCH] Forward tag events to consumers in StreamProducer Fix #386 Part-of: --- gstreamer-utils/src/streamproducer.rs | 40 ++++++++++++++++++++++----- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/gstreamer-utils/src/streamproducer.rs b/gstreamer-utils/src/streamproducer.rs index 41128954f..23b917cef 100644 --- a/gstreamer-utils/src/streamproducer.rs +++ b/gstreamer-utils/src/streamproducer.rs @@ -235,9 +235,9 @@ impl StreamProducer { } } - /// configure if EOS from appsrc should be forwarded to all the consumers (default: `true`) - pub fn set_forward_eos(&self, forward_eos: bool) { - self.consumers.lock().unwrap().forward_eos = forward_eos; + /// configure event types the appsrc should forward to all consumers (default: `Eos`). + pub fn set_forward_events(&self, events_to_forward: impl IntoIterator) { + self.consumers.lock().unwrap().events_to_forward = events_to_forward.into_iter().collect(); } /// Get the GStreamer `appsink` wrapped by this producer @@ -272,7 +272,9 @@ impl<'a> From<&'a gst_app::AppSink> for StreamProducer { current_latency: None, latency_updated: false, consumers: HashMap::new(), - forward_eos: true, + // it would make sense to automatically forward more events such as Tag but that would break + // with older GStreamer, see https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4297 + events_to_forward: vec![gst::EventType::Eos], })); appsink.set_callbacks( @@ -375,12 +377,36 @@ impl<'a> From<&'a gst_app::AppSink> for StreamProducer { Ok(gst::FlowSuccess::Ok) })) + .new_event(glib::clone!(@strong consumers => move |appsink| { + match appsink.pull_object().map(|obj| obj.downcast::()) { + Ok(Ok(event)) => { + let (events_to_forward, appsrcs) = { + // clone so we don't keep the lock while pushing events + let consumers = consumers.lock().unwrap(); + let events = consumers.events_to_forward.clone(); + let appsrcs = consumers.consumers.keys().cloned().collect::>(); + + (events, appsrcs) + }; + + if events_to_forward.contains(&event.type_()){ + for appsrc in appsrcs { + appsrc.send_event(event.clone()); + } + } + } + Ok(Err(_)) => {}, // pulled another unsupported object type, ignore + Err(_err) => gst::warning!(CAT, obj: appsink, "Failed to pull event"), + } + + false + })) .eos(glib::clone!(@strong consumers => move |appsink| { let stream_consumers = consumers .lock() .unwrap(); - if stream_consumers.forward_eos { + if stream_consumers.events_to_forward.contains(&gst::EventType::Eos) { let current_consumers = stream_consumers .consumers .values() @@ -432,8 +458,8 @@ struct StreamConsumers { latency_updated: bool, /// The consumers, AppSrc pointer value -> consumer consumers: HashMap, - /// Whether appsrc EOS should be forwarded to consumers - forward_eos: bool, + /// What events should be forwarded to consumers + events_to_forward: Vec, } /// Wrapper around a consumer's `appsrc`