forked from mirrors/gstreamer-rs
gstreamer/gstreamer-app: Don't store strong references in futures Stream/Sink adapters
This applies to the ones of the appsink, appsrc and bus. If we would store a strong reference then they would keep alive the underlying object forever even if their pipeline disappeared in the meantime. Like this e.g. the bus stream would start returning None once the bus was destroyed, similar to how other channels are working in Rust.
This commit is contained in:
parent
215dbe53c6
commit
122ccafd1a
3 changed files with 47 additions and 18 deletions
|
@ -26,6 +26,7 @@ use AppSink;
|
|||
#[cfg(any(feature = "v1_10"))]
|
||||
use {
|
||||
futures_core::Stream,
|
||||
glib::prelude::*,
|
||||
std::{
|
||||
pin::Pin,
|
||||
sync::{Arc, Mutex},
|
||||
|
@ -349,7 +350,7 @@ unsafe extern "C" fn new_preroll_trampoline<
|
|||
#[cfg(any(feature = "v1_10"))]
|
||||
#[derive(Debug)]
|
||||
pub struct AppSinkStream {
|
||||
app_sink: AppSink,
|
||||
app_sink: glib::WeakRef<AppSink>,
|
||||
waker_reference: Arc<Mutex<Option<Waker>>>,
|
||||
}
|
||||
|
||||
|
@ -358,7 +359,6 @@ impl AppSinkStream {
|
|||
fn new(app_sink: &AppSink) -> Self {
|
||||
skip_assert_initialized!();
|
||||
|
||||
let app_sink = app_sink.clone();
|
||||
let waker_reference = Arc::new(Mutex::new(None as Option<Waker>));
|
||||
|
||||
app_sink.set_callbacks(
|
||||
|
@ -387,7 +387,7 @@ impl AppSinkStream {
|
|||
);
|
||||
|
||||
Self {
|
||||
app_sink,
|
||||
app_sink: app_sink.downgrade(),
|
||||
waker_reference,
|
||||
}
|
||||
}
|
||||
|
@ -399,7 +399,9 @@ impl Drop for AppSinkStream {
|
|||
// This is not thread-safe before 1.16.3, see
|
||||
// https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570
|
||||
if gst::version() >= (1, 16, 3, 0) {
|
||||
self.app_sink.set_callbacks(AppSinkCallbacks::new().build());
|
||||
if let Some(app_sink) = self.app_sink.upgrade() {
|
||||
app_sink.set_callbacks(AppSinkCallbacks::new().build());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -411,11 +413,16 @@ impl Stream for AppSinkStream {
|
|||
fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
let mut waker = self.waker_reference.lock().unwrap();
|
||||
|
||||
self.app_sink
|
||||
let app_sink = match self.app_sink.upgrade() {
|
||||
Some(app_sink) => app_sink,
|
||||
None => return Poll::Ready(None),
|
||||
};
|
||||
|
||||
app_sink
|
||||
.try_pull_sample(gst::ClockTime::from_mseconds(0))
|
||||
.map(|sample| Poll::Ready(Some(sample)))
|
||||
.unwrap_or_else(|| {
|
||||
if self.app_sink.is_eos() {
|
||||
if app_sink.is_eos() {
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
// except according to those terms.
|
||||
|
||||
use futures_sink::Sink;
|
||||
use glib::prelude::*;
|
||||
use glib::translate::*;
|
||||
use glib_sys::{gboolean, gpointer};
|
||||
use gst;
|
||||
|
@ -324,7 +325,7 @@ impl AppSrc {
|
|||
|
||||
#[derive(Debug)]
|
||||
pub struct AppSrcSink {
|
||||
app_src: AppSrc,
|
||||
app_src: glib::WeakRef<AppSrc>,
|
||||
waker_reference: Arc<Mutex<Option<Waker>>>,
|
||||
}
|
||||
|
||||
|
@ -332,7 +333,6 @@ impl AppSrcSink {
|
|||
fn new(app_src: &AppSrc) -> Self {
|
||||
skip_assert_initialized!();
|
||||
|
||||
let app_src = app_src.clone();
|
||||
let waker_reference = Arc::new(Mutex::new(None as Option<Waker>));
|
||||
|
||||
app_src.set_callbacks(
|
||||
|
@ -350,7 +350,7 @@ impl AppSrcSink {
|
|||
);
|
||||
|
||||
Self {
|
||||
app_src,
|
||||
app_src: app_src.downgrade(),
|
||||
waker_reference,
|
||||
}
|
||||
}
|
||||
|
@ -361,7 +361,9 @@ impl Drop for AppSrcSink {
|
|||
// This is not thread-safe before 1.16.3, see
|
||||
// https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570
|
||||
if gst::version() >= (1, 16, 3, 0) {
|
||||
self.app_src.set_callbacks(AppSrcCallbacks::new().build());
|
||||
if let Some(app_src) = self.app_src.upgrade() {
|
||||
app_src.set_callbacks(AppSrcCallbacks::new().build());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -372,8 +374,13 @@ impl Sink<gst::Sample> for AppSrcSink {
|
|||
fn poll_ready(self: Pin<&mut Self>, context: &mut Context) -> Poll<Result<(), Self::Error>> {
|
||||
let mut waker = self.waker_reference.lock().unwrap();
|
||||
|
||||
let current_level_bytes = self.app_src.get_current_level_bytes();
|
||||
let max_bytes = self.app_src.get_max_bytes();
|
||||
let app_src = match self.app_src.upgrade() {
|
||||
Some(app_src) => app_src,
|
||||
None => return Poll::Ready(Err(gst::FlowError::Eos)),
|
||||
};
|
||||
|
||||
let current_level_bytes = app_src.get_current_level_bytes();
|
||||
let max_bytes = app_src.get_max_bytes();
|
||||
|
||||
if current_level_bytes >= max_bytes && max_bytes != 0 {
|
||||
waker.replace(context.waker().to_owned());
|
||||
|
@ -385,7 +392,12 @@ impl Sink<gst::Sample> for AppSrcSink {
|
|||
}
|
||||
|
||||
fn start_send(self: Pin<&mut Self>, sample: gst::Sample) -> Result<(), Self::Error> {
|
||||
self.app_src.push_sample(&sample)?;
|
||||
let app_src = match self.app_src.upgrade() {
|
||||
Some(app_src) => app_src,
|
||||
None => return Err(gst::FlowError::Eos),
|
||||
};
|
||||
|
||||
app_src.push_sample(&sample)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -395,7 +407,12 @@ impl Sink<gst::Sample> for AppSrcSink {
|
|||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
|
||||
self.app_src.end_of_stream()?;
|
||||
let app_src = match self.app_src.upgrade() {
|
||||
Some(app_src) => app_src,
|
||||
None => return Poll::Ready(Ok(())),
|
||||
};
|
||||
|
||||
app_src.end_of_stream()?;
|
||||
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ use futures_channel::mpsc::{self, UnboundedReceiver};
|
|||
use futures_core::Stream;
|
||||
use futures_util::{future, StreamExt};
|
||||
use glib;
|
||||
use glib::prelude::*;
|
||||
use glib::source::{Continue, Priority, SourceId};
|
||||
use glib::translate::*;
|
||||
use glib_sys;
|
||||
|
@ -280,7 +281,7 @@ impl<'a> Iterator for Iter<'a> {
|
|||
|
||||
#[derive(Debug)]
|
||||
pub struct BusStream {
|
||||
bus: Bus,
|
||||
bus: glib::WeakRef<Bus>,
|
||||
receiver: UnboundedReceiver<Message>,
|
||||
}
|
||||
|
||||
|
@ -288,7 +289,6 @@ impl BusStream {
|
|||
pub fn new(bus: &Bus) -> Self {
|
||||
skip_assert_initialized!();
|
||||
|
||||
let bus = bus.clone();
|
||||
let (sender, receiver) = mpsc::unbounded();
|
||||
|
||||
bus.set_sync_handler(move |_, message| {
|
||||
|
@ -297,13 +297,18 @@ impl BusStream {
|
|||
BusSyncReply::Drop
|
||||
});
|
||||
|
||||
Self { bus, receiver }
|
||||
Self {
|
||||
bus: bus.downgrade(),
|
||||
receiver,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for BusStream {
|
||||
fn drop(&mut self) {
|
||||
self.bus.unset_sync_handler();
|
||||
if let Some(bus) = self.bus.upgrade() {
|
||||
bus.unset_sync_handler();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue