gstreamer: BusStream cleanup

A few small readability changes
This commit is contained in:
Valmir Pretto 2020-01-29 09:40:17 -03:00
parent 53dd841006
commit 7663589d94
6 changed files with 186 additions and 22 deletions

View file

@ -13,6 +13,8 @@ keywords = ["gstreamer", "multimedia", "audio", "video", "gnome"]
build = "build.rs" build = "build.rs"
[dependencies] [dependencies]
futures-core = "0.3"
futures-sink = "0.3"
bitflags = "1.0" bitflags = "1.0"
libc = "0.2" libc = "0.2"
glib-sys = { git = "https://github.com/gtk-rs/sys" } glib-sys = { git = "https://github.com/gtk-rs/sys" }

View file

@ -20,6 +20,16 @@ use std::mem::transmute;
use std::ptr; use std::ptr;
use AppSink; use AppSink;
#[cfg(any(feature = "v1_10"))]
use {
futures_core::Stream,
std::{
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll, Waker},
},
};
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
pub struct AppSinkCallbacks { pub struct AppSinkCallbacks {
eos: Option<RefCell<Box<dyn FnMut(&AppSink) + Send + 'static>>>, eos: Option<RefCell<Box<dyn FnMut(&AppSink) + Send + 'static>>>,
@ -217,6 +227,11 @@ impl AppSink {
) )
} }
} }
#[cfg(any(feature = "v1_10"))]
pub fn stream(&self) -> AppSinkStream {
AppSinkStream::new(self)
}
} }
unsafe extern "C" fn new_sample_trampoline< unsafe extern "C" fn new_sample_trampoline<
@ -240,3 +255,85 @@ unsafe extern "C" fn new_preroll_trampoline<
let ret: gst::FlowReturn = f(&from_glib_borrow(this)).into(); let ret: gst::FlowReturn = f(&from_glib_borrow(this)).into();
ret.to_glib() ret.to_glib()
} }
#[cfg(any(feature = "v1_10"))]
#[derive(Debug)]
pub struct AppSinkStream {
app_sink: AppSink,
waker_reference: Arc<Mutex<Option<Waker>>>,
}
#[cfg(any(feature = "v1_10"))]
impl AppSinkStream {
fn new(app_sink: &AppSink) -> Self {
skip_assert_initialized!();
let app_sink = app_sink.clone();
let waker_reference = Arc::new(Mutex::new(None as Option<Waker>));
app_sink.set_callbacks(
AppSinkCallbacks::new()
.new_sample({
let waker_reference = Arc::clone(&waker_reference);
move |_| {
if let Some(waker) = waker_reference.lock().unwrap().take() {
waker.wake();
}
Ok(gst::FlowSuccess::Ok)
}
})
.eos({
let waker_reference = Arc::clone(&waker_reference);
move |_| {
if let Some(waker) = waker_reference.lock().unwrap().take() {
waker.wake();
}
}
})
.build(),
);
Self {
app_sink,
waker_reference,
}
}
}
#[cfg(any(feature = "v1_10"))]
impl Stream for AppSinkStream {
type Item = gst::Sample;
fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
self.app_sink
.try_pull_sample(gst::ClockTime::from_mseconds(0))
.map(|sample| Poll::Ready(Some(sample)))
.unwrap_or_else(|| {
if self.app_sink.is_eos() {
return Poll::Ready(None);
}
self.waker_reference
.lock()
.unwrap()
.replace(context.waker().to_owned());
Poll::Pending
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_app_sink_stream() {
gst::init().unwrap();
unimplemented!()
}
}

View file

@ -6,13 +6,16 @@
// option. This file may not be copied, modified, or distributed // option. This file may not be copied, modified, or distributed
// except according to those terms. // except according to those terms.
use futures_sink::Sink;
use glib::translate::*; use glib::translate::*;
use glib_sys::{gboolean, gpointer}; use glib_sys::{gboolean, gpointer};
use gst; use gst;
use gst_app_sys; use gst_app_sys;
use std::cell::RefCell; use std::cell::RefCell;
use std::mem; use std::mem;
use std::pin::Pin;
use std::ptr; use std::ptr;
use std::task::{Context, Poll};
use AppSrc; use AppSrc;
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
@ -227,4 +230,59 @@ impl AppSrc {
(from_glib(min.assume_init()), from_glib(max.assume_init())) (from_glib(min.assume_init()), from_glib(max.assume_init()))
} }
} }
pub fn sink(&self) -> AppSrcSink {
AppSrcSink::new(self)
}
}
#[derive(Debug)]
pub struct AppSrcSink {
app_src: AppSrc,
}
impl AppSrcSink {
fn new(app_src: &AppSrc) -> Self {
skip_assert_initialized!();
let app_src = app_src.clone();
Self { app_src }
}
}
impl Sink<gst::Sample> for AppSrcSink {
type Error = gst::FlowError;
fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, sample: gst::Sample) -> Result<(), Self::Error> {
self.app_src.push_sample(&sample)?;
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
self.app_src.end_of_stream()?;
Poll::Ready(Ok(()))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_app_src_sink() {
gst::init().unwrap();
unimplemented!()
}
} }

