diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 2ab5f44f..32f21b3d 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -9307,12 +9307,24 @@ ], "klass": "Source/Network/WebRTC", "pad-templates": { + "audio_%%s_%%u": { + "caps": "audio/x-raw(ANY):\napplication/x-rtp:\naudio/x-opus:\n", + "direction": "src", + "presence": "sometimes", + "type": "GstWebRTCSrcPad" + }, "audio_%%u": { "caps": "audio/x-raw(ANY):\napplication/x-rtp:\naudio/x-opus:\n", "direction": "src", "presence": "sometimes", "type": "GstWebRTCSrcPad" }, + "video_%%s_%%u": { + "caps": "video/x-raw(ANY):\napplication/x-rtp:\nvideo/x-vp8:\nvideo/x-h264:\nvideo/x-vp9:\nvideo/x-h265:\nvideo/x-av1:\n", + "direction": "src", + "presence": "sometimes", + "type": "GstWebRTCSrcPad" + }, "video_%%u": { "caps": "video/x-raw(ANY):\napplication/x-rtp:\nvideo/x-vp8:\nvideo/x-h264:\nvideo/x-vp9:\nvideo/x-h265:\nvideo/x-av1:\n", "direction": "src", @@ -9374,12 +9386,24 @@ "klass": "Source/Network/WebRTC", "long-name": "WebRTCSrc", "pad-templates": { + "audio_%%s_%%u": { + "caps": "audio/x-raw(ANY):\napplication/x-rtp:\naudio/x-opus:\n", + "direction": "src", + "presence": "sometimes", + "type": "GstWebRTCSrcPad" + }, "audio_%%u": { "caps": "audio/x-raw(ANY):\napplication/x-rtp:\naudio/x-opus:\n", "direction": "src", "presence": "sometimes", "type": "GstWebRTCSrcPad" }, + "video_%%s_%%u": { + "caps": "video/x-raw(ANY):\napplication/x-rtp:\nvideo/x-vp8:\nvideo/x-h264:\nvideo/x-vp9:\nvideo/x-h265:\nvideo/x-av1:\n", + "direction": "src", + "presence": "sometimes", + "type": "GstWebRTCSrcPad" + }, "video_%%u": { "caps": "video/x-raw(ANY):\napplication/x-rtp:\nvideo/x-vp8:\nvideo/x-h264:\nvideo/x-vp9:\nvideo/x-h265:\nvideo/x-av1:\n", "direction": "src", @@ -9439,12 +9463,24 @@ ], "klass": "Source/Network/WebRTC", "pad-templates": { + "audio_%%s_%%u": { + "caps": "audio/x-raw(ANY):\napplication/x-rtp:\naudio/x-opus:\n", + "direction": "src", + "presence": "sometimes", + "type": "GstWebRTCSrcPad" + }, "audio_%%u": { "caps": "audio/x-raw(ANY):\napplication/x-rtp:\naudio/x-opus:\n", "direction": "src", "presence": "sometimes", "type": "GstWebRTCSrcPad" }, + "video_%%s_%%u": { + "caps": "video/x-raw(ANY):\napplication/x-rtp:\nvideo/x-vp8:\nvideo/x-h264:\nvideo/x-vp9:\nvideo/x-h265:\nvideo/x-av1:\n", + "direction": "src", + "presence": "sometimes", + "type": "GstWebRTCSrcPad" + }, "video_%%u": { "caps": "video/x-raw(ANY):\napplication/x-rtp:\nvideo/x-vp8:\nvideo/x-h264:\nvideo/x-vp9:\nvideo/x-h265:\nvideo/x-av1:\n", "direction": "src", diff --git a/net/webrtc/src/webrtcsrc/imp.rs b/net/webrtc/src/webrtcsrc/imp.rs index 8a9edf3b..60161656 100644 --- a/net/webrtc/src/webrtcsrc/imp.rs +++ b/net/webrtc/src/webrtcsrc/imp.rs @@ -11,7 +11,7 @@ use gst::subclass::prelude::*; use gst_webrtc::WebRTCDataChannel; use once_cell::sync::Lazy; use std::borrow::BorrowMut; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::sync::atomic::AtomicU16; use std::sync::atomic::Ordering; @@ -44,8 +44,6 @@ struct Settings { #[derive(Default)] pub struct BaseWebRTCSrc { settings: Mutex, - n_video_pads: AtomicU16, - n_audio_pads: AtomicU16, state: Mutex, } @@ -296,32 +294,55 @@ struct SignallerSignals { handle_ice: glib::SignalHandlerId, } -impl BaseWebRTCSrc { +impl Session { fn webrtcbin(&self) -> gst::Bin { - let state = self.state.lock().unwrap(); - let webrtcbin = state - .webrtcbin - .as_ref() - .expect("We should never call `.webrtcbin()` when state not > Ready") - .clone() - .downcast::() - .unwrap(); - - webrtcbin + self.webrtcbin.clone().downcast::().unwrap() } - fn signaller(&self) -> Signallable { - self.settings.lock().unwrap().signaller.clone() + fn new(session_id: &str) -> Result { + let webrtcbin = gst::ElementFactory::make("webrtcbin") + .property("bundle-policy", gst_webrtc::WebRTCBundlePolicy::MaxBundle) + .build() + .with_context(|| "Failed to make element webrtcbin".to_string())?; + + Ok(Self { + id: session_id.to_string(), + webrtcbin, + data_channel: None, + n_video_pads: AtomicU16::new(0), + n_audio_pads: AtomicU16::new(0), + flow_combiner: Mutex::new(gst_base::UniqueFlowCombiner::new()), + }) + } + + fn get_stream_id( + &self, + transceiver: Option, + mline: Option, + ) -> Option { + let mline = transceiver.map_or(mline, |t| Some(t.mlineindex())); + + // making a hash of the session ID and adding `:`, + // here the ID is the mline of the stream in the SDP. + mline.map(|mline| { + let mut cs = glib::Checksum::new(glib::ChecksumType::Sha256).unwrap(); + cs.update(self.id.as_bytes()); + format!("{}:{mline}", cs.string().unwrap()) + }) } // Maps the `webrtcbin` pad to our exposed source pad using the pad stream ID. - fn get_src_pad_from_webrtcbin_pad(&self, webrtcbin_src: &gst::Pad) -> Option { + fn get_src_pad_from_webrtcbin_pad( + &self, + webrtcbin_src: &gst::Pad, + element: &super::BaseWebRTCSrc, + ) -> Option { self.get_stream_id( Some(webrtcbin_src.property::("transceiver")), None, ) .and_then(|stream_id| { - self.obj().iterate_src_pads().into_iter().find_map(|s| { + element.iterate_src_pads().into_iter().find_map(|s| { let pad = s.ok()?.downcast::().unwrap(); if pad.imp().stream_id() == stream_id { Some(pad) @@ -332,61 +353,86 @@ impl BaseWebRTCSrc { }) } - fn send_navigation_event(&self, evt: gst_video::NavigationEvent) { - if let Some(data_channel) = &self.state.lock().unwrap().data_channel.borrow_mut() { + fn send_navigation_event( + &mut self, + evt: gst_video::NavigationEvent, + element: &super::BaseWebRTCSrc, + ) { + if let Some(data_channel) = &self.data_channel.borrow_mut() { let nav_event = NavigationEvent { mid: None, event: evt, }; match serde_json::to_string(&nav_event).ok() { Some(str) => { - gst::trace!(CAT, imp: self, "Sending navigation event to peer"); + gst::trace!(CAT, obj: element, "Sending navigation event to peer for session {}", self.id); data_channel.send_string(Some(str.as_str())); } None => { - gst::error!(CAT, imp: self, "Could not serialize navigation event"); + gst::error!(CAT, obj: element, "Could not serialize navigation event for session {}", self.id); } } } } - fn handle_webrtc_src_pad(&self, bin: &gst::Bin, pad: &gst::Pad) { - let srcpad = self.get_src_pad_from_webrtcbin_pad(pad); + // Creates a bin which contains the webrtcbin, encoded filter (if requested) plus parser + // and decoder (if needed) for every session + // + // The ghostpad of the session's bin will be the target pad of the webrtcsrc's srcpad + // corresponding to the session. + // + // The target pad for the session's bin ghostpad will be + // - the decoder's srcpad, if decoder is needed + // - otherwise, encoded filter's srcpad, if requested + // - otherwise, webrtcbin's src pad. + fn handle_webrtc_src_pad( + &self, + bin: &gst::Bin, + webrtcbin_pad: &gst::Pad, + element: &super::BaseWebRTCSrc, + ) -> gst::GhostPad { + let srcpad = self.get_src_pad_from_webrtcbin_pad(webrtcbin_pad, element); if let Some(ref srcpad) = srcpad { let stream_id = srcpad.imp().stream_id(); let mut builder = gst::event::StreamStart::builder(&stream_id); - if let Some(stream_start) = pad.sticky_event::(0) { + if let Some(stream_start) = webrtcbin_pad.sticky_event::(0) { builder = builder .seqnum(stream_start.seqnum()) .group_id(stream_start.group_id().unwrap_or_else(gst::GroupId::next)); } - gst::debug!(CAT, imp: self, "Storing id {stream_id} on {pad:?}"); - pad.store_sticky_event(&builder.build()).ok(); + gst::debug!(CAT, obj: element, "Storing id {stream_id} on {webrtcbin_pad:?} for session {}", self.id); + webrtcbin_pad.store_sticky_event(&builder.build()).ok(); } let ghostpad = gst::GhostPad::builder(gst::PadDirection::Src) - .with_target(pad) - .unwrap() - .proxy_pad_chain_function(glib::clone!(@weak self as this => @default-panic, move + .proxy_pad_chain_function(glib::clone!(@weak element, @strong self.id as sess_id => @default-panic, move |pad, parent, buffer| { let padret = gst::ProxyPad::chain_default(pad, parent, buffer); - let ret = this.state.lock().unwrap().flow_combiner.update_flow(padret); - - ret + let state = element.imp().state.lock().unwrap(); + let Some(session) = state.sessions.get(&sess_id) else { + gst::error!(CAT, obj: element , "session {sess_id:?} does not exist"); + return padret; + }; + let f = session.flow_combiner.lock().unwrap().update_flow(padret); + f } )) - .proxy_pad_event_function(glib::clone!(@weak self as this => @default-panic, move |pad, parent, event| { + .proxy_pad_event_function(glib::clone!(@weak element , @weak webrtcbin_pad as webrtcpad, @strong self.id as sess_id => @default-panic, move |pad, parent, event| { let event = if let gst::EventView::StreamStart(stream_start) = event.view() { - let webrtcpad = pad.peer().unwrap(); - - this.get_src_pad_from_webrtcbin_pad(&webrtcpad) + let state = element.imp().state.lock().unwrap(); + if let Some(session) = state.sessions.get(&sess_id) { + session.get_src_pad_from_webrtcbin_pad(&webrtcpad, &element) .map(|srcpad| { gst::event::StreamStart::builder(&srcpad.imp().stream_id()) .seqnum(stream_start.seqnum()) .group_id(stream_start.group_id().unwrap_or_else(gst::GroupId::next)) .build() }).unwrap_or(event) + } else { + gst::error!(CAT, obj: element , "session {sess_id:?} does not exist"); + event + } } else { event }; @@ -395,10 +441,17 @@ impl BaseWebRTCSrc { })) .build(); - if self.settings.lock().unwrap().enable_data_channel_navigation { - pad.add_probe( + let sess_id = self.id.clone(); + if element + .imp() + .settings + .lock() + .unwrap() + .enable_data_channel_navigation + { + webrtcbin_pad.add_probe( gst::PadProbeType::EVENT_UPSTREAM, - glib::clone!(@weak self as this => @default-panic, move |_pad, info| { + glib::clone!(@weak element => @default-panic, move |_pad, info| { let Some(ev) = info.event() else { return gst::PadProbeReturn::Ok; }; @@ -406,23 +459,26 @@ impl BaseWebRTCSrc { return gst::PadProbeReturn::Ok; }; - this.send_navigation_event (gst_video::NavigationEvent::parse(ev).unwrap()); + let mut state = element.imp().state.lock().unwrap(); + if let Some(session) = state.sessions.get_mut(&sess_id) { + session.send_navigation_event(gst_video::NavigationEvent::parse(ev).unwrap(), &element); + } else { + gst::error!(CAT, obj: element , "session {sess_id:?} does not exist"); + } gst::PadProbeReturn::Ok }), ); } - bin.add_pad(&ghostpad) - .expect("Adding ghostpad to the bin should always work"); - if let Some(srcpad) = srcpad { - let producer_id = self + let producer_id = element + .imp() .signaller() .property::>("producer-peer-id") - .or_else(|| pad.property("msid")); + .or_else(|| webrtcbin_pad.property("msid")); - let encoded_filter = self.obj().emit_by_name::>( + let encoded_filter = element.emit_by_name::>( "request-encoded-filter", &[&producer_id, &srcpad.name(), &srcpad.allowed_caps()], ); @@ -431,19 +487,19 @@ impl BaseWebRTCSrc { let decodebin = gst::ElementFactory::make("decodebin3") .build() .expect("decodebin3 needs to be present!"); - self.obj().add(&decodebin).unwrap(); + bin.add(&decodebin).unwrap(); decodebin.sync_state_with_parent().unwrap(); decodebin.connect_pad_added( - glib::clone!(@weak self as this, @weak srcpad => move |_webrtcbin, pad| { + glib::clone!(@weak element as this, @weak ghostpad as ghostpad => move |_webrtcbin, pad| { if pad.direction() == gst::PadDirection::Sink { return; } - srcpad.set_target(Some(pad)).unwrap(); + ghostpad.set_target(Some(pad)).unwrap(); }), ); - gst::debug!(CAT, imp: self, "Decoding for {}", srcpad.imp().stream_id()); + gst::debug!(CAT, obj: element, "Decoding for {}", srcpad.imp().stream_id()); if let Some(encoded_filter) = encoded_filter { let filter_sink_pad = encoded_filter @@ -453,7 +509,7 @@ impl BaseWebRTCSrc { let parsebin = gst::ElementFactory::make("parsebin") .build() .expect("parsebin needs to be present!"); - self.obj().add_many([&parsebin, &encoded_filter]).unwrap(); + bin.add_many([&parsebin, &encoded_filter]).unwrap(); parsebin.connect_pad_added(move |_, pad| { pad.link(&filter_sink_pad) @@ -465,7 +521,7 @@ impl BaseWebRTCSrc { encoded_filter.sync_state_with_parent().unwrap(); }); - ghostpad + webrtcbin_pad .link(&parsebin.static_pad("sink").unwrap()) .expect("webrtcbin ! parsebin linking failed"); @@ -474,14 +530,14 @@ impl BaseWebRTCSrc { let sinkpad = decodebin .static_pad("sink") .expect("decodebin has a sink pad"); - ghostpad + webrtcbin_pad .link(&sinkpad) .expect("webrtcbin ! decodebin3 linking failed"); } } else { gst::debug!( CAT, - imp: self, + obj: element, "NO decoding for {}", srcpad.imp().stream_id() ); @@ -494,291 +550,39 @@ impl BaseWebRTCSrc { .static_pad("src") .expect("encoded filter must expose a static src pad"); - self.obj().add(&encoded_filter).unwrap(); + bin.add(&encoded_filter).unwrap(); - ghostpad + webrtcbin_pad .link(&filter_sink_pad) .expect("webrtcbin ! encoded_filter linking failed"); - srcpad.set_target(Some(&filter_src_pad)).unwrap(); encoded_filter.sync_state_with_parent().unwrap(); + ghostpad.set_target(Some(&filter_src_pad)).unwrap(); } else { - srcpad.set_target(Some(&ghostpad)).unwrap(); + // No decoder or filter + ghostpad.set_target(Some(webrtcbin_pad)).unwrap(); } } + srcpad.set_target(Some(&ghostpad)).unwrap(); } else { - gst::debug!(CAT, imp: self, "Unused webrtcbin pad {pad:?}"); + gst::debug!(CAT, obj: element, "Unused webrtcbin pad {webrtcbin_pad:?}"); } + ghostpad } - fn prepare(&self) -> Result<(), Error> { - let webrtcbin = gst::ElementFactory::make("webrtcbin") - .property("bundle-policy", gst_webrtc::WebRTCBundlePolicy::MaxBundle) - .build() - .with_context(|| "Failed to make element webrtcbin".to_string())?; - - { - let settings = self.settings.lock().unwrap(); - - if let Some(stun_server) = settings.stun_server.as_ref() { - webrtcbin.set_property("stun-server", stun_server); - } - - for turn_server in settings.turn_servers.iter() { - webrtcbin.emit_by_name::("add-turn-server", &[&turn_server]); - } - } - - let bin = gst::Bin::new(); - bin.connect_pad_removed(glib::clone!(@weak self as this => move |_, pad| - this.state.lock().unwrap().flow_combiner.remove_pad(pad); - )); - bin.connect_pad_added(glib::clone!(@weak self as this => move |_, pad| - this.state.lock().unwrap().flow_combiner.add_pad(pad); - )); - webrtcbin.connect_pad_added( - glib::clone!(@weak self as this, @weak bin, => move |_webrtcbin, pad| { - if pad.direction() == gst::PadDirection::Sink { - return; - } - - this.handle_webrtc_src_pad(&bin, pad); - }), - ); - - webrtcbin.connect_closure( - "on-ice-candidate", - false, - glib::closure!(@weak-allow-none self as this => move | - _webrtcbin: gst::Bin, - sdp_m_line_index: u32, - candidate: String| { - this.unwrap().on_ice_candidate(sdp_m_line_index, candidate); - }), - ); - - webrtcbin.connect_closure( - "on-data-channel", - false, - glib::closure!(@weak-allow-none self as this => move | - _webrtcbin: gst::Bin, - data_channel: glib::Object| { - this.unwrap().on_data_channel(data_channel); - }), - ); - - self.signaller() - .emit_by_name::<()>("webrtcbin-ready", &[&"none", &webrtcbin]); - - bin.add(&webrtcbin).unwrap(); - self.obj().add(&bin).context("Could not add `webrtcbin`")?; - - let mut state = self.state.lock().unwrap(); - state.webrtcbin.replace(webrtcbin); - - Ok(()) - } - - fn get_stream_id( + fn handle_offer( &self, - transceiver: Option, - mline: Option, - ) -> Option { - let mline = transceiver.map_or(mline, |t| Some(t.mlineindex())); - - // Same logic as gst_pad_create_stream_id and friends, making a hash of - // the URI (session id, if URI doesn't exist) and adding `:`, here the ID is the mline of the - // stream in the SDP. - mline.map(|mline| { - let mut cs = glib::Checksum::new(glib::ChecksumType::Sha256).unwrap(); - - let data: String = if self - .signaller() - .has_property("uri", Some(String::static_type())) - { - self.signaller().property::>("uri").unwrap() - } else { - // use the session id - self.state.lock().unwrap().session_id.clone().unwrap() - }; - - cs.update(data.as_bytes()); - - format!("{}:{mline}", cs.string().unwrap()) - }) - } - - fn unprepare(&self) -> Result<(), Error> { - gst::info!(CAT, imp: self, "unpreparing"); - - let obj = self.obj(); - self.maybe_stop_signaller(); - self.state.lock().unwrap().session_id = None; - for pad in obj.src_pads() { - obj.remove_pad(&pad) - .map_err(|err| anyhow::anyhow!("Couldn't remove pad? {err:?}"))?; - } - - self.n_video_pads.store(0, Ordering::SeqCst); - self.n_audio_pads.store(0, Ordering::SeqCst); - - Ok(()) - } - - fn connect_signaller(&self, signaller: &Signallable) { - let instance = &*self.obj(); - - let _ = self - .state - .lock() - .unwrap() - .signaller_signals - .insert(SignallerSignals { - error: signaller.connect_closure( - "error", - false, - glib::closure!(@watch instance => move | - _signaller: glib::Object, error: String| { - gst::element_error!( - instance, - gst::StreamError::Failed, - ["Signalling error: {}", error] - ); - }), - ), - - session_started: signaller.connect_closure( - "session-started", - false, - glib::closure!(@watch instance => move | - _signaller: glib::Object, - session_id: &str, - _peer_id: &str| { - let imp = instance.imp(); - gst::info!(CAT, imp: imp, "Session started: {session_id}"); - imp.state.lock().unwrap().session_id = - Some(session_id.to_string()); - }), - ), - - session_ended: signaller.connect_closure( - "session-ended", - false, - glib::closure!(@watch instance => move |_signaler: glib::Object, _session_id: &str|{ - instance.imp().state.lock().unwrap().session_id = None; - instance.iterate_src_pads().into_iter().for_each(|pad| - { if let Err(e) = pad.map(|pad| pad.push_event(gst::event::Eos::new())) { - gst::error!(CAT, "Could not send EOS: {e:?}"); - }} - ); - - false - }), - ), - - request_meta: signaller.connect_closure( - "request-meta", - false, - glib::closure!(@watch instance => move | - _signaller: glib::Object| -> Option { - instance.imp().settings.lock().unwrap().meta.clone() - }), - ), - - session_description: signaller.connect_closure( - "session-description", - false, - glib::closure!(@watch instance => move | - _signaller: glib::Object, - _peer_id: &str, - desc: &gst_webrtc::WebRTCSessionDescription| { - assert_eq!(desc.type_(), gst_webrtc::WebRTCSDPType::Offer); - - instance.imp().handle_offer(desc); - }), - ), - - // sdp_mid is exposed for future proofing, see - // https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/issues/1174, - // at the moment sdp_m_line_index must be Some - handle_ice: signaller.connect_closure( - "handle-ice", - false, - glib::closure!(@watch instance => move | - _signaller: glib::Object, - peer_id: &str, - sdp_m_line_index: u32, - _sdp_mid: Option, - candidate: &str| { - instance.imp().handle_ice(peer_id, Some(sdp_m_line_index), None, candidate); - }), - ), - }); - - // previous signals are disconnected when dropping the old structure - } - - // Creates and adds our `WebRTCSrcPad` source pad, returning caps accepted - // downstream - fn create_and_probe_src_pad(&self, caps: &gst::Caps, stream_id: &str) -> bool { - gst::log!(CAT, "Creating pad for {caps:?}, stream: {stream_id}"); - - let obj = self.obj(); - let media_type = caps - .structure(0) - .expect("Passing empty caps is invalid") - .get::<&str>("media") - .expect("Only caps with a `media` field are expected when creating the pad"); - - let (template, name, raw_caps) = if media_type == "video" { - ( - obj.pad_template("video_%u").unwrap(), - format!("video_{}", self.n_video_pads.fetch_add(1, Ordering::SeqCst)), - VIDEO_CAPS.to_owned(), - ) - } else if media_type == "audio" { - ( - obj.pad_template("audio_%u").unwrap(), - format!("audio_{}", self.n_audio_pads.fetch_add(1, Ordering::SeqCst)), - AUDIO_CAPS.to_owned(), - ) - } else { - gst::info!(CAT, imp: self, "Not an audio or video media {media_type:?}"); - - return false; - }; - - let caps_with_raw = [caps.clone(), raw_caps.clone()] - .into_iter() - .collect::(); - let ghost = gst::GhostPad::builder_from_template(&template) - .name(name) - .build() - .downcast::() - .unwrap(); - ghost.imp().set_stream_id(stream_id); - obj.add_pad(&ghost) - .expect("Adding ghost pad should never fail"); - - let downstream_caps = ghost.peer_query_caps(Some(&caps_with_raw)); - if let Some(first_struct) = downstream_caps.structure(0) { - if first_struct.has_name(raw_caps.structure(0).unwrap().name()) { - ghost.imp().set_needs_decoding(true) - } - } - - true - } - - fn handle_offer(&self, offer: &gst_webrtc::WebRTCSessionDescription) { - gst::log!(CAT, imp: self, "Got offer {}", offer.sdp().to_string()); + offer: &gst_webrtc::WebRTCSessionDescription, + element: &super::BaseWebRTCSrc, + ) -> (gst::Promise, gst::Bin) { + gst::log!(CAT, obj: element, "Got offer {}", offer.sdp().to_string()); let sdp = offer.sdp(); let direction = gst_webrtc::WebRTCRTPTransceiverDirection::Recvonly; let webrtcbin = self.webrtcbin(); for (i, media) in sdp.medias().enumerate() { let (codec_names, do_retransmission) = { - let settings = self.settings.lock().unwrap(); + let settings = element.imp().settings.lock().unwrap(); ( settings .video_codecs @@ -814,7 +618,7 @@ impl BaseWebRTCSrc { { gst::warning!( CAT, - imp: self, + obj: element, "Failed to retrieve attributes from media!" ); return None; @@ -837,10 +641,13 @@ impl BaseWebRTCSrc { if !caps.is_empty() { let stream_id = self.get_stream_id(None, Some(i as u32)).unwrap(); - if self.create_and_probe_src_pad(&caps, &stream_id) { + if element + .imp() + .create_and_probe_src_pad(&caps, &stream_id, self) + { gst::info!( CAT, - imp: self, + obj: element, "Adding transceiver for {stream_id} with caps: {caps:#?}" ); let transceiver = webrtcbin.emit_by_name::( @@ -848,12 +655,13 @@ impl BaseWebRTCSrc { &[&direction, &caps], ); - transceiver.set_property("do-nack", do_retransmission); + transceiver.set_property("do_nack", do_retransmission); transceiver.set_property("fec-type", gst_webrtc::WebRTCFECType::UlpRed); } } else { gst::info!( CAT, + obj: element, "Not using media: {media:#?} as it doesn't match our codec restrictions" ); } @@ -861,19 +669,36 @@ impl BaseWebRTCSrc { webrtcbin.emit_by_name::<()>("set-remote-description", &[&offer, &None::]); - let obj = self.obj(); - obj.no_more_pads(); + gst::info!(CAT, obj: element, "Set remote description"); + let obj = element.clone(); + + let session_id = self.id.clone(); let promise = - gst::Promise::with_change_func(glib::clone!(@weak self as this => move |reply| { - this.on_answer_created(reply); + gst::Promise::with_change_func(glib::clone!(@weak element as ele => move |reply| { + let state = ele.imp().state.lock().unwrap(); + gst::info!(CAT, obj: ele, "got answer for session {session_id:?}"); + let Some(session) = state.sessions.get(&session_id) else { + gst::error!(CAT, obj: ele , "no session {session_id:?}"); + return + }; + session.on_answer_created(reply, &obj); } )); - webrtcbin.emit_by_name::<()>("create-answer", &[&None::, &promise]); + // We cannot emit `create-answer` from here. The promise function + // of the answer needs the state lock which is held by the caller + // of `handle_offer`. So return the promise to the caller so that + // the it can drop the `state` and safely emit `create-answer` + + (promise, webrtcbin.clone()) } - fn on_answer_created(&self, reply: Result, gst::PromiseError>) { + fn on_answer_created( + &self, + reply: Result, gst::PromiseError>, + element: &super::BaseWebRTCSrc, + ) { let reply = match reply { Ok(Some(reply)) => { if !reply.has_field_with_type( @@ -881,14 +706,14 @@ impl BaseWebRTCSrc { gst_webrtc::WebRTCSessionDescription::static_type(), ) { gst::element_error!( - self.obj(), + element, gst::StreamError::Failed, ["create-answer::Promise returned with no reply"] ); return; } else if reply.has_field_with_type("error", glib::Error::static_type()) { gst::element_error!( - self.obj(), + element, gst::LibraryError::Failed, ["create-offer::Promise returned with error: {:?}", reply] ); @@ -899,7 +724,7 @@ impl BaseWebRTCSrc { } Ok(None) => { gst::element_error!( - self.obj(), + element, gst::StreamError::Failed, ["create-answer::Promise returned with no reply"] ); @@ -908,7 +733,7 @@ impl BaseWebRTCSrc { } Err(err) => { gst::element_error!( - self.obj(), + element, gst::LibraryError::Failed, ["create-answer::Promise returned with error {:?}", err] ); @@ -923,73 +748,263 @@ impl BaseWebRTCSrc { .get::() .expect("Invalid argument"); - self.webrtcbin() + self.webrtcbin .emit_by_name::<()>("set-local-description", &[&answer, &None::]); - let session_id = { - let state = self.state.lock().unwrap(); - match &state.session_id { - Some(id) => id.to_owned(), - _ => { - gst::element_error!( - self.obj(), - gst::StreamError::Failed, - ["Signalling error, no session started while requesting to send an SDP offer"] - ); - - return; - } - } - }; - - gst::log!(CAT, imp: self, "Sending SDP, {}", answer.sdp().to_string()); - let signaller = self.signaller(); - signaller.send_sdp(&session_id, &answer); + gst::log!(CAT, obj: element, "Sending SDP, {}", answer.sdp().to_string()); + let signaller = element.imp().signaller(); + signaller.send_sdp(&self.id, &answer); } - fn on_data_channel(&self, data_channel: glib::Object) { - gst::info!(CAT, imp: self, "Received data channel {data_channel:?}"); - let mut state = self.state.lock().unwrap(); - state.data_channel = data_channel.dynamic_cast::().ok(); + fn on_data_channel(&mut self, data_channel: glib::Object, element: &super::BaseWebRTCSrc) { + gst::info!(CAT, obj: element, "Received data channel {data_channel:?}"); + self.data_channel = data_channel.dynamic_cast::().ok(); } - fn on_ice_candidate(&self, sdp_m_line_index: u32, candidate: String) { - let signaller = self.signaller(); - let session_id = match self.state.lock().unwrap().session_id.as_ref() { - Some(id) => id.to_string(), - _ => { - gst::element_error!( - self.obj(), - gst::StreamError::Failed, - ["Signalling error, no session started while requesting to propose ice candidates"] - ); - - return; - } - }; - signaller.add_ice(&session_id, &candidate, sdp_m_line_index, None::); + fn on_ice_candidate( + &self, + sdp_m_line_index: u32, + candidate: String, + element: &super::BaseWebRTCSrc, + ) { + let signaller = element.imp().signaller(); + signaller.add_ice(&self.id, &candidate, sdp_m_line_index, None::); } /// Called by the signaller with an ice candidate fn handle_ice( &self, - peer_id: &str, sdp_m_line_index: Option, _sdp_mid: Option, candidate: &str, + element: &super::BaseWebRTCSrc, ) { let sdp_m_line_index = match sdp_m_line_index { Some(m_line) => m_line, None => { - gst::error!(CAT, imp: self, "No mandatory mline"); + gst::error!(CAT, obj: element, "No mandatory mline"); return; } }; - gst::log!(CAT, imp: self, "Got ice from {peer_id}: {candidate}"); + gst::log!(CAT, obj: element, "Got ice candidate for {}: {candidate}", self.id); self.webrtcbin() .emit_by_name::<()>("add-ice-candidate", &[&sdp_m_line_index, &candidate]); } +} + +impl BaseWebRTCSrc { + fn signaller(&self) -> Signallable { + self.settings.lock().unwrap().signaller.clone() + } + + fn unprepare(&self) -> Result<(), Error> { + gst::info!(CAT, imp: self, "unpreparing"); + + let mut state = self.state.lock().unwrap(); + let sessions = &state.sessions; + for (_, s) in sessions.iter() { + let id = s.id.as_str(); + let bin = s.webrtcbin().parent().and_downcast::().unwrap(); + if let Err(e) = self.end_session(id, &bin) { + gst::error!(CAT, imp: self , "Error ending session : {e}"); + } + } + state.sessions.clear(); + drop(state); + + self.maybe_stop_signaller(); + + Ok(()) + } + + fn connect_signaller(&self, signaller: &Signallable) { + let instance = &*self.obj(); + + let _ = self + .state + .lock() + .unwrap() + .signaller_signals + .insert(SignallerSignals { + error: signaller.connect_closure( + "error", + false, + glib::closure!(@watch instance => move | + _signaller: glib::Object, error: String| { + gst::element_error!( + instance, + gst::StreamError::Failed, + ["Signalling error: {}", error] + ); + }), + ), + + session_started: signaller.connect_closure( + "session-started", + false, + glib::closure!(@watch instance => move | + _signaller: glib::Object, + session_id: &str, + _peer_id: &str| { + let imp = instance.imp(); + gst::info!(CAT, imp: imp, "Session started: {session_id}"); + let _ = imp.start_session(session_id); + }), + ), + + session_ended: signaller.connect_closure( + "session-ended", + false, + glib::closure!(@watch instance => move |_signaler: glib::Object, session_id: &str|{ + let this = instance.imp(); + let state = this.state.lock().unwrap(); + let Some(session) = state.sessions.get(session_id) else { + gst::error!(CAT, imp: this , " Failed to find session {session_id}"); + return false; + }; + + let bin = session.webrtcbin().parent().and_downcast::().unwrap(); + drop(state); + + // FIXME: Needs a safer way to perform end_session. + // We had to release the lock during the session tear down + // to avoid blocking the session bin pad's chain function which could potentially + // block the end_session while removing the bin from webrtcsrc, causing a deadlock + // This looks unsafe to end a session without holding the state lock + + if let Err(e) = this.end_session(session_id, &bin) { + gst::error!(CAT, imp: this , " Failed to end session {session_id}: {e}"); + return false; + } + { + this.state.lock().unwrap().sessions.remove(session_id); + } + true + }), + ), + + request_meta: signaller.connect_closure( + "request-meta", + false, + glib::closure!(@watch instance => move | + _signaller: glib::Object| -> Option { + instance.imp().settings.lock().unwrap().meta.clone() + }), + ), + + session_description: signaller.connect_closure( + "session-description", + false, + glib::closure!(@watch instance => move | + _signaller: glib::Object, + session_id: &str, + desc: &gst_webrtc::WebRTCSessionDescription| { + assert_eq!(desc.type_(), gst_webrtc::WebRTCSDPType::Offer); + let this = instance.imp(); + gst::info!(CAT, imp: this, "got sdp offer"); + let state = this.state.lock().unwrap(); + let Some(session) = state.sessions.get(session_id) else { + gst::error!(CAT, imp: this, "session {session_id:?} not found"); + return + }; + + let (promise, webrtcbin) = session.handle_offer(desc, &this.obj()); + drop(state); + webrtcbin.emit_by_name::<()>("create-answer", &[&None::, &promise]); + }), + ), + + // sdp_mid is exposed for future proofing, see + // https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/issues/1174, + // at the moment sdp_m_line_index must be Some + handle_ice: signaller.connect_closure( + "handle-ice", + false, + glib::closure!(@watch instance => move | + _signaller: glib::Object, + session_id: &str, + sdp_m_line_index: u32, + _sdp_mid: Option, + candidate: &str| { + let this = instance.imp(); + let state = this.state.lock().unwrap(); + let Some(session) = state.sessions.get(session_id) else { + gst::error!(CAT, imp: this, "session {session_id:?} not found"); + return + }; + session.handle_ice(Some(sdp_m_line_index), None, candidate, &this.obj()); + }), + ), + }); + + // previous signals are disconnected when dropping the old structure + } + + // Creates and adds our `WebRTCSrcPad` source pad, returning caps accepted + // downstream + fn create_and_probe_src_pad( + &self, + caps: &gst::Caps, + stream_id: &str, + session: &Session, + ) -> bool { + gst::log!(CAT, "Creating pad for {caps:?}, stream: {stream_id}"); + + let obj = self.obj(); + let media_type = caps + .structure(0) + .expect("Passing empty caps is invalid") + .get::<&str>("media") + .expect("Only caps with a `media` field are expected when creating the pad"); + + let (template, name, raw_caps) = if media_type == "video" { + ( + obj.pad_template("video_%s_%u").unwrap(), + format!( + "video_{}_{}", + session.id, + session.n_video_pads.fetch_add(1, Ordering::SeqCst) + ), + VIDEO_CAPS.to_owned(), + ) + } else if media_type == "audio" { + ( + obj.pad_template("audio_%s_%u").unwrap(), + format!( + "audio_{}_{}", + session.id, + session.n_audio_pads.fetch_add(1, Ordering::SeqCst) + ), + AUDIO_CAPS.to_owned(), + ) + } else { + gst::info!(CAT, imp: self, "Not an audio or video media {media_type:?}"); + + return false; + }; + + let caps_with_raw = [caps.clone(), raw_caps.clone()] + .into_iter() + .collect::(); + let ghost = gst::GhostPad::builder_from_template(&template) + .name(name) + .build() + .downcast::() + .unwrap(); + ghost.imp().set_stream_id(stream_id); + obj.add_pad(&ghost) + .expect("Adding ghost pad should never fail"); + + let downstream_caps = ghost.peer_query_caps(Some(&caps_with_raw)); + if let Some(first_struct) = downstream_caps.structure(0) { + if first_struct.has_name(raw_caps.structure(0).unwrap().name()) { + ghost.imp().set_needs_decoding(true) + } + } + + true + } fn maybe_start_signaller(&self) { let obj = self.obj(); @@ -1007,8 +1022,9 @@ impl BaseWebRTCSrc { fn maybe_stop_signaller(&self) { let mut state = self.state.lock().unwrap(); if state.signaller_state == SignallerState::Started { - self.signaller().stop(); state.signaller_state = SignallerState::Stopped; + drop(state); + self.signaller().stop(); gst::info!(CAT, imp: self, "Stopped signaller"); } } @@ -1022,6 +1038,152 @@ impl BaseWebRTCSrc { Ok(()) } + + fn start_session(&self, session_id: &str) -> Result<(), Error> { + let state = self.state.lock().unwrap(); + if state.sessions.contains_key(session_id) { + return Err(anyhow::anyhow!( + "session with id {session_id} already exists" + )); + }; + drop(state); + + let session = Session::new(session_id)?; + + let webrtcbin = session.webrtcbin(); + + { + let settings = self.settings.lock().unwrap(); + + if let Some(stun_server) = settings.stun_server.as_ref() { + webrtcbin.set_property("stun-server", stun_server); + } + + for turn_server in settings.turn_servers.iter() { + webrtcbin.emit_by_name::("add-turn-server", &[&turn_server]); + } + } + + let bin = gst::Bin::new(); + + bin.connect_pad_removed( + glib::clone!(@weak self as this, @to-owned session_id => move |_, pad| + let mut state = this.state.lock().unwrap(); + let Some(session) = state.sessions.get_mut(&session_id) else { + gst::warning!(CAT, imp: this, "session {session_id:?} not found"); + return + }; + session.flow_combiner.lock().unwrap().remove_pad(pad); + ), + ); + bin.connect_pad_added( + glib::clone!(@weak self as this, @to-owned session_id => move |_, pad| + let mut state = this.state.lock().unwrap(); + let Some(session) = state.sessions.get_mut(&session_id) else { + gst::warning!(CAT, imp: this, "session {session_id:?} not found"); + return + }; + session.flow_combiner.lock().unwrap().add_pad(pad); + + ), + ); + + webrtcbin.connect_pad_added( + glib::clone!(@weak self as this, @weak bin, @to-owned session_id => move |_webrtcbin, pad| { + if pad.direction() == gst::PadDirection::Sink { + return; + } + let mut state = this.state.lock().unwrap(); + let Some(session) = state.sessions.get_mut(&session_id) else { + gst::error!(CAT, imp: this, "session {session_id:?} not found"); + return + }; + let bin_ghostpad = session.handle_webrtc_src_pad(&bin, pad, &this.obj()); + drop(state); + bin.add_pad(&bin_ghostpad) + .expect("Adding ghostpad to the bin should always work"); + }), + ); + + webrtcbin.connect_pad_removed( + glib::clone!(@weak self as this, @weak bin, @to-owned session_id => move |_webrtcbin, pad| { + let mut state = this.state.lock().unwrap(); + let Some(session) = state.sessions.get_mut(&session_id) else { + gst::error!(CAT, imp: this, "session {session_id:?} not found"); + return + }; + session.flow_combiner.lock().unwrap().remove_pad(pad); + }), + ); + + webrtcbin.connect_closure( + "on-ice-candidate", + false, + glib::closure!(@weak-allow-none self as this, @to-owned session_id => move | + _webrtcbin: gst::Bin, + sdp_m_line_index: u32, + candidate: String| { + if let Some(ele) = this { + let mut state = ele.state.lock().unwrap(); + let Some(session) = state.sessions.get_mut(&session_id) else { + gst::error!(CAT, imp: ele, "session {session_id:?} not found"); + return + }; + session.on_ice_candidate(sdp_m_line_index, candidate, &ele.obj()); + } + }), + ); + + webrtcbin.connect_closure( + "on-data-channel", + false, + glib::closure!(@weak-allow-none self as this, @to-owned session_id => move | + _webrtcbin: gst::Bin, + data_channel: glib::Object| { + if let Some(ele) = this { + let mut state = ele.state.lock().unwrap(); + + let Some(session) = state.sessions.get_mut(&session_id) else { + gst::error!(CAT, imp: ele, "session {session_id:?} not found"); + return + }; + session.on_data_channel(data_channel, &ele.obj()); + } + }), + ); + + bin.add(&webrtcbin).unwrap(); + self.obj().add(&bin).context("Could not add `webrtcbin`")?; + bin.sync_state_with_parent().unwrap(); + + self.signaller() + .emit_by_name::<()>("webrtcbin-ready", &[&session_id, &webrtcbin]); + + let mut state = self.state.lock().unwrap(); + state.sessions.insert(session_id.to_string(), session); + + Ok(()) + } + + fn end_session(&self, id: &str, bin: &gst::Bin) -> Result<(), Error> { + let obj = self.obj(); + + // set the session's bin to Null and remove it + bin.set_state(gst::State::Null)?; + obj.remove(bin)?; + + for pad in obj.src_pads() { + if pad.name().contains(id) { + if !pad.push_event(gst::event::Eos::new()) { + gst::warning!(CAT, imp: self, "failed to send EOS on {}", pad.name()); + } + obj.remove_pad(&pad) + .map_err(|err| anyhow::anyhow!("Couldn't remove pad? {err:?}"))?; + } + } + // self.signaller().end_session(id); + Ok(()) + } } impl ElementImpl for BaseWebRTCSrc { @@ -1047,7 +1209,7 @@ impl ElementImpl for BaseWebRTCSrc { vec![ gst::PadTemplate::with_gtype( - "video_%u", + "video_%s_%u", gst::PadDirection::Src, gst::PadPresence::Sometimes, &video_caps_builder.build(), @@ -1055,7 +1217,7 @@ impl ElementImpl for BaseWebRTCSrc { ) .unwrap(), gst::PadTemplate::with_gtype( - "audio_%u", + "audio_%s_%u", gst::PadDirection::Src, gst::PadPresence::Sometimes, &audio_caps_builder.build(), @@ -1073,16 +1235,6 @@ impl ElementImpl for BaseWebRTCSrc { transition: gst::StateChange, ) -> Result { let obj = &*self.obj(); - if let gst::StateChange::NullToReady = transition { - if let Err(err) = self.prepare() { - gst::element_error!( - obj, - gst::StreamError::Failed, - ["Failed to prepare: {}", err] - ); - return Err(gst::StateChangeError); - } - } let mut ret = self.parent_change_state(transition); @@ -1115,8 +1267,26 @@ impl ElementImpl for BaseWebRTCSrc { fn send_event(&self, event: gst::Event) -> bool { match event.view() { gst::EventView::Navigation(ev) => { - self.send_navigation_event(gst_video::NavigationEvent::parse(ev).unwrap()); - true + let mut state = self.state.lock().unwrap(); + + if state.sessions.len() != 1 { + gst::warning!(CAT, imp: self, + "Navigation event can only be sent on the element if there is a single \ + session. For multiple sessions, send the event on the desired source \ + pad(s)"); + false + } else { + state + .sessions + .values_mut() + .next() + .unwrap() + .send_navigation_event( + gst_video::NavigationEvent::parse(ev).unwrap(), + &self.obj(), + ); + true + } } _ => true, } @@ -1157,24 +1327,26 @@ enum SignallerState { Stopped, } -struct State { - session_id: Option, - signaller_state: SignallerState, - webrtcbin: Option, - flow_combiner: gst_base::UniqueFlowCombiner, - signaller_signals: Option, +struct Session { + id: String, + webrtcbin: gst::Element, data_channel: Option, + n_video_pads: AtomicU16, + n_audio_pads: AtomicU16, + flow_combiner: Mutex, +} +struct State { + sessions: HashMap, + signaller_state: SignallerState, + signaller_signals: Option, } impl Default for State { fn default() -> Self { Self { signaller_state: SignallerState::Stopped, - session_id: None, - webrtcbin: None, - flow_combiner: Default::default(), + sessions: HashMap::new(), signaller_signals: Default::default(), - data_channel: None, } } }