// Take a look at the license at the top of the repository in the LICENSE file. use std::{ hash::{Hash, Hasher}, ptr, sync::{Arc, Mutex}, }; use glib::{ffi::gpointer, subclass::prelude::*, translate::*}; use super::prelude::*; use crate::{TaskHandle, TaskPool}; pub trait TaskPoolImpl: GstObjectImpl + Send + Sync { // rustdoc-stripper-ignore-next /// Handle to be returned from the `push` function to allow the caller to wait for the task's /// completion. /// /// If unneeded, you can specify `()` or [`Infallible`](std::convert::Infallible) for a handle /// that does nothing on `join` or drop. type Handle: TaskHandle; // rustdoc-stripper-ignore-next /// Prepare the task pool to accept tasks. /// /// This defaults to doing nothing. fn prepare(&self) -> Result<(), glib::Error> { Ok(()) } // rustdoc-stripper-ignore-next /// Clean up, rejecting further tasks and waiting for all accepted tasks to be stopped. /// /// This is mainly used internally to ensure proper cleanup of internal data structures in test /// suites. fn cleanup(&self) {} // rustdoc-stripper-ignore-next /// Deliver a task to the pool. /// /// If returning `Ok`, you need to call the `func` eventually. /// /// If returning `Err`, the `func` must be dropped without calling it. fn push(&self, func: TaskPoolFunction) -> Result, glib::Error>; } unsafe impl IsSubclassable for TaskPool { fn class_init(klass: &mut glib::Class) { Self::parent_class_init::(klass); let klass = klass.as_mut(); klass.prepare = Some(task_pool_prepare::); klass.cleanup = Some(task_pool_cleanup::); klass.push = Some(task_pool_push::); klass.join = Some(task_pool_join::); #[cfg(any(feature = "v1_20", feature = "dox"))] { klass.dispose_handle = Some(task_pool_dispose_handle::); } } } unsafe extern "C" fn task_pool_prepare( ptr: *mut ffi::GstTaskPool, error: *mut *mut glib::ffi::GError, ) { let instance = &*(ptr as *mut T::Instance); let imp = instance.imp(); match imp.prepare() { Ok(()) => {} Err(err) => { if !error.is_null() { *error = err.into_glib_ptr(); } } } } unsafe extern "C" fn task_pool_cleanup(ptr: *mut ffi::GstTaskPool) { let instance = &*(ptr as *mut T::Instance); let imp = instance.imp(); imp.cleanup(); } unsafe extern "C" fn task_pool_push( ptr: *mut ffi::GstTaskPool, func: ffi::GstTaskPoolFunction, user_data: gpointer, error: *mut *mut glib::ffi::GError, ) -> gpointer { let instance = &*(ptr as *mut T::Instance); let imp = instance.imp(); let func = TaskPoolFunction::new(func.expect("Tried to push null func"), user_data); match imp.push(func.clone()) { Ok(None) => ptr::null_mut(), Ok(Some(handle)) => Box::into_raw(Box::new(handle)) as gpointer, Err(err) => { func.prevent_call(); if !error.is_null() { *error = err.into_glib_ptr(); } ptr::null_mut() } } } unsafe extern "C" fn task_pool_join(ptr: *mut ffi::GstTaskPool, id: gpointer) { if id.is_null() { let wrap: Borrowed = from_glib_borrow(ptr); crate::warning!(crate::CAT_RUST, obj: wrap.as_ref(), "Tried to join null handle"); return; } let handle = Box::from_raw(id as *mut T::Handle); handle.join(); } #[cfg(any(feature = "v1_20", feature = "dox"))] #[cfg_attr(feature = "dox", doc(cfg(feature = "v1_20")))] unsafe extern "C" fn task_pool_dispose_handle( ptr: *mut ffi::GstTaskPool, id: gpointer, ) { if id.is_null() { let wrap: Borrowed = from_glib_borrow(ptr); crate::warning!(crate::CAT_RUST, obj: wrap.as_ref(), "Tried to dispose null handle"); return; } let handle = Box::from_raw(id as *mut T::Handle); drop(handle); } // rustdoc-stripper-ignore-next /// Function the task pool should execute, provided to [`push`](TaskPoolImpl::push). #[derive(Debug)] pub struct TaskPoolFunction(Arc>>); // `Arc>>` is required so that we can enforce that the function // has not been called and will never be called after `push` returns `Err`. #[derive(Debug)] struct TaskPoolFunctionInner { func: unsafe extern "C" fn(gpointer), user_data: gpointer, warn_on_drop: bool, } unsafe impl Send for TaskPoolFunctionInner {} impl TaskPoolFunction { fn new(func: unsafe extern "C" fn(gpointer), user_data: gpointer) -> Self { let inner = TaskPoolFunctionInner { func, user_data, warn_on_drop: true, }; Self(Arc::new(Mutex::new(Some(inner)))) } #[inline] fn clone(&self) -> Self { Self(self.0.clone()) } // rustdoc-stripper-ignore-next /// Consume and execute the function. pub fn call(self) { let mut inner = self .0 .lock() .unwrap() .take() .expect("TaskPoolFunction has already been dropped"); inner.warn_on_drop = false; unsafe { (inner.func)(inner.user_data) } } fn prevent_call(self) { let mut inner = self .0 .lock() .unwrap() .take() .expect("TaskPoolFunction has already been called"); inner.warn_on_drop = false; drop(inner); } #[inline] fn as_ptr(&self) -> *const Mutex> { Arc::as_ptr(&self.0) } } impl Drop for TaskPoolFunctionInner { fn drop(&mut self) { if self.warn_on_drop { crate::warning!(crate::CAT_RUST, "Leaked task function"); } } } impl PartialEq for TaskPoolFunction { fn eq(&self, other: &Self) -> bool { self.as_ptr() == other.as_ptr() } } impl Eq for TaskPoolFunction {} impl PartialOrd for TaskPoolFunction { fn partial_cmp(&self, other: &Self) -> Option { self.as_ptr().partial_cmp(&other.as_ptr()) } } impl Ord for TaskPoolFunction { fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.as_ptr().cmp(&other.as_ptr()) } } impl Hash for TaskPoolFunction { fn hash(&self, state: &mut H) { self.as_ptr().hash(state) } } #[cfg(test)] mod tests { use std::{ sync::{ atomic, mpsc::{channel, TryRecvError}, }, thread, }; use super::*; use crate::prelude::*; pub mod imp { use super::*; #[derive(Default)] pub struct TestPool { pub(super) prepared: atomic::AtomicBool, pub(super) cleaned_up: atomic::AtomicBool, } #[glib::object_subclass] impl ObjectSubclass for TestPool { const NAME: &'static str = "TestPool"; type Type = super::TestPool; type ParentType = TaskPool; } impl ObjectImpl for TestPool {} impl GstObjectImpl for TestPool {} impl TaskPoolImpl for TestPool { type Handle = TestHandle; fn prepare(&self) -> Result<(), glib::Error> { self.prepared.store(true, atomic::Ordering::SeqCst); Ok(()) } fn cleanup(&self) { self.cleaned_up.store(true, atomic::Ordering::SeqCst); } fn push(&self, func: TaskPoolFunction) -> Result, glib::Error> { let handle = thread::spawn(move || func.call()); Ok(Some(TestHandle(handle))) } } pub struct TestHandle(thread::JoinHandle<()>); impl TaskHandle for TestHandle { fn join(self) { self.0.join().unwrap(); } } } glib::wrapper! { pub struct TestPool(ObjectSubclass) @extends TaskPool, crate::Object; } unsafe impl Send for TestPool {} unsafe impl Sync for TestPool {} impl TestPool { pub fn new() -> Self { Self::default() } } impl Default for TestPool { fn default() -> Self { glib::Object::new(&[]) } } #[test] fn test_simple_subclass() { crate::init().unwrap(); let pool = TestPool::new(); pool.prepare().unwrap(); let (sender, receiver) = channel(); let handle = pool .push(move || { sender.send(()).unwrap(); }) .unwrap(); let handle = handle.unwrap(); assert_eq!(receiver.recv(), Ok(())); handle.join(); assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected)); pool.cleanup(); let imp = pool.imp(); assert!(imp.prepared.load(atomic::Ordering::SeqCst)); assert!(imp.cleaned_up.load(atomic::Ordering::SeqCst)); } }