gstreamer: Complete the Task bindings

This commit is contained in:
Sebastian Dröge 2022-04-02 11:44:55 +03:00
parent 715f7cd6c2
commit 8d03a0d032
4 changed files with 320 additions and 20 deletions

View file

@ -2306,19 +2306,46 @@ status = "generate"
# Need work # Need work
[[object.function]] [[object.function]]
name = "new" name = "new"
ignore = true manual = true
[[object.function]] [[object.function]]
name = "set_enter_callback" name = "set_enter_callback"
ignore = true manual = true
[[object.function]] [[object.function]]
name = "set_leave_callback" name = "set_leave_callback"
ignore = true manual = true
[[object.function]] [[object.function]]
name = "set_lock" name = "set_lock"
ignore = true manual = true
[[object.function]]
name = "start"
[object.function.return]
bool_return_is_error = "Failed to start task"
[[object.function]]
name = "stop"
[object.function.return]
bool_return_is_error = "Failed to stop task"
[[object.function]]
name = "pause"
[object.function.return]
bool_return_is_error = "Failed to pause task"
[[object.function]]
name = "resume"
[object.function.return]
bool_return_is_error = "Failed to resume task"
[[object.function]]
name = "join"
[object.function.return]
bool_return_is_error = "Failed to join task"
[[object.function]] [[object.function]]
name = "set_state" name = "set_state"
ignore = true [object.function.return]
bool_return_is_error = "Failed to set task state"
[[object]] [[object]]
name = "Gst.TaskPool" name = "Gst.TaskPool"

View file

@ -43,24 +43,27 @@ pub trait TaskExt: 'static {
fn state(&self) -> TaskState; fn state(&self) -> TaskState;
#[doc(alias = "gst_task_join")] #[doc(alias = "gst_task_join")]
fn join(&self) -> bool; fn join(&self) -> Result<(), glib::error::BoolError>;
#[doc(alias = "gst_task_pause")] #[doc(alias = "gst_task_pause")]
fn pause(&self) -> bool; fn pause(&self) -> Result<(), glib::error::BoolError>;
#[cfg(any(feature = "v1_18", feature = "dox"))] #[cfg(any(feature = "v1_18", feature = "dox"))]
#[cfg_attr(feature = "dox", doc(cfg(feature = "v1_18")))] #[cfg_attr(feature = "dox", doc(cfg(feature = "v1_18")))]
#[doc(alias = "gst_task_resume")] #[doc(alias = "gst_task_resume")]
fn resume(&self) -> bool; fn resume(&self) -> Result<(), glib::error::BoolError>;
#[doc(alias = "gst_task_set_pool")] #[doc(alias = "gst_task_set_pool")]
fn set_pool(&self, pool: &impl IsA<TaskPool>); fn set_pool(&self, pool: &impl IsA<TaskPool>);
#[doc(alias = "gst_task_set_state")]
fn set_state(&self, state: TaskState) -> Result<(), glib::error::BoolError>;
#[doc(alias = "gst_task_start")] #[doc(alias = "gst_task_start")]
fn start(&self) -> bool; fn start(&self) -> Result<(), glib::error::BoolError>;
#[doc(alias = "gst_task_stop")] #[doc(alias = "gst_task_stop")]
fn stop(&self) -> bool; fn stop(&self) -> Result<(), glib::error::BoolError>;
} }
impl<O: IsA<Task>> TaskExt for O { impl<O: IsA<Task>> TaskExt for O {
@ -72,18 +75,33 @@ impl<O: IsA<Task>> TaskExt for O {
unsafe { from_glib(ffi::gst_task_get_state(self.as_ref().to_glib_none().0)) } unsafe { from_glib(ffi::gst_task_get_state(self.as_ref().to_glib_none().0)) }
} }
fn join(&self) -> bool { fn join(&self) -> Result<(), glib::error::BoolError> {
unsafe { from_glib(ffi::gst_task_join(self.as_ref().to_glib_none().0)) } unsafe {
glib::result_from_gboolean!(
ffi::gst_task_join(self.as_ref().to_glib_none().0),
"Failed to join task"
)
}
} }
fn pause(&self) -> bool { fn pause(&self) -> Result<(), glib::error::BoolError> {
unsafe { from_glib(ffi::gst_task_pause(self.as_ref().to_glib_none().0)) } unsafe {
glib::result_from_gboolean!(
ffi::gst_task_pause(self.as_ref().to_glib_none().0),
"Failed to pause task"
)
}
} }
#[cfg(any(feature = "v1_18", feature = "dox"))] #[cfg(any(feature = "v1_18", feature = "dox"))]
#[cfg_attr(feature = "dox", doc(cfg(feature = "v1_18")))] #[cfg_attr(feature = "dox", doc(cfg(feature = "v1_18")))]
fn resume(&self) -> bool { fn resume(&self) -> Result<(), glib::error::BoolError> {
unsafe { from_glib(ffi::gst_task_resume(self.as_ref().to_glib_none().0)) } unsafe {
glib::result_from_gboolean!(
ffi::gst_task_resume(self.as_ref().to_glib_none().0),
"Failed to resume task"
)
}
} }
fn set_pool(&self, pool: &impl IsA<TaskPool>) { fn set_pool(&self, pool: &impl IsA<TaskPool>) {
@ -95,11 +113,30 @@ impl<O: IsA<Task>> TaskExt for O {
} }
} }
fn start(&self) -> bool { fn set_state(&self, state: TaskState) -> Result<(), glib::error::BoolError> {
unsafe { from_glib(ffi::gst_task_start(self.as_ref().to_glib_none().0)) } unsafe {
glib::result_from_gboolean!(
ffi::gst_task_set_state(self.as_ref().to_glib_none().0, state.into_glib()),
"Failed to set task state"
)
}
} }
fn stop(&self) -> bool { fn start(&self) -> Result<(), glib::error::BoolError> {
unsafe { from_glib(ffi::gst_task_stop(self.as_ref().to_glib_none().0)) } unsafe {
glib::result_from_gboolean!(
ffi::gst_task_start(self.as_ref().to_glib_none().0),
"Failed to start task"
)
}
}
fn stop(&self) -> Result<(), glib::error::BoolError> {
unsafe {
glib::result_from_gboolean!(
ffi::gst_task_stop(self.as_ref().to_glib_none().0),
"Failed to stop task"
)
}
} }
} }

