gstreamer: Fix PromiseFuture implementation

We can't return a plain reference to something stored inside the future
as that would go out of scope after `await`. Instead return a struct
that wraps the `gst::Promise`, derefs to a structure and keeps the
promise alive as long as needed.
This commit is contained in:
Sebastian Dröge 2021-04-23 14:49:59 +03:00 committed by Sebastian Dröge
parent 15295f299f
commit 9d3888d294

View file

@ -5,10 +5,9 @@ use crate::Structure;
use crate::StructureRef; use crate::StructureRef;
use glib::translate::*; use glib::translate::*;
use std::marker::PhantomData;
use std::pin::Pin;
use std::ptr; use std::ptr;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::{ops::Deref, pin::Pin};
glib::wrapper! { glib::wrapper! {
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
@ -82,7 +81,7 @@ impl Promise {
} }
} }
pub fn new_future<'a>() -> (Self, PromiseFuture<'a>) { pub fn new_future() -> (Self, PromiseFuture) {
use futures_channel::oneshot; use futures_channel::oneshot;
// We only use the channel as a convenient waker // We only use the channel as a convenient waker
@ -91,10 +90,7 @@ impl Promise {
let _ = sender.send(()); let _ = sender.send(());
}); });
( (promise.clone(), PromiseFuture(promise, receiver))
promise.clone(),
PromiseFuture(promise, receiver, PhantomData),
)
} }
pub fn expire(&self) { pub fn expire(&self) {
@ -144,28 +140,26 @@ unsafe impl Send for Promise {}
unsafe impl Sync for Promise {} unsafe impl Sync for Promise {}
#[derive(Debug)] #[derive(Debug)]
pub struct PromiseFuture<'a>( pub struct PromiseFuture(Promise, futures_channel::oneshot::Receiver<()>);
Promise,
futures_channel::oneshot::Receiver<()>,
PhantomData<&'a StructureRef>,
);
impl<'a> std::future::Future for PromiseFuture<'a> { #[derive(Debug)]
type Output = Result<Option<&'a StructureRef>, PromiseError>; pub struct PromiseReply(Promise);
impl std::future::Future for PromiseFuture {
type Output = Result<Option<PromiseReply>, PromiseError>;
fn poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Self::Output> {
match Pin::new(&mut self.1).poll(context) { match Pin::new(&mut self.1).poll(context) {
Poll::Ready(Err(_)) => panic!("Sender dropped before callback was called"), Poll::Ready(Err(_)) => panic!("Sender dropped before callback was called"),
Poll::Ready(Ok(())) => { Poll::Ready(Ok(())) => {
let res = match self.0.wait() { let res = match self.0.wait() {
PromiseResult::Replied => unsafe { PromiseResult::Replied => {
let s = ffi::gst_promise_get_reply(self.0.to_glib_none().0); if self.0.get_reply().is_none() {
if s.is_null() {
Ok(None) Ok(None)
} else { } else {
Ok(Some(StructureRef::from_glib_borrow(s))) Ok(Some(PromiseReply(self.0.clone())))
} }
}, }
PromiseResult::Interrupted => Err(PromiseError::Interrupted), PromiseResult::Interrupted => Err(PromiseError::Interrupted),
PromiseResult::Expired => Err(PromiseError::Expired), PromiseResult::Expired => Err(PromiseError::Expired),
PromiseResult::Pending => { PromiseResult::Pending => {
@ -180,6 +174,14 @@ impl<'a> std::future::Future for PromiseFuture<'a> {
} }
} }
impl Deref for PromiseReply {
type Target = StructureRef;
fn deref(&self) -> &StructureRef {
self.0.get_reply().expect("Promise without reply")
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;