From 5c66d8c10785fc49e6ef5e4a851e588a9b879bb0 Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Mon, 23 Sep 2024 16:12:31 +0200 Subject: [PATCH] webrtcsrc: ensure source pad has msid when added Part-of: --- net/webrtc/src/webrtcsrc/imp.rs | 77 +++++++++++++++++++++++---------- net/webrtc/src/webrtcsrc/pad.rs | 20 ++++----- 2 files changed, 62 insertions(+), 35 deletions(-) diff --git a/net/webrtc/src/webrtcsrc/imp.rs b/net/webrtc/src/webrtcsrc/imp.rs index aaee02bf..0709792d 100644 --- a/net/webrtc/src/webrtcsrc/imp.rs +++ b/net/webrtc/src/webrtcsrc/imp.rs @@ -351,6 +351,7 @@ impl Session { n_audio_pads: AtomicU16::new(0), flow_combiner: Mutex::new(gst_base::UniqueFlowCombiner::new()), request_counter: 0, + pending_srcpads: HashMap::new(), }) } @@ -370,6 +371,18 @@ impl Session { }) } + // Maps the `webrtcbin` pad to our exposed source pad using the pad stream ID. + fn take_pending_src_pad( + &mut self, + webrtcbin_src: &gst::Pad, + ) -> Option<(WebRTCSrcPad, gst::Caps)> { + self.get_stream_id( + Some(webrtcbin_src.property::("transceiver")), + None, + ) + .and_then(|stream_id| self.pending_srcpads.remove(&stream_id)) + } + // Maps the `webrtcbin` pad to our exposed source pad using the pad stream ID. fn get_src_pad_from_webrtcbin_pad( &self, @@ -469,13 +482,13 @@ impl Session { // - otherwise, encoded filter's srcpad, if requested // - otherwise, webrtcbin's src pad. fn handle_webrtc_src_pad( - &self, + &mut self, bin: &gst::Bin, webrtcbin_pad: &gst::Pad, element: &super::BaseWebRTCSrc, ) -> gst::GhostPad { - let srcpad = self.get_src_pad_from_webrtcbin_pad(webrtcbin_pad, element); - if let Some(ref srcpad) = srcpad { + let srcpad_and_caps = self.take_pending_src_pad(webrtcbin_pad); + if let Some((ref srcpad, ref caps)) = srcpad_and_caps { let stream_id = srcpad.imp().stream_id(); let mut builder = gst::event::StreamStart::builder(&stream_id); if let Some(stream_start) = webrtcbin_pad.sticky_event::(0) { @@ -493,6 +506,34 @@ impl Session { webrtcbin_pad.store_sticky_event(&builder.build()).ok(); srcpad.imp().set_webrtc_pad(webrtcbin_pad.downgrade()); + + element + .add_pad(srcpad) + .expect("Adding ghost pad should never fail"); + let media_type = caps + .structure(0) + .expect("Passing empty caps is invalid") + .get::<&str>("media") + .expect("Only caps with a `media` field are expected when creating the pad"); + + let raw_caps = if media_type == "video" { + VIDEO_CAPS.to_owned() + } else if media_type == "audio" { + AUDIO_CAPS.to_owned() + } else { + unreachable!() + }; + + let caps_with_raw = [caps.clone(), raw_caps.clone()] + .into_iter() + .collect::(); + + let downstream_caps = srcpad.peer_query_caps(Some(&caps_with_raw)); + if let Some(first_struct) = downstream_caps.structure(0) { + if first_struct.has_name(raw_caps.structure(0).unwrap().name()) { + srcpad.imp().set_needs_decoding(true) + } + } } let ghostpad = gst::GhostPad::builder(gst::PadDirection::Src) @@ -601,7 +642,7 @@ impl Session { ); } - if let Some(srcpad) = srcpad { + if let Some((srcpad, _)) = srcpad_and_caps { let signaller = element.imp().signaller(); // Signalers like WhipServer do not need a peer producer id as they run as a server @@ -727,7 +768,7 @@ impl Session { } fn handle_offer( - &self, + &mut self, offer: &gst_webrtc::WebRTCSessionDescription, element: &super::BaseWebRTCSrc, ) -> (gst::Promise, gst::Bin) { @@ -1109,8 +1150,8 @@ impl BaseWebRTCSrc { assert_eq!(desc.type_(), gst_webrtc::WebRTCSDPType::Offer); let this = instance.imp(); gst::info!(CAT, imp = this, "got sdp offer"); - let state = this.state.lock().unwrap(); - let Some(session) = state.sessions.get(session_id) else { + let mut state = this.state.lock().unwrap(); + let Some(session) = state.sessions.get_mut(session_id) else { gst::error!(CAT, imp = this, "session {session_id:?} not found"); return; }; @@ -1165,7 +1206,7 @@ impl BaseWebRTCSrc { &self, caps: &gst::Caps, stream_id: &str, - session: &Session, + session: &mut Session, ) -> bool { gst::log!(CAT, "Creating pad for {caps:?}, stream: {stream_id}"); @@ -1176,7 +1217,7 @@ impl BaseWebRTCSrc { .get::<&str>("media") .expect("Only caps with a `media` field are expected when creating the pad"); - let (template, name, raw_caps) = if media_type == "video" { + let (template, name) = if media_type == "video" { ( obj.pad_template("video_%s_%u").unwrap(), format!( @@ -1184,7 +1225,6 @@ impl BaseWebRTCSrc { session.id, session.n_video_pads.fetch_add(1, Ordering::SeqCst) ), - VIDEO_CAPS.to_owned(), ) } else if media_type == "audio" { ( @@ -1194,7 +1234,6 @@ impl BaseWebRTCSrc { session.id, session.n_audio_pads.fetch_add(1, Ordering::SeqCst) ), - AUDIO_CAPS.to_owned(), ) } else { gst::info!( @@ -1206,24 +1245,15 @@ impl BaseWebRTCSrc { return false; }; - let caps_with_raw = [caps.clone(), raw_caps.clone()] - .into_iter() - .collect::(); let ghost = gst::GhostPad::builder_from_template(&template) .name(name) .build() .downcast::() .unwrap(); ghost.imp().set_stream_id(stream_id); - obj.add_pad(&ghost) - .expect("Adding ghost pad should never fail"); - - let downstream_caps = ghost.peer_query_caps(Some(&caps_with_raw)); - if let Some(first_struct) = downstream_caps.structure(0) { - if first_struct.has_name(raw_caps.structure(0).unwrap().name()) { - ghost.imp().set_needs_decoding(true) - } - } + session + .pending_srcpads + .insert(stream_id.to_string(), (ghost.clone(), caps.clone())); true } @@ -1603,6 +1633,7 @@ struct Session { n_audio_pads: AtomicU16, flow_combiner: Mutex, request_counter: u64, + pending_srcpads: HashMap, } struct State { sessions: HashMap, diff --git a/net/webrtc/src/webrtcsrc/pad.rs b/net/webrtc/src/webrtcsrc/pad.rs index b14fef75..afe78482 100644 --- a/net/webrtc/src/webrtcsrc/pad.rs +++ b/net/webrtc/src/webrtcsrc/pad.rs @@ -57,18 +57,14 @@ impl ObjectImpl for WebRTCSrcPad { } fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { match pspec.name() { - "msid" => { - let msid = self - .webrtcbin_pad - .lock() - .unwrap() - .as_ref() - .and_then(|p| p.upgrade()) - .map(|p| p.property::("msid")) - .unwrap_or_else(|| String::from("")); - - msid.to_value() - } + "msid" => self + .webrtcbin_pad + .lock() + .unwrap() + .as_ref() + .and_then(|p| p.upgrade()) + .and_then(|p| p.property::>("msid")) + .to_value(), name => panic!("no readable property {name:?}"), } }