diff --git a/gstreamer-app/src/app_sink.rs b/gstreamer-app/src/app_sink.rs index e57e330f2..3549a5b88 100644 --- a/gstreamer-app/src/app_sink.rs +++ b/gstreamer-app/src/app_sink.rs @@ -26,6 +26,7 @@ use AppSink; #[cfg(any(feature = "v1_10"))] use { futures_core::Stream, + glib::prelude::*, std::{ pin::Pin, sync::{Arc, Mutex}, @@ -349,7 +350,7 @@ unsafe extern "C" fn new_preroll_trampoline< #[cfg(any(feature = "v1_10"))] #[derive(Debug)] pub struct AppSinkStream { - app_sink: AppSink, + app_sink: glib::WeakRef, waker_reference: Arc>>, } @@ -358,7 +359,6 @@ impl AppSinkStream { fn new(app_sink: &AppSink) -> Self { skip_assert_initialized!(); - let app_sink = app_sink.clone(); let waker_reference = Arc::new(Mutex::new(None as Option)); app_sink.set_callbacks( @@ -387,7 +387,7 @@ impl AppSinkStream { ); Self { - app_sink, + app_sink: app_sink.downgrade(), waker_reference, } } @@ -399,7 +399,9 @@ impl Drop for AppSinkStream { // This is not thread-safe before 1.16.3, see // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570 if gst::version() >= (1, 16, 3, 0) { - self.app_sink.set_callbacks(AppSinkCallbacks::new().build()); + if let Some(app_sink) = self.app_sink.upgrade() { + app_sink.set_callbacks(AppSinkCallbacks::new().build()); + } } } } @@ -411,11 +413,16 @@ impl Stream for AppSinkStream { fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll> { let mut waker = self.waker_reference.lock().unwrap(); - self.app_sink + let app_sink = match self.app_sink.upgrade() { + Some(app_sink) => app_sink, + None => return Poll::Ready(None), + }; + + app_sink .try_pull_sample(gst::ClockTime::from_mseconds(0)) .map(|sample| Poll::Ready(Some(sample))) .unwrap_or_else(|| { - if self.app_sink.is_eos() { + if app_sink.is_eos() { return Poll::Ready(None); } diff --git a/gstreamer-app/src/app_src.rs b/gstreamer-app/src/app_src.rs index e4f3d07a4..fafc5c585 100644 --- a/gstreamer-app/src/app_src.rs +++ b/gstreamer-app/src/app_src.rs @@ -7,6 +7,7 @@ // except according to those terms. use futures_sink::Sink; +use glib::prelude::*; use glib::translate::*; use glib_sys::{gboolean, gpointer}; use gst; @@ -324,7 +325,7 @@ impl AppSrc { #[derive(Debug)] pub struct AppSrcSink { - app_src: AppSrc, + app_src: glib::WeakRef, waker_reference: Arc>>, } @@ -332,7 +333,6 @@ impl AppSrcSink { fn new(app_src: &AppSrc) -> Self { skip_assert_initialized!(); - let app_src = app_src.clone(); let waker_reference = Arc::new(Mutex::new(None as Option)); app_src.set_callbacks( @@ -350,7 +350,7 @@ impl AppSrcSink { ); Self { - app_src, + app_src: app_src.downgrade(), waker_reference, } } @@ -361,7 +361,9 @@ impl Drop for AppSrcSink { // This is not thread-safe before 1.16.3, see // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570 if gst::version() >= (1, 16, 3, 0) { - self.app_src.set_callbacks(AppSrcCallbacks::new().build()); + if let Some(app_src) = self.app_src.upgrade() { + app_src.set_callbacks(AppSrcCallbacks::new().build()); + } } } } @@ -372,8 +374,13 @@ impl Sink for AppSrcSink { fn poll_ready(self: Pin<&mut Self>, context: &mut Context) -> Poll> { let mut waker = self.waker_reference.lock().unwrap(); - let current_level_bytes = self.app_src.get_current_level_bytes(); - let max_bytes = self.app_src.get_max_bytes(); + let app_src = match self.app_src.upgrade() { + Some(app_src) => app_src, + None => return Poll::Ready(Err(gst::FlowError::Eos)), + }; + + let current_level_bytes = app_src.get_current_level_bytes(); + let max_bytes = app_src.get_max_bytes(); if current_level_bytes >= max_bytes && max_bytes != 0 { waker.replace(context.waker().to_owned()); @@ -385,7 +392,12 @@ impl Sink for AppSrcSink { } fn start_send(self: Pin<&mut Self>, sample: gst::Sample) -> Result<(), Self::Error> { - self.app_src.push_sample(&sample)?; + let app_src = match self.app_src.upgrade() { + Some(app_src) => app_src, + None => return Err(gst::FlowError::Eos), + }; + + app_src.push_sample(&sample)?; Ok(()) } @@ -395,7 +407,12 @@ impl Sink for AppSrcSink { } fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll> { - self.app_src.end_of_stream()?; + let app_src = match self.app_src.upgrade() { + Some(app_src) => app_src, + None => return Poll::Ready(Ok(())), + }; + + app_src.end_of_stream()?; Poll::Ready(Ok(())) } diff --git a/gstreamer/src/bus.rs b/gstreamer/src/bus.rs index 8a774f84b..7d91cc6db 100644 --- a/gstreamer/src/bus.rs +++ b/gstreamer/src/bus.rs @@ -10,6 +10,7 @@ use futures_channel::mpsc::{self, UnboundedReceiver}; use futures_core::Stream; use futures_util::{future, StreamExt}; use glib; +use glib::prelude::*; use glib::source::{Continue, Priority, SourceId}; use glib::translate::*; use glib_sys; @@ -280,7 +281,7 @@ impl<'a> Iterator for Iter<'a> { #[derive(Debug)] pub struct BusStream { - bus: Bus, + bus: glib::WeakRef, receiver: UnboundedReceiver, } @@ -288,7 +289,6 @@ impl BusStream { pub fn new(bus: &Bus) -> Self { skip_assert_initialized!(); - let bus = bus.clone(); let (sender, receiver) = mpsc::unbounded(); bus.set_sync_handler(move |_, message| { @@ -297,13 +297,18 @@ impl BusStream { BusSyncReply::Drop }); - Self { bus, receiver } + Self { + bus: bus.downgrade(), + receiver, + } } } impl Drop for BusStream { fn drop(&mut self) { - self.bus.unset_sync_handler(); + if let Some(bus) = self.bus.upgrade() { + bus.unset_sync_handler(); + } } }