ndisinkcombiner: Properly handle caps changes

We are caching one video buffer, so previously we were changing the src
caps one buffer too early.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1110>
This commit is contained in:
Vivia Nikolaidou 2023-02-28 14:03:57 +02:00 committed by GStreamer Marge Bot
parent fb528941ea
commit cd74d01324

View file

@ -25,7 +25,16 @@ struct State {
// to the current_video_buffer below! // to the current_video_buffer below!
video_info: Option<gst_video::VideoInfo>, video_info: Option<gst_video::VideoInfo>,
audio_info: Option<gst_audio::AudioInfo>, audio_info: Option<gst_audio::AudioInfo>,
current_video_buffer: Option<(gst::Buffer, gst::ClockTime)>, // These are only ever set when a change is pending mid-stream. They apply to the currently
// pending buffer on the pad and not to the current_video_buffer.
pending_caps: Option<gst::Caps>,
pending_segment: Option<gst::Segment>,
current_video_buffer: Option<(
gst::Buffer,
gst::ClockTime,
Option<gst::Caps>,
Option<gst::Segment>,
)>,
current_audio_buffers: Vec<(gst::Buffer, gst_audio::AudioInfo, i64)>, current_audio_buffers: Vec<(gst::Buffer, gst_audio::AudioInfo, i64)>,
} }
@ -181,6 +190,8 @@ impl AggregatorImpl for NdiSinkCombiner {
*state_storage = Some(State { *state_storage = Some(State {
audio_info: None, audio_info: None,
video_info: None, video_info: None,
pending_caps: None,
pending_segment: None,
current_video_buffer: None, current_video_buffer: None,
current_audio_buffers: Vec::new(), current_audio_buffers: Vec::new(),
}); });
@ -373,21 +384,36 @@ impl AggregatorImpl for NdiSinkCombiner {
None => return Err(gst::FlowError::Flushing), None => return Err(gst::FlowError::Flushing),
}; };
let (mut current_video_buffer, current_video_running_time_end, next_video_buffer) = let (
if let Some((video_buffer, video_segment)) = video_buffer_and_segment { mut current_video_buffer,
current_video_running_time_end,
pending_caps,
pending_segment,
next_video_buffer,
) = if let Some((video_buffer, video_segment)) = video_buffer_and_segment {
let video_running_time = video_segment.to_running_time(video_buffer.pts()).unwrap(); let video_running_time = video_segment.to_running_time(video_buffer.pts()).unwrap();
if let Some(pending_segment) = &state.pending_segment {
assert_eq!(video_segment.upcast_ref(), pending_segment);
}
match state.current_video_buffer { match &state.current_video_buffer {
None => { None => {
gst::trace!(CAT, imp: self, "First video buffer, waiting for second"); gst::trace!(CAT, imp: self, "First video buffer, waiting for second");
state.current_video_buffer = Some((video_buffer, video_running_time)); state.current_video_buffer = Some((
video_buffer,
video_running_time,
state.pending_caps.take(),
state.pending_segment.take(),
));
drop(state_storage); drop(state_storage);
self.video_pad.drop_buffer(); self.video_pad.drop_buffer();
return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
} }
Some((ref buffer, _)) => ( Some((ref buffer, _, pending_caps, pending_segment)) => (
buffer.clone(), buffer.clone(),
Some(video_running_time), Some(video_running_time),
pending_caps.clone(),
pending_segment.clone(),
Some((video_buffer, video_running_time)), Some((video_buffer, video_running_time)),
), ),
} }
@ -420,14 +446,9 @@ impl AggregatorImpl for NdiSinkCombiner {
return Err(gst::FlowError::Error); return Err(gst::FlowError::Error);
} }
}; };
let video_pts = let video_pts = video_segment.position_from_running_time(audio_running_time);
video_segment.position_from_running_time(audio_running_time);
if video_pts.is_none() { if video_pts.is_none() {
gst::warning!( gst::warning!(CAT, imp: self, "Can't output more audio after video EOS");
CAT,
imp: self,
"Can't output more audio after video EOS"
);
return Err(gst::FlowError::Eos); return Err(gst::FlowError::Eos);
} }
@ -437,9 +458,11 @@ impl AggregatorImpl for NdiSinkCombiner {
buffer.set_pts(video_pts); buffer.set_pts(video_pts);
} }
(buffer, gst::ClockTime::NONE, None) (buffer, gst::ClockTime::NONE, None, None, None)
}
(Some((ref buffer, _, _, _)), _) => {
(buffer.clone(), gst::ClockTime::NONE, None, None, None)
} }
(Some((ref buffer, _)), _) => (buffer.clone(), gst::ClockTime::NONE, None),
} }
}; };
@ -508,7 +531,7 @@ impl AggregatorImpl for NdiSinkCombiner {
} }
if let Some((video_buffer, video_running_time)) = next_video_buffer { if let Some((video_buffer, video_running_time)) = next_video_buffer {
state.current_video_buffer = Some((video_buffer, video_running_time)); state.current_video_buffer = Some((video_buffer, video_running_time, None, None));
drop(state_storage); drop(state_storage);
self.video_pad.drop_buffer(); self.video_pad.drop_buffer();
} else { } else {
@ -522,7 +545,14 @@ impl AggregatorImpl for NdiSinkCombiner {
"Finishing video buffer {:?}", "Finishing video buffer {:?}",
current_video_buffer current_video_buffer
); );
self.obj().finish_buffer(current_video_buffer) if let Some(caps) = pending_caps {
self.obj().set_src_caps(&caps);
}
if let Some(segment) = pending_segment {
self.obj().update_segment(&segment);
}
let ret = self.obj().finish_buffer(current_video_buffer);
ret
} }
fn sink_event(&self, pad: &gst_base::AggregatorPad, event: gst::Event) -> bool { fn sink_event(&self, pad: &gst_base::AggregatorPad, event: gst::Event) -> bool {
@ -539,6 +569,7 @@ impl AggregatorImpl for NdiSinkCombiner {
}; };
if pad == &self.video_pad { if pad == &self.video_pad {
let mut send_caps_immediately = true;
let info = match gst_video::VideoInfo::from_caps(&caps) { let info = match gst_video::VideoInfo::from_caps(&caps) {
Ok(info) => info, Ok(info) => info,
Err(_) => { Err(_) => {
@ -559,13 +590,21 @@ impl AggregatorImpl for NdiSinkCombiner {
}; };
state.video_info = Some(info); state.video_info = Some(info);
if state.current_video_buffer.is_some() {
state.pending_caps = Some(caps.clone());
send_caps_immediately = false;
} else {
state.pending_caps = None;
}
drop(state_storage); drop(state_storage);
self.obj().set_latency(latency, gst::ClockTime::NONE); self.obj().set_latency(latency, gst::ClockTime::NONE);
// The video caps are passed through as the audio is included only in a meta // The video caps are passed through as the audio is included only in a meta
if send_caps_immediately {
self.obj().set_src_caps(&caps); self.obj().set_src_caps(&caps);
}
} else { } else {
let info = match gst_audio::AudioInfo::from_caps(&caps) { let info = match gst_audio::AudioInfo::from_caps(&caps) {
Ok(info) => info, Ok(info) => info,
@ -582,8 +621,34 @@ impl AggregatorImpl for NdiSinkCombiner {
EventView::Segment(segment) if pad == &self.video_pad => { EventView::Segment(segment) if pad == &self.video_pad => {
let segment = segment.segment(); let segment = segment.segment();
gst::debug!(CAT, obj: pad, "Updating segment {:?}", segment); gst::debug!(CAT, obj: pad, "Updating segment {:?}", segment);
let mut state_storage = self.state.lock().unwrap();
let state = match &mut *state_storage {
Some(ref mut state) => state,
None => return false,
};
let mut send_segment_immediately = true;
if state.current_video_buffer.is_some() {
state.pending_segment = Some(segment.clone());
send_segment_immediately = false;
} else {
state.pending_caps = None;
}
drop(state_storage);
if send_segment_immediately {
self.obj().update_segment(segment); self.obj().update_segment(segment);
} }
}
EventView::FlushStop(_) if pad == &self.video_pad => {
let mut state_storage = self.state.lock().unwrap();
let state = match &mut *state_storage {
Some(ref mut state) => state,
None => return false,
};
state.pending_segment = None;
state.pending_caps = None;
}
_ => (), _ => (),
} }