gstreamer/gstreamer-app: Don't store strong references in futures Stream/Sink adapters

This applies to the ones of the appsink, appsrc and bus. If we would
store a strong reference then they would keep alive the underlying
object forever even if their pipeline disappeared in the meantime.

Like this e.g. the bus stream would start returning None once the bus
was destroyed, similar to how other channels are working in Rust.
This commit is contained in:
Sebastian Dröge 2020-05-13 22:13:11 +03:00
parent 3f495ce1da
commit 93bc5c9324
3 changed files with 47 additions and 18 deletions

View file

@ -26,6 +26,7 @@ use AppSink;
#[cfg(any(feature = "v1_10"))]
use {
futures_core::Stream,
glib::prelude::*,
std::{
pin::Pin,
sync::{Arc, Mutex},
@ -353,7 +354,7 @@ unsafe extern "C" fn new_preroll_trampoline<
#[cfg(any(feature = "v1_10"))]
#[derive(Debug)]
pub struct AppSinkStream {
app_sink: AppSink,
app_sink: glib::WeakRef<AppSink>,
waker_reference: Arc<Mutex<Option<Waker>>>,
}
@ -362,7 +363,6 @@ impl AppSinkStream {
fn new(app_sink: &AppSink) -> Self {
skip_assert_initialized!();
let app_sink = app_sink.clone();
let waker_reference = Arc::new(Mutex::new(None as Option<Waker>));
app_sink.set_callbacks(
@ -391,7 +391,7 @@ impl AppSinkStream {
);
Self {
app_sink,
app_sink: app_sink.downgrade(),
waker_reference,
}
}
@ -403,7 +403,9 @@ impl Drop for AppSinkStream {
// This is not thread-safe before 1.16.3, see
// https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570
if gst::version() >= (1, 16, 3, 0) {
self.app_sink.set_callbacks(AppSinkCallbacks::new().build());
if let Some(app_sink) = self.app_sink.upgrade() {
app_sink.set_callbacks(AppSinkCallbacks::new().build());
}
}
}
}
@ -415,11 +417,16 @@ impl Stream for AppSinkStream {
fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
let mut waker = self.waker_reference.lock().unwrap();
self.app_sink
let app_sink = match self.app_sink.upgrade() {
Some(app_sink) => app_sink,
None => return Poll::Ready(None),
};
app_sink
.try_pull_sample(gst::ClockTime::from_mseconds(0))
.map(|sample| Poll::Ready(Some(sample)))
.unwrap_or_else(|| {
if self.app_sink.is_eos() {
if app_sink.is_eos() {
return Poll::Ready(None);
}

View file

@ -7,6 +7,7 @@
// except according to those terms.
use futures_sink::Sink;
use glib::prelude::*;
use glib::translate::*;
use glib_sys::{gboolean, gpointer};
use gst;
@ -324,7 +325,7 @@ impl AppSrc {
#[derive(Debug)]
pub struct AppSrcSink {
app_src: AppSrc,
app_src: glib::WeakRef<AppSrc>,
waker_reference: Arc<Mutex<Option<Waker>>>,
}
@ -332,7 +333,6 @@ impl AppSrcSink {
fn new(app_src: &AppSrc) -> Self {
skip_assert_initialized!();
let app_src = app_src.clone();
let waker_reference = Arc::new(Mutex::new(None as Option<Waker>));
app_src.set_callbacks(
@ -350,7 +350,7 @@ impl AppSrcSink {
);
Self {
app_src,
app_src: app_src.downgrade(),
waker_reference,
}
}
@ -361,7 +361,9 @@ impl Drop for AppSrcSink {
// This is not thread-safe before 1.16.3, see
// https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570
if gst::version() >= (1, 16, 3, 0) {
self.app_src.set_callbacks(AppSrcCallbacks::new().build());
if let Some(app_src) = self.app_src.upgrade() {
app_src.set_callbacks(AppSrcCallbacks::new().build());
}
}
}
}
@ -372,8 +374,13 @@ impl Sink<gst::Sample> for AppSrcSink {
fn poll_ready(self: Pin<&mut Self>, context: &mut Context) -> Poll<Result<(), Self::Error>> {
let mut waker = self.waker_reference.lock().unwrap();
let current_level_bytes = self.app_src.get_current_level_bytes();
let max_bytes = self.app_src.get_max_bytes();
let app_src = match self.app_src.upgrade() {
Some(app_src) => app_src,
None => return Poll::Ready(Err(gst::FlowError::Eos)),
};
let current_level_bytes = app_src.get_current_level_bytes();
let max_bytes = app_src.get_max_bytes();
if current_level_bytes >= max_bytes && max_bytes != 0 {
waker.replace(context.waker().to_owned());
@ -385,7 +392,12 @@ impl Sink<gst::Sample> for AppSrcSink {
}
fn start_send(self: Pin<&mut Self>, sample: gst::Sample) -> Result<(), Self::Error> {
self.app_src.push_sample(&sample)?;
let app_src = match self.app_src.upgrade() {
Some(app_src) => app_src,
None => return Err(gst::FlowError::Eos),
};
app_src.push_sample(&sample)?;
Ok(())
}
@ -395,7 +407,12 @@ impl Sink<gst::Sample> for AppSrcSink {
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
self.app_src.end_of_stream()?;
let app_src = match self.app_src.upgrade() {
Some(app_src) => app_src,
None => return Poll::Ready(Ok(())),
};
app_src.end_of_stream()?;
Poll::Ready(Ok(()))
}

View file

@ -10,6 +10,7 @@ use futures_channel::mpsc::{self, UnboundedReceiver};
use futures_core::Stream;
use futures_util::{future, StreamExt};
use glib;
use glib::prelude::*;
use glib::source::{Continue, Priority, SourceId};
use glib::translate::*;
use glib_sys;
@ -283,7 +284,7 @@ impl<'a> Iterator for Iter<'a> {
#[derive(Debug)]
pub struct BusStream {
bus: Bus,
bus: glib::WeakRef<Bus>,
receiver: UnboundedReceiver<Message>,
}
@ -291,7 +292,6 @@ impl BusStream {
fn new(bus: &Bus) -> Self {
skip_assert_initialized!();
let bus = bus.clone();
let (sender, receiver) = mpsc::unbounded();
bus.set_sync_handler(move |_, message| {
@ -300,13 +300,18 @@ impl BusStream {
BusSyncReply::Drop
});
Self { bus, receiver }
Self {
bus: bus.downgrade(),
receiver,
}
}
}
impl Drop for BusStream {
fn drop(&mut self) {
self.bus.unset_sync_handler();
if let Some(bus) = self.bus.upgrade() {
bus.unset_sync_handler();
}
}
}