From 531014a35ccec7d57826b478eff6e52c9b80b9a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Sat, 10 Oct 2020 12:10:52 +0300 Subject: [PATCH] gstreamer/clock: Improve ClockId bindings There is now a separate type for Single and Periodic clock ids. This allows to have API that is only for one type on that specific type instead of doing runtime checks, and allows for more refined async waiting API. --- gstreamer/src/clock.rs | 352 +++++++++++++++++++++++++++++++---------- gstreamer/src/lib.rs | 2 +- 2 files changed, 268 insertions(+), 86 deletions(-) diff --git a/gstreamer/src/clock.rs b/gstreamer/src/clock.rs index 9caa12d3a..f1b69e1b1 100644 --- a/gstreamer/src/clock.rs +++ b/gstreamer/src/clock.rs @@ -15,6 +15,7 @@ use glib_sys::{gboolean, gpointer}; use gst_sys; use libc::c_void; use std::cmp; +use std::convert; use std::ptr; use Clock; use ClockEntryType; @@ -25,7 +26,7 @@ use ClockSuccess; use ClockTime; use ClockTimeDiff; -use futures_core::Stream; +use futures_core::{Future, Stream}; use std::marker::Unpin; use std::pin::Pin; use std::sync::atomic; @@ -41,35 +42,6 @@ glib_wrapper! { } } -unsafe extern "C" fn trampoline_wait_async( - clock: *mut gst_sys::GstClock, - time: gst_sys::GstClockTime, - id: gpointer, - func: gpointer, -) -> gboolean { - let f: &F = &*(func as *const F); - f( - &from_glib_borrow(clock), - from_glib(time), - &from_glib_borrow(id), - ); - glib_sys::GTRUE -} - -unsafe extern "C" fn destroy_closure_wait_async< - F: Fn(&Clock, ClockTime, &ClockId) + Send + 'static, ->( - ptr: gpointer, -) { - Box::::from_raw(ptr as *mut _); -} - -fn into_raw_wait_async(func: F) -> gpointer { - #[allow(clippy::type_complexity)] - let func: Box = Box::new(func); - Box::into_raw(func) as gpointer -} - impl ClockId { pub fn get_time(&self) -> ClockTime { unsafe { from_glib(gst_sys::gst_clock_id_get_time(self.to_glib_none().0)) } @@ -90,42 +62,6 @@ impl ClockId { } } - pub fn wait_async(&self, func: F) -> Result - where - F: Fn(&Clock, ClockTime, &ClockId) + Send + 'static, - { - let ret: ClockReturn = unsafe { - from_glib(gst_sys::gst_clock_id_wait_async( - self.to_glib_none().0, - Some(trampoline_wait_async::), - into_raw_wait_async(func), - Some(destroy_closure_wait_async::), - )) - }; - ret.into_result() - } - - #[allow(clippy::type_complexity)] - pub fn wait_async_stream( - &self, - ) -> Result< - Pin + Unpin + Send + 'static>>, - ClockError, - > { - use futures_channel::mpsc; - - let (sender, receiver) = mpsc::unbounded(); - - self.wait_async(move |_clock, jitter, id| { - if sender.unbounded_send((jitter, id.clone())).is_err() { - // Unschedule any future calls if the receiver end is disconnected - id.unschedule(); - } - })?; - - Ok(Box::pin(receiver)) - } - pub fn compare_by_time(&self, other: &Self) -> cmp::Ordering { unsafe { let res = @@ -156,17 +92,6 @@ impl ClockId { } } - pub fn get_interval(&self) -> Option { - if self.get_type() != ClockEntryType::Periodic { - return None; - } - - unsafe { - let ptr: *mut gst_sys::GstClockEntry = self.to_glib_none().0 as *mut _; - Some(from_glib((*ptr).interval)) - } - } - pub fn get_status(&self) -> &AtomicClockReturn { unsafe { let ptr: *mut gst_sys::GstClockEntry = self.to_glib_none().0 as *mut _; @@ -175,6 +100,215 @@ impl ClockId { } } +#[derive(Clone, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)] +pub struct SingleShotClockId(ClockId); + +impl std::ops::Deref for SingleShotClockId { + type Target = ClockId; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl From for ClockId { + fn from(id: SingleShotClockId) -> ClockId { + skip_assert_initialized!(); + id.0 + } +} + +impl convert::TryFrom for SingleShotClockId { + type Error = glib::BoolError; + + fn try_from(id: ClockId) -> Result { + skip_assert_initialized!(); + match id.get_type() { + ClockEntryType::Single => Ok(SingleShotClockId(id)), + _ => Err(glib_bool_error!("Not a single-shot clock id")), + } + } +} + +impl SingleShotClockId { + pub fn compare_by_time(&self, other: &Self) -> cmp::Ordering { + self.0.compare_by_time(&other.0) + } + + pub fn wait_async(&self, func: F) -> Result + where + F: FnOnce(&Clock, ClockTime, &ClockId) + Send + 'static, + { + unsafe extern "C" fn trampoline( + clock: *mut gst_sys::GstClock, + time: gst_sys::GstClockTime, + id: gpointer, + func: gpointer, + ) -> gboolean { + let f: &mut Option = &mut *(func as *mut Option); + let f = f.take().unwrap(); + + f( + &from_glib_borrow(clock), + from_glib(time), + &from_glib_borrow(id), + ); + + glib_sys::GTRUE + } + + unsafe extern "C" fn destroy_notify< + F: FnOnce(&Clock, ClockTime, &ClockId) + Send + 'static, + >( + ptr: gpointer, + ) { + Box::>::from_raw(ptr as *mut _); + } + + let func: Box> = Box::new(Some(func)); + + let ret: ClockReturn = unsafe { + from_glib(gst_sys::gst_clock_id_wait_async( + self.to_glib_none().0, + Some(trampoline::), + Box::into_raw(func) as gpointer, + Some(destroy_notify::), + )) + }; + ret.into_result() + } + + #[allow(clippy::type_complexity)] + pub fn wait_async_future( + &self, + ) -> Result< + Pin< + Box< + dyn Future> + + Unpin + + Send + + 'static, + >, + >, + ClockError, + > { + use futures_channel::oneshot; + use futures_util::TryFutureExt; + + let (sender, receiver) = oneshot::channel(); + + self.wait_async(move |_clock, jitter, id| { + if sender.send((jitter, id.clone())).is_err() { + // Unschedule any future calls if the receiver end is disconnected + id.unschedule(); + } + })?; + + Ok(Box::pin(receiver.map_err(|_| ClockError::Unscheduled))) + } +} + +#[derive(Debug, PartialOrd, Ord, PartialEq, Eq, Hash)] +pub struct PeriodicClockId(ClockId); + +impl std::ops::Deref for PeriodicClockId { + type Target = ClockId; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl From for ClockId { + fn from(id: PeriodicClockId) -> ClockId { + skip_assert_initialized!(); + id.0 + } +} + +impl convert::TryFrom for PeriodicClockId { + type Error = glib::BoolError; + + fn try_from(id: ClockId) -> Result { + skip_assert_initialized!(); + match id.get_type() { + ClockEntryType::Periodic => Ok(PeriodicClockId(id)), + _ => Err(glib_bool_error!("Not a periodic clock id")), + } + } +} + +impl PeriodicClockId { + pub fn get_interval(&self) -> ClockTime { + unsafe { + let ptr: *mut gst_sys::GstClockEntry = self.to_glib_none().0 as *mut _; + from_glib((*ptr).interval) + } + } + + pub fn compare_by_time(&self, other: &Self) -> cmp::Ordering { + self.0.compare_by_time(&other.0) + } + + pub fn wait_async(&self, func: F) -> Result + where + F: Fn(&Clock, ClockTime, &ClockId) + Send + 'static, + { + unsafe extern "C" fn trampoline( + clock: *mut gst_sys::GstClock, + time: gst_sys::GstClockTime, + id: gpointer, + func: gpointer, + ) -> gboolean { + let f: &F = &*(func as *const F); + f( + &from_glib_borrow(clock), + from_glib(time), + &from_glib_borrow(id), + ); + glib_sys::GTRUE + } + + unsafe extern "C" fn destroy_notify( + ptr: gpointer, + ) { + Box::::from_raw(ptr as *mut _); + } + + let func: Box = Box::new(func); + let ret: ClockReturn = unsafe { + from_glib(gst_sys::gst_clock_id_wait_async( + self.to_glib_none().0, + Some(trampoline::), + Box::into_raw(func) as gpointer, + Some(destroy_notify::), + )) + }; + ret.into_result() + } + + #[allow(clippy::type_complexity)] + pub fn wait_async_stream( + &self, + ) -> Result< + Pin + Unpin + Send + 'static>>, + ClockError, + > { + use futures_channel::mpsc; + + let (sender, receiver) = mpsc::unbounded(); + + self.wait_async(move |_clock, jitter, id| { + if sender.unbounded_send((jitter, id.clone())).is_err() { + // Unschedule any future calls if the receiver end is disconnected + id.unschedule(); + } + })?; + + Ok(Box::pin(receiver)) + } +} + #[repr(C)] #[derive(Debug)] pub struct AtomicClockReturn(AtomicI32); @@ -251,18 +385,22 @@ pub trait ClockExtManual: 'static { &self, start_time: ClockTime, interval: ClockTime, - ) -> Result; + ) -> Result; fn periodic_id_reinit( &self, - id: &ClockId, + id: &PeriodicClockId, start_time: ClockTime, interval: ClockTime, ) -> Result<(), glib::BoolError>; - fn new_single_shot_id(&self, time: ClockTime) -> Result; + fn new_single_shot_id(&self, time: ClockTime) -> Result; - fn single_shot_id_reinit(&self, id: &ClockId, time: ClockTime) -> Result<(), glib::BoolError>; + fn single_shot_id_reinit( + &self, + id: &SingleShotClockId, + time: ClockTime, + ) -> Result<(), glib::BoolError>; fn set_clock_flags(&self, flags: ClockFlags); @@ -276,20 +414,21 @@ impl> ClockExtManual for O { &self, start_time: ClockTime, interval: ClockTime, - ) -> Result { + ) -> Result { unsafe { Option::<_>::from_glib_full(gst_sys::gst_clock_new_periodic_id( self.as_ref().to_glib_none().0, start_time.to_glib(), interval.to_glib(), )) + .map(PeriodicClockId) .ok_or_else(|| glib_bool_error!("Failed to create new periodic clock id")) } } fn periodic_id_reinit( &self, - id: &ClockId, + id: &PeriodicClockId, start_time: ClockTime, interval: ClockTime, ) -> Result<(), glib::BoolError> { @@ -309,17 +448,22 @@ impl> ClockExtManual for O { } } - fn new_single_shot_id(&self, time: ClockTime) -> Result { + fn new_single_shot_id(&self, time: ClockTime) -> Result { unsafe { Option::<_>::from_glib_full(gst_sys::gst_clock_new_single_shot_id( self.as_ref().to_glib_none().0, time.to_glib(), )) + .map(SingleShotClockId) .ok_or_else(|| glib_bool_error!("Failed to create new single shot clock id")) } } - fn single_shot_id_reinit(&self, id: &ClockId, time: ClockTime) -> Result<(), glib::BoolError> { + fn single_shot_id_reinit( + &self, + id: &SingleShotClockId, + time: ClockTime, + ) -> Result<(), glib::BoolError> { unsafe { let res: bool = from_glib(gst_sys::gst_clock_single_shot_id_reinit( self.as_ref().to_glib_none().0, @@ -394,4 +538,42 @@ mod tests { assert_eq!(receiver.recv(), Ok(())); } + + #[test] + fn test_wait_periodic() { + ::init().unwrap(); + + let clock = SystemClock::obtain(); + let now = clock.get_time(); + let id = clock + .new_periodic_id(now + 20 * ::MSECOND, 20 * ::MSECOND) + .unwrap(); + + let (res, _) = id.wait(); + assert!(res == Ok(ClockSuccess::Ok) || res == Err(ClockError::Early)); + + let (res, _) = id.wait(); + assert!(res == Ok(ClockSuccess::Ok) || res == Err(ClockError::Early)); + } + + #[test] + fn test_wait_async_periodic() { + ::init().unwrap(); + + let (sender, receiver) = channel(); + + let clock = SystemClock::obtain(); + let now = clock.get_time(); + let id = clock + .new_periodic_id(now + 20 * ::MSECOND, 20 * ::MSECOND) + .unwrap(); + let res = id.wait_async(move |_, _, _| { + let _ = sender.send(()); + }); + + assert!(res == Ok(ClockSuccess::Ok)); + + assert_eq!(receiver.recv(), Ok(())); + assert_eq!(receiver.recv(), Ok(())); + } } diff --git a/gstreamer/src/lib.rs b/gstreamer/src/lib.rs index d83f1eeca..235af8e64 100644 --- a/gstreamer/src/lib.rs +++ b/gstreamer/src/lib.rs @@ -273,7 +273,7 @@ pub use toc::{Toc, TocEntry, TocEntryRef, TocRef}; mod toc_serde; mod clock; -pub use clock::{AtomicClockReturn, ClockExtManual, ClockId}; +pub use clock::{AtomicClockReturn, ClockExtManual, ClockId, PeriodicClockId, SingleShotClockId}; mod buffer_pool; pub use buffer_pool::*;