Relax main context checks in Bus::add_local() and gst_video::convert_sample_async_local()

This commit is contained in:
Sebastian Dröge 2021-10-25 12:47:33 +03:00 committed by Sebastian Dröge
parent 42feed5441
commit e657e676d5
4 changed files with 47 additions and 10 deletions

View file

@ -23,6 +23,7 @@ gst = { package = "gstreamer", path = "../gstreamer" }
gst-base = { package = "gstreamer-base", path = "../gstreamer-base" } gst-base = { package = "gstreamer-base", path = "../gstreamer-base" }
once_cell = "1.0" once_cell = "1.0"
futures-channel = "0.3" futures-channel = "0.3"
fragile = "1"
[dev-dependencies] [dev-dependencies]
itertools = "0.10" itertools = "0.10"

View file

@ -49,12 +49,18 @@ pub fn convert_sample_async_local<F>(
timeout: Option<gst::ClockTime>, timeout: Option<gst::ClockTime>,
func: F, func: F,
) where ) where
F: FnOnce(Result<gst::Sample, glib::Error>) + Send + 'static, F: FnOnce(Result<gst::Sample, glib::Error>) + 'static,
{ {
skip_assert_initialized!(); skip_assert_initialized!();
unsafe { unsafe {
assert!(glib::MainContext::ref_thread_default().is_owner()); let ctx = glib::MainContext::ref_thread_default();
convert_sample_async_unsafe(sample, caps, timeout, func) let _acquire = ctx
.acquire()
.expect("thread default main context already acquired by another thread");
let func = fragile::Fragile::new(func);
convert_sample_async_unsafe(sample, caps, timeout, move |res| (func.into_inner())(res))
} }
} }

View file

@ -32,6 +32,7 @@ serde_bytes = { version = "0.11", optional = true }
paste = "1.0" paste = "1.0"
pretty-hex = "0.2" pretty-hex = "0.2"
thiserror = "1" thiserror = "1"
fragile = "1"
[dev-dependencies] [dev-dependencies]
ron = "0.7" ron = "0.7"

View file

@ -13,12 +13,14 @@ use std::mem::transmute;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use fragile::Fragile;
use crate::Bus; use crate::Bus;
use crate::BusSyncReply; use crate::BusSyncReply;
use crate::Message; use crate::Message;
use crate::MessageType; use crate::MessageType;
unsafe extern "C" fn trampoline_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>( unsafe extern "C" fn trampoline_watch<F: FnMut(&Bus, &Message) -> Continue + Send + 'static>(
bus: *mut ffi::GstBus, bus: *mut ffi::GstBus,
msg: *mut ffi::GstMessage, msg: *mut ffi::GstMessage,
func: gpointer, func: gpointer,
@ -27,18 +29,42 @@ unsafe extern "C" fn trampoline_watch<F: FnMut(&Bus, &Message) -> Continue + 'st
(&mut *func.borrow_mut())(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib() (&mut *func.borrow_mut())(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib()
} }
unsafe extern "C" fn destroy_closure_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>( unsafe extern "C" fn destroy_closure_watch<
F: FnMut(&Bus, &Message) -> Continue + Send + 'static,
>(
ptr: gpointer, ptr: gpointer,
) { ) {
Box::<RefCell<F>>::from_raw(ptr as *mut _); Box::<RefCell<F>>::from_raw(ptr as *mut _);
} }
fn into_raw_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>(func: F) -> gpointer { fn into_raw_watch<F: FnMut(&Bus, &Message) -> Continue + Send + 'static>(func: F) -> gpointer {
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
let func: Box<RefCell<F>> = Box::new(RefCell::new(func)); let func: Box<RefCell<F>> = Box::new(RefCell::new(func));
Box::into_raw(func) as gpointer Box::into_raw(func) as gpointer
} }
unsafe extern "C" fn trampoline_watch_local<F: FnMut(&Bus, &Message) -> Continue + 'static>(
bus: *mut ffi::GstBus,
msg: *mut ffi::GstMessage,
func: gpointer,
) -> gboolean {
let func: &Fragile<RefCell<F>> = &*(func as *const Fragile<RefCell<F>>);
(&mut *func.get().borrow_mut())(&from_glib_borrow(bus), &Message::from_glib_borrow(msg))
.into_glib()
}
unsafe extern "C" fn destroy_closure_watch_local<F: FnMut(&Bus, &Message) -> Continue + 'static>(
ptr: gpointer,
) {
Box::<Fragile<RefCell<F>>>::from_raw(ptr as *mut _);
}
fn into_raw_watch_local<F: FnMut(&Bus, &Message) -> Continue + 'static>(func: F) -> gpointer {
#[allow(clippy::type_complexity)]
let func: Box<Fragile<RefCell<F>>> = Box::new(Fragile::new(RefCell::new(func)));
Box::into_raw(func) as gpointer
}
unsafe extern "C" fn trampoline_sync< unsafe extern "C" fn trampoline_sync<
F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static, F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
>( >(
@ -133,14 +159,17 @@ impl Bus {
F: FnMut(&Bus, &Message) -> Continue + 'static, F: FnMut(&Bus, &Message) -> Continue + 'static,
{ {
unsafe { unsafe {
assert!(glib::MainContext::ref_thread_default().is_owner()); let ctx = glib::MainContext::ref_thread_default();
let _acquire = ctx
.acquire()
.expect("thread default main context already acquired by another thread");
let res = ffi::gst_bus_add_watch_full( let res = ffi::gst_bus_add_watch_full(
self.to_glib_none().0, self.to_glib_none().0,
glib::ffi::G_PRIORITY_DEFAULT, glib::ffi::G_PRIORITY_DEFAULT,
Some(trampoline_watch::<F>), Some(trampoline_watch_local::<F>),
into_raw_watch(func), into_raw_watch_local(func),
Some(destroy_closure_watch::<F>), Some(destroy_closure_watch_local::<F>),
); );
if res == 0 { if res == 0 {