2020-12-15 10:53:31 +00:00
|
|
|
// Take a look at the license at the top of the repository in the LICENSE file.
|
2017-07-12 07:28:42 +00:00
|
|
|
|
2020-01-29 12:40:17 +00:00
|
|
|
use futures_channel::mpsc::{self, UnboundedReceiver};
|
|
|
|
use futures_core::Stream;
|
2021-01-19 12:33:11 +00:00
|
|
|
use futures_util::StreamExt;
|
2020-11-21 13:46:48 +00:00
|
|
|
use glib::ffi::{gboolean, gpointer};
|
2020-05-13 19:13:11 +00:00
|
|
|
use glib::prelude::*;
|
2018-06-24 11:44:38 +00:00
|
|
|
use glib::source::{Continue, Priority, SourceId};
|
2018-04-01 08:30:03 +00:00
|
|
|
use glib::translate::*;
|
|
|
|
use std::cell::RefCell;
|
2021-01-19 12:33:11 +00:00
|
|
|
use std::future;
|
2020-04-13 16:18:57 +00:00
|
|
|
use std::mem::transmute;
|
2019-11-11 10:56:20 +00:00
|
|
|
use std::pin::Pin;
|
2020-01-29 12:40:17 +00:00
|
|
|
use std::task::{Context, Poll};
|
2018-03-15 08:39:12 +00:00
|
|
|
|
2020-11-21 13:46:48 +00:00
|
|
|
use crate::Bus;
|
|
|
|
use crate::BusSyncReply;
|
|
|
|
use crate::Message;
|
|
|
|
use crate::MessageType;
|
2017-07-12 07:28:42 +00:00
|
|
|
|
2021-10-25 09:47:33 +00:00
|
|
|
unsafe extern "C" fn trampoline_watch<F: FnMut(&Bus, &Message) -> Continue + Send + 'static>(
|
2020-11-21 13:46:48 +00:00
|
|
|
bus: *mut ffi::GstBus,
|
|
|
|
msg: *mut ffi::GstMessage,
|
2017-07-12 07:28:42 +00:00
|
|
|
func: gpointer,
|
|
|
|
) -> gboolean {
|
2019-02-10 09:43:55 +00:00
|
|
|
let func: &RefCell<F> = &*(func as *const RefCell<F>);
|
2021-04-27 15:15:46 +00:00
|
|
|
(&mut *func.borrow_mut())(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib()
|
2017-07-12 07:28:42 +00:00
|
|
|
}
|
|
|
|
|
2021-10-25 09:47:33 +00:00
|
|
|
unsafe extern "C" fn destroy_closure_watch<
|
|
|
|
F: FnMut(&Bus, &Message) -> Continue + Send + 'static,
|
|
|
|
>(
|
2019-01-30 13:02:03 +00:00
|
|
|
ptr: gpointer,
|
|
|
|
) {
|
|
|
|
Box::<RefCell<F>>::from_raw(ptr as *mut _);
|
2017-07-12 07:28:42 +00:00
|
|
|
}
|
|
|
|
|
2021-10-25 09:47:33 +00:00
|
|
|
fn into_raw_watch<F: FnMut(&Bus, &Message) -> Continue + Send + 'static>(func: F) -> gpointer {
|
2019-02-28 08:32:13 +00:00
|
|
|
#[allow(clippy::type_complexity)]
|
2019-01-30 13:02:03 +00:00
|
|
|
let func: Box<RefCell<F>> = Box::new(RefCell::new(func));
|
2017-07-12 07:28:42 +00:00
|
|
|
Box::into_raw(func) as gpointer
|
|
|
|
}
|
|
|
|
|
2021-10-25 09:47:33 +00:00
|
|
|
unsafe extern "C" fn trampoline_watch_local<F: FnMut(&Bus, &Message) -> Continue + 'static>(
|
|
|
|
bus: *mut ffi::GstBus,
|
|
|
|
msg: *mut ffi::GstMessage,
|
|
|
|
func: gpointer,
|
|
|
|
) -> gboolean {
|
2022-01-24 17:33:15 +00:00
|
|
|
let func: &glib::thread_guard::ThreadGuard<RefCell<F>> =
|
|
|
|
&*(func as *const glib::thread_guard::ThreadGuard<RefCell<F>>);
|
|
|
|
(&mut *func.get_ref().borrow_mut())(&from_glib_borrow(bus), &Message::from_glib_borrow(msg))
|
2021-10-25 09:47:33 +00:00
|
|
|
.into_glib()
|
|
|
|
}
|
|
|
|
|
|
|
|
unsafe extern "C" fn destroy_closure_watch_local<F: FnMut(&Bus, &Message) -> Continue + 'static>(
|
|
|
|
ptr: gpointer,
|
|
|
|
) {
|
2022-01-24 17:33:15 +00:00
|
|
|
Box::<glib::thread_guard::ThreadGuard<RefCell<F>>>::from_raw(ptr as *mut _);
|
2021-10-25 09:47:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
fn into_raw_watch_local<F: FnMut(&Bus, &Message) -> Continue + 'static>(func: F) -> gpointer {
|
|
|
|
#[allow(clippy::type_complexity)]
|
2022-01-24 17:33:15 +00:00
|
|
|
let func: Box<glib::thread_guard::ThreadGuard<RefCell<F>>> =
|
|
|
|
Box::new(glib::thread_guard::ThreadGuard::new(RefCell::new(func)));
|
2021-10-25 09:47:33 +00:00
|
|
|
Box::into_raw(func) as gpointer
|
|
|
|
}
|
|
|
|
|
2019-01-30 13:02:03 +00:00
|
|
|
unsafe extern "C" fn trampoline_sync<
|
|
|
|
F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
|
|
|
|
>(
|
2020-11-21 13:46:48 +00:00
|
|
|
bus: *mut ffi::GstBus,
|
|
|
|
msg: *mut ffi::GstMessage,
|
2017-07-12 07:28:42 +00:00
|
|
|
func: gpointer,
|
2020-11-21 13:46:48 +00:00
|
|
|
) -> ffi::GstBusSyncReply {
|
2019-02-10 09:43:55 +00:00
|
|
|
let f: &F = &*(func as *const F);
|
2021-04-27 15:15:46 +00:00
|
|
|
let res = f(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib();
|
2018-07-24 12:35:26 +00:00
|
|
|
|
2020-11-21 13:46:48 +00:00
|
|
|
if res == ffi::GST_BUS_DROP {
|
|
|
|
ffi::gst_mini_object_unref(msg as *mut _);
|
2018-07-24 12:35:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
res
|
2017-07-12 07:28:42 +00:00
|
|
|
}
|
|
|
|
|
2019-01-30 13:02:03 +00:00
|
|
|
unsafe extern "C" fn destroy_closure_sync<
|
|
|
|
F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
|
|
|
|
>(
|
|
|
|
ptr: gpointer,
|
|
|
|
) {
|
|
|
|
Box::<F>::from_raw(ptr as *mut _);
|
2017-07-12 07:28:42 +00:00
|
|
|
}
|
|
|
|
|
2017-08-13 22:40:43 +00:00
|
|
|
fn into_raw_sync<F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static>(
|
|
|
|
func: F,
|
|
|
|
) -> gpointer {
|
2019-01-30 13:02:03 +00:00
|
|
|
let func: Box<F> = Box::new(func);
|
2017-07-12 07:28:42 +00:00
|
|
|
Box::into_raw(func) as gpointer
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Bus {
|
2021-12-12 15:52:32 +00:00
|
|
|
#[doc(alias = "gst_bus_add_signal_watch")]
|
2021-05-19 20:35:47 +00:00
|
|
|
#[doc(alias = "gst_bus_add_signal_watch_full")]
|
2017-12-18 07:38:40 +00:00
|
|
|
pub fn add_signal_watch_full(&self, priority: Priority) {
|
|
|
|
unsafe {
|
2021-04-27 15:15:46 +00:00
|
|
|
ffi::gst_bus_add_signal_watch_full(self.to_glib_none().0, priority.into_glib());
|
2017-12-18 07:38:40 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-05-19 20:35:47 +00:00
|
|
|
#[doc(alias = "gst_bus_create_watch")]
|
2019-05-23 18:19:24 +00:00
|
|
|
pub fn create_watch<F>(&self, name: Option<&str>, priority: Priority, func: F) -> glib::Source
|
2017-07-12 07:28:42 +00:00
|
|
|
where
|
|
|
|
F: FnMut(&Bus, &Message) -> Continue + Send + 'static,
|
|
|
|
{
|
2017-08-30 11:39:09 +00:00
|
|
|
skip_assert_initialized!();
|
2017-07-12 07:28:42 +00:00
|
|
|
unsafe {
|
2020-11-21 13:46:48 +00:00
|
|
|
let source = ffi::gst_bus_create_watch(self.to_glib_none().0);
|
|
|
|
glib::ffi::g_source_set_callback(
|
2017-07-12 07:28:42 +00:00
|
|
|
source,
|
2020-04-13 16:18:57 +00:00
|
|
|
Some(transmute::<
|
|
|
|
_,
|
2020-11-21 13:46:48 +00:00
|
|
|
unsafe extern "C" fn(glib::ffi::gpointer) -> i32,
|
2020-04-13 16:18:57 +00:00
|
|
|
>(trampoline_watch::<F> as *const ())),
|
2017-07-12 07:28:42 +00:00
|
|
|
into_raw_watch(func),
|
2019-01-30 13:02:03 +00:00
|
|
|
Some(destroy_closure_watch::<F>),
|
2017-07-12 07:28:42 +00:00
|
|
|
);
|
2021-04-27 15:15:46 +00:00
|
|
|
glib::ffi::g_source_set_priority(source, priority.into_glib());
|
2017-07-12 07:28:42 +00:00
|
|
|
|
|
|
|
if let Some(name) = name {
|
2020-11-21 13:46:48 +00:00
|
|
|
glib::ffi::g_source_set_name(source, name.to_glib_none().0);
|
2017-07-12 07:28:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
from_glib_full(source)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-12-12 15:52:32 +00:00
|
|
|
#[doc(alias = "gst_bus_add_watch")]
|
2021-05-19 20:35:47 +00:00
|
|
|
#[doc(alias = "gst_bus_add_watch_full")]
|
2019-12-17 19:00:42 +00:00
|
|
|
pub fn add_watch<F>(&self, func: F) -> Result<SourceId, glib::BoolError>
|
2017-07-12 07:28:42 +00:00
|
|
|
where
|
|
|
|
F: FnMut(&Bus, &Message) -> Continue + Send + 'static,
|
|
|
|
{
|
|
|
|
unsafe {
|
2020-11-21 13:46:48 +00:00
|
|
|
let res = ffi::gst_bus_add_watch_full(
|
2017-07-12 07:28:42 +00:00
|
|
|
self.to_glib_none().0,
|
2020-11-21 13:46:48 +00:00
|
|
|
glib::ffi::G_PRIORITY_DEFAULT,
|
2019-01-30 13:02:03 +00:00
|
|
|
Some(trampoline_watch::<F>),
|
2017-07-12 07:28:42 +00:00
|
|
|
into_raw_watch(func),
|
2019-01-30 13:02:03 +00:00
|
|
|
Some(destroy_closure_watch::<F>),
|
2019-02-15 11:30:05 +00:00
|
|
|
);
|
|
|
|
|
|
|
|
if res == 0 {
|
2020-12-17 22:38:06 +00:00
|
|
|
Err(glib::bool_error!("Bus already has a watch"))
|
2019-02-15 11:30:05 +00:00
|
|
|
} else {
|
2019-12-17 19:00:42 +00:00
|
|
|
Ok(from_glib(res))
|
2019-02-15 11:30:05 +00:00
|
|
|
}
|
2017-07-12 07:28:42 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-12-12 15:52:32 +00:00
|
|
|
#[doc(alias = "gst_bus_add_watch")]
|
|
|
|
#[doc(alias = "gst_bus_add_watch_full")]
|
2019-12-17 19:00:42 +00:00
|
|
|
pub fn add_watch_local<F>(&self, func: F) -> Result<SourceId, glib::BoolError>
|
2019-02-10 09:43:55 +00:00
|
|
|
where
|
|
|
|
F: FnMut(&Bus, &Message) -> Continue + 'static,
|
|
|
|
{
|
|
|
|
unsafe {
|
2021-10-25 09:47:33 +00:00
|
|
|
let ctx = glib::MainContext::ref_thread_default();
|
|
|
|
let _acquire = ctx
|
|
|
|
.acquire()
|
|
|
|
.expect("thread default main context already acquired by another thread");
|
2019-02-10 09:43:55 +00:00
|
|
|
|
2020-11-21 13:46:48 +00:00
|
|
|
let res = ffi::gst_bus_add_watch_full(
|
2019-02-10 09:43:55 +00:00
|
|
|
self.to_glib_none().0,
|
2020-11-21 13:46:48 +00:00
|
|
|
glib::ffi::G_PRIORITY_DEFAULT,
|
2021-10-25 09:47:33 +00:00
|
|
|
Some(trampoline_watch_local::<F>),
|
|
|
|
into_raw_watch_local(func),
|
|
|
|
Some(destroy_closure_watch_local::<F>),
|
2019-02-15 11:30:05 +00:00
|
|
|
);
|
|
|
|
|
|
|
|
if res == 0 {
|
2020-12-17 22:38:06 +00:00
|
|
|
Err(glib::bool_error!("Bus already has a watch"))
|
2019-02-15 11:30:05 +00:00
|
|
|
} else {
|
2019-12-17 19:00:42 +00:00
|
|
|
Ok(from_glib(res))
|
2019-02-15 11:30:05 +00:00
|
|
|
}
|
2019-02-10 09:43:55 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-05-19 20:35:47 +00:00
|
|
|
#[doc(alias = "gst_bus_set_sync_handler")]
|
2017-07-12 07:28:42 +00:00
|
|
|
pub fn set_sync_handler<F>(&self, func: F)
|
|
|
|
where
|
2017-08-01 14:28:36 +00:00
|
|
|
F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
|
2017-07-12 07:28:42 +00:00
|
|
|
{
|
2020-02-09 19:30:53 +00:00
|
|
|
use once_cell::sync::Lazy;
|
|
|
|
static SET_ONCE_QUARK: Lazy<glib::Quark> =
|
2022-01-11 10:49:42 +00:00
|
|
|
Lazy::new(|| glib::Quark::from_str("gstreamer-rs-sync-handler"));
|
2020-02-09 19:30:53 +00:00
|
|
|
|
2017-07-12 07:28:42 +00:00
|
|
|
unsafe {
|
2020-02-09 19:30:53 +00:00
|
|
|
let bus = self.to_glib_none().0;
|
|
|
|
|
2020-02-15 09:05:25 +00:00
|
|
|
// This is not thread-safe before 1.16.3, see
|
|
|
|
// https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416
|
2020-11-21 13:46:48 +00:00
|
|
|
if crate::version() < (1, 16, 3, 0) {
|
2021-04-27 15:15:46 +00:00
|
|
|
if !glib::gobject_ffi::g_object_get_qdata(bus as *mut _, SET_ONCE_QUARK.into_glib())
|
2020-02-15 09:05:25 +00:00
|
|
|
.is_null()
|
|
|
|
{
|
|
|
|
panic!("Bus sync handler can only be set once");
|
|
|
|
}
|
|
|
|
|
2020-11-21 13:46:48 +00:00
|
|
|
glib::gobject_ffi::g_object_set_qdata(
|
2020-02-15 09:05:25 +00:00
|
|
|
bus as *mut _,
|
2021-04-27 15:15:46 +00:00
|
|
|
SET_ONCE_QUARK.into_glib(),
|
2020-02-15 09:05:25 +00:00
|
|
|
1 as *mut _,
|
|
|
|
);
|
|
|
|
}
|
2020-02-09 19:30:53 +00:00
|
|
|
|
2020-11-21 13:46:48 +00:00
|
|
|
ffi::gst_bus_set_sync_handler(
|
2020-02-09 19:30:53 +00:00
|
|
|
bus,
|
2019-01-30 13:02:03 +00:00
|
|
|
Some(trampoline_sync::<F>),
|
2017-07-12 07:28:42 +00:00
|
|
|
into_raw_sync(func),
|
2019-01-30 13:02:03 +00:00
|
|
|
Some(destroy_closure_sync::<F>),
|
2017-07-12 07:28:42 +00:00
|
|
|
)
|
|
|
|
}
|
|
|
|
}
|
2017-08-01 17:52:29 +00:00
|
|
|
|
2020-02-15 09:05:25 +00:00
|
|
|
pub fn unset_sync_handler(&self) {
|
|
|
|
// This is not thread-safe before 1.16.3, see
|
|
|
|
// https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416
|
2020-11-21 13:46:48 +00:00
|
|
|
if crate::version() < (1, 16, 3, 0) {
|
2020-02-15 09:05:25 +00:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
unsafe {
|
|
|
|
use std::ptr;
|
|
|
|
|
2020-11-21 13:46:48 +00:00
|
|
|
ffi::gst_bus_set_sync_handler(self.to_glib_none().0, None, ptr::null_mut(), None)
|
2020-02-15 09:05:25 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-12-12 15:52:32 +00:00
|
|
|
#[doc(alias = "gst_bus_pop")]
|
2018-12-27 22:06:03 +00:00
|
|
|
pub fn iter(&self) -> Iter {
|
2020-10-27 17:27:16 +00:00
|
|
|
self.iter_timed(Some(crate::ClockTime::ZERO))
|
2018-12-27 22:06:03 +00:00
|
|
|
}
|
|
|
|
|
2021-12-12 15:52:32 +00:00
|
|
|
#[doc(alias = "gst_bus_timed_pop")]
|
2021-04-28 22:29:13 +00:00
|
|
|
pub fn iter_timed(&self, timeout: impl Into<Option<crate::ClockTime>>) -> Iter {
|
|
|
|
Iter {
|
|
|
|
bus: self,
|
|
|
|
timeout: timeout.into(),
|
|
|
|
}
|
2018-12-27 22:06:03 +00:00
|
|
|
}
|
2019-05-10 16:07:02 +00:00
|
|
|
|
2021-12-12 15:52:32 +00:00
|
|
|
#[doc(alias = "gst_bus_pop_filtered")]
|
2019-05-10 16:07:02 +00:00
|
|
|
pub fn iter_filtered<'a>(
|
|
|
|
&'a self,
|
|
|
|
msg_types: &'a [MessageType],
|
|
|
|
) -> impl Iterator<Item = Message> + 'a {
|
2020-10-27 17:27:16 +00:00
|
|
|
self.iter_timed_filtered(Some(crate::ClockTime::ZERO), msg_types)
|
2019-05-10 16:07:02 +00:00
|
|
|
}
|
|
|
|
|
2021-12-12 15:52:32 +00:00
|
|
|
#[doc(alias = "gst_bus_timed_pop_filtered")]
|
2019-05-10 16:07:02 +00:00
|
|
|
pub fn iter_timed_filtered<'a>(
|
|
|
|
&'a self,
|
2021-04-28 22:29:13 +00:00
|
|
|
timeout: impl Into<Option<crate::ClockTime>>,
|
2019-05-10 16:07:02 +00:00
|
|
|
msg_types: &'a [MessageType],
|
|
|
|
) -> impl Iterator<Item = Message> + 'a {
|
|
|
|
self.iter_timed(timeout)
|
2021-04-11 19:39:50 +00:00
|
|
|
.filter(move |msg| msg_types.contains(&msg.type_()))
|
2019-05-10 16:07:02 +00:00
|
|
|
}
|
|
|
|
|
2021-12-12 15:52:32 +00:00
|
|
|
#[doc(alias = "gst_bus_timed_pop_filtered")]
|
2019-05-11 13:43:32 +00:00
|
|
|
pub fn timed_pop_filtered(
|
|
|
|
&self,
|
2021-04-28 22:29:13 +00:00
|
|
|
timeout: impl Into<Option<crate::ClockTime>> + Clone,
|
2019-05-11 13:43:32 +00:00
|
|
|
msg_types: &[MessageType],
|
|
|
|
) -> Option<Message> {
|
2019-05-10 16:07:02 +00:00
|
|
|
loop {
|
2021-04-28 22:29:13 +00:00
|
|
|
let msg = self.timed_pop(timeout.clone())?;
|
2021-04-11 19:39:50 +00:00
|
|
|
if msg_types.contains(&msg.type_()) {
|
2019-05-10 16:07:02 +00:00
|
|
|
return Some(msg);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-12-12 15:52:32 +00:00
|
|
|
#[doc(alias = "gst_bus_pop_filtered")]
|
2019-05-11 13:43:32 +00:00
|
|
|
pub fn pop_filtered(&self, msg_types: &[MessageType]) -> Option<Message> {
|
2019-05-10 16:07:02 +00:00
|
|
|
loop {
|
|
|
|
let msg = self.pop()?;
|
2021-04-11 19:39:50 +00:00
|
|
|
if msg_types.contains(&msg.type_()) {
|
2019-05-10 16:07:02 +00:00
|
|
|
return Some(msg);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2020-01-22 07:23:10 +00:00
|
|
|
|
2020-01-29 12:40:17 +00:00
|
|
|
pub fn stream(&self) -> BusStream {
|
2020-01-22 07:23:10 +00:00
|
|
|
BusStream::new(self)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn stream_filtered<'a>(
|
|
|
|
&self,
|
2020-01-29 12:40:17 +00:00
|
|
|
message_types: &'a [MessageType],
|
2020-01-22 07:23:10 +00:00
|
|
|
) -> impl Stream<Item = Message> + Unpin + Send + 'a {
|
2020-01-29 12:40:17 +00:00
|
|
|
self.stream().filter(move |message| {
|
2021-04-11 19:39:50 +00:00
|
|
|
let message_type = message.type_();
|
2020-01-22 07:23:10 +00:00
|
|
|
|
2020-01-29 12:40:17 +00:00
|
|
|
future::ready(message_types.contains(&message_type))
|
2020-01-22 07:23:10 +00:00
|
|
|
})
|
|
|
|
}
|
2018-12-27 22:06:03 +00:00
|
|
|
}
|
|
|
|
|
2019-01-22 15:43:29 +00:00
|
|
|
#[derive(Debug)]
|
2018-12-27 22:06:03 +00:00
|
|
|
pub struct Iter<'a> {
|
|
|
|
bus: &'a Bus,
|
2020-10-27 17:27:16 +00:00
|
|
|
timeout: Option<crate::ClockTime>,
|
2018-12-27 22:06:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl<'a> Iterator for Iter<'a> {
|
|
|
|
type Item = Message;
|
|
|
|
|
|
|
|
fn next(&mut self) -> Option<Message> {
|
|
|
|
self.bus.timed_pop(self.timeout)
|
|
|
|
}
|
2017-07-12 07:28:42 +00:00
|
|
|
}
|
2017-08-17 10:07:32 +00:00
|
|
|
|
2019-11-11 10:56:20 +00:00
|
|
|
#[derive(Debug)]
|
2020-01-29 12:40:17 +00:00
|
|
|
pub struct BusStream {
|
2020-05-13 19:13:11 +00:00
|
|
|
bus: glib::WeakRef<Bus>,
|
2020-01-29 12:40:17 +00:00
|
|
|
receiver: UnboundedReceiver<Message>,
|
|
|
|
}
|
2017-11-12 12:33:02 +00:00
|
|
|
|
2019-11-11 10:56:20 +00:00
|
|
|
impl BusStream {
|
2020-01-22 07:23:10 +00:00
|
|
|
fn new(bus: &Bus) -> Self {
|
2019-11-11 10:56:20 +00:00
|
|
|
skip_assert_initialized!();
|
|
|
|
|
2020-01-28 21:39:36 +00:00
|
|
|
let (sender, receiver) = mpsc::unbounded();
|
2017-11-12 12:33:02 +00:00
|
|
|
|
2020-01-29 12:40:17 +00:00
|
|
|
bus.set_sync_handler(move |_, message| {
|
|
|
|
let _ = sender.unbounded_send(message.to_owned());
|
|
|
|
|
2020-01-28 21:39:36 +00:00
|
|
|
BusSyncReply::Drop
|
2019-11-11 10:56:20 +00:00
|
|
|
});
|
2017-11-12 12:33:02 +00:00
|
|
|
|
2020-05-13 19:13:11 +00:00
|
|
|
Self {
|
|
|
|
bus: bus.downgrade(),
|
|
|
|
receiver,
|
|
|
|
}
|
2017-08-17 10:07:32 +00:00
|
|
|
}
|
2019-11-11 10:56:20 +00:00
|
|
|
}
|
2017-08-17 10:07:32 +00:00
|
|
|
|
2020-02-15 09:05:25 +00:00
|
|
|
impl Drop for BusStream {
|
|
|
|
fn drop(&mut self) {
|
2020-05-13 19:13:11 +00:00
|
|
|
if let Some(bus) = self.bus.upgrade() {
|
|
|
|
bus.unset_sync_handler();
|
|
|
|
}
|
2020-02-15 09:05:25 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-11-11 10:56:20 +00:00
|
|
|
impl Stream for BusStream {
|
|
|
|
type Item = Message;
|
2017-08-17 10:07:32 +00:00
|
|
|
|
2020-01-29 12:40:17 +00:00
|
|
|
fn poll_next(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
|
|
|
|
self.receiver.poll_next_unpin(context)
|
2017-08-17 10:07:32 +00:00
|
|
|
}
|
|
|
|
}
|
2017-11-12 12:33:02 +00:00
|
|
|
|
2019-01-29 14:14:06 +00:00
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
|
|
|
use super::*;
|
|
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_sync_handler() {
|
2020-11-21 13:46:48 +00:00
|
|
|
crate::init().unwrap();
|
2019-01-29 14:14:06 +00:00
|
|
|
|
|
|
|
let bus = Bus::new();
|
|
|
|
let msgs = Arc::new(Mutex::new(Vec::new()));
|
|
|
|
let msgs_clone = msgs.clone();
|
|
|
|
bus.set_sync_handler(move |_, msg| {
|
|
|
|
msgs_clone.lock().unwrap().push(msg.clone());
|
|
|
|
BusSyncReply::Pass
|
|
|
|
});
|
|
|
|
|
2020-11-21 13:46:48 +00:00
|
|
|
bus.post(&crate::message::Eos::new()).unwrap();
|
2019-01-29 14:14:06 +00:00
|
|
|
|
|
|
|
let msgs = msgs.lock().unwrap();
|
|
|
|
assert_eq!(msgs.len(), 1);
|
|
|
|
match msgs[0].view() {
|
2020-11-21 13:46:48 +00:00
|
|
|
crate::MessageView::Eos(_) => (),
|
2019-01-29 14:14:06 +00:00
|
|
|
_ => unreachable!(),
|
|
|
|
}
|
|
|
|
}
|
2020-01-30 12:30:17 +00:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_bus_stream() {
|
2020-11-21 13:46:48 +00:00
|
|
|
crate::init().unwrap();
|
2020-01-30 12:30:17 +00:00
|
|
|
|
|
|
|
let bus = Bus::new();
|
|
|
|
let bus_stream = bus.stream();
|
|
|
|
|
2020-11-21 13:46:48 +00:00
|
|
|
let eos_message = crate::message::Eos::new();
|
2020-01-30 12:30:17 +00:00
|
|
|
bus.post(&eos_message).unwrap();
|
|
|
|
|
|
|
|
let bus_future = bus_stream.into_future();
|
|
|
|
let (message, _) = futures_executor::block_on(bus_future);
|
|
|
|
|
|
|
|
match message.unwrap().view() {
|
2020-11-21 13:46:48 +00:00
|
|
|
crate::MessageView::Eos(_) => (),
|
2020-01-30 12:30:17 +00:00
|
|
|
_ => unreachable!(),
|
|
|
|
}
|
|
|
|
}
|
2019-01-29 14:14:06 +00:00
|
|
|
}
|