webrtcsrc: ensure source pad has msid when added

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1800>
This commit is contained in:
Mathieu Duponchelle 2024-09-23 16:12:31 +02:00 committed by GStreamer Marge Bot
parent f70482d9bc
commit 5c66d8c107
2 changed files with 62 additions and 35 deletions

View file

@ -351,6 +351,7 @@ impl Session {
n_audio_pads: AtomicU16::new(0),
flow_combiner: Mutex::new(gst_base::UniqueFlowCombiner::new()),
request_counter: 0,
pending_srcpads: HashMap::new(),
})
}
@ -370,6 +371,18 @@ impl Session {
})
}
// Maps the `webrtcbin` pad to our exposed source pad using the pad stream ID.
fn take_pending_src_pad(
&mut self,
webrtcbin_src: &gst::Pad,
) -> Option<(WebRTCSrcPad, gst::Caps)> {
self.get_stream_id(
Some(webrtcbin_src.property::<gst_webrtc::WebRTCRTPTransceiver>("transceiver")),
None,
)
.and_then(|stream_id| self.pending_srcpads.remove(&stream_id))
}
// Maps the `webrtcbin` pad to our exposed source pad using the pad stream ID.
fn get_src_pad_from_webrtcbin_pad(
&self,
@ -469,13 +482,13 @@ impl Session {
// - otherwise, encoded filter's srcpad, if requested
// - otherwise, webrtcbin's src pad.
fn handle_webrtc_src_pad(
&self,
&mut self,
bin: &gst::Bin,
webrtcbin_pad: &gst::Pad,
element: &super::BaseWebRTCSrc,
) -> gst::GhostPad {
let srcpad = self.get_src_pad_from_webrtcbin_pad(webrtcbin_pad, element);
if let Some(ref srcpad) = srcpad {
let srcpad_and_caps = self.take_pending_src_pad(webrtcbin_pad);
if let Some((ref srcpad, ref caps)) = srcpad_and_caps {
let stream_id = srcpad.imp().stream_id();
let mut builder = gst::event::StreamStart::builder(&stream_id);
if let Some(stream_start) = webrtcbin_pad.sticky_event::<gst::event::StreamStart>(0) {
@ -493,6 +506,34 @@ impl Session {
webrtcbin_pad.store_sticky_event(&builder.build()).ok();
srcpad.imp().set_webrtc_pad(webrtcbin_pad.downgrade());
element
.add_pad(srcpad)
.expect("Adding ghost pad should never fail");
let media_type = caps
.structure(0)
.expect("Passing empty caps is invalid")
.get::<&str>("media")
.expect("Only caps with a `media` field are expected when creating the pad");
let raw_caps = if media_type == "video" {
VIDEO_CAPS.to_owned()
} else if media_type == "audio" {
AUDIO_CAPS.to_owned()
} else {
unreachable!()
};
let caps_with_raw = [caps.clone(), raw_caps.clone()]
.into_iter()
.collect::<gst::Caps>();
let downstream_caps = srcpad.peer_query_caps(Some(&caps_with_raw));
if let Some(first_struct) = downstream_caps.structure(0) {
if first_struct.has_name(raw_caps.structure(0).unwrap().name()) {
srcpad.imp().set_needs_decoding(true)
}
}
}
let ghostpad = gst::GhostPad::builder(gst::PadDirection::Src)
@ -601,7 +642,7 @@ impl Session {
);
}
if let Some(srcpad) = srcpad {
if let Some((srcpad, _)) = srcpad_and_caps {
let signaller = element.imp().signaller();
// Signalers like WhipServer do not need a peer producer id as they run as a server
@ -727,7 +768,7 @@ impl Session {
}
fn handle_offer(
&self,
&mut self,
offer: &gst_webrtc::WebRTCSessionDescription,
element: &super::BaseWebRTCSrc,
) -> (gst::Promise, gst::Bin) {
@ -1109,8 +1150,8 @@ impl BaseWebRTCSrc {
assert_eq!(desc.type_(), gst_webrtc::WebRTCSDPType::Offer);
let this = instance.imp();
gst::info!(CAT, imp = this, "got sdp offer");
let state = this.state.lock().unwrap();
let Some(session) = state.sessions.get(session_id) else {
let mut state = this.state.lock().unwrap();
let Some(session) = state.sessions.get_mut(session_id) else {
gst::error!(CAT, imp = this, "session {session_id:?} not found");
return;
};
@ -1165,7 +1206,7 @@ impl BaseWebRTCSrc {
&self,
caps: &gst::Caps,
stream_id: &str,
session: &Session,
session: &mut Session,
) -> bool {
gst::log!(CAT, "Creating pad for {caps:?}, stream: {stream_id}");
@ -1176,7 +1217,7 @@ impl BaseWebRTCSrc {
.get::<&str>("media")
.expect("Only caps with a `media` field are expected when creating the pad");
let (template, name, raw_caps) = if media_type == "video" {
let (template, name) = if media_type == "video" {
(
obj.pad_template("video_%s_%u").unwrap(),
format!(
@ -1184,7 +1225,6 @@ impl BaseWebRTCSrc {
session.id,
session.n_video_pads.fetch_add(1, Ordering::SeqCst)
),
VIDEO_CAPS.to_owned(),
)
} else if media_type == "audio" {
(
@ -1194,7 +1234,6 @@ impl BaseWebRTCSrc {
session.id,
session.n_audio_pads.fetch_add(1, Ordering::SeqCst)
),
AUDIO_CAPS.to_owned(),
)
} else {
gst::info!(
@ -1206,24 +1245,15 @@ impl BaseWebRTCSrc {
return false;
};
let caps_with_raw = [caps.clone(), raw_caps.clone()]
.into_iter()
.collect::<gst::Caps>();
let ghost = gst::GhostPad::builder_from_template(&template)
.name(name)
.build()
.downcast::<WebRTCSrcPad>()
.unwrap();
ghost.imp().set_stream_id(stream_id);
obj.add_pad(&ghost)
.expect("Adding ghost pad should never fail");
let downstream_caps = ghost.peer_query_caps(Some(&caps_with_raw));
if let Some(first_struct) = downstream_caps.structure(0) {
if first_struct.has_name(raw_caps.structure(0).unwrap().name()) {
ghost.imp().set_needs_decoding(true)
}
}
session
.pending_srcpads
.insert(stream_id.to_string(), (ghost.clone(), caps.clone()));
true
}
@ -1603,6 +1633,7 @@ struct Session {
n_audio_pads: AtomicU16,
flow_combiner: Mutex<gst_base::UniqueFlowCombiner>,
request_counter: u64,
pending_srcpads: HashMap<String, (WebRTCSrcPad, gst::Caps)>,
}
struct State {
sessions: HashMap<String, Session>,

View file

@ -57,18 +57,14 @@ impl ObjectImpl for WebRTCSrcPad {
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"msid" => {
let msid = self
"msid" => self
.webrtcbin_pad
.lock()
.unwrap()
.as_ref()
.and_then(|p| p.upgrade())
.map(|p| p.property::<String>("msid"))
.unwrap_or_else(|| String::from(""));
msid.to_value()
}
.and_then(|p| p.property::<Option<String>>("msid"))
.to_value(),
name => panic!("no readable property {name:?}"),
}
}