From 122ccafd1ac3c5c700265f6e42c842177eb20174 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Wed, 13 May 2020 22:13:11 +0300 Subject: [PATCH] gstreamer/gstreamer-app: Don't store strong references in futures Stream/Sink adapters This applies to the ones of the appsink, appsrc and bus. If we would store a strong reference then they would keep alive the underlying object forever even if their pipeline disappeared in the meantime. Like this e.g. the bus stream would start returning None once the bus was destroyed, similar to how other channels are working in Rust. --- gstreamer-app/src/app_sink.rs | 19 +++++++++++++------ gstreamer-app/src/app_src.rs | 33 +++++++++++++++++++++++++-------- gstreamer/src/bus.rs | 13 +++++++++---- 3 files changed, 47 insertions(+), 18 deletions(-) diff --git a/gstreamer-app/src/app_sink.rs b/gstreamer-app/src/app_sink.rs index e57e330f2..3549a5b88 100644 --- a/gstreamer-app/src/app_sink.rs +++ b/gstreamer-app/src/app_sink.rs @@ -26,6 +26,7 @@ use AppSink; #[cfg(any(feature = "v1_10"))] use { futures_core::Stream, + glib::prelude::*, std::{ pin::Pin, sync::{Arc, Mutex}, @@ -349,7 +350,7 @@ unsafe extern "C" fn new_preroll_trampoline< #[cfg(any(feature = "v1_10"))] #[derive(Debug)] pub struct AppSinkStream { - app_sink: AppSink, + app_sink: glib::WeakRef, waker_reference: Arc>>, } @@ -358,7 +359,6 @@ impl AppSinkStream { fn new(app_sink: &AppSink) -> Self { skip_assert_initialized!(); - let app_sink = app_sink.clone(); let waker_reference = Arc::new(Mutex::new(None as Option)); app_sink.set_callbacks( @@ -387,7 +387,7 @@ impl AppSinkStream { ); Self { - app_sink, + app_sink: app_sink.downgrade(), waker_reference, } } @@ -399,7 +399,9 @@ impl Drop for AppSinkStream { // This is not thread-safe before 1.16.3, see // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570 if gst::version() >= (1, 16, 3, 0) { - self.app_sink.set_callbacks(AppSinkCallbacks::new().build()); + if let Some(app_sink) = self.app_sink.upgrade() { + app_sink.set_callbacks(AppSinkCallbacks::new().build()); + } } } } @@ -411,11 +413,16 @@ impl Stream for AppSinkStream { fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll> { let mut waker = self.waker_reference.lock().unwrap(); - self.app_sink + let app_sink = match self.app_sink.upgrade() { + Some(app_sink) => app_sink, + None => return Poll::Ready(None), + }; + + app_sink .try_pull_sample(gst::ClockTime::from_mseconds(0)) .map(|sample| Poll::Ready(Some(sample))) .unwrap_or_else(|| { - if self.app_sink.is_eos() { + if app_sink.is_eos() { return Poll::Ready(None); } diff --git a/gstreamer-app/src/app_src.rs b/gstreamer-app/src/app_src.rs index e4f3d07a4..fafc5c585 100644 --- a/gstreamer-app/src/app_src.rs +++ b/gstreamer-app/src/app_src.rs @@ -7,6 +7,7 @@ // except according to those terms. use futures_sink::Sink; +use glib::prelude::*; use glib::translate::*; use glib_sys::{gboolean, gpointer}; use gst; @@ -324,7 +325,7 @@ impl AppSrc { #[derive(Debug)] pub struct AppSrcSink { - app_src: AppSrc, + app_src: glib::WeakRef, waker_reference: Arc>>, } @@ -332,7 +333,6 @@ impl AppSrcSink { fn new(app_src: &AppSrc) -> Self { skip_assert_initialized!(); - let app_src = app_src.clone(); let waker_reference = Arc::new(Mutex::new(None as Option)); app_src.set_callbacks( @@ -350,7 +350,7 @@ impl AppSrcSink { ); Self { - app_src, + app_src: app_src.downgrade(), waker_reference, } } @@ -361,7 +361,9 @@ impl Drop for AppSrcSink { // This is not thread-safe before 1.16.3, see // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570 if gst::version() >= (1, 16, 3, 0) { - self.app_src.set_callbacks(AppSrcCallbacks::new().build()); + if let Some(app_src) = self.app_src.upgrade() { + app_src.set_callbacks(AppSrcCallbacks::new().build()); + } } } } @@ -372,8 +374,13 @@ impl Sink for AppSrcSink { fn poll_ready(self: Pin<&mut Self>, context: &mut Context) -> Poll> { let mut waker = self.waker_reference.lock().unwrap(); - let current_level_bytes = self.app_src.get_current_level_bytes(); - let max_bytes = self.app_src.get_max_bytes(); + let app_src = match self.app_src.upgrade() { + Some(app_src) => app_src, + None => return Poll::Ready(Err(gst::FlowError::Eos)), + }; + + let current_level_bytes = app_src.get_current_level_bytes(); + let max_bytes = app_src.get_max_bytes(); if current_level_bytes >= max_bytes && max_bytes != 0 { waker.replace(context.waker().to_owned()); @@ -385,7 +392,12 @@ impl Sink for AppSrcSink { } fn start_send(self: Pin<&mut Self>, sample: gst::Sample) -> Result<(), Self::Error> { - self.app_src.push_sample(&sample)?; + let app_src = match self.app_src.upgrade() { + Some(app_src) => app_src, + None => return Err(gst::FlowError::Eos), + }; + + app_src.push_sample(&sample)?; Ok(()) } @@ -395,7 +407,12 @@ impl Sink for AppSrcSink { } fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll> { - self.app_src.end_of_stream()?; + let app_src = match self.app_src.upgrade() { + Some(app_src) => app_src, + None => return Poll::Ready(Ok(())), + }; + + app_src.end_of_stream()?; Poll::Ready(Ok(())) } diff --git a/gstreamer/src/bus.rs b/gstreamer/src/bus.rs index 8a774f84b..7d91cc6db 100644 --- a/gstreamer/src/bus.rs +++ b/gstreamer/src/bus.rs @@ -10,6 +10,7 @@ use futures_channel::mpsc::{self, UnboundedReceiver}; use futures_core::Stream; use futures_util::{future, StreamExt}; use glib; +use glib::prelude::*; use glib::source::{Continue, Priority, SourceId}; use glib::translate::*; use glib_sys; @@ -280,7 +281,7 @@ impl<'a> Iterator for Iter<'a> { #[derive(Debug)] pub struct BusStream { - bus: Bus, + bus: glib::WeakRef, receiver: UnboundedReceiver, } @@ -288,7 +289,6 @@ impl BusStream { pub fn new(bus: &Bus) -> Self { skip_assert_initialized!(); - let bus = bus.clone(); let (sender, receiver) = mpsc::unbounded(); bus.set_sync_handler(move |_, message| { @@ -297,13 +297,18 @@ impl BusStream { BusSyncReply::Drop }); - Self { bus, receiver } + Self { + bus: bus.downgrade(), + receiver, + } } } impl Drop for BusStream { fn drop(&mut self) { - self.bus.unset_sync_handler(); + if let Some(bus) = self.bus.upgrade() { + bus.unset_sync_handler(); + } } }