// Take a look at the license at the top of the repository in the LICENSE file. use futures_channel::mpsc::{self, UnboundedReceiver}; use futures_core::Stream; use futures_util::StreamExt; use glib::ffi::{gboolean, gpointer}; use glib::prelude::*; use glib::source::{Continue, Priority, SourceId}; use glib::translate::*; use std::cell::RefCell; use std::future; use std::mem::transmute; use std::pin::Pin; use std::task::{Context, Poll}; use crate::Bus; use crate::BusSyncReply; use crate::Message; use crate::MessageType; unsafe extern "C" fn trampoline_watch Continue + 'static>( bus: *mut ffi::GstBus, msg: *mut ffi::GstMessage, func: gpointer, ) -> gboolean { let func: &RefCell = &*(func as *const RefCell); (&mut *func.borrow_mut())(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib() } unsafe extern "C" fn destroy_closure_watch Continue + 'static>( ptr: gpointer, ) { Box::>::from_raw(ptr as *mut _); } fn into_raw_watch Continue + 'static>(func: F) -> gpointer { #[allow(clippy::type_complexity)] let func: Box> = Box::new(RefCell::new(func)); Box::into_raw(func) as gpointer } unsafe extern "C" fn trampoline_sync< F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static, >( bus: *mut ffi::GstBus, msg: *mut ffi::GstMessage, func: gpointer, ) -> ffi::GstBusSyncReply { let f: &F = &*(func as *const F); let res = f(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib(); if res == ffi::GST_BUS_DROP { ffi::gst_mini_object_unref(msg as *mut _); } res } unsafe extern "C" fn destroy_closure_sync< F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static, >( ptr: gpointer, ) { Box::::from_raw(ptr as *mut _); } fn into_raw_sync BusSyncReply + Send + Sync + 'static>( func: F, ) -> gpointer { let func: Box = Box::new(func); Box::into_raw(func) as gpointer } impl Bus { #[doc(alias = "gst_bus_add_signal_watch_full")] pub fn add_signal_watch_full(&self, priority: Priority) { unsafe { ffi::gst_bus_add_signal_watch_full(self.to_glib_none().0, priority.into_glib()); } } #[doc(alias = "gst_bus_create_watch")] pub fn create_watch(&self, name: Option<&str>, priority: Priority, func: F) -> glib::Source where F: FnMut(&Bus, &Message) -> Continue + Send + 'static, { skip_assert_initialized!(); unsafe { let source = ffi::gst_bus_create_watch(self.to_glib_none().0); glib::ffi::g_source_set_callback( source, Some(transmute::< _, unsafe extern "C" fn(glib::ffi::gpointer) -> i32, >(trampoline_watch:: as *const ())), into_raw_watch(func), Some(destroy_closure_watch::), ); glib::ffi::g_source_set_priority(source, priority.into_glib()); if let Some(name) = name { glib::ffi::g_source_set_name(source, name.to_glib_none().0); } from_glib_full(source) } } #[doc(alias = "gst_bus_add_watch_full")] pub fn add_watch(&self, func: F) -> Result where F: FnMut(&Bus, &Message) -> Continue + Send + 'static, { unsafe { let res = ffi::gst_bus_add_watch_full( self.to_glib_none().0, glib::ffi::G_PRIORITY_DEFAULT, Some(trampoline_watch::), into_raw_watch(func), Some(destroy_closure_watch::), ); if res == 0 { Err(glib::bool_error!("Bus already has a watch")) } else { Ok(from_glib(res)) } } } pub fn add_watch_local(&self, func: F) -> Result where F: FnMut(&Bus, &Message) -> Continue + 'static, { unsafe { assert!(glib::MainContext::ref_thread_default().is_owner()); let res = ffi::gst_bus_add_watch_full( self.to_glib_none().0, glib::ffi::G_PRIORITY_DEFAULT, Some(trampoline_watch::), into_raw_watch(func), Some(destroy_closure_watch::), ); if res == 0 { Err(glib::bool_error!("Bus already has a watch")) } else { Ok(from_glib(res)) } } } #[doc(alias = "gst_bus_set_sync_handler")] pub fn set_sync_handler(&self, func: F) where F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static, { use once_cell::sync::Lazy; static SET_ONCE_QUARK: Lazy = Lazy::new(|| glib::Quark::from_string("gstreamer-rs-sync-handler")); unsafe { let bus = self.to_glib_none().0; // This is not thread-safe before 1.16.3, see // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416 if crate::version() < (1, 16, 3, 0) { if !glib::gobject_ffi::g_object_get_qdata(bus as *mut _, SET_ONCE_QUARK.into_glib()) .is_null() { panic!("Bus sync handler can only be set once"); } glib::gobject_ffi::g_object_set_qdata( bus as *mut _, SET_ONCE_QUARK.into_glib(), 1 as *mut _, ); } ffi::gst_bus_set_sync_handler( bus, Some(trampoline_sync::), into_raw_sync(func), Some(destroy_closure_sync::), ) } } pub fn unset_sync_handler(&self) { // This is not thread-safe before 1.16.3, see // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416 if crate::version() < (1, 16, 3, 0) { return; } unsafe { use std::ptr; ffi::gst_bus_set_sync_handler(self.to_glib_none().0, None, ptr::null_mut(), None) } } pub fn iter(&self) -> Iter { self.iter_timed(Some(crate::ClockTime::ZERO)) } pub fn iter_timed(&self, timeout: impl Into>) -> Iter { Iter { bus: self, timeout: timeout.into(), } } pub fn iter_filtered<'a>( &'a self, msg_types: &'a [MessageType], ) -> impl Iterator + 'a { self.iter_timed_filtered(Some(crate::ClockTime::ZERO), msg_types) } pub fn iter_timed_filtered<'a>( &'a self, timeout: impl Into>, msg_types: &'a [MessageType], ) -> impl Iterator + 'a { self.iter_timed(timeout) .filter(move |msg| msg_types.contains(&msg.type_())) } pub fn timed_pop_filtered( &self, timeout: impl Into> + Clone, msg_types: &[MessageType], ) -> Option { loop { let msg = self.timed_pop(timeout.clone())?; if msg_types.contains(&msg.type_()) { return Some(msg); } } } pub fn pop_filtered(&self, msg_types: &[MessageType]) -> Option { loop { let msg = self.pop()?; if msg_types.contains(&msg.type_()) { return Some(msg); } } } pub fn stream(&self) -> BusStream { BusStream::new(self) } pub fn stream_filtered<'a>( &self, message_types: &'a [MessageType], ) -> impl Stream + Unpin + Send + 'a { self.stream().filter(move |message| { let message_type = message.type_(); future::ready(message_types.contains(&message_type)) }) } } #[derive(Debug)] pub struct Iter<'a> { bus: &'a Bus, timeout: Option, } impl<'a> Iterator for Iter<'a> { type Item = Message; fn next(&mut self) -> Option { self.bus.timed_pop(self.timeout) } } #[derive(Debug)] pub struct BusStream { bus: glib::WeakRef, receiver: UnboundedReceiver, } impl BusStream { fn new(bus: &Bus) -> Self { skip_assert_initialized!(); let (sender, receiver) = mpsc::unbounded(); bus.set_sync_handler(move |_, message| { let _ = sender.unbounded_send(message.to_owned()); BusSyncReply::Drop }); Self { bus: bus.downgrade(), receiver, } } } impl Drop for BusStream { fn drop(&mut self) { if let Some(bus) = self.bus.upgrade() { bus.unset_sync_handler(); } } } impl Stream for BusStream { type Item = Message; fn poll_next(mut self: Pin<&mut Self>, context: &mut Context) -> Poll> { self.receiver.poll_next_unpin(context) } } #[cfg(test)] mod tests { use super::*; use std::sync::{Arc, Mutex}; #[test] fn test_sync_handler() { crate::init().unwrap(); let bus = Bus::new(); let msgs = Arc::new(Mutex::new(Vec::new())); let msgs_clone = msgs.clone(); bus.set_sync_handler(move |_, msg| { msgs_clone.lock().unwrap().push(msg.clone()); BusSyncReply::Pass }); bus.post(&crate::message::Eos::new()).unwrap(); let msgs = msgs.lock().unwrap(); assert_eq!(msgs.len(), 1); match msgs[0].view() { crate::MessageView::Eos(_) => (), _ => unreachable!(), } } #[test] fn test_bus_stream() { crate::init().unwrap(); let bus = Bus::new(); let bus_stream = bus.stream(); let eos_message = crate::message::Eos::new(); bus.post(&eos_message).unwrap(); let bus_future = bus_stream.into_future(); let (message, _) = futures_executor::block_on(bus_future); match message.unwrap().view() { crate::MessageView::Eos(_) => (), _ => unreachable!(), } } }