diff --git a/gstreamer-app/Cargo.toml b/gstreamer-app/Cargo.toml index 5c6fbd710..45d65a43c 100644 --- a/gstreamer-app/Cargo.toml +++ b/gstreamer-app/Cargo.toml @@ -24,6 +24,7 @@ gstreamer-app-sys = { version = "0.8", features = ["v1_8"] } glib = { version = "0.9" } gstreamer = { version = "0.15", path = "../gstreamer" } gstreamer-base = { version = "0.15", path = "../gstreamer-base" } +lazy_static = "1.0" [build-dependencies] rustdoc-stripper = { version = "0.1", optional = true } diff --git a/gstreamer-app/src/app_sink.rs b/gstreamer-app/src/app_sink.rs index 1c0a0f372..ead302a4f 100644 --- a/gstreamer-app/src/app_sink.rs +++ b/gstreamer-app/src/app_sink.rs @@ -30,6 +30,11 @@ use { }, }; +lazy_static! { + static ref SET_ONCE_QUARK: glib::Quark = + glib::Quark::from_string("gstreamer-rs-app-sink-callbacks"); +} + #[allow(clippy::type_complexity)] pub struct AppSinkCallbacks { eos: Option>>, @@ -185,8 +190,16 @@ unsafe extern "C" fn destroy_callbacks(ptr: gpointer) { impl AppSink { pub fn set_callbacks(&self, callbacks: AppSinkCallbacks) { unsafe { + let sink = self.to_glib_none().0; + if !gobject_sys::g_object_get_qdata(sink as *mut _, SET_ONCE_QUARK.to_glib()).is_null() + { + panic!("AppSink callbacks can only be set once"); + } + + gobject_sys::g_object_set_qdata(sink as *mut _, SET_ONCE_QUARK.to_glib(), 1 as *mut _); + gst_app_sys::gst_app_sink_set_callbacks( - self.to_glib_none().0, + sink, mut_override(&callbacks.callbacks), Box::into_raw(Box::new(callbacks)) as *mut _, Some(destroy_callbacks), @@ -303,13 +316,6 @@ impl AppSinkStream { } } -#[cfg(any(feature = "v1_10"))] -impl Drop for AppSinkStream { - fn drop(&mut self) { - self.app_sink.set_callbacks(AppSinkCallbacks::new().build()); - } -} - #[cfg(any(feature = "v1_10"))] impl Stream for AppSinkStream { type Item = gst::Sample; diff --git a/gstreamer-app/src/app_src.rs b/gstreamer-app/src/app_src.rs index 8b54ba189..398abae6a 100644 --- a/gstreamer-app/src/app_src.rs +++ b/gstreamer-app/src/app_src.rs @@ -19,6 +19,11 @@ use std::sync::{Arc, Mutex}; use std::task::{Context, Poll, Waker}; use AppSrc; +lazy_static! { + static ref SET_ONCE_QUARK: glib::Quark = + glib::Quark::from_string("gstreamer-rs-app-src-callbacks"); +} + #[allow(clippy::type_complexity)] pub struct AppSrcCallbacks { need_data: Option>>, @@ -200,8 +205,15 @@ impl AppSrc { pub fn set_callbacks(&self, callbacks: AppSrcCallbacks) { unsafe { + let src = self.to_glib_none().0; + if !gobject_sys::g_object_get_qdata(src as *mut _, SET_ONCE_QUARK.to_glib()).is_null() { + panic!("AppSrc callbacks can only be set once"); + } + + gobject_sys::g_object_set_qdata(src as *mut _, SET_ONCE_QUARK.to_glib(), 1 as *mut _); + gst_app_sys::gst_app_src_set_callbacks( - self.to_glib_none().0, + src, mut_override(&callbacks.callbacks), Box::into_raw(Box::new(callbacks)) as *mut _, Some(destroy_callbacks), @@ -271,12 +283,6 @@ impl AppSrcSink { } } -impl Drop for AppSrcSink { - fn drop(&mut self) { - self.app_src.set_callbacks(AppSrcCallbacks::new().build()); - } -} - impl Sink for AppSrcSink { type Error = gst::FlowError; diff --git a/gstreamer-app/src/lib.rs b/gstreamer-app/src/lib.rs index c7fd82503..072744099 100644 --- a/gstreamer-app/src/lib.rs +++ b/gstreamer-app/src/lib.rs @@ -17,6 +17,9 @@ extern crate gstreamer_app_sys as gst_app_sys; extern crate gstreamer_base as gst_base; extern crate gstreamer_sys as gst_sys; +#[macro_use] +extern crate lazy_static; + #[macro_use] extern crate glib; diff --git a/gstreamer/src/bus.rs b/gstreamer/src/bus.rs index 1b168db38..ca105926c 100644 --- a/gstreamer/src/bus.rs +++ b/gstreamer/src/bus.rs @@ -18,7 +18,6 @@ use gst_sys; use std::cell::RefCell; use std::mem::transmute; use std::pin::Pin; -use std::ptr; use std::task::{Context, Poll}; use Bus; @@ -26,6 +25,10 @@ use BusSyncReply; use Message; use MessageType; +lazy_static! { + static ref SET_ONCE_QUARK: glib::Quark = glib::Quark::from_string("gstreamer-rs-sync-handler"); +} + unsafe extern "C" fn trampoline_watch Continue + 'static>( bus: *mut gst_sys::GstBus, msg: *mut gst_sys::GstMessage, @@ -158,8 +161,15 @@ impl Bus { F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static, { unsafe { + let bus = self.to_glib_none().0; + if !gobject_sys::g_object_get_qdata(bus as *mut _, SET_ONCE_QUARK.to_glib()).is_null() { + panic!("Bus sync handler can only be set once"); + } + + gobject_sys::g_object_set_qdata(bus as *mut _, SET_ONCE_QUARK.to_glib(), 1 as *mut _); + gst_sys::gst_bus_set_sync_handler( - self.to_glib_none().0, + bus, Some(trampoline_sync::), into_raw_sync(func), Some(destroy_closure_sync::), @@ -167,12 +177,6 @@ impl Bus { } } - pub fn unset_sync_handler(&self) { - unsafe { - gst_sys::gst_bus_set_sync_handler(self.to_glib_none().0, None, ptr::null_mut(), None) - } - } - pub fn iter(&self) -> Iter { self.iter_timed(0.into()) } @@ -272,12 +276,6 @@ impl BusStream { } } -impl Drop for BusStream { - fn drop(&mut self) { - self.bus.unset_sync_handler(); - } -} - impl Stream for BusStream { type Item = Message;