View file

@ -8,6 +8,8 @@
extern crate libc; extern crate libc;
extern crate futures_core;
extern crate futures_sink;
extern crate glib_sys; extern crate glib_sys;
extern crate gobject_sys; extern crate gobject_sys;
extern crate gstreamer as gst; extern crate gstreamer as gst;
@ -28,10 +30,11 @@ macro_rules! skip_assert_initialized {
mod auto; mod auto;
pub use auto::*; pub use auto::*;
mod app_sink; pub mod app_sink;
mod app_src; pub use app_sink::AppSinkCallbacks;
pub use app_sink::*;
pub use app_src::*; pub mod app_src;
pub use app_src::AppSrcCallbacks;
// Re-export all the traits in a prelude module, so that applications // Re-export all the traits in a prelude module, so that applications
// can always "use gst::prelude::*" without getting conflicts // can always "use gst::prelude::*" without getting conflicts

View file

@ -6,9 +6,9 @@
// option. This file may not be copied, modified, or distributed // option. This file may not be copied, modified, or distributed
// except according to those terms. // except according to those terms.
use futures_channel::mpsc; use futures_channel::mpsc::{self, UnboundedReceiver};
use futures_core::stream::Stream; use futures_core::Stream;
use futures_core::task::{Context, Poll}; use futures_util::{future, StreamExt};
use glib; use glib;
use glib::source::{Continue, Priority, SourceId}; use glib::source::{Continue, Priority, SourceId};
use glib::translate::*; use glib::translate::*;
@ -19,6 +19,7 @@ use std::cell::RefCell;
use std::mem::transmute; use std::mem::transmute;
use std::pin::Pin; use std::pin::Pin;
use std::ptr; use std::ptr;
use std::task::{Context, Poll};
use Bus; use Bus;
use BusSyncReply; use BusSyncReply;
@ -218,20 +219,18 @@ impl Bus {
} }
} }
pub fn stream(&self) -> impl Stream<Item = Message> + Unpin + Send + 'static { pub fn stream(&self) -> BusStream {
BusStream::new(self) BusStream::new(self)
} }
pub fn stream_filtered<'a>( pub fn stream_filtered<'a>(
&self, &self,
msg_types: &'a [MessageType], message_types: &'a [MessageType],
) -> impl Stream<Item = Message> + Unpin + Send + 'a { ) -> impl Stream<Item = Message> + Unpin + Send + 'a {
use futures_util::future; self.stream().filter(move |message| {
use futures_util::StreamExt; let message_type = message.get_type();
BusStream::new(self).filter(move |msg| { future::ready(message_types.contains(&message_type))
let type_ = msg.get_type();
future::ready(msg_types.contains(&type_))
}) })
} }
} }
@ -251,34 +250,39 @@ impl<'a> Iterator for Iter<'a> {
} }
#[derive(Debug)] #[derive(Debug)]
struct BusStream(Bus, mpsc::UnboundedReceiver<Message>); pub struct BusStream {
bus: Bus,
receiver: UnboundedReceiver<Message>,
}
impl BusStream { impl BusStream {
fn new(bus: &Bus) -> Self { fn new(bus: &Bus) -> Self {
skip_assert_initialized!(); skip_assert_initialized!();
let bus = bus.clone();
let (sender, receiver) = mpsc::unbounded(); let (sender, receiver) = mpsc::unbounded();
bus.set_sync_handler(move |_, msg| { bus.set_sync_handler(move |_, message| {
let _ = sender.unbounded_send(msg.to_owned()); let _ = sender.unbounded_send(message.to_owned());
BusSyncReply::Drop BusSyncReply::Drop
}); });
BusStream(bus.clone(), receiver) Self { bus, receiver }
} }
} }
impl Drop for BusStream { impl Drop for BusStream {
fn drop(&mut self) { fn drop(&mut self) {
self.0.unset_sync_handler(); self.bus.unset_sync_handler();
} }
} }
impl Stream for BusStream { impl Stream for BusStream {
type Item = Message; type Item = Message;
fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.1).poll_next(ctx) self.receiver.poll_next_unpin(context)
} }
} }

View file

@ -156,7 +156,7 @@ mod promise;
#[cfg(any(feature = "v1_14", feature = "dox"))] #[cfg(any(feature = "v1_14", feature = "dox"))]
pub use promise::*; pub use promise::*;
mod bus; pub mod bus;
mod element; mod element;
mod bin; mod bin;