From 93bc5c9324fce9da8bc55f0b486e334b9dbf77e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Wed, 13 May 2020 22:13:11 +0300 Subject: [PATCH] gstreamer/gstreamer-app: Don't store strong references in futures Stream/Sink adapters This applies to the ones of the appsink, appsrc and bus. If we would store a strong reference then they would keep alive the underlying object forever even if their pipeline disappeared in the meantime. Like this e.g. the bus stream would start returning None once the bus was destroyed, similar to how other channels are working in Rust. --- gstreamer-app/src/app_sink.rs | 19 +++++++++++++------ gstreamer-app/src/app_src.rs | 33 +++++++++++++++++++++++++-------- gstreamer/src/bus.rs | 13 +++++++++---- 3 files changed, 47 insertions(+), 18 deletions(-) diff --git a/gstreamer-app/src/app_sink.rs b/gstreamer-app/src/app_sink.rs index ee7f363bb..715684ce2 100644 --- a/gstreamer-app/src/app_sink.rs +++ b/gstreamer-app/src/app_sink.rs @@ -26,6 +26,7 @@ use AppSink; #[cfg(any(feature = "v1_10"))] use { futures_core::Stream, + glib::prelude::*, std::{ pin::Pin, sync::{Arc, Mutex}, @@ -353,7 +354,7 @@ unsafe extern "C" fn new_preroll_trampoline< #[cfg(any(feature = "v1_10"))] #[derive(Debug)] pub struct AppSinkStream { - app_sink: AppSink, + app_sink: glib::WeakRef, waker_reference: Arc>>, } @@ -362,7 +363,6 @@ impl AppSinkStream { fn new(app_sink: &AppSink) -> Self { skip_assert_initialized!(); - let app_sink = app_sink.clone(); let waker_reference = Arc::new(Mutex::new(None as Option)); app_sink.set_callbacks( @@ -391,7 +391,7 @@ impl AppSinkStream { ); Self { - app_sink, + app_sink: app_sink.downgrade(), waker_reference, } } @@ -403,7 +403,9 @@ impl Drop for AppSinkStream { // This is not thread-safe before 1.16.3, see // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570 if gst::version() >= (1, 16, 3, 0) { - self.app_sink.set_callbacks(AppSinkCallbacks::new().build()); + if let Some(app_sink) = self.app_sink.upgrade() { + app_sink.set_callbacks(AppSinkCallbacks::new().build()); + } } } } @@ -415,11 +417,16 @@ impl Stream for AppSinkStream { fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll> { let mut waker = self.waker_reference.lock().unwrap(); - self.app_sink + let app_sink = match self.app_sink.upgrade() { + Some(app_sink) => app_sink, + None => return Poll::Ready(None), + }; + + app_sink .try_pull_sample(gst::ClockTime::from_mseconds(0)) .map(|sample| Poll::Ready(Some(sample))) .unwrap_or_else(|| { - if self.app_sink.is_eos() { + if app_sink.is_eos() { return Poll::Ready(None); } diff --git a/gstreamer-app/src/app_src.rs b/gstreamer-app/src/app_src.rs index b7819c3c0..c3836016e 100644 --- a/gstreamer-app/src/app_src.rs +++ b/gstreamer-app/src/app_src.rs @@ -7,6 +7,7 @@ // except according to those terms. use futures_sink::Sink; +use glib::prelude::*; use glib::translate::*; use glib_sys::{gboolean, gpointer}; use gst; @@ -324,7 +325,7 @@ impl AppSrc { #[derive(Debug)] pub struct AppSrcSink { - app_src: AppSrc, + app_src: glib::WeakRef, waker_reference: Arc>>, } @@ -332,7 +333,6 @@ impl AppSrcSink { fn new(app_src: &AppSrc) -> Self { skip_assert_initialized!(); - let app_src = app_src.clone(); let waker_reference = Arc::new(Mutex::new(None as Option)); app_src.set_callbacks( @@ -350,7 +350,7 @@ impl AppSrcSink { ); Self { - app_src, + app_src: app_src.downgrade(), waker_reference, } } @@ -361,7 +361,9 @@ impl Drop for AppSrcSink { // This is not thread-safe before 1.16.3, see // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570 if gst::version() >= (1, 16, 3, 0) { - self.app_src.set_callbacks(AppSrcCallbacks::new().build()); + if let Some(app_src) = self.app_src.upgrade() { + app_src.set_callbacks(AppSrcCallbacks::new().build()); + } } } } @@ -372,8 +374,13 @@ impl Sink for AppSrcSink { fn poll_ready(self: Pin<&mut Self>, context: &mut Context) -> Poll> { let mut waker = self.waker_reference.lock().unwrap(); - let current_level_bytes = self.app_src.get_current_level_bytes(); - let max_bytes = self.app_src.get_max_bytes(); + let app_src = match self.app_src.upgrade() { + Some(app_src) => app_src, + None => return Poll::Ready(Err(gst::FlowError::Eos)), + }; + + let current_level_bytes = app_src.get_current_level_bytes(); + let max_bytes = app_src.get_max_bytes(); if current_level_bytes >= max_bytes && max_bytes != 0 { waker.replace(context.waker().to_owned()); @@ -385,7 +392,12 @@ impl Sink for AppSrcSink { } fn start_send(self: Pin<&mut Self>, sample: gst::Sample) -> Result<(), Self::Error> { - self.app_src.push_sample(&sample)?; + let app_src = match self.app_src.upgrade() { + Some(app_src) => app_src, + None => return Err(gst::FlowError::Eos), + }; + + app_src.push_sample(&sample)?; Ok(()) } @@ -395,7 +407,12 @@ impl Sink for AppSrcSink { } fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll> { - self.app_src.end_of_stream()?; + let app_src = match self.app_src.upgrade() { + Some(app_src) => app_src, + None => return Poll::Ready(Ok(())), + }; + + app_src.end_of_stream()?; Poll::Ready(Ok(())) } diff --git a/gstreamer/src/bus.rs b/gstreamer/src/bus.rs index 9fdbb80ea..062e9d145 100644 --- a/gstreamer/src/bus.rs +++ b/gstreamer/src/bus.rs @@ -10,6 +10,7 @@ use futures_channel::mpsc::{self, UnboundedReceiver}; use futures_core::Stream; use futures_util::{future, StreamExt}; use glib; +use glib::prelude::*; use glib::source::{Continue, Priority, SourceId}; use glib::translate::*; use glib_sys; @@ -283,7 +284,7 @@ impl<'a> Iterator for Iter<'a> { #[derive(Debug)] pub struct BusStream { - bus: Bus, + bus: glib::WeakRef, receiver: UnboundedReceiver, } @@ -291,7 +292,6 @@ impl BusStream { fn new(bus: &Bus) -> Self { skip_assert_initialized!(); - let bus = bus.clone(); let (sender, receiver) = mpsc::unbounded(); bus.set_sync_handler(move |_, message| { @@ -300,13 +300,18 @@ impl BusStream { BusSyncReply::Drop }); - Self { bus, receiver } + Self { + bus: bus.downgrade(), + receiver, + } } } impl Drop for BusStream { fn drop(&mut self) { - self.bus.unset_sync_handler(); + if let Some(bus) = self.bus.upgrade() { + bus.unset_sync_handler(); + } } }