mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer-rs.git
synced 2024-12-22 08:07:07 +00:00
gstreamer: Add TaskPool bindings and subclassing
Fixes: https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/issues/14
This commit is contained in:
parent
44b2eba600
commit
b7afdd4dd0
8 changed files with 724 additions and 0 deletions
|
@ -2300,6 +2300,42 @@ manual_traits = ["TagSetterExtManual"]
|
|||
# Takes a raw pointer
|
||||
ignore = true
|
||||
|
||||
[[object]]
|
||||
name = "Gst.Task"
|
||||
status = "generate"
|
||||
# Need work
|
||||
[[object.function]]
|
||||
name = "new"
|
||||
ignore = true
|
||||
[[object.function]]
|
||||
name = "set_enter_callback"
|
||||
ignore = true
|
||||
[[object.function]]
|
||||
name = "set_leave_callback"
|
||||
ignore = true
|
||||
[[object.function]]
|
||||
name = "set_lock"
|
||||
ignore = true
|
||||
[[object.function]]
|
||||
name = "set_state"
|
||||
ignore = true
|
||||
|
||||
[[object]]
|
||||
name = "Gst.TaskPool"
|
||||
status = "generate"
|
||||
manual_traits = ["TaskPoolExtManual"]
|
||||
[[object.function]]
|
||||
name = "push"
|
||||
manual = true
|
||||
|
||||
# Moved to TaskHandle
|
||||
[[object.function]]
|
||||
name = "join"
|
||||
ignore = true
|
||||
[[object.function]]
|
||||
name = "dispose_handle"
|
||||
ignore = true
|
||||
|
||||
[[object]]
|
||||
name = "Gst.Toc"
|
||||
status = "manual"
|
||||
|
|
|
@ -95,6 +95,12 @@ pub use self::system_clock::SystemClock;
|
|||
mod tag_setter;
|
||||
pub use self::tag_setter::TagSetter;
|
||||
|
||||
mod task;
|
||||
pub use self::task::Task;
|
||||
|
||||
mod task_pool;
|
||||
pub use self::task_pool::TaskPool;
|
||||
|
||||
mod toc_setter;
|
||||
pub use self::toc_setter::TocSetter;
|
||||
|
||||
|
@ -225,6 +231,8 @@ pub mod traits {
|
|||
pub use super::proxy_pad::ProxyPadExt;
|
||||
pub use super::system_clock::SystemClockExt;
|
||||
pub use super::tag_setter::TagSetterExt;
|
||||
pub use super::task::TaskExt;
|
||||
pub use super::task_pool::TaskPoolExt;
|
||||
pub use super::toc_setter::TocSetterExt;
|
||||
pub use super::tracer::TracerExt;
|
||||
pub use super::uri_handler::URIHandlerExt;
|
||||
|
|
105
gstreamer/src/auto/task.rs
Normal file
105
gstreamer/src/auto/task.rs
Normal file
|
@ -0,0 +1,105 @@
|
|||
// This file was generated by gir (https://github.com/gtk-rs/gir)
|
||||
// from gir-files (https://github.com/gtk-rs/gir-files)
|
||||
// from gst-gir-files (https://gitlab.freedesktop.org/gstreamer/gir-files-rs.git)
|
||||
// DO NOT EDIT
|
||||
|
||||
use crate::Object;
|
||||
use crate::TaskPool;
|
||||
use crate::TaskState;
|
||||
use glib::object::IsA;
|
||||
use glib::translate::*;
|
||||
|
||||
glib::wrapper! {
|
||||
#[doc(alias = "GstTask")]
|
||||
pub struct Task(Object<ffi::GstTask, ffi::GstTaskClass>) @extends Object;
|
||||
|
||||
match fn {
|
||||
type_ => || ffi::gst_task_get_type(),
|
||||
}
|
||||
}
|
||||
|
||||
impl Task {
|
||||
pub const NONE: Option<&'static Task> = None;
|
||||
|
||||
#[doc(alias = "gst_task_cleanup_all")]
|
||||
pub fn cleanup_all() {
|
||||
assert_initialized_main_thread!();
|
||||
unsafe {
|
||||
ffi::gst_task_cleanup_all();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
unsafe impl Send for Task {}
|
||||
unsafe impl Sync for Task {}
|
||||
|
||||
pub trait TaskExt: 'static {
|
||||
#[doc(alias = "gst_task_get_pool")]
|
||||
#[doc(alias = "get_pool")]
|
||||
fn pool(&self) -> TaskPool;
|
||||
|
||||
#[doc(alias = "gst_task_get_state")]
|
||||
#[doc(alias = "get_state")]
|
||||
fn state(&self) -> TaskState;
|
||||
|
||||
#[doc(alias = "gst_task_join")]
|
||||
fn join(&self) -> bool;
|
||||
|
||||
#[doc(alias = "gst_task_pause")]
|
||||
fn pause(&self) -> bool;
|
||||
|
||||
#[cfg(any(feature = "v1_18", feature = "dox"))]
|
||||
#[cfg_attr(feature = "dox", doc(cfg(feature = "v1_18")))]
|
||||
#[doc(alias = "gst_task_resume")]
|
||||
fn resume(&self) -> bool;
|
||||
|
||||
#[doc(alias = "gst_task_set_pool")]
|
||||
fn set_pool(&self, pool: &impl IsA<TaskPool>);
|
||||
|
||||
#[doc(alias = "gst_task_start")]
|
||||
fn start(&self) -> bool;
|
||||
|
||||
#[doc(alias = "gst_task_stop")]
|
||||
fn stop(&self) -> bool;
|
||||
}
|
||||
|
||||
impl<O: IsA<Task>> TaskExt for O {
|
||||
fn pool(&self) -> TaskPool {
|
||||
unsafe { from_glib_full(ffi::gst_task_get_pool(self.as_ref().to_glib_none().0)) }
|
||||
}
|
||||
|
||||
fn state(&self) -> TaskState {
|
||||
unsafe { from_glib(ffi::gst_task_get_state(self.as_ref().to_glib_none().0)) }
|
||||
}
|
||||
|
||||
fn join(&self) -> bool {
|
||||
unsafe { from_glib(ffi::gst_task_join(self.as_ref().to_glib_none().0)) }
|
||||
}
|
||||
|
||||
fn pause(&self) -> bool {
|
||||
unsafe { from_glib(ffi::gst_task_pause(self.as_ref().to_glib_none().0)) }
|
||||
}
|
||||
|
||||
#[cfg(any(feature = "v1_18", feature = "dox"))]
|
||||
#[cfg_attr(feature = "dox", doc(cfg(feature = "v1_18")))]
|
||||
fn resume(&self) -> bool {
|
||||
unsafe { from_glib(ffi::gst_task_resume(self.as_ref().to_glib_none().0)) }
|
||||
}
|
||||
|
||||
fn set_pool(&self, pool: &impl IsA<TaskPool>) {
|
||||
unsafe {
|
||||
ffi::gst_task_set_pool(
|
||||
self.as_ref().to_glib_none().0,
|
||||
pool.as_ref().to_glib_none().0,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn start(&self) -> bool {
|
||||
unsafe { from_glib(ffi::gst_task_start(self.as_ref().to_glib_none().0)) }
|
||||
}
|
||||
|
||||
fn stop(&self) -> bool {
|
||||
unsafe { from_glib(ffi::gst_task_stop(self.as_ref().to_glib_none().0)) }
|
||||
}
|
||||
}
|
65
gstreamer/src/auto/task_pool.rs
Normal file
65
gstreamer/src/auto/task_pool.rs
Normal file
|
@ -0,0 +1,65 @@
|
|||
// This file was generated by gir (https://github.com/gtk-rs/gir)
|
||||
// from gir-files (https://github.com/gtk-rs/gir-files)
|
||||
// from gst-gir-files (https://gitlab.freedesktop.org/gstreamer/gir-files-rs.git)
|
||||
// DO NOT EDIT
|
||||
|
||||
use crate::Object;
|
||||
use glib::object::IsA;
|
||||
use glib::translate::*;
|
||||
use std::ptr;
|
||||
|
||||
glib::wrapper! {
|
||||
#[doc(alias = "GstTaskPool")]
|
||||
pub struct TaskPool(Object<ffi::GstTaskPool, ffi::GstTaskPoolClass>) @extends Object;
|
||||
|
||||
match fn {
|
||||
type_ => || ffi::gst_task_pool_get_type(),
|
||||
}
|
||||
}
|
||||
|
||||
impl TaskPool {
|
||||
pub const NONE: Option<&'static TaskPool> = None;
|
||||
|
||||
#[doc(alias = "gst_task_pool_new")]
|
||||
pub fn new() -> TaskPool {
|
||||
assert_initialized_main_thread!();
|
||||
unsafe { from_glib_full(ffi::gst_task_pool_new()) }
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for TaskPool {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
unsafe impl Send for TaskPool {}
|
||||
unsafe impl Sync for TaskPool {}
|
||||
|
||||
pub trait TaskPoolExt: 'static {
|
||||
#[doc(alias = "gst_task_pool_cleanup")]
|
||||
fn cleanup(&self);
|
||||
|
||||
#[doc(alias = "gst_task_pool_prepare")]
|
||||
fn prepare(&self) -> Result<(), glib::Error>;
|
||||
}
|
||||
|
||||
impl<O: IsA<TaskPool>> TaskPoolExt for O {
|
||||
fn cleanup(&self) {
|
||||
unsafe {
|
||||
ffi::gst_task_pool_cleanup(self.as_ref().to_glib_none().0);
|
||||
}
|
||||
}
|
||||
|
||||
fn prepare(&self) -> Result<(), glib::Error> {
|
||||
unsafe {
|
||||
let mut error = ptr::null_mut();
|
||||
let _ = ffi::gst_task_pool_prepare(self.as_ref().to_glib_none().0, &mut error);
|
||||
if error.is_null() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(from_glib_full(error))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -182,11 +182,13 @@ mod control_source;
|
|||
mod parse_context;
|
||||
mod proxy_pad;
|
||||
mod tag_setter;
|
||||
mod task_pool;
|
||||
pub use crate::element::{ElementMessageType, NotifyWatchId};
|
||||
pub use crate::element::{
|
||||
ELEMENT_METADATA_AUTHOR, ELEMENT_METADATA_DESCRIPTION, ELEMENT_METADATA_DOC_URI,
|
||||
ELEMENT_METADATA_ICON_NAME, ELEMENT_METADATA_KLASS, ELEMENT_METADATA_LONGNAME,
|
||||
};
|
||||
pub use crate::task_pool::{TaskHandle, TaskPoolTaskHandle};
|
||||
|
||||
pub use self::iterator::{Iterator, IteratorError, IteratorImpl, StdIterator};
|
||||
pub use crate::clock_time::ClockTime;
|
||||
|
@ -321,6 +323,7 @@ pub mod prelude {
|
|||
pub use crate::plugin_feature::PluginFeatureExtManual;
|
||||
pub use crate::proxy_pad::ProxyPadExtManual;
|
||||
pub use crate::tag_setter::TagSetterExtManual;
|
||||
pub use crate::task_pool::{TaskHandle, TaskPoolExtManual};
|
||||
pub use crate::typefind::TypeFindImpl;
|
||||
pub use crate::value::GstValueExt;
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ mod object;
|
|||
mod pad;
|
||||
mod pipeline;
|
||||
mod proxy_pad;
|
||||
mod task_pool;
|
||||
mod tracer;
|
||||
|
||||
mod device;
|
||||
|
@ -61,6 +62,7 @@ pub mod prelude {
|
|||
pub use super::proxy_pad::ProxyPadImpl;
|
||||
pub use super::system_clock::SystemClockImpl;
|
||||
pub use super::tag_setter::TagSetterImpl;
|
||||
pub use super::task_pool::TaskPoolImpl;
|
||||
pub use super::tracer::{TracerHook, TracerImpl, TracerImplExt};
|
||||
pub use super::uri_handler::{URIHandlerImpl, URIHandlerImplExt};
|
||||
}
|
||||
|
|
347
gstreamer/src/subclass/task_pool.rs
Normal file
347
gstreamer/src/subclass/task_pool.rs
Normal file
|
@ -0,0 +1,347 @@
|
|||
// Take a look at the license at the top of the repository in the LICENSE file.
|
||||
|
||||
use super::prelude::*;
|
||||
use crate::{TaskHandle, TaskPool};
|
||||
|
||||
use std::hash::{Hash, Hasher};
|
||||
use std::ptr;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use glib::ffi::gpointer;
|
||||
use glib::prelude::*;
|
||||
use glib::subclass::prelude::*;
|
||||
use glib::translate::*;
|
||||
|
||||
pub trait TaskPoolImpl: GstObjectImpl + Send + Sync {
|
||||
// rustdoc-stripper-ignore-next
|
||||
/// Handle to be returned from the `push` function to allow the caller to wait for the task's
|
||||
/// completion.
|
||||
///
|
||||
/// If unneeded, you can specify `()` or [`Infallible`](std::convert::Infallible) for a handle
|
||||
/// that does nothing on `join` or drop.
|
||||
type Handle: TaskHandle;
|
||||
|
||||
// rustdoc-stripper-ignore-next
|
||||
/// Prepare the task pool to accept tasks.
|
||||
///
|
||||
/// This defaults to doing nothing.
|
||||
fn prepare(&self, _task_pool: &Self::Type) -> Result<(), glib::Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// rustdoc-stripper-ignore-next
|
||||
/// Clean up, rejecting further tasks and waiting for all accepted tasks to be stopped.
|
||||
///
|
||||
/// This is mainly used internally to ensure proper cleanup of internal data structures in test
|
||||
/// suites.
|
||||
fn cleanup(&self, _task_pool: &Self::Type) {}
|
||||
|
||||
// rustdoc-stripper-ignore-next
|
||||
/// Deliver a task to the pool.
|
||||
///
|
||||
/// If returning `Ok`, you need to call the `func` eventually.
|
||||
///
|
||||
/// If returning `Err`, the `func` must be dropped without calling it.
|
||||
fn push(
|
||||
&self,
|
||||
task_pool: &Self::Type,
|
||||
func: TaskPoolFunction,
|
||||
) -> Result<Option<Self::Handle>, glib::Error>;
|
||||
}
|
||||
|
||||
unsafe impl<T: TaskPoolImpl> IsSubclassable<T> for TaskPool {
|
||||
fn class_init(klass: &mut glib::Class<Self>) {
|
||||
Self::parent_class_init::<T>(klass);
|
||||
let klass = klass.as_mut();
|
||||
klass.prepare = Some(task_pool_prepare::<T>);
|
||||
klass.cleanup = Some(task_pool_cleanup::<T>);
|
||||
klass.push = Some(task_pool_push::<T>);
|
||||
klass.join = Some(task_pool_join::<T>);
|
||||
|
||||
#[cfg(any(feature = "v1_20", feature = "dox"))]
|
||||
{
|
||||
klass.dispose_handle = Some(task_pool_dispose_handle::<T>);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
unsafe extern "C" fn task_pool_prepare<T: TaskPoolImpl>(
|
||||
ptr: *mut ffi::GstTaskPool,
|
||||
error: *mut *mut glib::ffi::GError,
|
||||
) {
|
||||
let instance = &*(ptr as *mut T::Instance);
|
||||
let imp = instance.imp();
|
||||
let wrap: Borrowed<TaskPool> = from_glib_borrow(ptr);
|
||||
|
||||
match imp.prepare(wrap.unsafe_cast_ref()) {
|
||||
Ok(()) => {}
|
||||
Err(err) => {
|
||||
if !error.is_null() {
|
||||
*error = err.into_raw();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
unsafe extern "C" fn task_pool_cleanup<T: TaskPoolImpl>(ptr: *mut ffi::GstTaskPool) {
|
||||
let instance = &*(ptr as *mut T::Instance);
|
||||
let imp = instance.imp();
|
||||
let wrap: Borrowed<TaskPool> = from_glib_borrow(ptr);
|
||||
|
||||
imp.cleanup(wrap.unsafe_cast_ref());
|
||||
}
|
||||
|
||||
unsafe extern "C" fn task_pool_push<T: TaskPoolImpl>(
|
||||
ptr: *mut ffi::GstTaskPool,
|
||||
func: ffi::GstTaskPoolFunction,
|
||||
user_data: gpointer,
|
||||
error: *mut *mut glib::ffi::GError,
|
||||
) -> gpointer {
|
||||
let instance = &*(ptr as *mut T::Instance);
|
||||
let imp = instance.imp();
|
||||
let wrap: Borrowed<TaskPool> = from_glib_borrow(ptr);
|
||||
|
||||
let func = TaskPoolFunction::new(func.expect("Tried to push null func"), user_data);
|
||||
|
||||
match imp.push(wrap.unsafe_cast_ref(), func.clone()) {
|
||||
Ok(None) => ptr::null_mut(),
|
||||
Ok(Some(handle)) => Box::into_raw(Box::new(handle)) as gpointer,
|
||||
Err(err) => {
|
||||
func.prevent_call();
|
||||
if !error.is_null() {
|
||||
*error = err.into_raw();
|
||||
}
|
||||
ptr::null_mut()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
unsafe extern "C" fn task_pool_join<T: TaskPoolImpl>(ptr: *mut ffi::GstTaskPool, id: gpointer) {
|
||||
let wrap: Borrowed<TaskPool> = from_glib_borrow(ptr);
|
||||
|
||||
if id.is_null() {
|
||||
crate::warning!(crate::CAT_RUST, obj: wrap.as_ref(), "Tried to join null handle");
|
||||
return;
|
||||
}
|
||||
|
||||
let handle = Box::from_raw(id as *mut T::Handle);
|
||||
handle.join();
|
||||
}
|
||||
|
||||
#[cfg(any(feature = "v1_20", feature = "dox"))]
|
||||
#[cfg_attr(feature = "dox", doc(cfg(feature = "v1_20")))]
|
||||
unsafe extern "C" fn task_pool_dispose_handle<T: TaskPoolImpl>(
|
||||
ptr: *mut ffi::GstTaskPool,
|
||||
id: gpointer,
|
||||
) {
|
||||
let wrap: Borrowed<TaskPool> = from_glib_borrow(ptr);
|
||||
|
||||
if id.is_null() {
|
||||
crate::warning!(crate::CAT_RUST, obj: wrap.as_ref(), "Tried to dispose null handle");
|
||||
return;
|
||||
}
|
||||
|
||||
let handle = Box::from_raw(id as *mut T::Handle);
|
||||
drop(handle);
|
||||
}
|
||||
|
||||
// rustdoc-stripper-ignore-next
|
||||
/// Function the task pool should execute, provided to [`push`](TaskPoolImpl::push).
|
||||
#[derive(Debug)]
|
||||
pub struct TaskPoolFunction(Arc<Mutex<Option<TaskPoolFunctionInner>>>);
|
||||
|
||||
// `Arc<Mutex<Option<…>>>` is required so that we can enforce that the function
|
||||
// has not been called and will never be called after `push` returns `Err`.
|
||||
|
||||
#[derive(Debug)]
|
||||
struct TaskPoolFunctionInner {
|
||||
func: unsafe extern "C" fn(gpointer),
|
||||
user_data: gpointer,
|
||||
warn_on_drop: bool,
|
||||
}
|
||||
|
||||
unsafe impl Send for TaskPoolFunctionInner {}
|
||||
|
||||
impl TaskPoolFunction {
|
||||
fn new(func: unsafe extern "C" fn(gpointer), user_data: gpointer) -> Self {
|
||||
let inner = TaskPoolFunctionInner {
|
||||
func,
|
||||
user_data,
|
||||
warn_on_drop: true,
|
||||
};
|
||||
Self(Arc::new(Mutex::new(Some(inner))))
|
||||
}
|
||||
|
||||
fn clone(&self) -> Self {
|
||||
Self(self.0.clone())
|
||||
}
|
||||
|
||||
// rustdoc-stripper-ignore-next
|
||||
/// Consume and execute the function.
|
||||
pub fn call(self) {
|
||||
let mut inner = self
|
||||
.0
|
||||
.lock()
|
||||
.unwrap()
|
||||
.take()
|
||||
.expect("TaskPoolFunction has already been dropped");
|
||||
inner.warn_on_drop = false;
|
||||
unsafe { (inner.func)(inner.user_data) }
|
||||
}
|
||||
|
||||
fn prevent_call(self) {
|
||||
let mut inner = self
|
||||
.0
|
||||
.lock()
|
||||
.unwrap()
|
||||
.take()
|
||||
.expect("TaskPoolFunction has already been called");
|
||||
inner.warn_on_drop = false;
|
||||
drop(inner);
|
||||
}
|
||||
|
||||
fn as_ptr(&self) -> *const Mutex<Option<TaskPoolFunctionInner>> {
|
||||
Arc::as_ptr(&self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TaskPoolFunctionInner {
|
||||
fn drop(&mut self) {
|
||||
if self.warn_on_drop {
|
||||
crate::warning!(crate::CAT_RUST, "Leaked task function");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for TaskPoolFunction {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.as_ptr() == other.as_ptr()
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for TaskPoolFunction {}
|
||||
|
||||
impl PartialOrd for TaskPoolFunction {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
||||
self.as_ptr().partial_cmp(&other.as_ptr())
|
||||
}
|
||||
}
|
||||
|
||||
impl Ord for TaskPoolFunction {
|
||||
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
||||
self.as_ptr().cmp(&other.as_ptr())
|
||||
}
|
||||
}
|
||||
|
||||
impl Hash for TaskPoolFunction {
|
||||
fn hash<H: Hasher>(&self, state: &mut H) {
|
||||
self.as_ptr().hash(state)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::prelude::*;
|
||||
use std::sync::atomic;
|
||||
use std::sync::mpsc::{channel, TryRecvError};
|
||||
use std::thread;
|
||||
|
||||
pub mod imp {
|
||||
use super::*;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct TestPool {
|
||||
pub(super) prepared: atomic::AtomicBool,
|
||||
pub(super) cleaned_up: atomic::AtomicBool,
|
||||
}
|
||||
|
||||
#[glib::object_subclass]
|
||||
impl ObjectSubclass for TestPool {
|
||||
const NAME: &'static str = "TestPool";
|
||||
type Type = super::TestPool;
|
||||
type ParentType = TaskPool;
|
||||
}
|
||||
|
||||
impl ObjectImpl for TestPool {}
|
||||
|
||||
impl GstObjectImpl for TestPool {}
|
||||
|
||||
impl TaskPoolImpl for TestPool {
|
||||
type Handle = TestHandle;
|
||||
|
||||
fn prepare(&self, _task_pool: &Self::Type) -> Result<(), glib::Error> {
|
||||
self.prepared.store(true, atomic::Ordering::SeqCst);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn cleanup(&self, _task_pool: &Self::Type) {
|
||||
self.cleaned_up.store(true, atomic::Ordering::SeqCst);
|
||||
}
|
||||
|
||||
fn push(
|
||||
&self,
|
||||
_task_pool: &Self::Type,
|
||||
func: TaskPoolFunction,
|
||||
) -> Result<Option<Self::Handle>, glib::Error> {
|
||||
let handle = thread::spawn(move || func.call());
|
||||
Ok(Some(TestHandle(handle)))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TestHandle(thread::JoinHandle<()>);
|
||||
|
||||
impl TaskHandle for TestHandle {
|
||||
fn join(self) {
|
||||
self.0.join().unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
glib::wrapper! {
|
||||
pub struct TestPool(ObjectSubclass<imp::TestPool>) @extends TaskPool, crate::Object;
|
||||
}
|
||||
|
||||
unsafe impl Send for TestPool {}
|
||||
unsafe impl Sync for TestPool {}
|
||||
|
||||
impl TestPool {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for TestPool {
|
||||
fn default() -> Self {
|
||||
glib::Object::new(&[]).unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_simple_subclass() {
|
||||
crate::init().unwrap();
|
||||
|
||||
let pool = TestPool::new();
|
||||
pool.prepare().unwrap();
|
||||
|
||||
let (sender, receiver) = channel();
|
||||
|
||||
let handle = pool
|
||||
.push(move || {
|
||||
sender.send(()).unwrap();
|
||||
})
|
||||
.unwrap();
|
||||
let handle = handle.unwrap();
|
||||
|
||||
assert_eq!(receiver.recv(), Ok(()));
|
||||
|
||||
handle.join();
|
||||
assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected));
|
||||
|
||||
pool.cleanup();
|
||||
|
||||
let imp = imp::TestPool::from_instance(&pool);
|
||||
assert!(imp.prepared.load(atomic::Ordering::SeqCst));
|
||||
assert!(imp.cleaned_up.load(atomic::Ordering::SeqCst));
|
||||
}
|
||||
}
|
158
gstreamer/src/task_pool.rs
Normal file
158
gstreamer/src/task_pool.rs
Normal file
|
@ -0,0 +1,158 @@
|
|||
// Take a look at the license at the top of the repository in the LICENSE file.
|
||||
|
||||
use crate::TaskPool;
|
||||
|
||||
use std::ptr;
|
||||
|
||||
use glib::ffi::gpointer;
|
||||
use glib::prelude::*;
|
||||
use glib::translate::*;
|
||||
|
||||
unsafe extern "C" fn task_pool_trampoline<P: FnOnce() + Send + 'static>(data: gpointer) {
|
||||
let func = Box::from_raw(data as *mut P);
|
||||
func()
|
||||
}
|
||||
|
||||
pub trait TaskPoolExtManual: 'static {
|
||||
#[doc(alias = "gst_task_pool_push")]
|
||||
fn push<P: FnOnce() + Send + 'static>(
|
||||
&self,
|
||||
func: P,
|
||||
) -> Result<Option<TaskPoolTaskHandle>, glib::Error>;
|
||||
}
|
||||
|
||||
impl<O: IsA<TaskPool>> TaskPoolExtManual for O {
|
||||
fn push<P: FnOnce() + Send + 'static>(
|
||||
&self,
|
||||
func: P,
|
||||
) -> Result<Option<TaskPoolTaskHandle>, glib::Error> {
|
||||
unsafe {
|
||||
let mut error = ptr::null_mut();
|
||||
let func: Box<P> = Box::new(func);
|
||||
let func = Box::into_raw(func);
|
||||
|
||||
let handle = ffi::gst_task_pool_push(
|
||||
self.as_ref().to_glib_none().0,
|
||||
Some(task_pool_trampoline::<P>),
|
||||
func as gpointer,
|
||||
&mut error,
|
||||
);
|
||||
|
||||
if !error.is_null() {
|
||||
assert!(handle.is_null());
|
||||
|
||||
// Assume that task_pool_trampoline was
|
||||
// not called and will not be called
|
||||
drop(Box::from_raw(func));
|
||||
|
||||
return Err(from_glib_full(error));
|
||||
}
|
||||
|
||||
let handle = ptr::NonNull::new(handle).map(|handle| TaskPoolTaskHandle {
|
||||
handle,
|
||||
task_pool: Some(self.as_ref().clone()),
|
||||
});
|
||||
|
||||
Ok(handle)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TaskPool {
|
||||
unsafe fn join(&self, id: ptr::NonNull<libc::c_void>) {
|
||||
ffi::gst_task_pool_join(self.to_glib_none().0, id.as_ptr())
|
||||
}
|
||||
|
||||
#[cfg(any(feature = "v1_20", feature = "dox"))]
|
||||
#[cfg_attr(feature = "dox", doc(cfg(feature = "v1_20")))]
|
||||
unsafe fn dispose_handle(&self, id: ptr::NonNull<libc::c_void>) {
|
||||
ffi::gst_task_pool_dispose_handle(self.to_glib_none().0, id.as_ptr())
|
||||
}
|
||||
}
|
||||
|
||||
// rustdoc-stripper-ignore-next
|
||||
/// A handle for a task which was pushed to a task pool.
|
||||
pub trait TaskHandle {
|
||||
// rustdoc-stripper-ignore-next
|
||||
/// Wait for the task to complete.
|
||||
fn join(self);
|
||||
}
|
||||
|
||||
impl TaskHandle for () {
|
||||
fn join(self) {}
|
||||
}
|
||||
|
||||
impl TaskHandle for std::convert::Infallible {
|
||||
fn join(self) {}
|
||||
}
|
||||
|
||||
// rustdoc-stripper-ignore-next
|
||||
/// An opaque handle for a task associated with a particular task pool.
|
||||
///
|
||||
/// Keeps a reference to the pool alive.
|
||||
///
|
||||
/// If the `v1_20` feature is enabled, requests the task pool to dispose of the handle when it is
|
||||
/// dropped. Otherwise, needs to be `join`ed to avoid a leak.
|
||||
#[cfg_attr(not(any(feature = "v1_20", feature = "dox")), must_use)]
|
||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub struct TaskPoolTaskHandle {
|
||||
handle: ptr::NonNull<libc::c_void>,
|
||||
task_pool: Option<TaskPool>,
|
||||
}
|
||||
|
||||
impl TaskHandle for TaskPoolTaskHandle {
|
||||
#[doc(alias = "gst_task_pool_join")]
|
||||
fn join(mut self) {
|
||||
let task_pool = self.task_pool.take().unwrap();
|
||||
unsafe { task_pool.join(self.handle) }
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TaskPoolTaskHandle {
|
||||
#[doc(alias = "gst_task_pool_dispose_handle")]
|
||||
fn drop(&mut self) {
|
||||
if let Some(task_pool) = self.task_pool.take() {
|
||||
cfg_if::cfg_if! {
|
||||
if #[cfg(any(feature = "v1_20", feature = "dox"))] {
|
||||
unsafe { task_pool.dispose_handle(self.handle) }
|
||||
} else {
|
||||
crate::warning!(crate::CAT_RUST, obj: &task_pool, "Leaked task handle");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::prelude::*;
|
||||
use std::sync::mpsc::{channel, RecvError};
|
||||
|
||||
#[test]
|
||||
fn test_simple() {
|
||||
crate::init().unwrap();
|
||||
let pool = TaskPool::new();
|
||||
pool.prepare().unwrap();
|
||||
|
||||
let (sender, receiver) = channel();
|
||||
|
||||
let handle = pool
|
||||
.push(move || {
|
||||
sender.send(()).unwrap();
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(receiver.recv(), Ok(()));
|
||||
|
||||
if let Some(handle) = handle {
|
||||
handle.join();
|
||||
}
|
||||
|
||||
// Can't test try_recv here as the default task pool produces no
|
||||
// handles and thus no way to wait for channel destruction
|
||||
assert_eq!(receiver.recv(), Err(RecvError));
|
||||
|
||||
pool.cleanup();
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue