diff --git a/gstreamer-utils/src/streamproducer.rs b/gstreamer-utils/src/streamproducer.rs index d00a984d7..05a17471d 100644 --- a/gstreamer-utils/src/streamproducer.rs +++ b/gstreamer-utils/src/streamproducer.rs @@ -115,6 +115,11 @@ impl StreamProducer { self.consumers.lock().unwrap().discard = false; } + /// configure if EOS from appsrc should be forwarded to all the consumers + pub fn set_forward_eos(&self, forward_eos: bool) { + self.consumers.lock().unwrap().forward_eos = forward_eos; + } + /// Get the GStreamer `appsink` wrapped by this producer pub fn appsink(&self) -> &gst_app::AppSink { &self.appsink @@ -143,6 +148,7 @@ impl<'a> From<&'a gst_app::AppSink> for StreamProducer { latency_updated: false, consumers: HashMap::new(), discard: true, + forward_eos: true, })); appsink.set_callbacks( @@ -241,17 +247,25 @@ impl<'a> From<&'a gst_app::AppSink> for StreamProducer { Ok(gst::FlowSuccess::Ok) })) - .eos(glib::clone!(@strong consumers => move |_| { - let current_consumers = consumers + .eos(glib::clone!(@strong consumers => move |appsink| { + let stream_consumers = consumers .lock() - .unwrap() - .consumers - .values() - .map(|c| c.appsrc.clone()) - .collect::>(); + .unwrap(); - for consumer in current_consumers { - let _ = consumer.end_of_stream(); + if stream_consumers.forward_eos { + let current_consumers = stream_consumers + .consumers + .values() + .map(|c| c.appsrc.clone()) + .collect::>(); + drop(stream_consumers); + + for consumer in current_consumers { + gst::debug!(CAT, obj: appsink, "set EOS on consumer {}", consumer.name()); + let _ = consumer.end_of_stream(); + } + } else { + gst::debug!(CAT, obj: appsink, "don't forward EOS to consumers"); } })) .build(), @@ -292,6 +306,8 @@ struct StreamConsumers { consumers: HashMap, /// Whether appsrc samples should be forwarded to consumers yet discard: bool, + /// Whether appsrc EOS should be forwarded to consumers + forward_eos: bool, } /// Wrapper around a consumer's `appsrc`