Forward tag events to consumers in StreamProducer

Fix #386

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/merge_requests/1045>
This commit is contained in:
Jimmi Holst Christensen 2022-05-24 12:12:50 +02:00 committed by GStreamer Marge Bot
parent 0ab72911ee
commit 85e427345e

View file

@ -235,9 +235,9 @@ impl StreamProducer {
} }
} }
/// configure if EOS from appsrc should be forwarded to all the consumers (default: `true`) /// configure event types the appsrc should forward to all consumers (default: `Eos`).
pub fn set_forward_eos(&self, forward_eos: bool) { pub fn set_forward_events(&self, events_to_forward: impl IntoIterator<Item = gst::EventType>) {
self.consumers.lock().unwrap().forward_eos = forward_eos; self.consumers.lock().unwrap().events_to_forward = events_to_forward.into_iter().collect();
} }
/// Get the GStreamer `appsink` wrapped by this producer /// Get the GStreamer `appsink` wrapped by this producer
@ -272,7 +272,9 @@ impl<'a> From<&'a gst_app::AppSink> for StreamProducer {
current_latency: None, current_latency: None,
latency_updated: false, latency_updated: false,
consumers: HashMap::new(), consumers: HashMap::new(),
forward_eos: true, // it would make sense to automatically forward more events such as Tag but that would break
// with older GStreamer, see https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4297
events_to_forward: vec![gst::EventType::Eos],
})); }));
appsink.set_callbacks( appsink.set_callbacks(
@ -375,12 +377,36 @@ impl<'a> From<&'a gst_app::AppSink> for StreamProducer {
Ok(gst::FlowSuccess::Ok) Ok(gst::FlowSuccess::Ok)
})) }))
.new_event(glib::clone!(@strong consumers => move |appsink| {
match appsink.pull_object().map(|obj| obj.downcast::<gst::Event>()) {
Ok(Ok(event)) => {
let (events_to_forward, appsrcs) = {
// clone so we don't keep the lock while pushing events
let consumers = consumers.lock().unwrap();
let events = consumers.events_to_forward.clone();
let appsrcs = consumers.consumers.keys().cloned().collect::<Vec<_>>();
(events, appsrcs)
};
if events_to_forward.contains(&event.type_()){
for appsrc in appsrcs {
appsrc.send_event(event.clone());
}
}
}
Ok(Err(_)) => {}, // pulled another unsupported object type, ignore
Err(_err) => gst::warning!(CAT, obj: appsink, "Failed to pull event"),
}
false
}))
.eos(glib::clone!(@strong consumers => move |appsink| { .eos(glib::clone!(@strong consumers => move |appsink| {
let stream_consumers = consumers let stream_consumers = consumers
.lock() .lock()
.unwrap(); .unwrap();
if stream_consumers.forward_eos { if stream_consumers.events_to_forward.contains(&gst::EventType::Eos) {
let current_consumers = stream_consumers let current_consumers = stream_consumers
.consumers .consumers
.values() .values()
@ -432,8 +458,8 @@ struct StreamConsumers {
latency_updated: bool, latency_updated: bool,
/// The consumers, AppSrc pointer value -> consumer /// The consumers, AppSrc pointer value -> consumer
consumers: HashMap<gst_app::AppSrc, StreamConsumer>, consumers: HashMap<gst_app::AppSrc, StreamConsumer>,
/// Whether appsrc EOS should be forwarded to consumers /// What events should be forwarded to consumers
forward_eos: bool, events_to_forward: Vec<gst::EventType>,
} }
/// Wrapper around a consumer's `appsrc` /// Wrapper around a consumer's `appsrc`