From e657e676d54f0e35ef8955b28f08b199934b866b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 25 Oct 2021 12:47:33 +0300 Subject: [PATCH] Relax main context checks in `Bus::add_local()` and `gst_video::convert_sample_async_local()` --- gstreamer-video/Cargo.toml | 1 + gstreamer-video/src/functions.rs | 12 ++++++--- gstreamer/Cargo.toml | 1 + gstreamer/src/bus.rs | 43 ++++++++++++++++++++++++++------ 4 files changed, 47 insertions(+), 10 deletions(-) diff --git a/gstreamer-video/Cargo.toml b/gstreamer-video/Cargo.toml index 06965422c..62f8f998a 100644 --- a/gstreamer-video/Cargo.toml +++ b/gstreamer-video/Cargo.toml @@ -23,6 +23,7 @@ gst = { package = "gstreamer", path = "../gstreamer" } gst-base = { package = "gstreamer-base", path = "../gstreamer-base" } once_cell = "1.0" futures-channel = "0.3" +fragile = "1" [dev-dependencies] itertools = "0.10" diff --git a/gstreamer-video/src/functions.rs b/gstreamer-video/src/functions.rs index 7138e36e3..2699d5370 100644 --- a/gstreamer-video/src/functions.rs +++ b/gstreamer-video/src/functions.rs @@ -49,12 +49,18 @@ pub fn convert_sample_async_local( timeout: Option, func: F, ) where - F: FnOnce(Result) + Send + 'static, + F: FnOnce(Result) + 'static, { skip_assert_initialized!(); unsafe { - assert!(glib::MainContext::ref_thread_default().is_owner()); - convert_sample_async_unsafe(sample, caps, timeout, func) + let ctx = glib::MainContext::ref_thread_default(); + let _acquire = ctx + .acquire() + .expect("thread default main context already acquired by another thread"); + + let func = fragile::Fragile::new(func); + + convert_sample_async_unsafe(sample, caps, timeout, move |res| (func.into_inner())(res)) } } diff --git a/gstreamer/Cargo.toml b/gstreamer/Cargo.toml index f4e25687b..2cf86d46d 100644 --- a/gstreamer/Cargo.toml +++ b/gstreamer/Cargo.toml @@ -32,6 +32,7 @@ serde_bytes = { version = "0.11", optional = true } paste = "1.0" pretty-hex = "0.2" thiserror = "1" +fragile = "1" [dev-dependencies] ron = "0.7" diff --git a/gstreamer/src/bus.rs b/gstreamer/src/bus.rs index fe6416aee..b8731d4f8 100644 --- a/gstreamer/src/bus.rs +++ b/gstreamer/src/bus.rs @@ -13,12 +13,14 @@ use std::mem::transmute; use std::pin::Pin; use std::task::{Context, Poll}; +use fragile::Fragile; + use crate::Bus; use crate::BusSyncReply; use crate::Message; use crate::MessageType; -unsafe extern "C" fn trampoline_watch Continue + 'static>( +unsafe extern "C" fn trampoline_watch Continue + Send + 'static>( bus: *mut ffi::GstBus, msg: *mut ffi::GstMessage, func: gpointer, @@ -27,18 +29,42 @@ unsafe extern "C" fn trampoline_watch Continue + 'st (&mut *func.borrow_mut())(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib() } -unsafe extern "C" fn destroy_closure_watch Continue + 'static>( +unsafe extern "C" fn destroy_closure_watch< + F: FnMut(&Bus, &Message) -> Continue + Send + 'static, +>( ptr: gpointer, ) { Box::>::from_raw(ptr as *mut _); } -fn into_raw_watch Continue + 'static>(func: F) -> gpointer { +fn into_raw_watch Continue + Send + 'static>(func: F) -> gpointer { #[allow(clippy::type_complexity)] let func: Box> = Box::new(RefCell::new(func)); Box::into_raw(func) as gpointer } +unsafe extern "C" fn trampoline_watch_local Continue + 'static>( + bus: *mut ffi::GstBus, + msg: *mut ffi::GstMessage, + func: gpointer, +) -> gboolean { + let func: &Fragile> = &*(func as *const Fragile>); + (&mut *func.get().borrow_mut())(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)) + .into_glib() +} + +unsafe extern "C" fn destroy_closure_watch_local Continue + 'static>( + ptr: gpointer, +) { + Box::>>::from_raw(ptr as *mut _); +} + +fn into_raw_watch_local Continue + 'static>(func: F) -> gpointer { + #[allow(clippy::type_complexity)] + let func: Box>> = Box::new(Fragile::new(RefCell::new(func))); + Box::into_raw(func) as gpointer +} + unsafe extern "C" fn trampoline_sync< F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static, >( @@ -133,14 +159,17 @@ impl Bus { F: FnMut(&Bus, &Message) -> Continue + 'static, { unsafe { - assert!(glib::MainContext::ref_thread_default().is_owner()); + let ctx = glib::MainContext::ref_thread_default(); + let _acquire = ctx + .acquire() + .expect("thread default main context already acquired by another thread"); let res = ffi::gst_bus_add_watch_full( self.to_glib_none().0, glib::ffi::G_PRIORITY_DEFAULT, - Some(trampoline_watch::), - into_raw_watch(func), - Some(destroy_closure_watch::), + Some(trampoline_watch_local::), + into_raw_watch_local(func), + Some(destroy_closure_watch_local::), ); if res == 0 {