diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index 7b6e9606..0e3dad74 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -85,29 +85,17 @@ struct Settings { signaller: Signallable, } -/// Type of discovery, used to differentiate between initial discovery -/// and discovery initiated by client offer -#[derive(Debug, Clone, PartialEq, Eq)] -enum DiscoveryType { - /// Initial discovery of our input streams - Initial, - /// Discovery to select a specific codec as requested by the remote peer - CodecSelection, -} - #[derive(Debug, Clone)] struct DiscoveryInfo { id: String, - type_: DiscoveryType, caps: gst::Caps, srcs: Arc>>, } impl DiscoveryInfo { - fn new(type_: DiscoveryType, caps: gst::Caps) -> Self { + fn new(caps: gst::Caps) -> Self { Self { id: uuid::Uuid::new_v4().to_string(), - type_, caps, srcs: Default::default(), } @@ -144,8 +132,8 @@ struct InputStream { serial: u32, /// Whether the input stream is video or not is_video: bool, - /// Information about currently running codec discoveries - discoveries: Vec, + /// Whether initial discovery has started + initial_discovery_started: bool, } /// Wrapper around webrtcbin pads @@ -243,6 +231,7 @@ struct State { audio_serial: u32, video_serial: u32, streams: HashMap, + discoveries: HashMap>, navigation_handler: Option, mids: HashMap, signaller_signals: Option, @@ -345,6 +334,7 @@ impl Default for State { audio_serial: 0, video_serial: 0, streams: HashMap::new(), + discoveries: HashMap::new(), navigation_handler: None, mids: HashMap::new(), signaller_signals: Default::default(), @@ -904,6 +894,27 @@ impl State { && element.current_state() >= gst::State::Paused && self.codec_discovery_done } + + fn queue_discovery(&mut self, stream_name: &str, discovery_info: DiscoveryInfo) { + if let Some(discos) = self.discoveries.get_mut(stream_name) { + discos.push(discovery_info); + } else { + self.discoveries + .insert(stream_name.to_string(), vec![discovery_info]); + } + } + + fn remove_discovery(&mut self, stream_name: &str, discovery_info: &DiscoveryInfo) { + if let Some(discos) = self.discoveries.get_mut(stream_name) { + let position = discos + .iter() + .position(|d| d.id == discovery_info.id) + .expect( + "We expect discovery to always be in the list of discoverers when removing", + ); + discos.remove(position); + } + } } impl Session { @@ -1191,26 +1202,12 @@ impl InputStream { } } - fn create_discovery(&mut self, type_: DiscoveryType) -> DiscoveryInfo { - let discovery_info = DiscoveryInfo::new( - type_, + fn create_discovery(&self) -> DiscoveryInfo { + DiscoveryInfo::new( self.in_caps.clone().expect( "We should never create a discovery for a stream that doesn't have caps set", ), - ); - - self.discoveries.push(discovery_info.clone()); - - discovery_info - } - - fn remove_discovery(&mut self, discovery: &DiscoveryInfo) { - let id = self - .discoveries - .iter() - .position(|d| d.id == discovery.id) - .expect("We expect discovery to always be in the list of discoverers when removing"); - self.discoveries.remove(id); + ) } } @@ -1311,7 +1308,7 @@ impl BaseWebRTCSink { let mut payloader_caps = match media { Some(media) => { - let discovery_info = stream.create_discovery(DiscoveryType::CodecSelection); + let discovery_info = stream.create_discovery(); let codec = BaseWebRTCSink::select_codec( element, @@ -1323,8 +1320,6 @@ impl BaseWebRTCSink { ) .await; - stream.remove_discovery(&discovery_info); - match codec { Some(codec) => { gst::debug!( @@ -2846,75 +2841,89 @@ impl BaseWebRTCSink { .set_state(gst::State::Playing) .with_context(|| format!("Running discovery pipeline for caps {input_caps}"))?; - while let Some(msg) = stream.next().await { - match msg.view() { - gst::MessageView::Error(err) => { - gst::error!(CAT, "Error in discovery pipeline: {err:#?}"); - pipe.0.debug_to_dot_file_with_ts( - gst::DebugGraphDetails::all(), - "webrtcsink-discovery-error", - ); - return Err(err.error().into()); - } - gst::MessageView::StateChanged(s) => { - if msg.src() == Some(pipe.0.upcast_ref()) { - pipe.0.debug_to_dot_file_with_ts( - gst::DebugGraphDetails::all(), - format!( - "webrtcsink-discovery-{}-{:?}-{:?}", - pipe.0.name(), - s.old(), - s.current() - ), - ); - } - continue; - } - gst::MessageView::Application(appmsg) => { - let caps = match appmsg.structure() { - Some(s) => { - if s.name().as_str() != "payloaded_caps" { - continue; - } - - s.get::("caps").unwrap() - } - _ => continue, - }; - - gst::info!(CAT, "Discovery pipeline got caps {caps:?}"); - pipe.0.debug_to_dot_file_with_ts( - gst::DebugGraphDetails::all(), - "webrtcsink-discovery-done", - ); - - if let Some(s) = caps.structure(0) { - let mut s = s.to_owned(); - s.remove_fields([ - "timestamp-offset", - "seqnum-offset", - "ssrc", - "sprop-parameter-sets", - "a-framerate", - ]); - s.set("payload", codec.payload().unwrap()); - gst::debug!( - CAT, - obj: element, - "Codec discovery pipeline for caps {input_caps} with codec {codec:?} succeeded: {s}" - ); - return Ok(s); - } else { - return Err(anyhow!("Discovered empty caps")); - } - } - _ => { - continue; - } - } + { + let mut state = element.imp().state.lock().unwrap(); + state.queue_discovery(stream_name, discovery_info.clone()); } - unreachable!() + let ret = { + loop { + if let Some(msg) = stream.next().await { + match msg.view() { + gst::MessageView::Error(err) => { + gst::warning!(CAT, "Error in discovery pipeline: {err:#?}"); + pipe.0.debug_to_dot_file_with_ts( + gst::DebugGraphDetails::all(), + "webrtcsink-discovery-error", + ); + break Err(err.error().into()); + } + gst::MessageView::StateChanged(s) => { + if msg.src() == Some(pipe.0.upcast_ref()) { + pipe.0.debug_to_dot_file_with_ts( + gst::DebugGraphDetails::all(), + format!( + "webrtcsink-discovery-{}-{:?}-{:?}", + pipe.0.name(), + s.old(), + s.current() + ), + ); + } + continue; + } + gst::MessageView::Application(appmsg) => { + let caps = match appmsg.structure() { + Some(s) => { + if s.name().as_str() != "payloaded_caps" { + continue; + } + + s.get::("caps").unwrap() + } + _ => continue, + }; + + gst::info!(CAT, "Discovery pipeline got caps {caps:?}"); + pipe.0.debug_to_dot_file_with_ts( + gst::DebugGraphDetails::all(), + format!("webrtcsink-discovery-{}-done", pipe.0.name()), + ); + + if let Some(s) = caps.structure(0) { + let mut s = s.to_owned(); + s.remove_fields([ + "timestamp-offset", + "seqnum-offset", + "ssrc", + "sprop-parameter-sets", + "a-framerate", + ]); + s.set("payload", codec.payload().unwrap()); + gst::debug!( + CAT, + obj: element, + "Codec discovery pipeline for caps {input_caps} with codec {codec:?} succeeded: {s}" + ); + break Ok(s); + } else { + break Err(anyhow!("Discovered empty caps")); + } + } + _ => { + continue; + } + } + } else { + unreachable!() + } + } + }; + + let mut state = element.imp().state.lock().unwrap(); + state.remove_discovery(stream_name, discovery_info); + + ret } async fn lookup_caps( @@ -3070,36 +3079,36 @@ impl BaseWebRTCSink { gst::Pad::event_default(pad, Some(element), event) } - fn start_stream_discovery_if_needed(&self, stream_name: &str, buffer: &gst::Buffer) { - let (codecs, discovery_info) = { - let mut state = self.state.lock().unwrap(); - let stream = state.streams.get_mut(stream_name).unwrap(); + fn feed_discoveries(&self, stream_name: &str, buffer: &gst::Buffer) { + let state = self.state.lock().unwrap(); - // Discovery already happened... nothing to do here. - if stream.out_caps.is_some() { - return; - } - - let mut discovery_started = false; - for discovery_info in stream.discoveries.iter() { - if matches!(discovery_info.type_, DiscoveryType::Initial) { - discovery_started = true; - } + if let Some(discos) = state.discoveries.get(stream_name) { + for discovery_info in discos.iter() { for src in discovery_info.srcs() { if let Err(err) = src.push_buffer(buffer.clone()) { gst::log!(CAT, obj: src, "Failed to push buffer: {}", err); } } } + } + } - if discovery_started { - // Discovery already started, we pushed the buffer to keep it - // going - return; - } + fn start_stream_discovery_if_needed(&self, stream_name: &str) { + let (codecs, discovery_info) = { + let mut state = self.state.lock().unwrap(); - let discovery_info = stream.create_discovery(DiscoveryType::Initial); - stream.discoveries.push(discovery_info.clone()); + let discovery_info = { + let stream = state.streams.get_mut(stream_name).unwrap(); + + // Initial discovery already happened... nothing to do here. + if stream.initial_discovery_started { + return; + } + + stream.initial_discovery_started = true; + + stream.create_discovery() + }; let codecs = if !state.codecs.is_empty() { Codecs::from_map(&state.codecs) @@ -3123,8 +3132,8 @@ impl BaseWebRTCSink { let (fut, handle) = futures::future::abortable( Self::lookup_caps( element, - discovery_info, - stream_name_clone, + discovery_info.clone(), + stream_name_clone.clone(), gst::Caps::new_any(), &codecs, )); @@ -3164,12 +3173,9 @@ impl BaseWebRTCSink { _ => (), } + let _ = codecs_done_sender.send(()); })); - - let mut state = self.state.lock().unwrap(); - let stream = state.streams.get_mut(stream_name).unwrap(); - stream.remove_discovery(&discovery_info); } fn chain( @@ -3177,7 +3183,8 @@ impl BaseWebRTCSink { pad: &gst::GhostPad, buffer: gst::Buffer, ) -> Result { - self.start_stream_discovery_if_needed(pad.name().as_str(), &buffer); + self.start_stream_discovery_if_needed(pad.name().as_str()); + self.feed_discoveries(pad.name().as_str(), &buffer); gst::ProxyPad::chain_default(pad, Some(&*self.obj()), buffer) } @@ -3690,7 +3697,7 @@ impl ElementImpl for BaseWebRTCSink { clocksync: None, is_video, serial, - discoveries: Default::default(), + initial_discovery_started: false, }, );