gstreamer-rs/gstreamer/src/bus.rs

284 lines
7.6 KiB
Rust
Raw Normal View History

2017-07-12 07:28:42 +00:00
// Copyright (C) 2017 Sebastian Dröge <sebastian@centricular.com>
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use ffi;
use glib;
use glib::source::{Continue, Priority, SourceId};
2018-04-01 08:30:03 +00:00
use glib::translate::*;
2017-07-12 07:28:42 +00:00
use glib_ffi;
use glib_ffi::{gboolean, gpointer};
2018-04-01 08:30:03 +00:00
use std::cell::RefCell;
use std::mem::transmute;
use std::ptr;
2018-03-15 08:39:12 +00:00
2017-07-12 07:28:42 +00:00
use Bus;
use BusSyncReply;
use Message;
unsafe extern "C" fn trampoline_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>(
2017-07-12 07:28:42 +00:00
bus: *mut ffi::GstBus,
msg: *mut ffi::GstMessage,
func: gpointer,
) -> gboolean {
let func: &RefCell<F> = &*(func as *const RefCell<F>);
2017-09-09 13:01:32 +00:00
(&mut *func.borrow_mut())(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).to_glib()
2017-07-12 07:28:42 +00:00
}
unsafe extern "C" fn destroy_closure_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>(
ptr: gpointer,
) {
Box::<RefCell<F>>::from_raw(ptr as *mut _);
2017-07-12 07:28:42 +00:00
}
fn into_raw_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>(func: F) -> gpointer {
#[cfg_attr(feature = "cargo-clippy", allow(type_complexity))]
let func: Box<RefCell<F>> = Box::new(RefCell::new(func));
2017-07-12 07:28:42 +00:00
Box::into_raw(func) as gpointer
}
unsafe extern "C" fn trampoline_sync<
F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
>(
2017-07-12 07:28:42 +00:00
bus: *mut ffi::GstBus,
msg: *mut ffi::GstMessage,
func: gpointer,
) -> ffi::GstBusSyncReply {
let f: &F = &*(func as *const F);
let res = f(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).to_glib();
if res == ffi::GST_BUS_DROP {
ffi::gst_mini_object_unref(msg as *mut _);
}
res
2017-07-12 07:28:42 +00:00
}
unsafe extern "C" fn destroy_closure_sync<
F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
>(
ptr: gpointer,
) {
Box::<F>::from_raw(ptr as *mut _);
2017-07-12 07:28:42 +00:00
}
fn into_raw_sync<F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static>(
func: F,
) -> gpointer {
let func: Box<F> = Box::new(func);
2017-07-12 07:28:42 +00:00
Box::into_raw(func) as gpointer
}
impl Bus {
pub fn add_signal_watch_full(&self, priority: Priority) {
unsafe {
ffi::gst_bus_add_signal_watch_full(self.to_glib_none().0, priority.to_glib());
}
}
2017-07-12 07:28:42 +00:00
pub fn create_watch<'a, N: Into<Option<&'a str>>, F>(
&self,
name: N,
priority: Priority,
func: F,
) -> glib::Source
2017-07-12 07:28:42 +00:00
where
F: FnMut(&Bus, &Message) -> Continue + Send + 'static,
{
2017-08-30 11:39:09 +00:00
skip_assert_initialized!();
2017-07-12 07:28:42 +00:00
unsafe {
let source = ffi::gst_bus_create_watch(self.to_glib_none().0);
glib_ffi::g_source_set_callback(
source,
Some(transmute(trampoline_watch::<F> as usize)),
2017-07-12 07:28:42 +00:00
into_raw_watch(func),
Some(destroy_closure_watch::<F>),
2017-07-12 07:28:42 +00:00
);
glib_ffi::g_source_set_priority(source, priority.to_glib());
let name = name.into();
if let Some(name) = name {
glib_ffi::g_source_set_name(source, name.to_glib_none().0);
}
from_glib_full(source)
}
}
pub fn add_watch<F>(&self, func: F) -> Option<SourceId>
2017-07-12 07:28:42 +00:00
where
F: FnMut(&Bus, &Message) -> Continue + Send + 'static,
{
unsafe {
let res = ffi::gst_bus_add_watch_full(
2017-07-12 07:28:42 +00:00
self.to_glib_none().0,
glib_ffi::G_PRIORITY_DEFAULT,
Some(trampoline_watch::<F>),
2017-07-12 07:28:42 +00:00
into_raw_watch(func),
Some(destroy_closure_watch::<F>),
);
if res == 0 {
None
} else {
Some(from_glib(res))
}
2017-07-12 07:28:42 +00:00
}
}
pub fn add_watch_local<F>(&self, func: F) -> Option<SourceId>
where
F: FnMut(&Bus, &Message) -> Continue + 'static,
{
unsafe {
assert!(glib::MainContext::ref_thread_default().is_owner());
let res = ffi::gst_bus_add_watch_full(
self.to_glib_none().0,
glib_ffi::G_PRIORITY_DEFAULT,
Some(trampoline_watch::<F>),
into_raw_watch(func),
Some(destroy_closure_watch::<F>),
);
if res == 0 {
None
} else {
Some(from_glib(res))
}
}
}
2017-07-12 07:28:42 +00:00
pub fn set_sync_handler<F>(&self, func: F)
where
F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
2017-07-12 07:28:42 +00:00
{
unsafe {
ffi::gst_bus_set_sync_handler(
self.to_glib_none().0,
Some(trampoline_sync::<F>),
2017-07-12 07:28:42 +00:00
into_raw_sync(func),
Some(destroy_closure_sync::<F>),
2017-07-12 07:28:42 +00:00
)
}
}
pub fn unset_sync_handler(&self) {
unsafe { ffi::gst_bus_set_sync_handler(self.to_glib_none().0, None, ptr::null_mut(), None) }
}
pub fn iter(&self) -> Iter {
self.iter_timed(0.into())
}
pub fn iter_timed(&self, timeout: ::ClockTime) -> Iter {
Iter { bus: self, timeout }
}
}
#[derive(Debug)]
pub struct Iter<'a> {
bus: &'a Bus,
timeout: ::ClockTime,
}
impl<'a> Iterator for Iter<'a> {
type Item = Message;
fn next(&mut self) -> Option<Message> {
self.bus.timed_pop(self.timeout)
}
2017-07-12 07:28:42 +00:00
}
#[cfg(any(feature = "futures", feature = "dox"))]
mod futures {
2018-04-01 08:30:03 +00:00
use super::*;
2018-04-23 14:55:31 +00:00
use futures_core::stream::Stream;
use futures_core::task::{Context, Waker};
use futures_core::{Async, Poll};
2018-04-01 08:30:03 +00:00
use std::sync::{Arc, Mutex};
#[derive(Debug)]
2018-04-23 14:55:31 +00:00
pub struct BusStream(Bus, Arc<Mutex<Option<Waker>>>);
impl BusStream {
pub fn new(bus: &Bus) -> Self {
skip_assert_initialized!();
2018-04-23 14:55:31 +00:00
let waker = Arc::new(Mutex::new(None));
let waker_clone = Arc::clone(&waker);
bus.set_sync_handler(move |_, _| {
2018-04-23 14:55:31 +00:00
let mut waker = waker_clone.lock().unwrap();
if let Some(waker) = waker.take() {
// FIXME: Force type...
2018-04-23 14:55:31 +00:00
let waker: Waker = waker;
waker.wake();
}
BusSyncReply::Pass
});
2018-04-23 14:55:31 +00:00
BusStream(bus.clone(), waker)
}
}
impl Drop for BusStream {
fn drop(&mut self) {
self.0.unset_sync_handler();
}
}
impl Stream for BusStream {
type Item = Message;
type Error = ();
2018-04-23 14:55:31 +00:00
fn poll_next(&mut self, ctx: &mut Context) -> Poll<Option<Self::Item>, Self::Error> {
let BusStream(ref bus, ref waker) = *self;
2018-04-23 14:55:31 +00:00
let msg = bus.pop();
if let Some(msg) = msg {
Ok(Async::Ready(Some(msg)))
} else {
2018-04-23 14:55:31 +00:00
let mut waker = waker.lock().unwrap();
*waker = Some(ctx.waker().clone());
Ok(Async::Pending)
}
}
}
}
#[cfg(any(feature = "futures", feature = "dox"))]
pub use bus::futures::BusStream;
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Mutex};
#[test]
fn test_sync_handler() {
::init().unwrap();
let bus = Bus::new();
let msgs = Arc::new(Mutex::new(Vec::new()));
let msgs_clone = msgs.clone();
bus.set_sync_handler(move |_, msg| {
msgs_clone.lock().unwrap().push(msg.clone());
BusSyncReply::Pass
});
bus.post(&::Message::new_eos().build()).unwrap();
let msgs = msgs.lock().unwrap();
assert_eq!(msgs.len(), 1);
match msgs[0].view() {
::MessageView::Eos(_) => (),
_ => unreachable!(),
}
}
}