gstreamer/clock: Improve ClockId bindings

There is now a separate type for Single and Periodic clock ids. This
allows to have API that is only for one type on that specific type
instead of doing runtime checks, and allows for more refined async
waiting API.
This commit is contained in:
Sebastian Dröge 2020-10-10 12:10:52 +03:00 committed by Sebastian Dröge
parent cb362e6fbc
commit 531014a35c
2 changed files with 268 additions and 86 deletions

View file

@ -15,6 +15,7 @@ use glib_sys::{gboolean, gpointer};
use gst_sys;
use libc::c_void;
use std::cmp;
use std::convert;
use std::ptr;
use Clock;
use ClockEntryType;
@ -25,7 +26,7 @@ use ClockSuccess;
use ClockTime;
use ClockTimeDiff;
use futures_core::Stream;
use futures_core::{Future, Stream};
use std::marker::Unpin;
use std::pin::Pin;
use std::sync::atomic;
@ -41,35 +42,6 @@ glib_wrapper! {
}
}
unsafe extern "C" fn trampoline_wait_async<F: Fn(&Clock, ClockTime, &ClockId) + Send + 'static>(
clock: *mut gst_sys::GstClock,
time: gst_sys::GstClockTime,
id: gpointer,
func: gpointer,
) -> gboolean {
let f: &F = &*(func as *const F);
f(
&from_glib_borrow(clock),
from_glib(time),
&from_glib_borrow(id),
);
glib_sys::GTRUE
}
unsafe extern "C" fn destroy_closure_wait_async<
F: Fn(&Clock, ClockTime, &ClockId) + Send + 'static,
>(
ptr: gpointer,
) {
Box::<F>::from_raw(ptr as *mut _);
}
fn into_raw_wait_async<F: Fn(&Clock, ClockTime, &ClockId) + Send + 'static>(func: F) -> gpointer {
#[allow(clippy::type_complexity)]
let func: Box<F> = Box::new(func);
Box::into_raw(func) as gpointer
}
impl ClockId {
pub fn get_time(&self) -> ClockTime {
unsafe { from_glib(gst_sys::gst_clock_id_get_time(self.to_glib_none().0)) }
@ -90,42 +62,6 @@ impl ClockId {
}
}
pub fn wait_async<F>(&self, func: F) -> Result<ClockSuccess, ClockError>
where
F: Fn(&Clock, ClockTime, &ClockId) + Send + 'static,
{
let ret: ClockReturn = unsafe {
from_glib(gst_sys::gst_clock_id_wait_async(
self.to_glib_none().0,
Some(trampoline_wait_async::<F>),
into_raw_wait_async(func),
Some(destroy_closure_wait_async::<F>),
))
};
ret.into_result()
}
#[allow(clippy::type_complexity)]
pub fn wait_async_stream(
&self,
) -> Result<
Pin<Box<dyn Stream<Item = (ClockTime, ClockId)> + Unpin + Send + 'static>>,
ClockError,
> {
use futures_channel::mpsc;
let (sender, receiver) = mpsc::unbounded();
self.wait_async(move |_clock, jitter, id| {
if sender.unbounded_send((jitter, id.clone())).is_err() {
// Unschedule any future calls if the receiver end is disconnected
id.unschedule();
}
})?;
Ok(Box::pin(receiver))
}
pub fn compare_by_time(&self, other: &Self) -> cmp::Ordering {
unsafe {
let res =
@ -156,17 +92,6 @@ impl ClockId {
}
}
pub fn get_interval(&self) -> Option<ClockTime> {
if self.get_type() != ClockEntryType::Periodic {
return None;
}
unsafe {
let ptr: *mut gst_sys::GstClockEntry = self.to_glib_none().0 as *mut _;
Some(from_glib((*ptr).interval))
}
}
pub fn get_status(&self) -> &AtomicClockReturn {
unsafe {
let ptr: *mut gst_sys::GstClockEntry = self.to_glib_none().0 as *mut _;
@ -175,6 +100,215 @@ impl ClockId {
}
}
#[derive(Clone, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
pub struct SingleShotClockId(ClockId);
impl std::ops::Deref for SingleShotClockId {
type Target = ClockId;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl From<SingleShotClockId> for ClockId {
fn from(id: SingleShotClockId) -> ClockId {
skip_assert_initialized!();
id.0
}
}
impl convert::TryFrom<ClockId> for SingleShotClockId {
type Error = glib::BoolError;
fn try_from(id: ClockId) -> Result<SingleShotClockId, glib::BoolError> {
skip_assert_initialized!();
match id.get_type() {
ClockEntryType::Single => Ok(SingleShotClockId(id)),
_ => Err(glib_bool_error!("Not a single-shot clock id")),
}
}
}
impl SingleShotClockId {
pub fn compare_by_time(&self, other: &Self) -> cmp::Ordering {
self.0.compare_by_time(&other.0)
}
pub fn wait_async<F>(&self, func: F) -> Result<ClockSuccess, ClockError>
where
F: FnOnce(&Clock, ClockTime, &ClockId) + Send + 'static,
{
unsafe extern "C" fn trampoline<F: FnOnce(&Clock, ClockTime, &ClockId) + Send + 'static>(
clock: *mut gst_sys::GstClock,
time: gst_sys::GstClockTime,
id: gpointer,
func: gpointer,
) -> gboolean {
let f: &mut Option<F> = &mut *(func as *mut Option<F>);
let f = f.take().unwrap();
f(
&from_glib_borrow(clock),
from_glib(time),
&from_glib_borrow(id),
);
glib_sys::GTRUE
}
unsafe extern "C" fn destroy_notify<
F: FnOnce(&Clock, ClockTime, &ClockId) + Send + 'static,
>(
ptr: gpointer,
) {
Box::<Option<F>>::from_raw(ptr as *mut _);
}
let func: Box<Option<F>> = Box::new(Some(func));
let ret: ClockReturn = unsafe {
from_glib(gst_sys::gst_clock_id_wait_async(
self.to_glib_none().0,
Some(trampoline::<F>),
Box::into_raw(func) as gpointer,
Some(destroy_notify::<F>),
))
};
ret.into_result()
}
#[allow(clippy::type_complexity)]
pub fn wait_async_future(
&self,
) -> Result<
Pin<
Box<
dyn Future<Output = Result<(ClockTime, ClockId), ClockError>>
+ Unpin
+ Send
+ 'static,
>,
>,
ClockError,
> {
use futures_channel::oneshot;
use futures_util::TryFutureExt;
let (sender, receiver) = oneshot::channel();
self.wait_async(move |_clock, jitter, id| {
if sender.send((jitter, id.clone())).is_err() {
// Unschedule any future calls if the receiver end is disconnected
id.unschedule();
}
})?;
Ok(Box::pin(receiver.map_err(|_| ClockError::Unscheduled)))
}
}
#[derive(Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
pub struct PeriodicClockId(ClockId);
impl std::ops::Deref for PeriodicClockId {
type Target = ClockId;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl From<PeriodicClockId> for ClockId {
fn from(id: PeriodicClockId) -> ClockId {
skip_assert_initialized!();
id.0
}
}
impl convert::TryFrom<ClockId> for PeriodicClockId {
type Error = glib::BoolError;
fn try_from(id: ClockId) -> Result<PeriodicClockId, glib::BoolError> {
skip_assert_initialized!();
match id.get_type() {
ClockEntryType::Periodic => Ok(PeriodicClockId(id)),
_ => Err(glib_bool_error!("Not a periodic clock id")),
}
}
}
impl PeriodicClockId {
pub fn get_interval(&self) -> ClockTime {
unsafe {
let ptr: *mut gst_sys::GstClockEntry = self.to_glib_none().0 as *mut _;
from_glib((*ptr).interval)
}
}
pub fn compare_by_time(&self, other: &Self) -> cmp::Ordering {
self.0.compare_by_time(&other.0)
}
pub fn wait_async<F>(&self, func: F) -> Result<ClockSuccess, ClockError>
where
F: Fn(&Clock, ClockTime, &ClockId) + Send + 'static,
{
unsafe extern "C" fn trampoline<F: Fn(&Clock, ClockTime, &ClockId) + Send + 'static>(
clock: *mut gst_sys::GstClock,
time: gst_sys::GstClockTime,
id: gpointer,
func: gpointer,
) -> gboolean {
let f: &F = &*(func as *const F);
f(
&from_glib_borrow(clock),
from_glib(time),
&from_glib_borrow(id),
);
glib_sys::GTRUE
}
unsafe extern "C" fn destroy_notify<F: Fn(&Clock, ClockTime, &ClockId) + Send + 'static>(
ptr: gpointer,
) {
Box::<F>::from_raw(ptr as *mut _);
}
let func: Box<F> = Box::new(func);
let ret: ClockReturn = unsafe {
from_glib(gst_sys::gst_clock_id_wait_async(
self.to_glib_none().0,
Some(trampoline::<F>),
Box::into_raw(func) as gpointer,
Some(destroy_notify::<F>),
))
};
ret.into_result()
}
#[allow(clippy::type_complexity)]
pub fn wait_async_stream(
&self,
) -> Result<
Pin<Box<dyn Stream<Item = (ClockTime, ClockId)> + Unpin + Send + 'static>>,
ClockError,
> {
use futures_channel::mpsc;
let (sender, receiver) = mpsc::unbounded();
self.wait_async(move |_clock, jitter, id| {
if sender.unbounded_send((jitter, id.clone())).is_err() {
// Unschedule any future calls if the receiver end is disconnected
id.unschedule();
}
})?;
Ok(Box::pin(receiver))
}
}
#[repr(C)]
#[derive(Debug)]
pub struct AtomicClockReturn(AtomicI32);
@ -251,18 +385,22 @@ pub trait ClockExtManual: 'static {
&self,
start_time: ClockTime,
interval: ClockTime,
) -> Result<ClockId, glib::BoolError>;
) -> Result<PeriodicClockId, glib::BoolError>;
fn periodic_id_reinit(
&self,
id: &ClockId,
id: &PeriodicClockId,
start_time: ClockTime,
interval: ClockTime,
) -> Result<(), glib::BoolError>;
fn new_single_shot_id(&self, time: ClockTime) -> Result<ClockId, glib::BoolError>;
fn new_single_shot_id(&self, time: ClockTime) -> Result<SingleShotClockId, glib::BoolError>;
fn single_shot_id_reinit(&self, id: &ClockId, time: ClockTime) -> Result<(), glib::BoolError>;
fn single_shot_id_reinit(
&self,
id: &SingleShotClockId,
time: ClockTime,
) -> Result<(), glib::BoolError>;
fn set_clock_flags(&self, flags: ClockFlags);
@ -276,20 +414,21 @@ impl<O: IsA<Clock>> ClockExtManual for O {
&self,
start_time: ClockTime,
interval: ClockTime,
) -> Result<ClockId, glib::BoolError> {
) -> Result<PeriodicClockId, glib::BoolError> {
unsafe {
Option::<_>::from_glib_full(gst_sys::gst_clock_new_periodic_id(
self.as_ref().to_glib_none().0,
start_time.to_glib(),
interval.to_glib(),
))
.map(PeriodicClockId)
.ok_or_else(|| glib_bool_error!("Failed to create new periodic clock id"))
}
}
fn periodic_id_reinit(
&self,
id: &ClockId,
id: &PeriodicClockId,
start_time: ClockTime,
interval: ClockTime,
) -> Result<(), glib::BoolError> {
@ -309,17 +448,22 @@ impl<O: IsA<Clock>> ClockExtManual for O {
}
}
fn new_single_shot_id(&self, time: ClockTime) -> Result<ClockId, glib::BoolError> {
fn new_single_shot_id(&self, time: ClockTime) -> Result<SingleShotClockId, glib::BoolError> {
unsafe {
Option::<_>::from_glib_full(gst_sys::gst_clock_new_single_shot_id(
self.as_ref().to_glib_none().0,
time.to_glib(),
))
.map(SingleShotClockId)
.ok_or_else(|| glib_bool_error!("Failed to create new single shot clock id"))
}
}
fn single_shot_id_reinit(&self, id: &ClockId, time: ClockTime) -> Result<(), glib::BoolError> {
fn single_shot_id_reinit(
&self,
id: &SingleShotClockId,
time: ClockTime,
) -> Result<(), glib::BoolError> {
unsafe {
let res: bool = from_glib(gst_sys::gst_clock_single_shot_id_reinit(
self.as_ref().to_glib_none().0,
@ -394,4 +538,42 @@ mod tests {
assert_eq!(receiver.recv(), Ok(()));
}
#[test]
fn test_wait_periodic() {
::init().unwrap();
let clock = SystemClock::obtain();
let now = clock.get_time();
let id = clock
.new_periodic_id(now + 20 * ::MSECOND, 20 * ::MSECOND)
.unwrap();
let (res, _) = id.wait();
assert!(res == Ok(ClockSuccess::Ok) || res == Err(ClockError::Early));
let (res, _) = id.wait();
assert!(res == Ok(ClockSuccess::Ok) || res == Err(ClockError::Early));
}
#[test]
fn test_wait_async_periodic() {
::init().unwrap();
let (sender, receiver) = channel();
let clock = SystemClock::obtain();
let now = clock.get_time();
let id = clock
.new_periodic_id(now + 20 * ::MSECOND, 20 * ::MSECOND)
.unwrap();
let res = id.wait_async(move |_, _, _| {
let _ = sender.send(());
});
assert!(res == Ok(ClockSuccess::Ok));
assert_eq!(receiver.recv(), Ok(()));
assert_eq!(receiver.recv(), Ok(()));
}
}

View file

@ -273,7 +273,7 @@ pub use toc::{Toc, TocEntry, TocEntryRef, TocRef};
mod toc_serde;
mod clock;
pub use clock::{AtomicClockReturn, ClockExtManual, ClockId};
pub use clock::{AtomicClockReturn, ClockExtManual, ClockId, PeriodicClockId, SingleShotClockId};
mod buffer_pool;
pub use buffer_pool::*;