forked from mirrors/gstreamer-rs
gstreamer: Complete the Task bindings
This commit is contained in:
parent
4f8f99e2bf
commit
5a334cdcd3
4 changed files with 320 additions and 20 deletions
|
@ -2306,19 +2306,46 @@ status = "generate"
|
|||
# Need work
|
||||
[[object.function]]
|
||||
name = "new"
|
||||
ignore = true
|
||||
manual = true
|
||||
[[object.function]]
|
||||
name = "set_enter_callback"
|
||||
ignore = true
|
||||
manual = true
|
||||
[[object.function]]
|
||||
name = "set_leave_callback"
|
||||
ignore = true
|
||||
manual = true
|
||||
[[object.function]]
|
||||
name = "set_lock"
|
||||
ignore = true
|
||||
manual = true
|
||||
|
||||
[[object.function]]
|
||||
name = "start"
|
||||
[object.function.return]
|
||||
bool_return_is_error = "Failed to start task"
|
||||
|
||||
[[object.function]]
|
||||
name = "stop"
|
||||
[object.function.return]
|
||||
bool_return_is_error = "Failed to stop task"
|
||||
|
||||
[[object.function]]
|
||||
name = "pause"
|
||||
[object.function.return]
|
||||
bool_return_is_error = "Failed to pause task"
|
||||
|
||||
[[object.function]]
|
||||
name = "resume"
|
||||
[object.function.return]
|
||||
bool_return_is_error = "Failed to resume task"
|
||||
|
||||
[[object.function]]
|
||||
name = "join"
|
||||
[object.function.return]
|
||||
bool_return_is_error = "Failed to join task"
|
||||
|
||||
[[object.function]]
|
||||
name = "set_state"
|
||||
ignore = true
|
||||
[object.function.return]
|
||||
bool_return_is_error = "Failed to set task state"
|
||||
|
||||
[[object]]
|
||||
name = "Gst.TaskPool"
|
||||
|
|
|
@ -43,24 +43,27 @@ pub trait TaskExt: 'static {
|
|||
fn state(&self) -> TaskState;
|
||||
|
||||
#[doc(alias = "gst_task_join")]
|
||||
fn join(&self) -> bool;
|
||||
fn join(&self) -> Result<(), glib::error::BoolError>;
|
||||
|
||||
#[doc(alias = "gst_task_pause")]
|
||||
fn pause(&self) -> bool;
|
||||
fn pause(&self) -> Result<(), glib::error::BoolError>;
|
||||
|
||||
#[cfg(any(feature = "v1_18", feature = "dox"))]
|
||||
#[cfg_attr(feature = "dox", doc(cfg(feature = "v1_18")))]
|
||||
#[doc(alias = "gst_task_resume")]
|
||||
fn resume(&self) -> bool;
|
||||
fn resume(&self) -> Result<(), glib::error::BoolError>;
|
||||
|
||||
#[doc(alias = "gst_task_set_pool")]
|
||||
fn set_pool(&self, pool: &impl IsA<TaskPool>);
|
||||
|
||||
#[doc(alias = "gst_task_set_state")]
|
||||
fn set_state(&self, state: TaskState) -> Result<(), glib::error::BoolError>;
|
||||
|
||||
#[doc(alias = "gst_task_start")]
|
||||
fn start(&self) -> bool;
|
||||
fn start(&self) -> Result<(), glib::error::BoolError>;
|
||||
|
||||
#[doc(alias = "gst_task_stop")]
|
||||
fn stop(&self) -> bool;
|
||||
fn stop(&self) -> Result<(), glib::error::BoolError>;
|
||||
}
|
||||
|
||||
impl<O: IsA<Task>> TaskExt for O {
|
||||
|
@ -72,18 +75,33 @@ impl<O: IsA<Task>> TaskExt for O {
|
|||
unsafe { from_glib(ffi::gst_task_get_state(self.as_ref().to_glib_none().0)) }
|
||||
}
|
||||
|
||||
fn join(&self) -> bool {
|
||||
unsafe { from_glib(ffi::gst_task_join(self.as_ref().to_glib_none().0)) }
|
||||
fn join(&self) -> Result<(), glib::error::BoolError> {
|
||||
unsafe {
|
||||
glib::result_from_gboolean!(
|
||||
ffi::gst_task_join(self.as_ref().to_glib_none().0),
|
||||
"Failed to join task"
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fn pause(&self) -> bool {
|
||||
unsafe { from_glib(ffi::gst_task_pause(self.as_ref().to_glib_none().0)) }
|
||||
fn pause(&self) -> Result<(), glib::error::BoolError> {
|
||||
unsafe {
|
||||
glib::result_from_gboolean!(
|
||||
ffi::gst_task_pause(self.as_ref().to_glib_none().0),
|
||||
"Failed to pause task"
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(any(feature = "v1_18", feature = "dox"))]
|
||||
#[cfg_attr(feature = "dox", doc(cfg(feature = "v1_18")))]
|
||||
fn resume(&self) -> bool {
|
||||
unsafe { from_glib(ffi::gst_task_resume(self.as_ref().to_glib_none().0)) }
|
||||
fn resume(&self) -> Result<(), glib::error::BoolError> {
|
||||
unsafe {
|
||||
glib::result_from_gboolean!(
|
||||
ffi::gst_task_resume(self.as_ref().to_glib_none().0),
|
||||
"Failed to resume task"
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fn set_pool(&self, pool: &impl IsA<TaskPool>) {
|
||||
|
@ -95,11 +113,30 @@ impl<O: IsA<Task>> TaskExt for O {
|
|||
}
|
||||
}
|
||||
|
||||
fn start(&self) -> bool {
|
||||
unsafe { from_glib(ffi::gst_task_start(self.as_ref().to_glib_none().0)) }
|
||||
fn set_state(&self, state: TaskState) -> Result<(), glib::error::BoolError> {
|
||||
unsafe {
|
||||
glib::result_from_gboolean!(
|
||||
ffi::gst_task_set_state(self.as_ref().to_glib_none().0, state.into_glib()),
|
||||
"Failed to set task state"
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fn stop(&self) -> bool {
|
||||
unsafe { from_glib(ffi::gst_task_stop(self.as_ref().to_glib_none().0)) }
|
||||
fn start(&self) -> Result<(), glib::error::BoolError> {
|
||||
unsafe {
|
||||
glib::result_from_gboolean!(
|
||||
ffi::gst_task_start(self.as_ref().to_glib_none().0),
|
||||
"Failed to start task"
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fn stop(&self) -> Result<(), glib::error::BoolError> {
|
||||
unsafe {
|
||||
glib::result_from_gboolean!(
|
||||
ffi::gst_task_stop(self.as_ref().to_glib_none().0),
|
||||
"Failed to stop task"
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -182,6 +182,8 @@ mod control_source;
|
|||
mod parse_context;
|
||||
mod proxy_pad;
|
||||
mod tag_setter;
|
||||
pub mod task;
|
||||
pub use task::{TaskLock, TaskLockGuard};
|
||||
mod task_pool;
|
||||
pub use crate::element::{ElementMessageType, NotifyWatchId};
|
||||
pub use crate::element::{
|
||||
|
|
234
gstreamer/src/task.rs
Normal file
234
gstreamer/src/task.rs
Normal file
|
@ -0,0 +1,234 @@
|
|||
// Take a look at the license at the top of the repository in the LICENSE file.
|
||||
|
||||
use crate::Task;
|
||||
|
||||
use std::mem;
|
||||
use std::ptr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use glib::prelude::*;
|
||||
use glib::translate::*;
|
||||
|
||||
pub struct TaskBuilder<F: FnMut(&Task) + Send + 'static> {
|
||||
func: Box<(F, *mut ffi::GstTask)>,
|
||||
lock: Option<TaskLock>,
|
||||
enter_callback: Option<Box<dyn FnMut(&Task) + Send + 'static>>,
|
||||
leave_callback: Option<Box<dyn FnMut(&Task) + Send + 'static>>,
|
||||
}
|
||||
|
||||
impl<F: FnMut(&Task) + Send + 'static> TaskBuilder<F> {
|
||||
#[doc(alias = "gst_task_set_enter_callback")]
|
||||
pub fn enter_callback<E: FnMut(&Task) + Send + 'static>(self, enter_callback: E) -> Self {
|
||||
Self {
|
||||
enter_callback: Some(Box::new(enter_callback)),
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
#[doc(alias = "gst_task_set_leave_callback")]
|
||||
pub fn leave_callback<E: FnMut(&Task) + Send + 'static>(self, leave_callback: E) -> Self {
|
||||
Self {
|
||||
leave_callback: Some(Box::new(leave_callback)),
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
#[doc(alias = "gst_task_set_lock")]
|
||||
pub fn lock(self, lock: &TaskLock) -> Self {
|
||||
Self {
|
||||
lock: Some(lock.clone()),
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
#[doc(alias = "gst_task_new")]
|
||||
pub fn build(self) -> Task {
|
||||
unsafe extern "C" fn func_trampoline<F: FnMut(&Task) + Send + 'static>(
|
||||
user_data: glib::ffi::gpointer,
|
||||
) {
|
||||
let callback: &mut (F, *mut ffi::GstTask) = &mut *(user_data as *mut _);
|
||||
(callback.0)(&from_glib_borrow(callback.1));
|
||||
}
|
||||
|
||||
unsafe extern "C" fn destroy_func<F: FnMut(&Task) + Send + 'static>(
|
||||
data: glib::ffi::gpointer,
|
||||
) {
|
||||
let _callback: Box<(F, *mut ffi::GstTask)> = Box::from_raw(data as *mut _);
|
||||
}
|
||||
|
||||
unsafe extern "C" fn callback_trampoline(
|
||||
task: *mut ffi::GstTask,
|
||||
_thread: *mut glib::ffi::GThread,
|
||||
data: glib::ffi::gpointer,
|
||||
) {
|
||||
let callback: &mut Box<dyn FnMut(&Task) + Send + 'static> = &mut *(data as *mut _);
|
||||
callback(&from_glib_borrow(task));
|
||||
}
|
||||
|
||||
unsafe extern "C" fn destroy_callback(data: glib::ffi::gpointer) {
|
||||
let _callback: Box<Box<dyn FnMut(&Task) + Send + 'static>> =
|
||||
Box::from_raw(data as *mut _);
|
||||
}
|
||||
|
||||
unsafe {
|
||||
let func_ptr = Box::into_raw(self.func);
|
||||
|
||||
let task: Task = from_glib_full(ffi::gst_task_new(
|
||||
Some(func_trampoline::<F> as _),
|
||||
func_ptr as *mut _,
|
||||
Some(destroy_func::<F> as _),
|
||||
));
|
||||
|
||||
(*func_ptr).1 = task.to_glib_none().0;
|
||||
|
||||
let lock = self.lock.unwrap_or_else(TaskLock::new);
|
||||
ffi::gst_task_set_lock(task.to_glib_none().0, mut_override(&lock.0 .0));
|
||||
task.set_data("gstreamer-rs-task-lock", Arc::clone(&lock.0));
|
||||
|
||||
if let Some(enter_callback) = self.enter_callback {
|
||||
ffi::gst_task_set_enter_callback(
|
||||
task.to_glib_none().0,
|
||||
Some(callback_trampoline),
|
||||
Box::into_raw(Box::new(enter_callback)) as *mut _,
|
||||
Some(destroy_callback),
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(leave_callback) = self.leave_callback {
|
||||
ffi::gst_task_set_leave_callback(
|
||||
task.to_glib_none().0,
|
||||
Some(callback_trampoline),
|
||||
Box::into_raw(Box::new(leave_callback)) as *mut _,
|
||||
Some(destroy_callback),
|
||||
);
|
||||
}
|
||||
|
||||
task
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Task {
|
||||
#[doc(alias = "gst_task_new")]
|
||||
pub fn builder<F: FnMut(&Task) + Send + 'static>(func: F) -> TaskBuilder<F> {
|
||||
assert_initialized_main_thread!();
|
||||
TaskBuilder {
|
||||
func: Box::new((func, ptr::null_mut())),
|
||||
lock: None,
|
||||
enter_callback: None,
|
||||
leave_callback: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TaskLock(Arc<RecMutex>);
|
||||
|
||||
impl Default for TaskLock {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct RecMutex(glib::ffi::GRecMutex);
|
||||
|
||||
unsafe impl Send for RecMutex {}
|
||||
unsafe impl Sync for RecMutex {}
|
||||
|
||||
#[must_use = "if unused the TaskLock will immediately unlock"]
|
||||
pub struct TaskLockGuard<'a>(&'a RecMutex);
|
||||
|
||||
impl TaskLock {
|
||||
pub fn new() -> Self {
|
||||
unsafe {
|
||||
let lock = TaskLock(Arc::new(RecMutex(mem::zeroed())));
|
||||
glib::ffi::g_rec_mutex_init(mut_override(&lock.0 .0));
|
||||
lock
|
||||
}
|
||||
}
|
||||
|
||||
// checker-ignore-item
|
||||
pub fn lock(&self) -> TaskLockGuard {
|
||||
unsafe {
|
||||
let guard = TaskLockGuard(&self.0);
|
||||
glib::ffi::g_rec_mutex_lock(mut_override(&self.0 .0));
|
||||
guard
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for RecMutex {
|
||||
fn drop(&mut self) {
|
||||
unsafe {
|
||||
glib::ffi::g_rec_mutex_clear(&mut self.0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Drop for TaskLockGuard<'a> {
|
||||
fn drop(&mut self) {
|
||||
unsafe {
|
||||
glib::ffi::g_rec_mutex_unlock(mut_override(&self.0 .0));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::prelude::*;
|
||||
use std::sync::mpsc::channel;
|
||||
|
||||
#[test]
|
||||
fn test_simple() {
|
||||
crate::init().unwrap();
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
enum Called {
|
||||
Enter,
|
||||
Func,
|
||||
Leave,
|
||||
}
|
||||
|
||||
let (send, recv) = channel();
|
||||
let lock = TaskLock::new();
|
||||
|
||||
let task = Task::builder({
|
||||
let send = send.clone();
|
||||
let mut count = 0;
|
||||
move |task| {
|
||||
count += 1;
|
||||
if count >= 3 {
|
||||
task.pause().unwrap();
|
||||
}
|
||||
send.send(Called::Func).unwrap();
|
||||
}
|
||||
})
|
||||
.enter_callback({
|
||||
let send = send.clone();
|
||||
move |_task| {
|
||||
send.send(Called::Enter).unwrap();
|
||||
}
|
||||
})
|
||||
.leave_callback({
|
||||
move |_task| {
|
||||
send.send(Called::Leave).unwrap();
|
||||
}
|
||||
})
|
||||
.lock(&lock)
|
||||
.build();
|
||||
|
||||
task.start().unwrap();
|
||||
|
||||
assert_eq!(recv.recv(), Ok(Called::Enter));
|
||||
assert_eq!(recv.recv(), Ok(Called::Func));
|
||||
assert_eq!(recv.recv(), Ok(Called::Func));
|
||||
assert_eq!(recv.recv(), Ok(Called::Func));
|
||||
|
||||
assert_eq!(task.state(), crate::TaskState::Paused);
|
||||
task.stop().unwrap();
|
||||
assert_eq!(recv.recv(), Ok(Called::Leave));
|
||||
task.join().unwrap();
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue