webrtcsink: fix codec selection discoveries

Since ab1ec126983f949804684e11e0e58c7cf3b22bc4:

webrtcsink: Add support for pre encoded streams

Discovery pipelines for remote offers were no longer fed any buffers.

While some encoders could already produce caps with no input buffers,
others, such as x264enc, simply hung forever. This resulted in no answer
getting produced if for instance video-caps were constrained to H264.

Fix this by tracking discovery pipelines at the State rather than the
InputStream level, removing the useless distinction of Initial vs.
CodecSelection discoveries, and always feeding all the current
discovery pipelines with incoming buffers.

For reference, the issue here was that codec selection discoveries were
assigned to local clones of InputStreams, not tracked anywhere, and thus
not iterated for discoveries when queuing incoming buffers from the
chain function, as it only looked at the original instance of
InputStream's in state.streams.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1319>
This commit is contained in:
Mathieu Duponchelle 2023-09-05 15:16:54 +02:00 committed by GStreamer Marge Bot
parent 9604dea90a
commit 2381558169

View file

@ -85,29 +85,17 @@ struct Settings {
signaller: Signallable,
}
/// Type of discovery, used to differentiate between initial discovery
/// and discovery initiated by client offer
#[derive(Debug, Clone, PartialEq, Eq)]
enum DiscoveryType {
/// Initial discovery of our input streams
Initial,
/// Discovery to select a specific codec as requested by the remote peer
CodecSelection,
}
#[derive(Debug, Clone)]
struct DiscoveryInfo {
id: String,
type_: DiscoveryType,
caps: gst::Caps,
srcs: Arc<Mutex<Vec<gst_app::AppSrc>>>,
}
impl DiscoveryInfo {
fn new(type_: DiscoveryType, caps: gst::Caps) -> Self {
fn new(caps: gst::Caps) -> Self {
Self {
id: uuid::Uuid::new_v4().to_string(),
type_,
caps,
srcs: Default::default(),
}
@ -144,8 +132,8 @@ struct InputStream {
serial: u32,
/// Whether the input stream is video or not
is_video: bool,
/// Information about currently running codec discoveries
discoveries: Vec<DiscoveryInfo>,
/// Whether initial discovery has started
initial_discovery_started: bool,
}
/// Wrapper around webrtcbin pads
@ -243,6 +231,7 @@ struct State {
audio_serial: u32,
video_serial: u32,
streams: HashMap<String, InputStream>,
discoveries: HashMap<String, Vec<DiscoveryInfo>>,
navigation_handler: Option<NavigationEventHandler>,
mids: HashMap<String, String>,
signaller_signals: Option<SignallerSignals>,
@ -345,6 +334,7 @@ impl Default for State {
audio_serial: 0,
video_serial: 0,
streams: HashMap::new(),
discoveries: HashMap::new(),
navigation_handler: None,
mids: HashMap::new(),
signaller_signals: Default::default(),
@ -904,6 +894,27 @@ impl State {
&& element.current_state() >= gst::State::Paused
&& self.codec_discovery_done
}
fn queue_discovery(&mut self, stream_name: &str, discovery_info: DiscoveryInfo) {
if let Some(discos) = self.discoveries.get_mut(stream_name) {
discos.push(discovery_info);
} else {
self.discoveries
.insert(stream_name.to_string(), vec![discovery_info]);
}
}
fn remove_discovery(&mut self, stream_name: &str, discovery_info: &DiscoveryInfo) {
if let Some(discos) = self.discoveries.get_mut(stream_name) {
let position = discos
.iter()
.position(|d| d.id == discovery_info.id)
.expect(
"We expect discovery to always be in the list of discoverers when removing",
);
discos.remove(position);
}
}
}
impl Session {
@ -1191,26 +1202,12 @@ impl InputStream {
}
}
fn create_discovery(&mut self, type_: DiscoveryType) -> DiscoveryInfo {
let discovery_info = DiscoveryInfo::new(
type_,
fn create_discovery(&self) -> DiscoveryInfo {
DiscoveryInfo::new(
self.in_caps.clone().expect(
"We should never create a discovery for a stream that doesn't have caps set",
),
);
self.discoveries.push(discovery_info.clone());
discovery_info
}
fn remove_discovery(&mut self, discovery: &DiscoveryInfo) {
let id = self
.discoveries
.iter()
.position(|d| d.id == discovery.id)
.expect("We expect discovery to always be in the list of discoverers when removing");
self.discoveries.remove(id);
)
}
}
@ -1311,7 +1308,7 @@ impl BaseWebRTCSink {
let mut payloader_caps = match media {
Some(media) => {
let discovery_info = stream.create_discovery(DiscoveryType::CodecSelection);
let discovery_info = stream.create_discovery();
let codec = BaseWebRTCSink::select_codec(
element,
@ -1323,8 +1320,6 @@ impl BaseWebRTCSink {
)
.await;
stream.remove_discovery(&discovery_info);
match codec {
Some(codec) => {
gst::debug!(
@ -2846,75 +2841,89 @@ impl BaseWebRTCSink {
.set_state(gst::State::Playing)
.with_context(|| format!("Running discovery pipeline for caps {input_caps}"))?;
while let Some(msg) = stream.next().await {
match msg.view() {
gst::MessageView::Error(err) => {
gst::error!(CAT, "Error in discovery pipeline: {err:#?}");
pipe.0.debug_to_dot_file_with_ts(
gst::DebugGraphDetails::all(),
"webrtcsink-discovery-error",
);
return Err(err.error().into());
}
gst::MessageView::StateChanged(s) => {
if msg.src() == Some(pipe.0.upcast_ref()) {
pipe.0.debug_to_dot_file_with_ts(
gst::DebugGraphDetails::all(),
format!(
"webrtcsink-discovery-{}-{:?}-{:?}",
pipe.0.name(),
s.old(),
s.current()
),
);
}
continue;
}
gst::MessageView::Application(appmsg) => {
let caps = match appmsg.structure() {
Some(s) => {
if s.name().as_str() != "payloaded_caps" {
continue;
}
s.get::<gst::Caps>("caps").unwrap()
}
_ => continue,
};
gst::info!(CAT, "Discovery pipeline got caps {caps:?}");
pipe.0.debug_to_dot_file_with_ts(
gst::DebugGraphDetails::all(),
"webrtcsink-discovery-done",
);
if let Some(s) = caps.structure(0) {
let mut s = s.to_owned();
s.remove_fields([
"timestamp-offset",
"seqnum-offset",
"ssrc",
"sprop-parameter-sets",
"a-framerate",
]);
s.set("payload", codec.payload().unwrap());
gst::debug!(
CAT,
obj: element,
"Codec discovery pipeline for caps {input_caps} with codec {codec:?} succeeded: {s}"
);
return Ok(s);
} else {
return Err(anyhow!("Discovered empty caps"));
}
}
_ => {
continue;
}
}
{
let mut state = element.imp().state.lock().unwrap();
state.queue_discovery(stream_name, discovery_info.clone());
}
unreachable!()
let ret = {
loop {
if let Some(msg) = stream.next().await {
match msg.view() {
gst::MessageView::Error(err) => {
gst::warning!(CAT, "Error in discovery pipeline: {err:#?}");
pipe.0.debug_to_dot_file_with_ts(
gst::DebugGraphDetails::all(),
"webrtcsink-discovery-error",
);
break Err(err.error().into());
}
gst::MessageView::StateChanged(s) => {
if msg.src() == Some(pipe.0.upcast_ref()) {
pipe.0.debug_to_dot_file_with_ts(
gst::DebugGraphDetails::all(),
format!(
"webrtcsink-discovery-{}-{:?}-{:?}",
pipe.0.name(),
s.old(),
s.current()
),
);
}
continue;
}
gst::MessageView::Application(appmsg) => {
let caps = match appmsg.structure() {
Some(s) => {
if s.name().as_str() != "payloaded_caps" {
continue;
}
s.get::<gst::Caps>("caps").unwrap()
}
_ => continue,
};
gst::info!(CAT, "Discovery pipeline got caps {caps:?}");
pipe.0.debug_to_dot_file_with_ts(
gst::DebugGraphDetails::all(),
format!("webrtcsink-discovery-{}-done", pipe.0.name()),
);
if let Some(s) = caps.structure(0) {
let mut s = s.to_owned();
s.remove_fields([
"timestamp-offset",
"seqnum-offset",
"ssrc",
"sprop-parameter-sets",
"a-framerate",
]);
s.set("payload", codec.payload().unwrap());
gst::debug!(
CAT,
obj: element,
"Codec discovery pipeline for caps {input_caps} with codec {codec:?} succeeded: {s}"
);
break Ok(s);
} else {
break Err(anyhow!("Discovered empty caps"));
}
}
_ => {
continue;
}
}
} else {
unreachable!()
}
}
};
let mut state = element.imp().state.lock().unwrap();
state.remove_discovery(stream_name, discovery_info);
ret
}
async fn lookup_caps(
@ -3070,36 +3079,36 @@ impl BaseWebRTCSink {
gst::Pad::event_default(pad, Some(element), event)
}
fn start_stream_discovery_if_needed(&self, stream_name: &str, buffer: &gst::Buffer) {
let (codecs, discovery_info) = {
let mut state = self.state.lock().unwrap();
let stream = state.streams.get_mut(stream_name).unwrap();
fn feed_discoveries(&self, stream_name: &str, buffer: &gst::Buffer) {
let state = self.state.lock().unwrap();
// Discovery already happened... nothing to do here.
if stream.out_caps.is_some() {
return;
}
let mut discovery_started = false;
for discovery_info in stream.discoveries.iter() {
if matches!(discovery_info.type_, DiscoveryType::Initial) {
discovery_started = true;
}
if let Some(discos) = state.discoveries.get(stream_name) {
for discovery_info in discos.iter() {
for src in discovery_info.srcs() {
if let Err(err) = src.push_buffer(buffer.clone()) {
gst::log!(CAT, obj: src, "Failed to push buffer: {}", err);
}
}
}
}
}
if discovery_started {
// Discovery already started, we pushed the buffer to keep it
// going
return;
}
fn start_stream_discovery_if_needed(&self, stream_name: &str) {
let (codecs, discovery_info) = {
let mut state = self.state.lock().unwrap();
let discovery_info = stream.create_discovery(DiscoveryType::Initial);
stream.discoveries.push(discovery_info.clone());
let discovery_info = {
let stream = state.streams.get_mut(stream_name).unwrap();
// Initial discovery already happened... nothing to do here.
if stream.initial_discovery_started {
return;
}
stream.initial_discovery_started = true;
stream.create_discovery()
};
let codecs = if !state.codecs.is_empty() {
Codecs::from_map(&state.codecs)
@ -3123,8 +3132,8 @@ impl BaseWebRTCSink {
let (fut, handle) = futures::future::abortable(
Self::lookup_caps(
element,
discovery_info,
stream_name_clone,
discovery_info.clone(),
stream_name_clone.clone(),
gst::Caps::new_any(),
&codecs,
));
@ -3164,12 +3173,9 @@ impl BaseWebRTCSink {
_ => (),
}
let _ = codecs_done_sender.send(());
}));
let mut state = self.state.lock().unwrap();
let stream = state.streams.get_mut(stream_name).unwrap();
stream.remove_discovery(&discovery_info);
}
fn chain(
@ -3177,7 +3183,8 @@ impl BaseWebRTCSink {
pad: &gst::GhostPad,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
self.start_stream_discovery_if_needed(pad.name().as_str(), &buffer);
self.start_stream_discovery_if_needed(pad.name().as_str());
self.feed_discoveries(pad.name().as_str(), &buffer);
gst::ProxyPad::chain_default(pad, Some(&*self.obj()), buffer)
}
@ -3690,7 +3697,7 @@ impl ElementImpl for BaseWebRTCSink {
clocksync: None,
is_video,
serial,
discoveries: Default::default(),
initial_discovery_started: false,
},
);