From 225482f7ede7ec08d4b33ced473e4dcd68e41d42 Mon Sep 17 00:00:00 2001 From: Seungha Yang Date: Sun, 10 Sep 2023 00:17:18 +0900 Subject: [PATCH] webrtcsink: Propagate GstContext messages Implement CustomBusStream so that NEED_CONTEXT and HAVE_CONTEXT messages from session/discovery can be forwarded to parent pipeline and also GstContext can be shared. Part-of: --- net/webrtc/src/webrtcsink/imp.rs | 66 +++++++++++++++++++++++++++++++- 1 file changed, 64 insertions(+), 2 deletions(-) diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index 45c68d92..f6b83ae6 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -118,6 +118,66 @@ impl DiscoveryInfo { } } +// Same gst::bus::BusStream but hooking context message from the thread +// where the message is posted, so that GstContext can be shared +#[derive(Debug)] +struct CustomBusStream { + bus: glib::WeakRef, + receiver: futures::channel::mpsc::UnboundedReceiver, +} + +impl CustomBusStream { + fn new(bin: &super::BaseWebRTCSink, bus: &gst::Bus) -> Self { + let (sender, receiver) = futures::channel::mpsc::unbounded(); + + let bin_weak = bin.downgrade(); + bus.set_sync_handler(move |_, msg| { + match msg.view() { + gst::MessageView::NeedContext(..) | gst::MessageView::HaveContext(..) => { + if let Some(bin) = bin_weak.upgrade() { + let _ = bin.post_message(msg.to_owned()); + } + } + _ => { + let _ = sender.unbounded_send(msg.to_owned()); + } + } + + gst::BusSyncReply::Drop + }); + + Self { + bus: bus.downgrade(), + receiver, + } + } +} + +impl Drop for CustomBusStream { + fn drop(&mut self) { + if let Some(bus) = self.bus.upgrade() { + bus.unset_sync_handler(); + } + } +} + +impl futures::Stream for CustomBusStream { + type Item = gst::Message; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + context: &mut std::task::Context, + ) -> std::task::Poll> { + self.receiver.poll_next_unpin(context) + } +} + +impl futures::stream::FusedStream for CustomBusStream { + fn is_terminated(&self) -> bool { + self.receiver.is_terminated() + } +} + /// Wrapper around our sink pads #[derive(Debug, Clone)] struct InputStream { @@ -2241,7 +2301,8 @@ impl BaseWebRTCSink { pipeline.set_start_time(gst::ClockTime::NONE); pipeline.set_base_time(element.base_time().unwrap()); - let mut bus_stream = pipeline.bus().unwrap().stream(); + let bus = pipeline.bus().unwrap(); + let mut bus_stream = CustomBusStream::new(&element, &bus); let element_clone = element.downgrade(); let pipeline_clone = pipeline.downgrade(); let session_id_clone = session_id.clone(); @@ -2851,7 +2912,8 @@ impl BaseWebRTCSink { .link(&sink) .with_context(|| format!("Running discovery pipeline for caps {input_caps}"))?; - let mut stream = pipe.0.bus().unwrap().stream(); + let bus = pipe.0.bus().unwrap(); + let mut stream = CustomBusStream::new(element, &bus); pipe.0 .set_state(gst::State::Playing)