// Copyright (C) 2017 Sebastian Dröge // // Licensed under the Apache License, Version 2.0 or the MIT license // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. use futures_core::stream::Stream; use futures_core::task::{Context, Poll, Waker}; use glib; use glib::source::{Continue, Priority, SourceId}; use glib::translate::*; use glib_sys; use glib_sys::{gboolean, gpointer}; use gst_sys; use std::cell::RefCell; use std::mem::transmute; use std::pin::Pin; use std::ptr; use std::sync::{Arc, Mutex}; use Bus; use BusSyncReply; use Message; use MessageType; unsafe extern "C" fn trampoline_watch Continue + 'static>( bus: *mut gst_sys::GstBus, msg: *mut gst_sys::GstMessage, func: gpointer, ) -> gboolean { let func: &RefCell = &*(func as *const RefCell); (&mut *func.borrow_mut())(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).to_glib() } unsafe extern "C" fn destroy_closure_watch Continue + 'static>( ptr: gpointer, ) { Box::>::from_raw(ptr as *mut _); } fn into_raw_watch Continue + 'static>(func: F) -> gpointer { #[allow(clippy::type_complexity)] let func: Box> = Box::new(RefCell::new(func)); Box::into_raw(func) as gpointer } unsafe extern "C" fn trampoline_sync< F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static, >( bus: *mut gst_sys::GstBus, msg: *mut gst_sys::GstMessage, func: gpointer, ) -> gst_sys::GstBusSyncReply { let f: &F = &*(func as *const F); let res = f(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).to_glib(); if res == gst_sys::GST_BUS_DROP { gst_sys::gst_mini_object_unref(msg as *mut _); } res } unsafe extern "C" fn destroy_closure_sync< F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static, >( ptr: gpointer, ) { Box::::from_raw(ptr as *mut _); } fn into_raw_sync BusSyncReply + Send + Sync + 'static>( func: F, ) -> gpointer { let func: Box = Box::new(func); Box::into_raw(func) as gpointer } impl Bus { pub fn add_signal_watch_full(&self, priority: Priority) { unsafe { gst_sys::gst_bus_add_signal_watch_full(self.to_glib_none().0, priority.to_glib()); } } pub fn create_watch(&self, name: Option<&str>, priority: Priority, func: F) -> glib::Source where F: FnMut(&Bus, &Message) -> Continue + Send + 'static, { skip_assert_initialized!(); unsafe { let source = gst_sys::gst_bus_create_watch(self.to_glib_none().0); glib_sys::g_source_set_callback( source, Some(transmute(trampoline_watch:: as usize)), into_raw_watch(func), Some(destroy_closure_watch::), ); glib_sys::g_source_set_priority(source, priority.to_glib()); if let Some(name) = name { glib_sys::g_source_set_name(source, name.to_glib_none().0); } from_glib_full(source) } } pub fn add_watch(&self, func: F) -> Result where F: FnMut(&Bus, &Message) -> Continue + Send + 'static, { unsafe { let res = gst_sys::gst_bus_add_watch_full( self.to_glib_none().0, glib_sys::G_PRIORITY_DEFAULT, Some(trampoline_watch::), into_raw_watch(func), Some(destroy_closure_watch::), ); if res == 0 { Err(glib_bool_error!("Bus already has a watch")) } else { Ok(from_glib(res)) } } } pub fn add_watch_local(&self, func: F) -> Result where F: FnMut(&Bus, &Message) -> Continue + 'static, { unsafe { assert!(glib::MainContext::ref_thread_default().is_owner()); let res = gst_sys::gst_bus_add_watch_full( self.to_glib_none().0, glib_sys::G_PRIORITY_DEFAULT, Some(trampoline_watch::), into_raw_watch(func), Some(destroy_closure_watch::), ); if res == 0 { Err(glib_bool_error!("Bus already has a watch")) } else { Ok(from_glib(res)) } } } pub fn set_sync_handler(&self, func: F) where F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static, { unsafe { gst_sys::gst_bus_set_sync_handler( self.to_glib_none().0, Some(trampoline_sync::), into_raw_sync(func), Some(destroy_closure_sync::), ) } } pub fn unset_sync_handler(&self) { unsafe { gst_sys::gst_bus_set_sync_handler(self.to_glib_none().0, None, ptr::null_mut(), None) } } pub fn iter(&self) -> Iter { self.iter_timed(0.into()) } pub fn iter_timed(&self, timeout: ::ClockTime) -> Iter { Iter { bus: self, timeout } } pub fn iter_filtered<'a>( &'a self, msg_types: &'a [MessageType], ) -> impl Iterator + 'a { self.iter_timed_filtered(0.into(), msg_types) } pub fn iter_timed_filtered<'a>( &'a self, timeout: ::ClockTime, msg_types: &'a [MessageType], ) -> impl Iterator + 'a { self.iter_timed(timeout) .filter(move |msg| msg_types.contains(&msg.get_type())) } pub fn timed_pop_filtered( &self, timeout: ::ClockTime, msg_types: &[MessageType], ) -> Option { loop { let msg = self.timed_pop(timeout)?; if msg_types.contains(&msg.get_type()) { return Some(msg); } } } pub fn pop_filtered(&self, msg_types: &[MessageType]) -> Option { loop { let msg = self.pop()?; if msg_types.contains(&msg.get_type()) { return Some(msg); } } } pub fn stream(&self) -> impl Stream + Unpin + Send + 'static { BusStream::new(self) } pub fn stream_filtered<'a>( &self, msg_types: &'a [MessageType], ) -> impl Stream + Unpin + Send + 'a { use futures_util::future; use futures_util::StreamExt; BusStream::new(self).filter(move |msg| { let type_ = msg.get_type(); future::ready(msg_types.contains(&type_)) }) } } #[derive(Debug)] pub struct Iter<'a> { bus: &'a Bus, timeout: ::ClockTime, } impl<'a> Iterator for Iter<'a> { type Item = Message; fn next(&mut self) -> Option { self.bus.timed_pop(self.timeout) } } #[derive(Debug)] struct BusStream(Bus, Arc>>); impl BusStream { fn new(bus: &Bus) -> Self { skip_assert_initialized!(); let waker = Arc::new(Mutex::new(None::)); let waker_clone = Arc::clone(&waker); bus.set_sync_handler(move |_, _| { let mut waker = waker_clone.lock().unwrap(); if let Some(waker) = waker.take() { waker.wake(); } BusSyncReply::Pass }); BusStream(bus.clone(), waker) } } impl Drop for BusStream { fn drop(&mut self) { self.0.unset_sync_handler(); } } impl Stream for BusStream { type Item = Message; fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll> { let BusStream(ref bus, ref waker) = *self; let mut waker = waker.lock().unwrap(); let msg = bus.pop(); if let Some(msg) = msg { Poll::Ready(Some(msg)) } else { *waker = Some(ctx.waker().clone()); Poll::Pending } } } #[cfg(test)] mod tests { use super::*; use std::sync::{Arc, Mutex}; #[test] fn test_sync_handler() { ::init().unwrap(); let bus = Bus::new(); let msgs = Arc::new(Mutex::new(Vec::new())); let msgs_clone = msgs.clone(); bus.set_sync_handler(move |_, msg| { msgs_clone.lock().unwrap().push(msg.clone()); BusSyncReply::Pass }); bus.post(&::Message::new_eos().build()).unwrap(); let msgs = msgs.lock().unwrap(); assert_eq!(msgs.len(), 1); match msgs[0].view() { ::MessageView::Eos(_) => (), _ => unreachable!(), } } }