View file

@ -186,6 +186,8 @@ mod parse_context;
mod proxy_pad; mod proxy_pad;
mod registry; mod registry;
mod tag_setter; mod tag_setter;
pub mod task;
pub use task::{TaskLock, TaskLockGuard};
mod task_pool; mod task_pool;
pub use crate::element::{ElementMessageType, NotifyWatchId}; pub use crate::element::{ElementMessageType, NotifyWatchId};
pub use crate::element::{ pub use crate::element::{

234
gstreamer/src/task.rs Normal file
View file

@ -0,0 +1,234 @@
// Take a look at the license at the top of the repository in the LICENSE file.
use crate::Task;
use std::mem;
use std::ptr;
use std::sync::Arc;
use glib::prelude::*;
use glib::translate::*;
pub struct TaskBuilder<F: FnMut(&Task) + Send + 'static> {
func: Box<(F, *mut ffi::GstTask)>,
lock: Option<TaskLock>,
enter_callback: Option<Box<dyn FnMut(&Task) + Send + 'static>>,
leave_callback: Option<Box<dyn FnMut(&Task) + Send + 'static>>,
}
impl<F: FnMut(&Task) + Send + 'static> TaskBuilder<F> {
#[doc(alias = "gst_task_set_enter_callback")]
pub fn enter_callback<E: FnMut(&Task) + Send + 'static>(self, enter_callback: E) -> Self {
Self {
enter_callback: Some(Box::new(enter_callback)),
..self
}
}
#[doc(alias = "gst_task_set_leave_callback")]
pub fn leave_callback<E: FnMut(&Task) + Send + 'static>(self, leave_callback: E) -> Self {
Self {
leave_callback: Some(Box::new(leave_callback)),
..self
}
}
#[doc(alias = "gst_task_set_lock")]
pub fn lock(self, lock: &TaskLock) -> Self {
Self {
lock: Some(lock.clone()),
..self
}
}
#[doc(alias = "gst_task_new")]
pub fn build(self) -> Task {
unsafe extern "C" fn func_trampoline<F: FnMut(&Task) + Send + 'static>(
user_data: glib::ffi::gpointer,
) {
let callback: &mut (F, *mut ffi::GstTask) = &mut *(user_data as *mut _);
(callback.0)(&from_glib_borrow(callback.1));
}
unsafe extern "C" fn destroy_func<F: FnMut(&Task) + Send + 'static>(
data: glib::ffi::gpointer,
) {
let _callback: Box<(F, *mut ffi::GstTask)> = Box::from_raw(data as *mut _);
}
unsafe extern "C" fn callback_trampoline(
task: *mut ffi::GstTask,
_thread: *mut glib::ffi::GThread,
data: glib::ffi::gpointer,
) {
let callback: &mut Box<dyn FnMut(&Task) + Send + 'static> = &mut *(data as *mut _);
callback(&from_glib_borrow(task));
}
unsafe extern "C" fn destroy_callback(data: glib::ffi::gpointer) {
let _callback: Box<Box<dyn FnMut(&Task) + Send + 'static>> =
Box::from_raw(data as *mut _);
}
unsafe {
let func_ptr = Box::into_raw(self.func);
let task: Task = from_glib_full(ffi::gst_task_new(
Some(func_trampoline::<F> as _),
func_ptr as *mut _,
Some(destroy_func::<F> as _),
));
(*func_ptr).1 = task.to_glib_none().0;
let lock = self.lock.unwrap_or_else(TaskLock::new);
ffi::gst_task_set_lock(task.to_glib_none().0, mut_override(&lock.0 .0));
task.set_data("gstreamer-rs-task-lock", Arc::clone(&lock.0));
if let Some(enter_callback) = self.enter_callback {
ffi::gst_task_set_enter_callback(
task.to_glib_none().0,
Some(callback_trampoline),
Box::into_raw(Box::new(enter_callback)) as *mut _,
Some(destroy_callback),
);
}
if let Some(leave_callback) = self.leave_callback {
ffi::gst_task_set_leave_callback(
task.to_glib_none().0,
Some(callback_trampoline),
Box::into_raw(Box::new(leave_callback)) as *mut _,
Some(destroy_callback),
);
}
task
}
}
}
impl Task {
#[doc(alias = "gst_task_new")]
pub fn builder<F: FnMut(&Task) + Send + 'static>(func: F) -> TaskBuilder<F> {
assert_initialized_main_thread!();
TaskBuilder {
func: Box::new((func, ptr::null_mut())),
lock: None,
enter_callback: None,
leave_callback: None,
}
}
}
#[derive(Debug, Clone)]
pub struct TaskLock(Arc<RecMutex>);
impl Default for TaskLock {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
struct RecMutex(glib::ffi::GRecMutex);
unsafe impl Send for RecMutex {}
unsafe impl Sync for RecMutex {}
#[must_use = "if unused the TaskLock will immediately unlock"]
pub struct TaskLockGuard<'a>(&'a RecMutex);
impl TaskLock {
pub fn new() -> Self {
unsafe {
let lock = TaskLock(Arc::new(RecMutex(mem::zeroed())));
glib::ffi::g_rec_mutex_init(mut_override(&lock.0 .0));
lock
}
}
// checker-ignore-item
pub fn lock(&self) -> TaskLockGuard {
unsafe {
let guard = TaskLockGuard(&self.0);
glib::ffi::g_rec_mutex_lock(mut_override(&self.0 .0));
guard
}
}
}
impl Drop for RecMutex {
fn drop(&mut self) {
unsafe {
glib::ffi::g_rec_mutex_clear(&mut self.0);
}
}
}
impl<'a> Drop for TaskLockGuard<'a> {
fn drop(&mut self) {
unsafe {
glib::ffi::g_rec_mutex_unlock(mut_override(&self.0 .0));
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::prelude::*;
use std::sync::mpsc::channel;
#[test]
fn test_simple() {
crate::init().unwrap();
#[derive(Debug, PartialEq, Eq)]
enum Called {
Enter,
Func,
Leave,
}
let (send, recv) = channel();
let lock = TaskLock::new();
let task = Task::builder({
let send = send.clone();
let mut count = 0;
move |task| {
count += 1;
if count >= 3 {
task.pause().unwrap();
}
send.send(Called::Func).unwrap();
}
})
.enter_callback({
let send = send.clone();
move |_task| {
send.send(Called::Enter).unwrap();
}
})
.leave_callback({
move |_task| {
send.send(Called::Leave).unwrap();
}
})
.lock(&lock)
.build();
task.start().unwrap();
assert_eq!(recv.recv(), Ok(Called::Enter));
assert_eq!(recv.recv(), Ok(Called::Func));
assert_eq!(recv.recv(), Ok(Called::Func));
assert_eq!(recv.recv(), Ok(Called::Func));
assert_eq!(task.state(), crate::TaskState::Paused);
task.stop().unwrap();
assert_eq!(recv.recv(), Ok(Called::Leave));
task.join().unwrap();
}
}