webrtcsink: Propagate GstContext messages

Implement CustomBusStream so that NEED_CONTEXT and HAVE_CONTEXT
messages from session/discovery can be forwarded to parent
pipeline and also GstContext can be shared.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1327>
This commit is contained in:
Seungha Yang 2023-09-10 00:17:18 +09:00 committed by Sebastian Dröge
parent 581787f651
commit 4fc905c9ea

View file

@ -97,6 +97,66 @@ impl Codec {
} }
} }
// Same gst::bus::BusStream but hooking context message from the thread
// where the message is posted, so that GstContext can be shared
#[derive(Debug)]
struct CustomBusStream {
bus: glib::WeakRef<gst::Bus>,
receiver: futures::channel::mpsc::UnboundedReceiver<gst::Message>,
}
impl CustomBusStream {
fn new(bin: &super::WebRTCSink, bus: &gst::Bus) -> Self {
let (sender, receiver) = futures::channel::mpsc::unbounded();
let bin_weak = bin.downgrade();
bus.set_sync_handler(move |_, msg| {
match msg.view() {
gst::MessageView::NeedContext(..) | gst::MessageView::HaveContext(..) => {
if let Some(bin) = bin_weak.upgrade() {
let _ = bin.post_message(msg.to_owned());
}
}
_ => {
let _ = sender.unbounded_send(msg.to_owned());
}
}
gst::BusSyncReply::Drop
});
Self {
bus: bus.downgrade(),
receiver,
}
}
}
impl Drop for CustomBusStream {
fn drop(&mut self) {
if let Some(bus) = self.bus.upgrade() {
bus.unset_sync_handler();
}
}
}
impl futures::Stream for CustomBusStream {
type Item = gst::Message;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
context: &mut std::task::Context,
) -> std::task::Poll<Option<Self::Item>> {
self.receiver.poll_next_unpin(context)
}
}
impl futures::stream::FusedStream for CustomBusStream {
fn is_terminated(&self) -> bool {
self.receiver.is_terminated()
}
}
/// Wrapper around our sink pads /// Wrapper around our sink pads
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct InputStream { struct InputStream {
@ -1765,7 +1825,8 @@ impl WebRTCSink {
pipeline.set_start_time(gst::ClockTime::NONE); pipeline.set_start_time(gst::ClockTime::NONE);
pipeline.set_base_time(element.base_time().unwrap()); pipeline.set_base_time(element.base_time().unwrap());
let mut bus_stream = pipeline.bus().unwrap().stream(); let bus = pipeline.bus().unwrap();
let mut bus_stream = CustomBusStream::new(element, &bus);
let element_clone = element.downgrade(); let element_clone = element.downgrade();
let pipeline_clone = pipeline.downgrade(); let pipeline_clone = pipeline.downgrade();
let session_id_clone = session_id.to_owned(); let session_id_clone = session_id.to_owned();
@ -2211,7 +2272,8 @@ impl WebRTCSink {
src.set_property("num-buffers", 1); src.set_property("num-buffers", 1);
let mut stream = pipe.0.bus().unwrap().stream(); let bus = pipe.0.bus().unwrap();
let mut stream = CustomBusStream::new(element, &bus);
pipe.0 pipe.0
.set_state(gst::State::Playing) .set_state(gst::State::Playing)