From 04757ea61af7d76082494b7194f4b2e3ab79513c Mon Sep 17 00:00:00 2001 From: Jordan Yelloz Date: Fri, 2 Feb 2024 15:40:26 -0700 Subject: [PATCH] webrtcsink: Added video simulcast support This is designed to work with LiveKit server but hopefully will not require much work for other systems supporting simulcasts --- net/webrtc/src/webrtcsink/imp.rs | 610 +++++++++++++++++++++++++++---- 1 file changed, 541 insertions(+), 69 deletions(-) diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index 847352e9..bced97ec 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -42,6 +42,8 @@ const D3D11_MEMORY_FEATURE: &str = "memory:D3D11Memory"; const RTP_TWCC_URI: &str = "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01"; +const RTP_MID_URI: &str = "urn:ietf:params:rtp-hdrext:sdes:mid"; +const RTP_RID_URI: &str = "urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id"; const DEFAULT_STUN_SERVER: Option<&str> = Some("stun://stun.l.google.com:19302"); const DEFAULT_MIN_BITRATE: u32 = 1000; @@ -66,6 +68,190 @@ const DEFAULT_START_BITRATE: u32 = 2048000; #[cfg(feature = "v1_22")] const DO_FEC_THRESHOLD: u32 = 2000000; +/// Internally unique default IDs for RTP Extensions +#[derive(Debug, Clone, Copy, Hash, PartialOrd, Ord, PartialEq, Eq)] +#[repr(u32)] +enum RTPExtensionId { + Twcc = 1, + Mid, + Rid, +} + +impl RTPExtensionId { + pub const fn default_id(self) -> u32 { + self as u32 + } + pub const fn uri(self) -> &'static str { + match self { + Self::Twcc => RTP_TWCC_URI, + Self::Mid => RTP_MID_URI, + Self::Rid => RTP_RID_URI, + } + } + pub fn create(&self) -> Option { + gst_rtp::RTPHeaderExtension::create_from_uri(self.uri()) + } +} + +struct Simulcast { + mid: String, + funnel: gst::Element, + capsfilter: gst::Element, + webrtcbin_pad: gst::Pad, +} + +impl Simulcast { + pub fn new( + mid: &str, + webrtcbin: &gst::Element, + parent: &gst::Bin, + settings: &Settings, + ) -> anyhow::Result { + let webrtcbin_pad = webrtcbin + .request_pad_simple("sink_%u") + .ok_or_else(|| anyhow::anyhow!("failed to request sinkpad from webrtcbin"))?; + let funnel = make_element("rtpfunnel", Some(&format!("simulcast_{mid}")))?; + let capsfilter = make_element("capsfilter", Some(&format!("caps_{mid}")))?; + parent.add_many([&funnel, &capsfilter])?; + funnel.link(&capsfilter)?; + let capsfilter_srcpad = capsfilter.static_pad("src").unwrap(); + capsfilter_srcpad.link(&webrtcbin_pad)?; + + let transceiver = webrtcbin_pad.property::("transceiver"); + transceiver.set_direction(gst_webrtc::WebRTCRTPTransceiverDirection::Sendonly); + settings.configure_video_transceiver(&transceiver); + + Ok(Self { + mid: mid.to_string(), + funnel, + capsfilter, + webrtcbin_pad, + }) + } + fn transceiver(&self) -> gst_webrtc::WebRTCRTPTransceiver { + self.webrtcbin_pad.property("transceiver") + } + fn set_caps(&self, caps: gst::Caps) { + self.transceiver().set_codec_preferences(Some(&caps)); + self.capsfilter.set_property("caps", &caps); + } + /// This should be set once the WebRTCPads are built for each InputStream + pub fn set_msid_from_pads<'a, P: Iterator>(&self, pads: P) { + let msids: HashSet<&str> = pads.filter_map(WebRTCPad::stream_msid).collect(); + if msids.len() > 1 { + gst::warning!( + CAT, + "multiple MSIDs {msids:?} for members of {}, ignoring", + self.mid, + ); + return; + } + if let Some(msid) = msids.into_iter().next() { + self.webrtcbin_pad.set_property("msid", msid); + } + } + /// This should be set once the WebRTCPads are built for each InputStream + pub fn set_caps_from_pads<'a, P: Iterator>(&self, pads: P) { + let simulcast_members = pads + .filter(|pad| pad.stream_mid() == Some(&self.mid)) + .filter_map(|pad| pad.stream_rid.clone().zip(Some(pad.payloader_caps.copy()))) + .collect::>(); + let mut caps = gst::Caps::new_empty(); + let caps_mut = caps.make_mut(); + let mut simulcast_rids = vec![]; + for (rid, rid_caps) in simulcast_members { + caps_mut.append(rid_caps); + simulcast_rids.push(rid); + } + let simulcast_rids = simulcast_rids.join(";"); + let simulcast_field = format!("send {simulcast_rids}"); + caps_mut.set("a-mid", &self.mid); + caps_mut.set("a-simulcast", simulcast_field); + self.set_caps(caps); + } + /// Creates video height and width RID restriction parameters from video caps + /// + /// This provides information about the video dimensions over SDP but also + /// can be used by the signalling client to inform the server about each + /// simulcast component's dimensions. + /// https://datatracker.ietf.org/doc/html/rfc8851#sec-rid_level_restrictions + fn video_caps_to_restrictions(structure: &gst::StructureRef) -> String { + let max_width = structure + .get::("width") + .ok() + .map(|width| format!("max-width={width}")); + let max_height = structure + .get::("height") + .ok() + .map(|height| format!("max-height={height}")); + + max_width + .into_iter() + .chain(max_height) + .collect::>() + .join(";") + } + pub fn add_rid_to_caps_structure( + rid: &str, + caps: &mut gst::Structure, + video_caps: Option<&gst::Caps>, + ) { + caps.set( + format!("rid-{rid}"), + Some(&Self::format_rid_restrictions(video_caps)), + ); + } + pub fn add_rid_to_caps(rid: &str, caps: &mut gst::CapsRef, video_caps: Option<&gst::Caps>) { + caps.set( + format!("rid-{rid}"), + Some(&Self::format_rid_restrictions(video_caps)), + ); + } + pub fn format_rid_restrictions(video_caps: Option<&gst::Caps>) -> String { + if let Some(caps) = video_caps { + let restrictions = caps + .structure(0) + .map(Self::video_caps_to_restrictions) + .unwrap_or_default(); + format!("send {restrictions}") + } else { + "send".to_string() + } + } +} + +/// Expresses a group of Pads that contribute to a single track. +/// Components of each track are grouped by MID and discriminated by RID. +#[derive(Default)] +struct Simulcasts { + simulcasts: HashMap, +} + +impl Simulcasts { + pub fn get(&self, mid: &str) -> Option<&Simulcast> { + self.simulcasts.get(mid) + } + pub fn funnel(&self, mid: &str) -> Option { + self.get(mid).map(|simulcast| simulcast.funnel.clone()) + } + pub fn add( + &mut self, + mid: &str, + webrtcbin: &gst::Element, + parent: &gst::Bin, + settings: &Settings, + ) -> anyhow::Result<()> { + if !self.simulcasts.contains_key(mid) { + let simulcast = Simulcast::new(mid, webrtcbin, parent, settings)?; + self.simulcasts.insert(mid.to_string(), simulcast); + } + Ok(()) + } + pub fn iter(&self) -> impl Iterator { + self.simulcasts.iter() + } +} + #[derive(Debug, Clone, Copy)] struct CCInfo { heuristic: WebRTCSinkCongestionControl, @@ -91,6 +277,15 @@ struct Settings { signaller: Signallable, } +impl Settings { + pub fn configure_video_transceiver(&self, trans: &gst_webrtc::WebRTCRTPTransceiver) { + if self.do_fec { + trans.set_property("fec-type", gst_webrtc::WebRTCFECType::UlpRed); + } + trans.set_property("do-nack", self.do_retransmission); + } +} + #[derive(Debug, Clone)] struct DiscoveryInfo { id: String, @@ -200,6 +395,8 @@ struct InputStream { is_video: bool, /// Whether initial discovery has started initial_discovery_started: bool, + /// The index of the SDP media associated with this stream + mline_index: Option, } /// Wrapper around webrtcbin pads @@ -215,8 +412,35 @@ struct WebRTCPad { /// When None, the pad was only created to mark its transceiver /// as inactive (in the case where we answer an offer). stream_name: Option, + /// The user-specified MSID value + stream_msid: Option, + /// The user-specified MID value for Simulcasts + stream_mid: Option, + /// The user-specified RID value for Simulcasts + stream_rid: Option, /// The payload selected in the answer, None at first payload: Option, + /// The caps produced by the payloader. + /// Used by simulcasts to combine caps from multiple payloaders + payloader_caps: gst::Caps, +} + +impl WebRTCPad { + fn transceiver(&self) -> gst_webrtc::WebRTCRTPTransceiver { + self.pad.property("transceiver") + } + fn mid(&self) -> Option { + self.transceiver().mid().map(String::from) + } + fn stream_msid(&self) -> Option<&str> { + self.stream_msid.as_deref() + } + fn stream_mid(&self) -> Option<&str> { + self.stream_mid.as_deref() + } + fn stream_rid(&self) -> Option<&str> { + self.stream_rid.as_deref() + } } /// Wrapper around GStreamer encoder element, keeps track of factory @@ -265,6 +489,8 @@ struct Session { codecs: Option>, stats_collection_handle: Option>, + + simulcasts: Simulcasts, } #[derive(Debug, PartialEq, Eq, Copy, Clone)] @@ -436,6 +662,9 @@ struct State { mids: HashMap, signaller_signals: Option, finalizing_sessions: Arc<(Mutex>, Condvar)>, + /// Used in simulcasts to re-apply the MID/RID extensions when connecting + /// each payloader to the webrtcbin + rtp_header_extensions: HashMap, } fn create_navigation_event(sink: &super::BaseWebRTCSink, msg: &str) { @@ -541,6 +770,7 @@ impl Default for State { mids: HashMap::new(), signaller_signals: Default::default(), finalizing_sessions: Arc::new((Mutex::new(HashSet::new()), Condvar::new())), + rtp_header_extensions: HashMap::new(), } } } @@ -1165,6 +1395,10 @@ impl State { discos.remove(position); } } + + fn register_rtp_header_extension(&mut self, ext: RTPExtensionId, id: u32) -> u32 { + *self.rtp_header_extensions.entry(ext).or_insert(id) + } } impl Session { @@ -1195,6 +1429,7 @@ impl Session { stats_sigid: None, codecs: None, stats_collection_handle: None, + simulcasts: Simulcasts::default(), } } @@ -1319,6 +1554,8 @@ impl Session { &payloader, &codec, Some(webrtc_pad.ssrc), + webrtc_pad.stream_mid(), + webrtc_pad.stream_rid(), Some(&caps), ExtensionConfigurationType::Skip, )?; @@ -1338,14 +1575,15 @@ impl Session { let s = caps.structure(0).unwrap(); let mut filtered_s = gst::Structure::new_empty("application/x-rtp"); - filtered_s.extend(s.iter().filter_map(|(key, value)| { - if key.starts_with("a-") { - None - } else { - Some((key, value.to_owned())) - } + filtered_s.extend(s.iter().filter_map(|(key, value)| match key { + key if key.starts_with("a-") => None, + key if key.starts_with("rid-") => None, + key => Some((key, value.to_owned())), })); filtered_s.set("ssrc", webrtc_pad.ssrc); + if let Some(rid) = &webrtc_pad.stream_rid { + Simulcast::add_rid_to_caps_structure(rid, &mut filtered_s, Some(&webrtc_pad.in_caps)); + } let caps = gst::Caps::builder_full().structure(filtered_s).build(); @@ -1405,8 +1643,19 @@ impl Session { let srcpad = pay_filter.static_pad("src").unwrap(); + let sink_pad = if let Some(funnel) = webrtc_pad + .mid() + .and_then(|mid| self.simulcasts.funnel(&mid)) + { + funnel + .request_pad_simple("sink_%u") + .ok_or_else(|| anyhow::anyhow!("Failed to request funnel pad"))? + } else { + webrtc_pad.pad.clone() + }; + srcpad - .link(&webrtc_pad.pad) + .link(&sink_pad) .with_context(|| format!("Connecting input stream for {}", self.peer_id))?; match producer.add_consumer(&appsrc) { @@ -1480,6 +1729,41 @@ impl InputStream { fn msid(&self) -> Option { self.sink_pad.property("msid") } + + fn mid(&self) -> Option { + self.sink_pad.property("mid") + } + + fn rid(&self) -> Option { + self.sink_pad.property("rid") + } + + fn mid_and_rid(&self) -> Option<(String, String)> { + self.mid().zip(self.rid()) + } + + /// When grouping pads/streams by MID for simulcasts, each member will also + /// share the same mline index, causing them to contribute to the same + /// SDP media section. + /// The index will only be incremented for new groups or the default case of + /// ungrouped pads. + fn compute_mline_indexes(streams: &mut [Self]) { + let mut mlines_by_mid = HashMap::new(); + let mut index = 0; + for stream in streams { + if let Some(mid) = stream.mid() { + let entry = mlines_by_mid.entry(mid.clone()).or_insert_with(|| { + let current_index = index; + index += 1; + current_index + }); + stream.mline_index = Some(*entry); + } else { + stream.mline_index = Some(index); + index += 1; + } + } + } } impl NavigationEventHandler { @@ -1512,6 +1796,7 @@ impl NavigationEventHandler { } /// How to configure RTP extensions for payloaders, if at all +#[derive(Clone, Copy)] enum ExtensionConfigurationType { /// Skip configuration, do not add any extensions Skip, @@ -1548,7 +1833,9 @@ impl BaseWebRTCSink { return Ok(()); } - let Some(twcc_id) = self.pick_twcc_extension_id(payloader, extension_configuration_type) + let ext_id = RTPExtensionId::Twcc; + + let Some(twcc_id) = self.pick_extension_id(payloader, ext_id, extension_configuration_type) else { return Ok(()); }; @@ -1561,9 +1848,10 @@ impl BaseWebRTCSink { * concept of *transport-wide* congestion control, and firefox doesn't * provide feedback for audio packets. */ - if let Some(twcc_extension) = gst_rtp::RTPHeaderExtension::create_from_uri(RTP_TWCC_URI) { + if let Some(twcc_extension) = RTPExtensionId::Twcc.create() { twcc_extension.set_id(twcc_id); payloader.emit_by_name::<()>("add-extension", &[&twcc_extension]); + self.register_rtp_header_extension(ext_id, twcc_id); } else { anyhow::bail!("Failed to add TWCC extension, make sure 'gst-plugins-good:rtpmanager' is installed"); } @@ -1585,41 +1873,112 @@ impl BaseWebRTCSink { ) } - /// Returns Some with an available ID for TWCC extension or None if it's already configured - fn pick_twcc_extension_id( + fn configure_mid( &self, payloader: &gst::Element, + mid: &str, extension_configuration_type: ExtensionConfigurationType, + ) -> anyhow::Result<()> { + if let ExtensionConfigurationType::Skip = extension_configuration_type { + return Ok(()); + } + + let ext_id = RTPExtensionId::Mid; + + let Some(id) = self.pick_extension_id(payloader, ext_id, extension_configuration_type) + else { + return Ok(()); + }; + + gst::debug!(CAT, obj: payloader, "Mapping MID extension to ID {id}, mid={mid}"); + + if let Some(ext) = RTPExtensionId::Mid.create() { + ext.set_id(id); + ext.set_property("mid", mid); + payloader.emit_by_name::<()>("add-extension", &[&ext]); + self.register_rtp_header_extension(ext_id, id); + } else { + anyhow::bail!( + "Failed to add MID extension, make sure 'gst-plugins-good:rtpmanager' is installed" + ); + } + + Ok(()) + } + + fn configure_rid( + &self, + payloader: &gst::Element, + rid: &str, + extension_configuration_type: ExtensionConfigurationType, + ) -> anyhow::Result<()> { + if let ExtensionConfigurationType::Skip = extension_configuration_type { + return Ok(()); + } + + let ext_id = RTPExtensionId::Rid; + + let Some(id) = self.pick_extension_id(payloader, ext_id, extension_configuration_type) + else { + return Ok(()); + }; + + gst::debug!(CAT, obj: payloader, "Mapping RID extension to ID {id}, rid={rid}"); + + if let Some(ext) = RTPExtensionId::Rid.create() { + ext.set_id(id); + ext.set_property("rid", rid); + payloader.emit_by_name::<()>("add-extension", &[&ext]); + self.register_rtp_header_extension(ext_id, id); + } else { + anyhow::bail!( + "Failed to add RID extension, make sure 'gst-plugins-good:rtpmanager' is installed" + ); + } + + Ok(()) + } + + /// Returns Some with an available ID for the extension or None if it's already configured + fn pick_extension_id( + &self, + payloader: &gst::Element, + ext: RTPExtensionId, + config: ExtensionConfigurationType, ) -> Option { - match extension_configuration_type { + gst::debug!(CAT, obj: payloader, "Picking extension id ext={ext:?}"); + match config { + ExtensionConfigurationType::Skip => unreachable!(), + ExtensionConfigurationType::Apply { twcc_id } => Some(twcc_id), ExtensionConfigurationType::Auto => { // GstRTPBasePayload::extensions property is only available since GStreamer 1.24 if !payloader.has_property("extensions", Some(gst::Array::static_type())) { + let default_id = ext.default_id(); if self.has_connected_payloader_setup_slots() { - gst::warning!(CAT, "'extensions' property is not available: TWCC extension ID will default to 1. \ - Application code must ensure to pick non-conflicting IDs for any additionally configured extensions. \ - Please consider updating GStreamer to 1.24."); + gst::warning!(CAT, obj: payloader, "'extensions' property is not available: {ext:?} extension ID will default to {default_id}. \ + Application code must ensure to pick non-conflicting IDs for any additionally configured extensions. \ + Please consider updating GStreamer to 1.24."); } - return Some(1); + gst::debug!(CAT, obj: payloader, "Using default {default_id} for {ext:?}"); + return Some(default_id); } let enabled_extensions: gst::Array = payloader.property("extensions"); - let twcc = enabled_extensions + let existing_extension = enabled_extensions .iter() - .find(|value| { - let value = value.get::().unwrap(); - - match value.uri() { - Some(v) => v == RTP_TWCC_URI, - None => false, + .filter_map(|value| value.get::().ok()) + .find(|element| { + if let Some(uri) = element.uri() { + uri == ext.uri() + } else { + false } - }) - .map(|value| value.get::().unwrap()); + }); - if let Some(ext) = twcc { - gst::debug!(CAT, obj: payloader, "TWCC extension is already mapped to id {} by application", ext.id()); + if let Some(ext) = existing_extension { + gst::debug!(CAT, obj: payloader, "{ext:?} extension is already mapped to id {} by application", ext.id()); return None; } @@ -1631,11 +1990,16 @@ impl BaseWebRTCSink { Some(ext_id) } - ExtensionConfigurationType::Apply { twcc_id } => Some(twcc_id), - ExtensionConfigurationType::Skip => unreachable!(), } } + fn register_rtp_header_extension(&self, extension: RTPExtensionId, id: u32) -> u32 { + self.state + .lock() + .unwrap() + .register_rtp_header_extension(extension, id) + } + #[allow(clippy::too_many_arguments)] fn configure_payloader( &self, @@ -1644,6 +2008,8 @@ impl BaseWebRTCSink { payloader: &gst::Element, codec: &Codec, ssrc: Option, + mid: Option<&str>, + rid: Option<&str>, caps: Option<&gst::Caps>, extension_configuration_type: ExtensionConfigurationType, ) -> Result<(), Error> { @@ -1696,7 +2062,45 @@ impl BaseWebRTCSink { } } - self.configure_congestion_control(payloader, codec, extension_configuration_type) + self.configure_congestion_control(payloader, codec, extension_configuration_type)?; + + if let Some(mid) = mid { + self.configure_mid(payloader, mid, extension_configuration_type)?; + } + if let Some(rid) = rid { + self.configure_rid(payloader, rid, extension_configuration_type)?; + } + + if matches!( + extension_configuration_type, + ExtensionConfigurationType::Skip + ) { + gst::debug!(CAT, obj: payloader, "Re-applying stored extension configurations"); + for (ext, id) in &self.state.lock().unwrap().rtp_header_extensions { + let Some(rtp_ext) = ext.create() else { + continue; + }; + rtp_ext.set_id(*id); + match ext { + RTPExtensionId::Mid => { + if let Some(mid) = mid { + gst::debug!(CAT, obj: payloader, "Re-applying mid={mid:?}"); + rtp_ext.set_property("mid", mid); + } + } + RTPExtensionId::Rid => { + if let Some(rid) = rid { + gst::debug!(CAT, obj: payloader, "Re-applying rid={rid:?}"); + rtp_ext.set_property("rid", rid); + } + } + _ => {} + }; + payloader.emit_by_name::<()>("add-extension", &[&rtp_ext]); + } + } + + Ok(()) } fn generate_ssrc( @@ -1718,9 +2122,9 @@ impl BaseWebRTCSink { webrtcbin: &gst::Element, webrtc_pads: &mut HashMap, is_video: bool, + media_idx: u32, ) { let ssrc = BaseWebRTCSink::generate_ssrc(element, webrtc_pads); - let media_idx = webrtc_pads.len() as i32; let Some(pad) = webrtcbin.request_pad_simple(&format!("sink_{}", media_idx)) else { gst::error!(CAT, obj: element, "Failed to request pad from webrtcbin"); @@ -1750,14 +2154,19 @@ impl BaseWebRTCSink { WebRTCPad { pad, in_caps: gst::Caps::new_empty(), - media_idx: media_idx as u32, + media_idx, ssrc, stream_name: None, + stream_msid: None, + stream_mid: None, + stream_rid: None, payload: None, + payloader_caps, }, ); } + #[allow(clippy::too_many_arguments)] async fn request_webrtcbin_pad( element: &super::BaseWebRTCSink, webrtcbin: &gst::Element, @@ -1766,9 +2175,10 @@ impl BaseWebRTCSink { settings: &Settings, webrtc_pads: &mut HashMap, codecs: &mut BTreeMap, + media_idx: u32, + simulcasts: &Simulcasts, ) { let ssrc = BaseWebRTCSink::generate_ssrc(element, webrtc_pads); - let media_idx = webrtc_pads.len() as i32; let mut payloader_caps = match media { Some(media) => { @@ -1776,6 +2186,7 @@ impl BaseWebRTCSink { let codec = BaseWebRTCSink::select_codec( element, + &stream.sink_pad, &discovery_info, media, &stream.in_caps.as_ref().unwrap().clone(), @@ -1811,6 +2222,7 @@ impl BaseWebRTCSink { webrtcbin, webrtc_pads, stream.is_video, + media_idx, ); } else { let payloader_caps_mut = payloader_caps.make_mut(); @@ -1875,47 +2287,70 @@ impl BaseWebRTCSink { payloader_caps ); - let Some(pad) = webrtcbin.request_pad_simple(&format!("sink_{}", media_idx)) else { - gst::error!(CAT, obj: element, "Failed to request pad from webrtcbin"); - gst::element_error!( - element, - gst::StreamError::Failed, - ["Failed to request pad from webrtcbin"] + let mid = stream.mid(); + let pad = if let Some((rid, simulcast)) = stream + .mid_and_rid() + .filter(|_| stream.is_video) + .and_then(|(mid, rid)| Some(rid).zip(simulcasts.get(&mid))) + { + let Simulcast { + webrtcbin_pad: pad, .. + } = simulcast; + let payloader_caps_mut = payloader_caps.make_mut(); + Simulcast::add_rid_to_caps(&rid, payloader_caps_mut, stream.in_caps.as_ref()); + gst::debug!( + CAT, + obj: element, + "Have simulcast pad {:?}, media_idx={media_idx}, ssrc={ssrc}, mid={mid:?}, rid={rid}", + pad.name(), ); - return; - }; + pad.clone() + } else { + gst::debug!(CAT, obj: element, "Requesting singlecast pad {media_idx}"); + let Some(pad) = webrtcbin.request_pad_simple(&format!("sink_{}", media_idx)) else { + gst::error!(CAT, obj: element, "Failed to request pad from webrtcbin"); + gst::element_error!( + element, + gst::StreamError::Failed, + ["Failed to request pad from webrtcbin"] + ); + return; + }; - if let Some(msid) = stream.msid() { - gst::trace!(CAT, obj: element, "forwarding msid={msid:?} to webrtcbin sinkpad"); - pad.set_property("msid", &msid); - } - - let transceiver = pad.property::("transceiver"); - - transceiver.set_property( - "direction", - gst_webrtc::WebRTCRTPTransceiverDirection::Sendonly, - ); - - transceiver.set_property("codec-preferences", &payloader_caps); - - if stream.sink_pad.name().starts_with("video_") { - if settings.do_fec { - transceiver.set_property("fec-type", gst_webrtc::WebRTCFECType::UlpRed); + if let Some(msid) = stream.msid() { + gst::trace!(CAT, obj: element, "forwarding msid={msid:?} to webrtcbin sinkpad"); + pad.set_property("msid", &msid); } - transceiver.set_property("do-nack", settings.do_retransmission); - } + let transceiver = pad.property::("transceiver"); + + transceiver.set_property( + "direction", + gst_webrtc::WebRTCRTPTransceiverDirection::Sendonly, + ); + + transceiver.set_property("codec-preferences", &payloader_caps); + + if stream.sink_pad.name().starts_with("video_") { + settings.configure_video_transceiver(&transceiver); + } + + pad + }; webrtc_pads.insert( ssrc, WebRTCPad { pad, in_caps: stream.in_caps.as_ref().unwrap().clone(), - media_idx: media_idx as u32, + media_idx, ssrc, stream_name: Some(stream.sink_pad.name().to_string()), + stream_msid: stream.msid(), + stream_mid: stream.mid(), + stream_rid: stream.rid(), payload: None, + payloader_caps, }, ); } @@ -2244,6 +2679,7 @@ impl BaseWebRTCSink { async fn select_codec( element: &super::BaseWebRTCSink, + pad: &WebRTCSinkPad, discovery_info: &DiscoveryInfo, media: &gst_sdp::SDPMediaRef, in_caps: &gst::Caps, @@ -2339,6 +2775,7 @@ impl BaseWebRTCSink { BaseWebRTCSink::run_discovery_pipeline( element, + pad, stream_name, discovery_info, codec.clone(), @@ -2831,6 +3268,7 @@ impl BaseWebRTCSink { let mut streams: Vec = state.streams.values().cloned().collect(); streams.sort_by_key(|s| s.serial); + InputStream::compute_mline_indexes(&mut streams); let element_clone = element.downgrade(); let offer_clone = offer.cloned(); @@ -2843,9 +3281,10 @@ impl BaseWebRTCSink { let mut webrtc_pads: HashMap = HashMap::new(); let mut codecs: BTreeMap = BTreeMap::new(); + let mut simulcasts = Simulcasts::default(); if let Some(ref offer) = offer_clone { - for media in offer.sdp().medias() { + for (i, media) in offer.sdp().medias().enumerate() { let media_is_video = match media.media() { Some("audio") => false, Some("video") => true, @@ -2874,6 +3313,8 @@ impl BaseWebRTCSink { &settings_clone, &mut webrtc_pads, &mut codecs, + i as u32, + &simulcasts, ) .await; } else { @@ -2882,22 +3323,41 @@ impl BaseWebRTCSink { &webrtcbin, &mut webrtc_pads, media_is_video, + i as u32, ); } } } else { - for mut stream in streams { + for mid in streams.iter().filter(|s| s.is_video).filter_map(|s| s.mid()) { + if let Err(e) = simulcasts.add(&mid, &webrtcbin, pipeline.upcast_ref(), &settings_clone) { + gst::error!(CAT, obj: element, "Failed to add simulcast for {mid}: {e:?}"); + gst::element_error!( + element, + gst::StreamError::Failed, + ["Failed to create simulcast for {}", mid] + ); + return; + } + } + for stream in &mut streams { + let media_idx = stream.mline_index.unwrap(); BaseWebRTCSink::request_webrtcbin_pad( &element, &webrtcbin, - &mut stream, + stream, None, &settings_clone, &mut webrtc_pads, &mut codecs, + media_idx, + &simulcasts, ) .await; } + for (_, simulcast) in simulcasts.iter() { + simulcast.set_msid_from_pads(webrtc_pads.values()); + simulcast.set_caps_from_pads(webrtc_pads.values()); + } } let enable_data_channel_navigation = settings_clone.enable_data_channel_navigation; @@ -2909,6 +3369,7 @@ impl BaseWebRTCSink { if let Some(session) = state.sessions.get_mut(&session_id) { let session = session.unwrap_mut(); session.webrtc_pads = webrtc_pads; + session.simulcasts = simulcasts; if offer_clone.is_some() { session.codecs = Some(codecs); } @@ -3307,8 +3768,10 @@ impl BaseWebRTCSink { } } + #[allow(clippy::too_many_arguments)] async fn run_discovery_pipeline( element: &super::BaseWebRTCSink, + pad: &WebRTCSinkPad, stream_name: &str, discovery_info: &DiscoveryInfo, codec: Codec, @@ -3367,12 +3830,16 @@ impl BaseWebRTCSink { ); } + let mid = pad.property::>("mid"); + let rid = pad.property::>("rid"); element.imp().configure_payloader( "discovery", stream_name, &payloader, &codec, None, + mid.as_deref(), + rid.as_deref(), None, extension_configuration_type, )?; @@ -3498,6 +3965,7 @@ impl BaseWebRTCSink { async fn lookup_caps( element: &super::BaseWebRTCSink, + pad: &WebRTCSinkPad, discovery_info: DiscoveryInfo, name: String, output_caps: gst::Caps, @@ -3517,6 +3985,7 @@ impl BaseWebRTCSink { vec![BaseWebRTCSink::run_discovery_pipeline( element, + pad, &name, &discovery_info, codec, @@ -3539,6 +4008,7 @@ impl BaseWebRTCSink { .map(|codec| { BaseWebRTCSink::run_discovery_pipeline( element, + pad, &name, &discovery_info, codec.clone(), @@ -3710,7 +4180,7 @@ impl BaseWebRTCSink { } } - fn start_stream_discovery_if_needed(&self, stream_name: &str) { + fn start_stream_discovery_if_needed(&self, pad: &WebRTCSinkPad, stream_name: &str) { let (codecs, discovery_info) = { let mut state = self.state.lock().unwrap(); @@ -3744,11 +4214,12 @@ impl BaseWebRTCSink { }; let stream_name_clone = stream_name.to_owned(); - RUNTIME.spawn(glib::clone!(@weak self as this, @strong discovery_info => async move { + RUNTIME.spawn(glib::clone!(@weak self as this, @strong pad, @strong discovery_info => async move { let element = &*this.obj(); let (fut, handle) = futures::future::abortable( Self::lookup_caps( element, + &pad, discovery_info.clone(), stream_name_clone.clone(), gst::Caps::new_any(), @@ -3797,10 +4268,10 @@ impl BaseWebRTCSink { fn chain( &self, - pad: &gst::GhostPad, + pad: &WebRTCSinkPad, buffer: gst::Buffer, ) -> Result { - self.start_stream_discovery_if_needed(pad.name().as_str()); + self.start_stream_discovery_if_needed(pad, pad.name().as_str()); self.feed_discoveries(pad.name().as_str(), &buffer); gst::ProxyPad::chain_default(pad, Some(&*self.obj()), buffer) @@ -4345,7 +4816,7 @@ impl ElementImpl for BaseWebRTCSink { BaseWebRTCSink::catch_panic_pad_function( parent, || Err(gst::FlowError::Error), - |this| this.chain(pad.upcast_ref(), buffer), + |this| this.chain(pad, buffer), ) }) .event_function(|pad, parent, event| { @@ -4372,6 +4843,7 @@ impl ElementImpl for BaseWebRTCSink { is_video, serial, initial_discovery_started: false, + mline_index: None, }, );