Only allow setting Bus sync handler and AppSrc/Sink callbacks once

Re-setting them is not thread-safe and can cause segfaults or worse.

See https://gitlab.freedesktop.org/gstreamer/gstreamer/issues/506
and https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/issues/729
This commit is contained in:
Sebastian Dröge 2020-02-09 21:30:53 +02:00
parent 870978e286
commit 2f88dc6576
5 changed files with 40 additions and 29 deletions

View file

@ -24,6 +24,7 @@ gstreamer-app-sys = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-
glib = { git = "https://github.com/gtk-rs/glib" } glib = { git = "https://github.com/gtk-rs/glib" }
gstreamer = { path = "../gstreamer" } gstreamer = { path = "../gstreamer" }
gstreamer-base = { path = "../gstreamer-base" } gstreamer-base = { path = "../gstreamer-base" }
once_cell = "1.0"
[build-dependencies] [build-dependencies]
rustdoc-stripper = { version = "0.1", optional = true } rustdoc-stripper = { version = "0.1", optional = true }

View file

@ -184,9 +184,21 @@ unsafe extern "C" fn destroy_callbacks(ptr: gpointer) {
impl AppSink { impl AppSink {
pub fn set_callbacks(&self, callbacks: AppSinkCallbacks) { pub fn set_callbacks(&self, callbacks: AppSinkCallbacks) {
use once_cell::sync::Lazy;
static SET_ONCE_QUARK: Lazy<glib::Quark> =
Lazy::new(|| glib::Quark::from_string("gstreamer-rs-app-sink-callbacks"));
unsafe { unsafe {
let sink = self.to_glib_none().0;
if !gobject_sys::g_object_get_qdata(sink as *mut _, SET_ONCE_QUARK.to_glib()).is_null()
{
panic!("AppSink callbacks can only be set once");
}
gobject_sys::g_object_set_qdata(sink as *mut _, SET_ONCE_QUARK.to_glib(), 1 as *mut _);
gst_app_sys::gst_app_sink_set_callbacks( gst_app_sys::gst_app_sink_set_callbacks(
self.to_glib_none().0, sink,
mut_override(&callbacks.callbacks), mut_override(&callbacks.callbacks),
Box::into_raw(Box::new(callbacks)) as *mut _, Box::into_raw(Box::new(callbacks)) as *mut _,
Some(destroy_callbacks), Some(destroy_callbacks),
@ -303,13 +315,6 @@ impl AppSinkStream {
} }
} }
#[cfg(any(feature = "v1_10"))]
impl Drop for AppSinkStream {
fn drop(&mut self) {
self.app_sink.set_callbacks(AppSinkCallbacks::new().build());
}
}
#[cfg(any(feature = "v1_10"))] #[cfg(any(feature = "v1_10"))]
impl Stream for AppSinkStream { impl Stream for AppSinkStream {
type Item = gst::Sample; type Item = gst::Sample;

View file

@ -199,9 +199,20 @@ impl AppSrc {
} }
pub fn set_callbacks(&self, callbacks: AppSrcCallbacks) { pub fn set_callbacks(&self, callbacks: AppSrcCallbacks) {
use once_cell::sync::Lazy;
static SET_ONCE_QUARK: Lazy<glib::Quark> =
Lazy::new(|| glib::Quark::from_string("gstreamer-rs-app-src-callbacks"));
unsafe { unsafe {
let src = self.to_glib_none().0;
if !gobject_sys::g_object_get_qdata(src as *mut _, SET_ONCE_QUARK.to_glib()).is_null() {
panic!("AppSrc callbacks can only be set once");
}
gobject_sys::g_object_set_qdata(src as *mut _, SET_ONCE_QUARK.to_glib(), 1 as *mut _);
gst_app_sys::gst_app_src_set_callbacks( gst_app_sys::gst_app_src_set_callbacks(
self.to_glib_none().0, src,
mut_override(&callbacks.callbacks), mut_override(&callbacks.callbacks),
Box::into_raw(Box::new(callbacks)) as *mut _, Box::into_raw(Box::new(callbacks)) as *mut _,
Some(destroy_callbacks), Some(destroy_callbacks),
@ -271,12 +282,6 @@ impl AppSrcSink {
} }
} }
impl Drop for AppSrcSink {
fn drop(&mut self) {
self.app_src.set_callbacks(AppSrcCallbacks::new().build());
}
}
impl Sink<gst::Sample> for AppSrcSink { impl Sink<gst::Sample> for AppSrcSink {
type Error = gst::FlowError; type Error = gst::FlowError;

View file

@ -17,6 +17,8 @@ extern crate gstreamer_app_sys as gst_app_sys;
extern crate gstreamer_base as gst_base; extern crate gstreamer_base as gst_base;
extern crate gstreamer_sys as gst_sys; extern crate gstreamer_sys as gst_sys;
extern crate once_cell;
#[macro_use] #[macro_use]
extern crate glib; extern crate glib;

View file

@ -18,7 +18,6 @@ use gst_sys;
use std::cell::RefCell; use std::cell::RefCell;
use std::mem::transmute; use std::mem::transmute;
use std::pin::Pin; use std::pin::Pin;
use std::ptr;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use Bus; use Bus;
@ -157,9 +156,20 @@ impl Bus {
where where
F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static, F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
{ {
use once_cell::sync::Lazy;
static SET_ONCE_QUARK: Lazy<glib::Quark> =
Lazy::new(|| glib::Quark::from_string("gstreamer-rs-sync-handler"));
unsafe { unsafe {
let bus = self.to_glib_none().0;
if !gobject_sys::g_object_get_qdata(bus as *mut _, SET_ONCE_QUARK.to_glib()).is_null() {
panic!("Bus sync handler can only be set once");
}
gobject_sys::g_object_set_qdata(bus as *mut _, SET_ONCE_QUARK.to_glib(), 1 as *mut _);
gst_sys::gst_bus_set_sync_handler( gst_sys::gst_bus_set_sync_handler(
self.to_glib_none().0, bus,
Some(trampoline_sync::<F>), Some(trampoline_sync::<F>),
into_raw_sync(func), into_raw_sync(func),
Some(destroy_closure_sync::<F>), Some(destroy_closure_sync::<F>),
@ -167,12 +177,6 @@ impl Bus {
} }
} }
pub fn unset_sync_handler(&self) {
unsafe {
gst_sys::gst_bus_set_sync_handler(self.to_glib_none().0, None, ptr::null_mut(), None)
}
}
pub fn iter(&self) -> Iter { pub fn iter(&self) -> Iter {
self.iter_timed(0.into()) self.iter_timed(0.into())
} }
@ -272,12 +276,6 @@ impl BusStream {
} }
} }
impl Drop for BusStream {
fn drop(&mut self) {
self.bus.unset_sync_handler();
}
}
impl Stream for BusStream { impl Stream for BusStream {
type Item = Message; type Item = Message;