From 8d03a0d0327385d2fe8ccb2c6397d1ab4d60fe86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Sat, 2 Apr 2022 11:44:55 +0300 Subject: [PATCH] gstreamer: Complete the Task bindings --- gstreamer/Gir.toml | 37 +++++- gstreamer/src/auto/task.rs | 67 ++++++++--- gstreamer/src/lib.rs | 2 + gstreamer/src/task.rs | 234 +++++++++++++++++++++++++++++++++++++ 4 files changed, 320 insertions(+), 20 deletions(-) create mode 100644 gstreamer/src/task.rs diff --git a/gstreamer/Gir.toml b/gstreamer/Gir.toml index faab82f5a..48b7e8dac 100644 --- a/gstreamer/Gir.toml +++ b/gstreamer/Gir.toml @@ -2306,19 +2306,46 @@ status = "generate" # Need work [[object.function]] name = "new" - ignore = true + manual = true [[object.function]] name = "set_enter_callback" - ignore = true + manual = true [[object.function]] name = "set_leave_callback" - ignore = true + manual = true [[object.function]] name = "set_lock" - ignore = true + manual = true + + [[object.function]] + name = "start" + [object.function.return] + bool_return_is_error = "Failed to start task" + + [[object.function]] + name = "stop" + [object.function.return] + bool_return_is_error = "Failed to stop task" + + [[object.function]] + name = "pause" + [object.function.return] + bool_return_is_error = "Failed to pause task" + + [[object.function]] + name = "resume" + [object.function.return] + bool_return_is_error = "Failed to resume task" + + [[object.function]] + name = "join" + [object.function.return] + bool_return_is_error = "Failed to join task" + [[object.function]] name = "set_state" - ignore = true + [object.function.return] + bool_return_is_error = "Failed to set task state" [[object]] name = "Gst.TaskPool" diff --git a/gstreamer/src/auto/task.rs b/gstreamer/src/auto/task.rs index 55da4ef7b..33bbf50b8 100644 --- a/gstreamer/src/auto/task.rs +++ b/gstreamer/src/auto/task.rs @@ -43,24 +43,27 @@ pub trait TaskExt: 'static { fn state(&self) -> TaskState; #[doc(alias = "gst_task_join")] - fn join(&self) -> bool; + fn join(&self) -> Result<(), glib::error::BoolError>; #[doc(alias = "gst_task_pause")] - fn pause(&self) -> bool; + fn pause(&self) -> Result<(), glib::error::BoolError>; #[cfg(any(feature = "v1_18", feature = "dox"))] #[cfg_attr(feature = "dox", doc(cfg(feature = "v1_18")))] #[doc(alias = "gst_task_resume")] - fn resume(&self) -> bool; + fn resume(&self) -> Result<(), glib::error::BoolError>; #[doc(alias = "gst_task_set_pool")] fn set_pool(&self, pool: &impl IsA); + #[doc(alias = "gst_task_set_state")] + fn set_state(&self, state: TaskState) -> Result<(), glib::error::BoolError>; + #[doc(alias = "gst_task_start")] - fn start(&self) -> bool; + fn start(&self) -> Result<(), glib::error::BoolError>; #[doc(alias = "gst_task_stop")] - fn stop(&self) -> bool; + fn stop(&self) -> Result<(), glib::error::BoolError>; } impl> TaskExt for O { @@ -72,18 +75,33 @@ impl> TaskExt for O { unsafe { from_glib(ffi::gst_task_get_state(self.as_ref().to_glib_none().0)) } } - fn join(&self) -> bool { - unsafe { from_glib(ffi::gst_task_join(self.as_ref().to_glib_none().0)) } + fn join(&self) -> Result<(), glib::error::BoolError> { + unsafe { + glib::result_from_gboolean!( + ffi::gst_task_join(self.as_ref().to_glib_none().0), + "Failed to join task" + ) + } } - fn pause(&self) -> bool { - unsafe { from_glib(ffi::gst_task_pause(self.as_ref().to_glib_none().0)) } + fn pause(&self) -> Result<(), glib::error::BoolError> { + unsafe { + glib::result_from_gboolean!( + ffi::gst_task_pause(self.as_ref().to_glib_none().0), + "Failed to pause task" + ) + } } #[cfg(any(feature = "v1_18", feature = "dox"))] #[cfg_attr(feature = "dox", doc(cfg(feature = "v1_18")))] - fn resume(&self) -> bool { - unsafe { from_glib(ffi::gst_task_resume(self.as_ref().to_glib_none().0)) } + fn resume(&self) -> Result<(), glib::error::BoolError> { + unsafe { + glib::result_from_gboolean!( + ffi::gst_task_resume(self.as_ref().to_glib_none().0), + "Failed to resume task" + ) + } } fn set_pool(&self, pool: &impl IsA) { @@ -95,11 +113,30 @@ impl> TaskExt for O { } } - fn start(&self) -> bool { - unsafe { from_glib(ffi::gst_task_start(self.as_ref().to_glib_none().0)) } + fn set_state(&self, state: TaskState) -> Result<(), glib::error::BoolError> { + unsafe { + glib::result_from_gboolean!( + ffi::gst_task_set_state(self.as_ref().to_glib_none().0, state.into_glib()), + "Failed to set task state" + ) + } } - fn stop(&self) -> bool { - unsafe { from_glib(ffi::gst_task_stop(self.as_ref().to_glib_none().0)) } + fn start(&self) -> Result<(), glib::error::BoolError> { + unsafe { + glib::result_from_gboolean!( + ffi::gst_task_start(self.as_ref().to_glib_none().0), + "Failed to start task" + ) + } + } + + fn stop(&self) -> Result<(), glib::error::BoolError> { + unsafe { + glib::result_from_gboolean!( + ffi::gst_task_stop(self.as_ref().to_glib_none().0), + "Failed to stop task" + ) + } } } diff --git a/gstreamer/src/lib.rs b/gstreamer/src/lib.rs index 8d14c4802..b491534a5 100644 --- a/gstreamer/src/lib.rs +++ b/gstreamer/src/lib.rs @@ -186,6 +186,8 @@ mod parse_context; mod proxy_pad; mod registry; mod tag_setter; +pub mod task; +pub use task::{TaskLock, TaskLockGuard}; mod task_pool; pub use crate::element::{ElementMessageType, NotifyWatchId}; pub use crate::element::{ diff --git a/gstreamer/src/task.rs b/gstreamer/src/task.rs new file mode 100644 index 000000000..8ca56480e --- /dev/null +++ b/gstreamer/src/task.rs @@ -0,0 +1,234 @@ +// Take a look at the license at the top of the repository in the LICENSE file. + +use crate::Task; + +use std::mem; +use std::ptr; +use std::sync::Arc; + +use glib::prelude::*; +use glib::translate::*; + +pub struct TaskBuilder { + func: Box<(F, *mut ffi::GstTask)>, + lock: Option, + enter_callback: Option>, + leave_callback: Option>, +} + +impl TaskBuilder { + #[doc(alias = "gst_task_set_enter_callback")] + pub fn enter_callback(self, enter_callback: E) -> Self { + Self { + enter_callback: Some(Box::new(enter_callback)), + ..self + } + } + + #[doc(alias = "gst_task_set_leave_callback")] + pub fn leave_callback(self, leave_callback: E) -> Self { + Self { + leave_callback: Some(Box::new(leave_callback)), + ..self + } + } + + #[doc(alias = "gst_task_set_lock")] + pub fn lock(self, lock: &TaskLock) -> Self { + Self { + lock: Some(lock.clone()), + ..self + } + } + + #[doc(alias = "gst_task_new")] + pub fn build(self) -> Task { + unsafe extern "C" fn func_trampoline( + user_data: glib::ffi::gpointer, + ) { + let callback: &mut (F, *mut ffi::GstTask) = &mut *(user_data as *mut _); + (callback.0)(&from_glib_borrow(callback.1)); + } + + unsafe extern "C" fn destroy_func( + data: glib::ffi::gpointer, + ) { + let _callback: Box<(F, *mut ffi::GstTask)> = Box::from_raw(data as *mut _); + } + + unsafe extern "C" fn callback_trampoline( + task: *mut ffi::GstTask, + _thread: *mut glib::ffi::GThread, + data: glib::ffi::gpointer, + ) { + let callback: &mut Box = &mut *(data as *mut _); + callback(&from_glib_borrow(task)); + } + + unsafe extern "C" fn destroy_callback(data: glib::ffi::gpointer) { + let _callback: Box> = + Box::from_raw(data as *mut _); + } + + unsafe { + let func_ptr = Box::into_raw(self.func); + + let task: Task = from_glib_full(ffi::gst_task_new( + Some(func_trampoline:: as _), + func_ptr as *mut _, + Some(destroy_func:: as _), + )); + + (*func_ptr).1 = task.to_glib_none().0; + + let lock = self.lock.unwrap_or_else(TaskLock::new); + ffi::gst_task_set_lock(task.to_glib_none().0, mut_override(&lock.0 .0)); + task.set_data("gstreamer-rs-task-lock", Arc::clone(&lock.0)); + + if let Some(enter_callback) = self.enter_callback { + ffi::gst_task_set_enter_callback( + task.to_glib_none().0, + Some(callback_trampoline), + Box::into_raw(Box::new(enter_callback)) as *mut _, + Some(destroy_callback), + ); + } + + if let Some(leave_callback) = self.leave_callback { + ffi::gst_task_set_leave_callback( + task.to_glib_none().0, + Some(callback_trampoline), + Box::into_raw(Box::new(leave_callback)) as *mut _, + Some(destroy_callback), + ); + } + + task + } + } +} + +impl Task { + #[doc(alias = "gst_task_new")] + pub fn builder(func: F) -> TaskBuilder { + assert_initialized_main_thread!(); + TaskBuilder { + func: Box::new((func, ptr::null_mut())), + lock: None, + enter_callback: None, + leave_callback: None, + } + } +} + +#[derive(Debug, Clone)] +pub struct TaskLock(Arc); + +impl Default for TaskLock { + fn default() -> Self { + Self::new() + } +} + +#[derive(Debug)] +struct RecMutex(glib::ffi::GRecMutex); + +unsafe impl Send for RecMutex {} +unsafe impl Sync for RecMutex {} + +#[must_use = "if unused the TaskLock will immediately unlock"] +pub struct TaskLockGuard<'a>(&'a RecMutex); + +impl TaskLock { + pub fn new() -> Self { + unsafe { + let lock = TaskLock(Arc::new(RecMutex(mem::zeroed()))); + glib::ffi::g_rec_mutex_init(mut_override(&lock.0 .0)); + lock + } + } + + // checker-ignore-item + pub fn lock(&self) -> TaskLockGuard { + unsafe { + let guard = TaskLockGuard(&self.0); + glib::ffi::g_rec_mutex_lock(mut_override(&self.0 .0)); + guard + } + } +} + +impl Drop for RecMutex { + fn drop(&mut self) { + unsafe { + glib::ffi::g_rec_mutex_clear(&mut self.0); + } + } +} + +impl<'a> Drop for TaskLockGuard<'a> { + fn drop(&mut self) { + unsafe { + glib::ffi::g_rec_mutex_unlock(mut_override(&self.0 .0)); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::prelude::*; + use std::sync::mpsc::channel; + + #[test] + fn test_simple() { + crate::init().unwrap(); + + #[derive(Debug, PartialEq, Eq)] + enum Called { + Enter, + Func, + Leave, + } + + let (send, recv) = channel(); + let lock = TaskLock::new(); + + let task = Task::builder({ + let send = send.clone(); + let mut count = 0; + move |task| { + count += 1; + if count >= 3 { + task.pause().unwrap(); + } + send.send(Called::Func).unwrap(); + } + }) + .enter_callback({ + let send = send.clone(); + move |_task| { + send.send(Called::Enter).unwrap(); + } + }) + .leave_callback({ + move |_task| { + send.send(Called::Leave).unwrap(); + } + }) + .lock(&lock) + .build(); + + task.start().unwrap(); + + assert_eq!(recv.recv(), Ok(Called::Enter)); + assert_eq!(recv.recv(), Ok(Called::Func)); + assert_eq!(recv.recv(), Ok(Called::Func)); + assert_eq!(recv.recv(), Ok(Called::Func)); + + assert_eq!(task.state(), crate::TaskState::Paused); + task.stop().unwrap(); + assert_eq!(recv.recv(), Ok(Called::Leave)); + task.join().unwrap(); + } +}