From 2381558169c3946575c78ff41248509c2d8c2c95 Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Tue, 5 Sep 2023 15:16:54 +0200 Subject: [PATCH] webrtcsink: fix codec selection discoveries Since ab1ec126983f949804684e11e0e58c7cf3b22bc4: webrtcsink: Add support for pre encoded streams Discovery pipelines for remote offers were no longer fed any buffers. While some encoders could already produce caps with no input buffers, others, such as x264enc, simply hung forever. This resulted in no answer getting produced if for instance video-caps were constrained to H264. Fix this by tracking discovery pipelines at the State rather than the InputStream level, removing the useless distinction of Initial vs. CodecSelection discoveries, and always feeding all the current discovery pipelines with incoming buffers. For reference, the issue here was that codec selection discoveries were assigned to local clones of InputStreams, not tracked anywhere, and thus not iterated for discoveries when queuing incoming buffers from the chain function, as it only looked at the original instance of InputStream's in state.streams. Part-of: --- net/webrtc/src/webrtcsink/imp.rs | 269 ++++++++++++++++--------------- 1 file changed, 138 insertions(+), 131 deletions(-) diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index 7b6e9606..0e3dad74 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -85,29 +85,17 @@ struct Settings { signaller: Signallable, } -/// Type of discovery, used to differentiate between initial discovery -/// and discovery initiated by client offer -#[derive(Debug, Clone, PartialEq, Eq)] -enum DiscoveryType { - /// Initial discovery of our input streams - Initial, - /// Discovery to select a specific codec as requested by the remote peer - CodecSelection, -} - #[derive(Debug, Clone)] struct DiscoveryInfo { id: String, - type_: DiscoveryType, caps: gst::Caps, srcs: Arc>>, } impl DiscoveryInfo { - fn new(type_: DiscoveryType, caps: gst::Caps) -> Self { + fn new(caps: gst::Caps) -> Self { Self { id: uuid::Uuid::new_v4().to_string(), - type_, caps, srcs: Default::default(), } @@ -144,8 +132,8 @@ struct InputStream { serial: u32, /// Whether the input stream is video or not is_video: bool, - /// Information about currently running codec discoveries - discoveries: Vec, + /// Whether initial discovery has started + initial_discovery_started: bool, } /// Wrapper around webrtcbin pads @@ -243,6 +231,7 @@ struct State { audio_serial: u32, video_serial: u32, streams: HashMap, + discoveries: HashMap>, navigation_handler: Option, mids: HashMap, signaller_signals: Option, @@ -345,6 +334,7 @@ impl Default for State { audio_serial: 0, video_serial: 0, streams: HashMap::new(), + discoveries: HashMap::new(), navigation_handler: None, mids: HashMap::new(), signaller_signals: Default::default(), @@ -904,6 +894,27 @@ impl State { && element.current_state() >= gst::State::Paused && self.codec_discovery_done } + + fn queue_discovery(&mut self, stream_name: &str, discovery_info: DiscoveryInfo) { + if let Some(discos) = self.discoveries.get_mut(stream_name) { + discos.push(discovery_info); + } else { + self.discoveries + .insert(stream_name.to_string(), vec![discovery_info]); + } + } + + fn remove_discovery(&mut self, stream_name: &str, discovery_info: &DiscoveryInfo) { + if let Some(discos) = self.discoveries.get_mut(stream_name) { + let position = discos + .iter() + .position(|d| d.id == discovery_info.id) + .expect( + "We expect discovery to always be in the list of discoverers when removing", + ); + discos.remove(position); + } + } } impl Session { @@ -1191,26 +1202,12 @@ impl InputStream { } } - fn create_discovery(&mut self, type_: DiscoveryType) -> DiscoveryInfo { - let discovery_info = DiscoveryInfo::new( - type_, + fn create_discovery(&self) -> DiscoveryInfo { + DiscoveryInfo::new( self.in_caps.clone().expect( "We should never create a discovery for a stream that doesn't have caps set", ), - ); - - self.discoveries.push(discovery_info.clone()); - - discovery_info - } - - fn remove_discovery(&mut self, discovery: &DiscoveryInfo) { - let id = self - .discoveries - .iter() - .position(|d| d.id == discovery.id) - .expect("We expect discovery to always be in the list of discoverers when removing"); - self.discoveries.remove(id); + ) } } @@ -1311,7 +1308,7 @@ impl BaseWebRTCSink { let mut payloader_caps = match media { Some(media) => { - let discovery_info = stream.create_discovery(DiscoveryType::CodecSelection); + let discovery_info = stream.create_discovery(); let codec = BaseWebRTCSink::select_codec( element, @@ -1323,8 +1320,6 @@ impl BaseWebRTCSink { ) .await; - stream.remove_discovery(&discovery_info); - match codec { Some(codec) => { gst::debug!( @@ -2846,75 +2841,89 @@ impl BaseWebRTCSink { .set_state(gst::State::Playing) .with_context(|| format!("Running discovery pipeline for caps {input_caps}"))?; - while let Some(msg) = stream.next().await { - match msg.view() { - gst::MessageView::Error(err) => { - gst::error!(CAT, "Error in discovery pipeline: {err:#?}"); - pipe.0.debug_to_dot_file_with_ts( - gst::DebugGraphDetails::all(), - "webrtcsink-discovery-error", - ); - return Err(err.error().into()); - } - gst::MessageView::StateChanged(s) => { - if msg.src() == Some(pipe.0.upcast_ref()) { - pipe.0.debug_to_dot_file_with_ts( - gst::DebugGraphDetails::all(), - format!( - "webrtcsink-discovery-{}-{:?}-{:?}", - pipe.0.name(), - s.old(), - s.current() - ), - ); - } - continue; - } - gst::MessageView::Application(appmsg) => { - let caps = match appmsg.structure() { - Some(s) => { - if s.name().as_str() != "payloaded_caps" { - continue; - } - - s.get::("caps").unwrap() - } - _ => continue, - }; - - gst::info!(CAT, "Discovery pipeline got caps {caps:?}"); - pipe.0.debug_to_dot_file_with_ts( - gst::DebugGraphDetails::all(), - "webrtcsink-discovery-done", - ); - - if let Some(s) = caps.structure(0) { - let mut s = s.to_owned(); - s.remove_fields([ - "timestamp-offset", - "seqnum-offset", - "ssrc", - "sprop-parameter-sets", - "a-framerate", - ]); - s.set("payload", codec.payload().unwrap()); - gst::debug!( - CAT, - obj: element, - "Codec discovery pipeline for caps {input_caps} with codec {codec:?} succeeded: {s}" - ); - return Ok(s); - } else { - return Err(anyhow!("Discovered empty caps")); - } - } - _ => { - continue; - } - } + { + let mut state = element.imp().state.lock().unwrap(); + state.queue_discovery(stream_name, discovery_info.clone()); } - unreachable!() + let ret = { + loop { + if let Some(msg) = stream.next().await { + match msg.view() { + gst::MessageView::Error(err) => { + gst::warning!(CAT, "Error in discovery pipeline: {err:#?}"); + pipe.0.debug_to_dot_file_with_ts( + gst::DebugGraphDetails::all(), + "webrtcsink-discovery-error", + ); + break Err(err.error().into()); + } + gst::MessageView::StateChanged(s) => { + if msg.src() == Some(pipe.0.upcast_ref()) { + pipe.0.debug_to_dot_file_with_ts( + gst::DebugGraphDetails::all(), + format!( + "webrtcsink-discovery-{}-{:?}-{:?}", + pipe.0.name(), + s.old(), + s.current() + ), + ); + } + continue; + } + gst::MessageView::Application(appmsg) => { + let caps = match appmsg.structure() { + Some(s) => { + if s.name().as_str() != "payloaded_caps" { + continue; + } + + s.get::("caps").unwrap() + } + _ => continue, + }; + + gst::info!(CAT, "Discovery pipeline got caps {caps:?}"); + pipe.0.debug_to_dot_file_with_ts( + gst::DebugGraphDetails::all(), + format!("webrtcsink-discovery-{}-done", pipe.0.name()), + ); + + if let Some(s) = caps.structure(0) { + let mut s = s.to_owned(); + s.remove_fields([ + "timestamp-offset", + "seqnum-offset", + "ssrc", + "sprop-parameter-sets", + "a-framerate", + ]); + s.set("payload", codec.payload().unwrap()); + gst::debug!( + CAT, + obj: element, + "Codec discovery pipeline for caps {input_caps} with codec {codec:?} succeeded: {s}" + ); + break Ok(s); + } else { + break Err(anyhow!("Discovered empty caps")); + } + } + _ => { + continue; + } + } + } else { + unreachable!() + } + } + }; + + let mut state = element.imp().state.lock().unwrap(); + state.remove_discovery(stream_name, discovery_info); + + ret } async fn lookup_caps( @@ -3070,36 +3079,36 @@ impl BaseWebRTCSink { gst::Pad::event_default(pad, Some(element), event) } - fn start_stream_discovery_if_needed(&self, stream_name: &str, buffer: &gst::Buffer) { - let (codecs, discovery_info) = { - let mut state = self.state.lock().unwrap(); - let stream = state.streams.get_mut(stream_name).unwrap(); + fn feed_discoveries(&self, stream_name: &str, buffer: &gst::Buffer) { + let state = self.state.lock().unwrap(); - // Discovery already happened... nothing to do here. - if stream.out_caps.is_some() { - return; - } - - let mut discovery_started = false; - for discovery_info in stream.discoveries.iter() { - if matches!(discovery_info.type_, DiscoveryType::Initial) { - discovery_started = true; - } + if let Some(discos) = state.discoveries.get(stream_name) { + for discovery_info in discos.iter() { for src in discovery_info.srcs() { if let Err(err) = src.push_buffer(buffer.clone()) { gst::log!(CAT, obj: src, "Failed to push buffer: {}", err); } } } + } + } - if discovery_started { - // Discovery already started, we pushed the buffer to keep it - // going - return; - } + fn start_stream_discovery_if_needed(&self, stream_name: &str) { + let (codecs, discovery_info) = { + let mut state = self.state.lock().unwrap(); - let discovery_info = stream.create_discovery(DiscoveryType::Initial); - stream.discoveries.push(discovery_info.clone()); + let discovery_info = { + let stream = state.streams.get_mut(stream_name).unwrap(); + + // Initial discovery already happened... nothing to do here. + if stream.initial_discovery_started { + return; + } + + stream.initial_discovery_started = true; + + stream.create_discovery() + }; let codecs = if !state.codecs.is_empty() { Codecs::from_map(&state.codecs) @@ -3123,8 +3132,8 @@ impl BaseWebRTCSink { let (fut, handle) = futures::future::abortable( Self::lookup_caps( element, - discovery_info, - stream_name_clone, + discovery_info.clone(), + stream_name_clone.clone(), gst::Caps::new_any(), &codecs, )); @@ -3164,12 +3173,9 @@ impl BaseWebRTCSink { _ => (), } + let _ = codecs_done_sender.send(()); })); - - let mut state = self.state.lock().unwrap(); - let stream = state.streams.get_mut(stream_name).unwrap(); - stream.remove_discovery(&discovery_info); } fn chain( @@ -3177,7 +3183,8 @@ impl BaseWebRTCSink { pad: &gst::GhostPad, buffer: gst::Buffer, ) -> Result { - self.start_stream_discovery_if_needed(pad.name().as_str(), &buffer); + self.start_stream_discovery_if_needed(pad.name().as_str()); + self.feed_discoveries(pad.name().as_str(), &buffer); gst::ProxyPad::chain_default(pad, Some(&*self.obj()), buffer) } @@ -3690,7 +3697,7 @@ impl ElementImpl for BaseWebRTCSink { clocksync: None, is_video, serial, - discoveries: Default::default(), + initial_discovery_started: false, }, );