diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index f65a5ef0..aefb4468 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -99,6 +99,66 @@ impl Codec { } } +// Same gst::bus::BusStream but hooking context message from the thread +// where the message is posted, so that GstContext can be shared +#[derive(Debug)] +struct CustomBusStream { + bus: glib::WeakRef, + receiver: futures::channel::mpsc::UnboundedReceiver, +} + +impl CustomBusStream { + fn new(bin: &super::WebRTCSink, bus: &gst::Bus) -> Self { + let (sender, receiver) = futures::channel::mpsc::unbounded(); + + let bin_weak = bin.downgrade(); + bus.set_sync_handler(move |_, msg| { + match msg.view() { + gst::MessageView::NeedContext(..) | gst::MessageView::HaveContext(..) => { + if let Some(bin) = bin_weak.upgrade() { + let _ = bin.post_message(msg.to_owned()); + } + } + _ => { + let _ = sender.unbounded_send(msg.to_owned()); + } + } + + gst::BusSyncReply::Drop + }); + + Self { + bus: bus.downgrade(), + receiver, + } + } +} + +impl Drop for CustomBusStream { + fn drop(&mut self) { + if let Some(bus) = self.bus.upgrade() { + bus.unset_sync_handler(); + } + } +} + +impl futures::Stream for CustomBusStream { + type Item = gst::Message; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + context: &mut std::task::Context, + ) -> std::task::Poll> { + self.receiver.poll_next_unpin(context) + } +} + +impl futures::stream::FusedStream for CustomBusStream { + fn is_terminated(&self) -> bool { + self.receiver.is_terminated() + } +} + /// Wrapper around our sink pads #[derive(Debug, Clone)] struct InputStream { @@ -1795,7 +1855,8 @@ impl WebRTCSink { pipeline.set_start_time(gst::ClockTime::NONE); pipeline.set_base_time(element.base_time().unwrap()); - let mut bus_stream = pipeline.bus().unwrap().stream(); + let bus = pipeline.bus().unwrap(); + let mut bus_stream = CustomBusStream::new(element, &bus); let element_clone = element.downgrade(); let pipeline_clone = pipeline.downgrade(); let session_id_clone = session_id.to_owned(); @@ -2251,7 +2312,8 @@ impl WebRTCSink { src.set_property("num-buffers", 1); - let mut stream = pipe.0.bus().unwrap().stream(); + let bus = pipe.0.bus().unwrap(); + let mut stream = CustomBusStream::new(element, &bus); pipe.0 .set_state(gst::State::Playing)