diff --git a/gstreamer/src/promise.rs b/gstreamer/src/promise.rs index 75e67cb31..96cb3dce8 100644 --- a/gstreamer/src/promise.rs +++ b/gstreamer/src/promise.rs @@ -13,6 +13,7 @@ use PromiseResult; use Structure; use StructureRef; +use std::marker::PhantomData; use std::pin::Pin; use std::ptr; use std::task::{Context, Poll}; @@ -89,16 +90,19 @@ impl Promise { } } - pub fn new_future() -> (Self, PromiseFuture) { + pub fn new_future<'a>() -> (Self, PromiseFuture<'a>) { use futures_channel::oneshot; + // We only use the channel as a convenient waker let (sender, receiver) = oneshot::channel(); - - let promise = Self::new_with_change_func(move |res| { - let _ = sender.send(res.map(|s| s.map(ToOwned::to_owned))); + let promise = Self::new_with_change_func(move |_res| { + let _ = sender.send(()); }); - (promise, PromiseFuture(receiver)) + ( + promise.clone(), + PromiseFuture(promise, receiver, PhantomData), + ) } pub fn expire(&self) { @@ -148,17 +152,37 @@ unsafe impl Send for Promise {} unsafe impl Sync for Promise {} #[derive(Debug)] -pub struct PromiseFuture( - futures_channel::oneshot::Receiver, PromiseError>>, +pub struct PromiseFuture<'a>( + Promise, + futures_channel::oneshot::Receiver<()>, + PhantomData<&'a StructureRef>, ); -impl std::future::Future for PromiseFuture { - type Output = Result, PromiseError>; +impl<'a> std::future::Future for PromiseFuture<'a> { + type Output = Result, PromiseError>; fn poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll { - match Pin::new(&mut self.0).poll(context) { + match Pin::new(&mut self.1).poll(context) { Poll::Ready(Err(_)) => panic!("Sender dropped before callback was called"), - Poll::Ready(Ok(res)) => Poll::Ready(res), + Poll::Ready(Ok(())) => { + let res = match self.0.wait() { + PromiseResult::Replied => unsafe { + let s = gst_sys::gst_promise_get_reply(self.0.to_glib_none().0); + if s.is_null() { + Ok(None) + } else { + Ok(Some(StructureRef::from_glib_borrow(s))) + } + }, + PromiseResult::Interrupted => Err(PromiseError::Interrupted), + PromiseResult::Expired => Err(PromiseError::Expired), + PromiseResult::Pending => { + panic!("Promise resolved but returned Pending"); + } + err => Err(PromiseError::Other(err)), + }; + Poll::Ready(res) + } Poll::Pending => Poll::Pending, } }