From 9d3888d294973065bdf1d4864bf70477571814a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Fri, 23 Apr 2021 14:49:59 +0300 Subject: [PATCH] gstreamer: Fix PromiseFuture implementation We can't return a plain reference to something stored inside the future as that would go out of scope after `await`. Instead return a struct that wraps the `gst::Promise`, derefs to a structure and keeps the promise alive as long as needed. --- gstreamer/src/promise.rs | 40 +++++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/gstreamer/src/promise.rs b/gstreamer/src/promise.rs index b0c880ad7..4eaf2a093 100644 --- a/gstreamer/src/promise.rs +++ b/gstreamer/src/promise.rs @@ -5,10 +5,9 @@ use crate::Structure; use crate::StructureRef; use glib::translate::*; -use std::marker::PhantomData; -use std::pin::Pin; use std::ptr; use std::task::{Context, Poll}; +use std::{ops::Deref, pin::Pin}; glib::wrapper! { #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] @@ -82,7 +81,7 @@ impl Promise { } } - pub fn new_future<'a>() -> (Self, PromiseFuture<'a>) { + pub fn new_future() -> (Self, PromiseFuture) { use futures_channel::oneshot; // We only use the channel as a convenient waker @@ -91,10 +90,7 @@ impl Promise { let _ = sender.send(()); }); - ( - promise.clone(), - PromiseFuture(promise, receiver, PhantomData), - ) + (promise.clone(), PromiseFuture(promise, receiver)) } pub fn expire(&self) { @@ -144,28 +140,26 @@ unsafe impl Send for Promise {} unsafe impl Sync for Promise {} #[derive(Debug)] -pub struct PromiseFuture<'a>( - Promise, - futures_channel::oneshot::Receiver<()>, - PhantomData<&'a StructureRef>, -); +pub struct PromiseFuture(Promise, futures_channel::oneshot::Receiver<()>); -impl<'a> std::future::Future for PromiseFuture<'a> { - type Output = Result, PromiseError>; +#[derive(Debug)] +pub struct PromiseReply(Promise); + +impl std::future::Future for PromiseFuture { + type Output = Result, PromiseError>; fn poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll { match Pin::new(&mut self.1).poll(context) { Poll::Ready(Err(_)) => panic!("Sender dropped before callback was called"), Poll::Ready(Ok(())) => { let res = match self.0.wait() { - PromiseResult::Replied => unsafe { - let s = ffi::gst_promise_get_reply(self.0.to_glib_none().0); - if s.is_null() { + PromiseResult::Replied => { + if self.0.get_reply().is_none() { Ok(None) } else { - Ok(Some(StructureRef::from_glib_borrow(s))) + Ok(Some(PromiseReply(self.0.clone()))) } - }, + } PromiseResult::Interrupted => Err(PromiseError::Interrupted), PromiseResult::Expired => Err(PromiseError::Expired), PromiseResult::Pending => { @@ -180,6 +174,14 @@ impl<'a> std::future::Future for PromiseFuture<'a> { } } +impl Deref for PromiseReply { + type Target = StructureRef; + + fn deref(&self) -> &StructureRef { + self.0.get_reply().expect("Promise without reply") + } +} + #[cfg(test)] mod tests { use super::*;