gstreamer-rs/gstreamer/src/bus.rs

368 lines
10 KiB
Rust
Raw Normal View History

2020-12-15 10:53:31 +00:00
// Take a look at the license at the top of the repository in the LICENSE file.
2017-07-12 07:28:42 +00:00
use futures_channel::mpsc::{self, UnboundedReceiver};
use futures_core::Stream;
use futures_util::StreamExt;
use glib::ffi::{gboolean, gpointer};
use glib::prelude::*;
use glib::source::{Continue, Priority, SourceId};
2018-04-01 08:30:03 +00:00
use glib::translate::*;
use std::cell::RefCell;
use std::future;
use std::mem::transmute;
use std::pin::Pin;
use std::task::{Context, Poll};
2018-03-15 08:39:12 +00:00
use crate::Bus;
use crate::BusSyncReply;
use crate::Message;
use crate::MessageType;
2017-07-12 07:28:42 +00:00
unsafe extern "C" fn trampoline_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>(
bus: *mut ffi::GstBus,
msg: *mut ffi::GstMessage,
2017-07-12 07:28:42 +00:00
func: gpointer,
) -> gboolean {
let func: &RefCell<F> = &*(func as *const RefCell<F>);
(&mut *func.borrow_mut())(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib()
2017-07-12 07:28:42 +00:00
}
unsafe extern "C" fn destroy_closure_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>(
ptr: gpointer,
) {
Box::<RefCell<F>>::from_raw(ptr as *mut _);
2017-07-12 07:28:42 +00:00
}
fn into_raw_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>(func: F) -> gpointer {
#[allow(clippy::type_complexity)]
let func: Box<RefCell<F>> = Box::new(RefCell::new(func));
2017-07-12 07:28:42 +00:00
Box::into_raw(func) as gpointer
}
unsafe extern "C" fn trampoline_sync<
F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
>(
bus: *mut ffi::GstBus,
msg: *mut ffi::GstMessage,
2017-07-12 07:28:42 +00:00
func: gpointer,
) -> ffi::GstBusSyncReply {
let f: &F = &*(func as *const F);
let res = f(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib();
if res == ffi::GST_BUS_DROP {
ffi::gst_mini_object_unref(msg as *mut _);
}
res
2017-07-12 07:28:42 +00:00
}
unsafe extern "C" fn destroy_closure_sync<
F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
>(
ptr: gpointer,
) {
Box::<F>::from_raw(ptr as *mut _);
2017-07-12 07:28:42 +00:00
}
fn into_raw_sync<F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static>(
func: F,
) -> gpointer {
let func: Box<F> = Box::new(func);
2017-07-12 07:28:42 +00:00
Box::into_raw(func) as gpointer
}
impl Bus {
#[doc(alias = "gst_bus_add_signal_watch_full")]
pub fn add_signal_watch_full(&self, priority: Priority) {
unsafe {
ffi::gst_bus_add_signal_watch_full(self.to_glib_none().0, priority.into_glib());
}
}
#[doc(alias = "gst_bus_create_watch")]
pub fn create_watch<F>(&self, name: Option<&str>, priority: Priority, func: F) -> glib::Source
2017-07-12 07:28:42 +00:00
where
F: FnMut(&Bus, &Message) -> Continue + Send + 'static,
{
2017-08-30 11:39:09 +00:00
skip_assert_initialized!();
2017-07-12 07:28:42 +00:00
unsafe {
let source = ffi::gst_bus_create_watch(self.to_glib_none().0);
glib::ffi::g_source_set_callback(
2017-07-12 07:28:42 +00:00
source,
Some(transmute::<
_,
unsafe extern "C" fn(glib::ffi::gpointer) -> i32,
>(trampoline_watch::<F> as *const ())),
2017-07-12 07:28:42 +00:00
into_raw_watch(func),
Some(destroy_closure_watch::<F>),
2017-07-12 07:28:42 +00:00
);
glib::ffi::g_source_set_priority(source, priority.into_glib());
2017-07-12 07:28:42 +00:00
if let Some(name) = name {
glib::ffi::g_source_set_name(source, name.to_glib_none().0);
2017-07-12 07:28:42 +00:00
}
from_glib_full(source)
}
}
#[doc(alias = "gst_bus_add_watch_full")]
pub fn add_watch<F>(&self, func: F) -> Result<SourceId, glib::BoolError>
2017-07-12 07:28:42 +00:00
where
F: FnMut(&Bus, &Message) -> Continue + Send + 'static,
{
unsafe {
let res = ffi::gst_bus_add_watch_full(
2017-07-12 07:28:42 +00:00
self.to_glib_none().0,
glib::ffi::G_PRIORITY_DEFAULT,
Some(trampoline_watch::<F>),
2017-07-12 07:28:42 +00:00
into_raw_watch(func),
Some(destroy_closure_watch::<F>),
);
if res == 0 {
Err(glib::bool_error!("Bus already has a watch"))
} else {
Ok(from_glib(res))
}
2017-07-12 07:28:42 +00:00
}
}
pub fn add_watch_local<F>(&self, func: F) -> Result<SourceId, glib::BoolError>
where
F: FnMut(&Bus, &Message) -> Continue + 'static,
{
unsafe {
assert!(glib::MainContext::ref_thread_default().is_owner());
let res = ffi::gst_bus_add_watch_full(
self.to_glib_none().0,
glib::ffi::G_PRIORITY_DEFAULT,
Some(trampoline_watch::<F>),
into_raw_watch(func),
Some(destroy_closure_watch::<F>),
);
if res == 0 {
Err(glib::bool_error!("Bus already has a watch"))
} else {
Ok(from_glib(res))
}
}
}
#[doc(alias = "gst_bus_set_sync_handler")]
2017-07-12 07:28:42 +00:00
pub fn set_sync_handler<F>(&self, func: F)
where
F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
2017-07-12 07:28:42 +00:00
{
use once_cell::sync::Lazy;
static SET_ONCE_QUARK: Lazy<glib::Quark> =
Lazy::new(|| glib::Quark::from_string("gstreamer-rs-sync-handler"));
2017-07-12 07:28:42 +00:00
unsafe {
let bus = self.to_glib_none().0;
// This is not thread-safe before 1.16.3, see
// https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416
if crate::version() < (1, 16, 3, 0) {
if !glib::gobject_ffi::g_object_get_qdata(bus as *mut _, SET_ONCE_QUARK.into_glib())
.is_null()
{
panic!("Bus sync handler can only be set once");
}
glib::gobject_ffi::g_object_set_qdata(
bus as *mut _,
SET_ONCE_QUARK.into_glib(),
1 as *mut _,
);
}
ffi::gst_bus_set_sync_handler(
bus,
Some(trampoline_sync::<F>),
2017-07-12 07:28:42 +00:00
into_raw_sync(func),
Some(destroy_closure_sync::<F>),
2017-07-12 07:28:42 +00:00
)
}
}
pub fn unset_sync_handler(&self) {
// This is not thread-safe before 1.16.3, see
// https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416
if crate::version() < (1, 16, 3, 0) {
return;
}
unsafe {
use std::ptr;
ffi::gst_bus_set_sync_handler(self.to_glib_none().0, None, ptr::null_mut(), None)
}
}
pub fn iter(&self) -> Iter {
self.iter_timed(0.into())
}
pub fn iter_timed(&self, timeout: crate::ClockTime) -> Iter {
Iter { bus: self, timeout }
}
pub fn iter_filtered<'a>(
&'a self,
msg_types: &'a [MessageType],
) -> impl Iterator<Item = Message> + 'a {
self.iter_timed_filtered(0.into(), msg_types)
}
pub fn iter_timed_filtered<'a>(
&'a self,
timeout: crate::ClockTime,
msg_types: &'a [MessageType],
) -> impl Iterator<Item = Message> + 'a {
self.iter_timed(timeout)
2021-04-11 19:39:50 +00:00
.filter(move |msg| msg_types.contains(&msg.type_()))
}
pub fn timed_pop_filtered(
&self,
timeout: crate::ClockTime,
msg_types: &[MessageType],
) -> Option<Message> {
loop {
let msg = self.timed_pop(timeout)?;
2021-04-11 19:39:50 +00:00
if msg_types.contains(&msg.type_()) {
return Some(msg);
}
}
}
pub fn pop_filtered(&self, msg_types: &[MessageType]) -> Option<Message> {
loop {
let msg = self.pop()?;
2021-04-11 19:39:50 +00:00
if msg_types.contains(&msg.type_()) {
return Some(msg);
}
}
}
pub fn stream(&self) -> BusStream {
BusStream::new(self)
}
pub fn stream_filtered<'a>(
&self,
message_types: &'a [MessageType],
) -> impl Stream<Item = Message> + Unpin + Send + 'a {
self.stream().filter(move |message| {
2021-04-11 19:39:50 +00:00
let message_type = message.type_();
future::ready(message_types.contains(&message_type))
})
}
}
#[derive(Debug)]
pub struct Iter<'a> {
bus: &'a Bus,
timeout: crate::ClockTime,
}
impl<'a> Iterator for Iter<'a> {
type Item = Message;
fn next(&mut self) -> Option<Message> {
self.bus.timed_pop(self.timeout)
}
2017-07-12 07:28:42 +00:00
}
#[derive(Debug)]
pub struct BusStream {
bus: glib::WeakRef<Bus>,
receiver: UnboundedReceiver<Message>,
}
impl BusStream {
fn new(bus: &Bus) -> Self {
skip_assert_initialized!();
let (sender, receiver) = mpsc::unbounded();
bus.set_sync_handler(move |_, message| {
let _ = sender.unbounded_send(message.to_owned());
BusSyncReply::Drop
});
Self {
bus: bus.downgrade(),
receiver,
}
}
}
impl Drop for BusStream {
fn drop(&mut self) {
if let Some(bus) = self.bus.upgrade() {
bus.unset_sync_handler();
}
}
}
impl Stream for BusStream {
type Item = Message;
fn poll_next(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
self.receiver.poll_next_unpin(context)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Mutex};
#[test]
fn test_sync_handler() {
crate::init().unwrap();
let bus = Bus::new();
let msgs = Arc::new(Mutex::new(Vec::new()));
let msgs_clone = msgs.clone();
bus.set_sync_handler(move |_, msg| {
msgs_clone.lock().unwrap().push(msg.clone());
BusSyncReply::Pass
});
bus.post(&crate::message::Eos::new()).unwrap();
let msgs = msgs.lock().unwrap();
assert_eq!(msgs.len(), 1);
match msgs[0].view() {
crate::MessageView::Eos(_) => (),
_ => unreachable!(),
}
}
2020-01-30 12:30:17 +00:00
#[test]
fn test_bus_stream() {
crate::init().unwrap();
2020-01-30 12:30:17 +00:00
let bus = Bus::new();
let bus_stream = bus.stream();
let eos_message = crate::message::Eos::new();
2020-01-30 12:30:17 +00:00
bus.post(&eos_message).unwrap();
let bus_future = bus_stream.into_future();
let (message, _) = futures_executor::block_on(bus_future);
match message.unwrap().view() {
crate::MessageView::Eos(_) => (),
2020-01-30 12:30:17 +00:00
_ => unreachable!(),
}
}
}