diff --git a/gstreamer-app/src/app_sink.rs b/gstreamer-app/src/app_sink.rs index e1187e3db..0e86e9994 100644 --- a/gstreamer-app/src/app_sink.rs +++ b/gstreamer-app/src/app_sink.rs @@ -190,12 +190,22 @@ impl AppSink { unsafe { let sink = self.to_glib_none().0; - if !gobject_sys::g_object_get_qdata(sink as *mut _, SET_ONCE_QUARK.to_glib()).is_null() - { - panic!("AppSink callbacks can only be set once"); - } - gobject_sys::g_object_set_qdata(sink as *mut _, SET_ONCE_QUARK.to_glib(), 1 as *mut _); + // This is not thread-safe before 1.16.3, see + // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570 + if gst::version() < (1, 16, 3, 0) { + if !gobject_sys::g_object_get_qdata(sink as *mut _, SET_ONCE_QUARK.to_glib()) + .is_null() + { + panic!("AppSink callbacks can only be set once"); + } + + gobject_sys::g_object_set_qdata( + sink as *mut _, + SET_ONCE_QUARK.to_glib(), + 1 as *mut _, + ); + } gst_app_sys::gst_app_sink_set_callbacks( sink, @@ -315,6 +325,17 @@ impl AppSinkStream { } } +#[cfg(any(feature = "v1_10"))] +impl Drop for AppSinkStream { + fn drop(&mut self) { + // This is not thread-safe before 1.16.3, see + // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570 + if gst::version() >= (1, 16, 3, 0) { + self.app_sink.set_callbacks(AppSinkCallbacks::new().build()); + } + } +} + #[cfg(any(feature = "v1_10"))] impl Stream for AppSinkStream { type Item = gst::Sample; diff --git a/gstreamer-app/src/app_src.rs b/gstreamer-app/src/app_src.rs index 71fa00ba5..830f50d39 100644 --- a/gstreamer-app/src/app_src.rs +++ b/gstreamer-app/src/app_src.rs @@ -205,11 +205,21 @@ impl AppSrc { unsafe { let src = self.to_glib_none().0; - if !gobject_sys::g_object_get_qdata(src as *mut _, SET_ONCE_QUARK.to_glib()).is_null() { - panic!("AppSrc callbacks can only be set once"); - } + // This is not thread-safe before 1.16.3, see + // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570 + if gst::version() < (1, 16, 3, 0) { + if !gobject_sys::g_object_get_qdata(src as *mut _, SET_ONCE_QUARK.to_glib()) + .is_null() + { + panic!("AppSrc callbacks can only be set once"); + } - gobject_sys::g_object_set_qdata(src as *mut _, SET_ONCE_QUARK.to_glib(), 1 as *mut _); + gobject_sys::g_object_set_qdata( + src as *mut _, + SET_ONCE_QUARK.to_glib(), + 1 as *mut _, + ); + } gst_app_sys::gst_app_src_set_callbacks( src, @@ -282,6 +292,16 @@ impl AppSrcSink { } } +impl Drop for AppSrcSink { + fn drop(&mut self) { + // This is not thread-safe before 1.16.3, see + // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570 + if gst::version() >= (1, 16, 3, 0) { + self.app_src.set_callbacks(AppSrcCallbacks::new().build()); + } + } +} + impl Sink for AppSrcSink { type Error = gst::FlowError; diff --git a/gstreamer/src/bus.rs b/gstreamer/src/bus.rs index 89e0e423b..1ad20e34d 100644 --- a/gstreamer/src/bus.rs +++ b/gstreamer/src/bus.rs @@ -162,11 +162,22 @@ impl Bus { unsafe { let bus = self.to_glib_none().0; - if !gobject_sys::g_object_get_qdata(bus as *mut _, SET_ONCE_QUARK.to_glib()).is_null() { - panic!("Bus sync handler can only be set once"); - } - gobject_sys::g_object_set_qdata(bus as *mut _, SET_ONCE_QUARK.to_glib(), 1 as *mut _); + // This is not thread-safe before 1.16.3, see + // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416 + if ::version() < (1, 16, 3, 0) { + if !gobject_sys::g_object_get_qdata(bus as *mut _, SET_ONCE_QUARK.to_glib()) + .is_null() + { + panic!("Bus sync handler can only be set once"); + } + + gobject_sys::g_object_set_qdata( + bus as *mut _, + SET_ONCE_QUARK.to_glib(), + 1 as *mut _, + ); + } gst_sys::gst_bus_set_sync_handler( bus, @@ -177,6 +188,20 @@ impl Bus { } } + pub fn unset_sync_handler(&self) { + // This is not thread-safe before 1.16.3, see + // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416 + if ::version() < (1, 16, 3, 0) { + return; + } + + unsafe { + use std::ptr; + + gst_sys::gst_bus_set_sync_handler(self.to_glib_none().0, None, ptr::null_mut(), None) + } + } + pub fn iter(&self) -> Iter { self.iter_timed(0.into()) } @@ -276,6 +301,12 @@ impl BusStream { } } +impl Drop for BusStream { + fn drop(&mut self) { + self.bus.unset_sync_handler(); + } +} + impl Stream for BusStream { type Item = Message;