// Take a look at the license at the top of the repository in the LICENSE file. use crate::Clock; use crate::ClockEntryType; use crate::ClockError; use crate::ClockFlags; use crate::ClockReturn; use crate::ClockSuccess; use crate::ClockTime; use crate::ClockTimeDiff; use glib::ffi::{gboolean, gpointer}; use glib::prelude::*; use glib::translate::*; use libc::c_void; use std::cmp; use std::convert; use std::ptr; use futures_core::{Future, Stream}; use std::marker::Unpin; use std::pin::Pin; use std::sync::atomic; use std::sync::atomic::AtomicI32; glib::wrapper! { #[derive(Debug, PartialOrd, Ord, PartialEq, Eq, Hash)] pub struct ClockId(Shared); match fn { ref => |ptr| ffi::gst_clock_id_ref(ptr), unref => |ptr| ffi::gst_clock_id_unref(ptr), } } impl ClockId { pub fn time(&self) -> ClockTime { unsafe { from_glib(ffi::gst_clock_id_get_time(self.to_glib_none().0)) } } pub fn unschedule(&self) { unsafe { ffi::gst_clock_id_unschedule(self.to_glib_none().0) } } pub fn wait(&self) -> (Result, ClockTimeDiff) { unsafe { let mut jitter = 0; let res: ClockReturn = from_glib(ffi::gst_clock_id_wait(self.to_glib_none().0, &mut jitter)); (res.into_result(), jitter) } } pub fn compare_by_time(&self, other: &Self) -> cmp::Ordering { unsafe { let res = ffi::gst_clock_id_compare_func(self.to_glib_none().0, other.to_glib_none().0); res.cmp(&0) } } #[cfg(any(feature = "v1_16", feature = "dox"))] #[cfg_attr(feature = "dox", doc(cfg(feature = "v1_16")))] pub fn clock(&self) -> Option { unsafe { from_glib_full(ffi::gst_clock_id_get_clock(self.to_glib_none().0)) } } #[cfg(any(feature = "v1_16", feature = "dox"))] #[cfg_attr(feature = "dox", doc(cfg(feature = "v1_16")))] pub fn uses_clock>(&self, clock: &P) -> bool { unsafe { from_glib(ffi::gst_clock_id_uses_clock( self.to_glib_none().0, clock.as_ref().as_ptr(), )) } } pub fn type_(&self) -> ClockEntryType { unsafe { let ptr: *mut ffi::GstClockEntry = self.to_glib_none().0 as *mut _; from_glib((*ptr).type_) } } pub fn status(&self) -> &AtomicClockReturn { unsafe { let ptr: *mut ffi::GstClockEntry = self.to_glib_none().0 as *mut _; &*((&(*ptr).status) as *const i32 as *const AtomicClockReturn) } } } #[derive(Clone, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)] pub struct SingleShotClockId(ClockId); impl std::ops::Deref for SingleShotClockId { type Target = ClockId; fn deref(&self) -> &Self::Target { &self.0 } } impl From for ClockId { fn from(id: SingleShotClockId) -> ClockId { skip_assert_initialized!(); id.0 } } impl convert::TryFrom for SingleShotClockId { type Error = glib::BoolError; fn try_from(id: ClockId) -> Result { skip_assert_initialized!(); match id.type_() { ClockEntryType::Single => Ok(SingleShotClockId(id)), _ => Err(glib::bool_error!("Not a single-shot clock id")), } } } impl SingleShotClockId { pub fn compare_by_time(&self, other: &Self) -> cmp::Ordering { self.0.compare_by_time(&other.0) } pub fn wait_async(&self, func: F) -> Result where F: FnOnce(&Clock, ClockTime, &ClockId) + Send + 'static, { unsafe extern "C" fn trampoline( clock: *mut ffi::GstClock, time: ffi::GstClockTime, id: gpointer, func: gpointer, ) -> gboolean { let f: &mut Option = &mut *(func as *mut Option); let f = f.take().unwrap(); f( &from_glib_borrow(clock), from_glib(time), &from_glib_borrow(id), ); glib::ffi::GTRUE } unsafe extern "C" fn destroy_notify< F: FnOnce(&Clock, ClockTime, &ClockId) + Send + 'static, >( ptr: gpointer, ) { Box::>::from_raw(ptr as *mut _); } let func: Box> = Box::new(Some(func)); let ret: ClockReturn = unsafe { from_glib(ffi::gst_clock_id_wait_async( self.to_glib_none().0, Some(trampoline::), Box::into_raw(func) as gpointer, Some(destroy_notify::), )) }; ret.into_result() } #[allow(clippy::type_complexity)] pub fn wait_async_future( &self, ) -> Result< Pin> + Send + 'static>>, ClockError, > { use futures_channel::oneshot; let (sender, receiver) = oneshot::channel(); self.wait_async(move |_clock, jitter, id| { if sender.send((jitter, id.clone())).is_err() { // Unschedule any future calls if the receiver end is disconnected id.unschedule(); } })?; Ok(Box::pin(async move { receiver.await.map_err(|_| ClockError::Unscheduled) })) } } #[derive(Debug, PartialOrd, Ord, PartialEq, Eq, Hash)] pub struct PeriodicClockId(ClockId); impl std::ops::Deref for PeriodicClockId { type Target = ClockId; fn deref(&self) -> &Self::Target { &self.0 } } impl From for ClockId { fn from(id: PeriodicClockId) -> ClockId { skip_assert_initialized!(); id.0 } } impl convert::TryFrom for PeriodicClockId { type Error = glib::BoolError; fn try_from(id: ClockId) -> Result { skip_assert_initialized!(); match id.type_() { ClockEntryType::Periodic => Ok(PeriodicClockId(id)), _ => Err(glib::bool_error!("Not a periodic clock id")), } } } impl PeriodicClockId { pub fn interval(&self) -> ClockTime { unsafe { let ptr: *mut ffi::GstClockEntry = self.to_glib_none().0 as *mut _; from_glib((*ptr).interval) } } pub fn compare_by_time(&self, other: &Self) -> cmp::Ordering { self.0.compare_by_time(&other.0) } pub fn wait_async(&self, func: F) -> Result where F: Fn(&Clock, ClockTime, &ClockId) + Send + 'static, { unsafe extern "C" fn trampoline( clock: *mut ffi::GstClock, time: ffi::GstClockTime, id: gpointer, func: gpointer, ) -> gboolean { let f: &F = &*(func as *const F); f( &from_glib_borrow(clock), from_glib(time), &from_glib_borrow(id), ); glib::ffi::GTRUE } unsafe extern "C" fn destroy_notify( ptr: gpointer, ) { Box::::from_raw(ptr as *mut _); } let func: Box = Box::new(func); let ret: ClockReturn = unsafe { from_glib(ffi::gst_clock_id_wait_async( self.to_glib_none().0, Some(trampoline::), Box::into_raw(func) as gpointer, Some(destroy_notify::), )) }; ret.into_result() } #[allow(clippy::type_complexity)] pub fn wait_async_stream( &self, ) -> Result< Pin + Unpin + Send + 'static>>, ClockError, > { use futures_channel::mpsc; let (sender, receiver) = mpsc::unbounded(); self.wait_async(move |_clock, jitter, id| { if sender.unbounded_send((jitter, id.clone())).is_err() { // Unschedule any future calls if the receiver end is disconnected id.unschedule(); } })?; Ok(Box::pin(receiver)) } } #[repr(transparent)] #[derive(Debug)] pub struct AtomicClockReturn(AtomicI32); impl AtomicClockReturn { pub fn load(&self) -> ClockReturn { unsafe { from_glib(self.0.load(atomic::Ordering::SeqCst)) } } pub fn store(&self, val: ClockReturn) { self.0.store(val.into_glib(), atomic::Ordering::SeqCst) } pub fn swap(&self, val: ClockReturn) -> ClockReturn { unsafe { from_glib(self.0.swap(val.into_glib(), atomic::Ordering::SeqCst)) } } pub fn compare_exchange( &self, current: ClockReturn, new: ClockReturn, ) -> Result { unsafe { self.0 .compare_exchange( current.into_glib(), new.into_glib(), atomic::Ordering::SeqCst, atomic::Ordering::SeqCst, ) .map(|v| from_glib(v)) .map_err(|v| from_glib(v)) } } } unsafe impl Send for ClockId {} unsafe impl Sync for ClockId {} impl Clock { pub fn adjust_with_calibration( internal_target: ClockTime, cinternal: ClockTime, cexternal: ClockTime, cnum: ClockTime, cdenom: ClockTime, ) -> ClockTime { skip_assert_initialized!(); unsafe { from_glib(ffi::gst_clock_adjust_with_calibration( ptr::null_mut(), internal_target.into_glib(), cinternal.into_glib(), cexternal.into_glib(), cnum.into_glib(), cdenom.into_glib(), )) } } pub fn unadjust_with_calibration( external_target: ClockTime, cinternal: ClockTime, cexternal: ClockTime, cnum: ClockTime, cdenom: ClockTime, ) -> ClockTime { skip_assert_initialized!(); unsafe { from_glib(ffi::gst_clock_unadjust_with_calibration( ptr::null_mut(), external_target.into_glib(), cinternal.into_glib(), cexternal.into_glib(), cnum.into_glib(), cdenom.into_glib(), )) } } } pub trait ClockExtManual: 'static { fn new_periodic_id(&self, start_time: ClockTime, interval: ClockTime) -> PeriodicClockId; fn periodic_id_reinit( &self, id: &PeriodicClockId, start_time: ClockTime, interval: ClockTime, ) -> Result<(), glib::BoolError>; fn new_single_shot_id(&self, time: ClockTime) -> SingleShotClockId; fn single_shot_id_reinit( &self, id: &SingleShotClockId, time: ClockTime, ) -> Result<(), glib::BoolError>; fn set_clock_flags(&self, flags: ClockFlags); fn unset_clock_flags(&self, flags: ClockFlags); fn clock_flags(&self) -> ClockFlags; } impl> ClockExtManual for O { fn new_periodic_id(&self, start_time: ClockTime, interval: ClockTime) -> PeriodicClockId { assert!(start_time.is_some()); assert!(interval.is_some()); assert_ne!(interval, crate::ClockTime::from(0)); unsafe { PeriodicClockId(from_glib_full(ffi::gst_clock_new_periodic_id( self.as_ref().to_glib_none().0, start_time.into_glib(), interval.into_glib(), ))) } } fn periodic_id_reinit( &self, id: &PeriodicClockId, start_time: ClockTime, interval: ClockTime, ) -> Result<(), glib::BoolError> { skip_assert_initialized!(); unsafe { let res: bool = from_glib(ffi::gst_clock_periodic_id_reinit( self.as_ref().to_glib_none().0, id.to_glib_none().0, start_time.into_glib(), interval.into_glib(), )); if res { Ok(()) } else { Err(glib::bool_error!("Failed to reinit periodic clock id")) } } } fn new_single_shot_id(&self, time: ClockTime) -> SingleShotClockId { assert!(time.is_some()); unsafe { SingleShotClockId(from_glib_full(ffi::gst_clock_new_single_shot_id( self.as_ref().to_glib_none().0, time.into_glib(), ))) } } fn single_shot_id_reinit( &self, id: &SingleShotClockId, time: ClockTime, ) -> Result<(), glib::BoolError> { unsafe { let res: bool = from_glib(ffi::gst_clock_single_shot_id_reinit( self.as_ref().to_glib_none().0, id.to_glib_none().0, time.into_glib(), )); if res { Ok(()) } else { Err(glib::bool_error!("Failed to reinit single shot clock id")) } } } fn set_clock_flags(&self, flags: ClockFlags) { unsafe { let ptr: *mut ffi::GstObject = self.as_ptr() as *mut _; let _guard = crate::utils::MutexGuard::lock(&(*ptr).lock); (*ptr).flags |= flags.into_glib(); } } fn unset_clock_flags(&self, flags: ClockFlags) { unsafe { let ptr: *mut ffi::GstObject = self.as_ptr() as *mut _; let _guard = crate::utils::MutexGuard::lock(&(*ptr).lock); (*ptr).flags &= !flags.into_glib(); } } fn clock_flags(&self) -> ClockFlags { unsafe { let ptr: *mut ffi::GstObject = self.as_ptr() as *mut _; let _guard = crate::utils::MutexGuard::lock(&(*ptr).lock); from_glib((*ptr).flags) } } } #[cfg(test)] mod tests { use super::super::prelude::*; use super::super::*; use super::*; use std::sync::mpsc::channel; #[test] fn test_wait() { crate::init().unwrap(); let clock = SystemClock::obtain(); let now = clock.time(); let id = clock.new_single_shot_id(now + 20 * crate::MSECOND); let (res, _) = id.wait(); assert!(res == Ok(ClockSuccess::Ok) || res == Err(ClockError::Early)); } #[test] fn test_wait_async() { crate::init().unwrap(); let (sender, receiver) = channel(); let clock = SystemClock::obtain(); let now = clock.time(); let id = clock.new_single_shot_id(now + 20 * crate::MSECOND); let res = id.wait_async(move |_, _, _| { sender.send(()).unwrap(); }); assert!(res == Ok(ClockSuccess::Ok)); assert_eq!(receiver.recv(), Ok(())); } #[test] fn test_wait_periodic() { crate::init().unwrap(); let clock = SystemClock::obtain(); let now = clock.time(); let id = clock.new_periodic_id(now + 20 * crate::MSECOND, 20 * crate::MSECOND); let (res, _) = id.wait(); assert!(res == Ok(ClockSuccess::Ok) || res == Err(ClockError::Early)); let (res, _) = id.wait(); assert!(res == Ok(ClockSuccess::Ok) || res == Err(ClockError::Early)); } #[test] fn test_wait_async_periodic() { crate::init().unwrap(); let (sender, receiver) = channel(); let clock = SystemClock::obtain(); let now = clock.time(); let id = clock.new_periodic_id(now + 20 * crate::MSECOND, 20 * crate::MSECOND); let res = id.wait_async(move |_, _, _| { let _ = sender.send(()); }); assert!(res == Ok(ClockSuccess::Ok)); assert_eq!(receiver.recv(), Ok(())); assert_eq!(receiver.recv(), Ok(())); } }