Refactor huge create() functions into multiple smaller functions

And don't hold any mutexes when unnecessary, especially not while trying
to receive a frame.
This commit is contained in:
Sebastian Dröge 2019-07-17 11:44:24 +03:00
parent cd0726b037
commit f27c2507c5
2 changed files with 283 additions and 190 deletions

View file

@ -438,15 +438,20 @@ impl BaseSrcImpl for NdiAudioSrc {
_offset: u64, _offset: u64,
_length: u32, _length: u32,
) -> Result<gst::Buffer, gst::FlowError> { ) -> Result<gst::Buffer, gst::FlowError> {
// FIXME: Make sure to not have any mutexes locked while wait self.capture(element)
}
}
impl NdiAudioSrc {
fn capture(&self, element: &gst_base::BaseSrc) -> Result<gst::Buffer, gst::FlowError> {
let settings = self.settings.lock().unwrap().clone(); let settings = self.settings.lock().unwrap().clone();
let mut state = self.state.lock().unwrap();
let recv = {
let state = self.state.lock().unwrap();
let receivers = HASHMAP_RECEIVERS.lock().unwrap(); let receivers = HASHMAP_RECEIVERS.lock().unwrap();
let receiver = &receivers.get(&state.id_receiver.unwrap()).unwrap(); let receiver = &receivers.get(&state.id_receiver.unwrap()).unwrap();
let recv = &receiver.ndi_instance; receiver.ndi_instance.clone()
};
let clock = element.get_clock().unwrap();
let timeout = time::Instant::now(); let timeout = time::Instant::now();
let audio_frame = loop { let audio_frame = loop {
@ -478,6 +483,46 @@ impl BaseSrcImpl for NdiAudioSrc {
break audio_frame; break audio_frame;
}; };
let pts = self.calculate_timestamp(element, &settings, &audio_frame);
let info = self.create_audio_info(element, &audio_frame)?;
{
let mut state = self.state.lock().unwrap();
if state.info.as_ref() != Some(&info) {
let caps = info.to_caps().unwrap();
state.info = Some(info.clone());
state.current_latency = gst::SECOND
.mul_div_ceil(
audio_frame.no_samples() as u64,
audio_frame.sample_rate() as u64,
)
.unwrap_or(gst::CLOCK_TIME_NONE);
drop(state);
gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps);
element
.set_caps(&caps)
.map_err(|_| gst::FlowError::NotNegotiated)?;
let _ =
element.post_message(&gst::Message::new_latency().src(Some(element)).build());
}
}
let buffer = self.create_buffer(element, pts, &info, &audio_frame)?;
gst_log!(self.cat, obj: element, "Produced buffer {:?}", buffer);
Ok(buffer)
}
fn calculate_timestamp(
&self,
element: &gst_base::BaseSrc,
settings: &Settings,
audio_frame: &AudioFrame,
) -> gst::ClockTime {
let clock = element.get_clock().unwrap();
// For now take the current running time as PTS. At a later time we // For now take the current running time as PTS. At a later time we
// will want to work with the timestamp given by the NDI SDK if available // will want to work with the timestamp given by the NDI SDK if available
let now = clock.get_time(); let now = clock.get_time();
@ -530,34 +575,32 @@ impl BaseSrcImpl for NdiAudioSrc {
pts pts
); );
let info = gst_audio::AudioInfo::new( pts
}
fn create_audio_info(
&self,
_element: &gst_base::BaseSrc,
audio_frame: &AudioFrame,
) -> Result<gst_audio::AudioInfo, gst::FlowError> {
let builder = gst_audio::AudioInfo::new(
gst_audio::AUDIO_FORMAT_S16, gst_audio::AUDIO_FORMAT_S16,
audio_frame.sample_rate() as u32, audio_frame.sample_rate() as u32,
audio_frame.no_channels() as u32, audio_frame.no_channels() as u32,
) );
.build()
.unwrap();
if state.info.as_ref() != Some(&info) { Ok(builder.build().unwrap())
let caps = info.to_caps().unwrap();
state.info = Some(info);
state.current_latency = gst::SECOND
.mul_div_ceil(
audio_frame.no_samples() as u64,
audio_frame.sample_rate() as u64,
)
.unwrap_or(gst::CLOCK_TIME_NONE);
gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps);
element
.set_caps(&caps)
.map_err(|_| gst::FlowError::NotNegotiated)?;
let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build());
} }
fn create_buffer(
&self,
_element: &gst_base::BaseSrc,
pts: gst::ClockTime,
info: &gst_audio::AudioInfo,
audio_frame: &AudioFrame,
) -> Result<gst::Buffer, gst::FlowError> {
// We multiply by 2 because is the size in bytes of an i16 variable // We multiply by 2 because is the size in bytes of an i16 variable
let buff_size = (audio_frame.no_samples() * 2 * audio_frame.no_channels()) as usize; let buff_size = (audio_frame.no_samples() as u32 * info.bpf()) as usize;
let mut buffer = gst::Buffer::with_size(buff_size).unwrap(); let mut buffer = gst::Buffer::with_size(buff_size).unwrap();
{ {
let duration = gst::SECOND let duration = gst::SECOND
@ -598,8 +641,6 @@ impl BaseSrcImpl for NdiAudioSrc {
); );
} }
gst_log!(self.cat, obj: element, "Produced buffer {:?}", buffer);
Ok(buffer) Ok(buffer)
} }
} }

View file

@ -484,15 +484,20 @@ impl BaseSrcImpl for NdiVideoSrc {
_offset: u64, _offset: u64,
_length: u32, _length: u32,
) -> Result<gst::Buffer, gst::FlowError> { ) -> Result<gst::Buffer, gst::FlowError> {
// FIXME: Make sure to not have any mutexes locked while wait self.capture(element)
}
}
impl NdiVideoSrc {
fn capture(&self, element: &gst_base::BaseSrc) -> Result<gst::Buffer, gst::FlowError> {
let settings = self.settings.lock().unwrap().clone(); let settings = self.settings.lock().unwrap().clone();
let mut state = self.state.lock().unwrap();
let recv = {
let state = self.state.lock().unwrap();
let receivers = HASHMAP_RECEIVERS.lock().unwrap(); let receivers = HASHMAP_RECEIVERS.lock().unwrap();
let receiver = &receivers.get(&state.id_receiver.unwrap()).unwrap(); let receiver = &receivers.get(&state.id_receiver.unwrap()).unwrap();
let recv = &receiver.ndi_instance; receiver.ndi_instance.clone()
};
let clock = element.get_clock().unwrap();
let timeout = time::Instant::now(); let timeout = time::Instant::now();
let video_frame = loop { let video_frame = loop {
@ -524,6 +529,46 @@ impl BaseSrcImpl for NdiVideoSrc {
break video_frame; break video_frame;
}; };
let pts = self.calculate_timestamp(element, &settings, &video_frame);
let info = self.create_video_info(element, &video_frame)?;
{
let mut state = self.state.lock().unwrap();
if state.info.as_ref() != Some(&info) {
let caps = info.to_caps().unwrap();
state.info = Some(info.clone());
state.current_latency = gst::SECOND
.mul_div_ceil(
video_frame.frame_rate().1 as u64,
video_frame.frame_rate().0 as u64,
)
.unwrap_or(gst::CLOCK_TIME_NONE);
drop(state);
gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps);
element
.set_caps(&caps)
.map_err(|_| gst::FlowError::NotNegotiated)?;
let _ =
element.post_message(&gst::Message::new_latency().src(Some(element)).build());
}
}
let buffer = self.create_buffer(element, pts, &info, &video_frame)?;
gst_log!(self.cat, obj: element, "Produced buffer {:?}", buffer);
Ok(buffer)
}
fn calculate_timestamp(
&self,
element: &gst_base::BaseSrc,
settings: &Settings,
video_frame: &VideoFrame,
) -> gst::ClockTime {
let clock = element.get_clock().unwrap();
// For now take the current running time as PTS. At a later time we // For now take the current running time as PTS. At a later time we
// will want to work with the timestamp given by the NDI SDK if available // will want to work with the timestamp given by the NDI SDK if available
let now = clock.get_time(); let now = clock.get_time();
@ -576,6 +621,14 @@ impl BaseSrcImpl for NdiVideoSrc {
pts pts
); );
pts
}
fn create_video_info(
&self,
_element: &gst_base::BaseSrc,
video_frame: &VideoFrame,
) -> Result<gst_video::VideoInfo, gst::FlowError> {
// YV12 and I420 are swapped in the NDI SDK compared to GStreamer // YV12 and I420 are swapped in the NDI SDK compared to GStreamer
let format = match video_frame.fourcc() { let format = match video_frame.fourcc() {
ndisys::NDIlib_FourCC_type_e::NDIlib_FourCC_type_UYVY => gst_video::VideoFormat::Uyvy, ndisys::NDIlib_FourCC_type_e::NDIlib_FourCC_type_UYVY => gst_video::VideoFormat::Uyvy,
@ -593,7 +646,7 @@ impl BaseSrcImpl for NdiVideoSrc {
* gst::Fraction::new(video_frame.yres(), video_frame.xres()); * gst::Fraction::new(video_frame.yres(), video_frame.xres());
#[cfg(feature = "interlaced-fields")] #[cfg(feature = "interlaced-fields")]
let info = { {
let mut builder = gst_video::VideoInfo::new( let mut builder = gst_video::VideoInfo::new(
format, format,
video_frame.xres() as u32, video_frame.xres() as u32,
@ -618,11 +671,12 @@ impl BaseSrcImpl for NdiVideoSrc {
builder = builder.field_order(gst_video::VideoFieldOrder::TopFieldFirst); builder = builder.field_order(gst_video::VideoFieldOrder::TopFieldFirst);
} }
builder.build().unwrap() Ok(builder.build().unwrap())
}; }
#[cfg(not(feature = "interlaced-fields"))] #[cfg(not(feature = "interlaced-fields"))]
let info = if video_frame.frame_format_type() {
if video_frame.frame_format_type()
!= ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_progressive != ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_progressive
&& video_frame.frame_format_type() && video_frame.frame_format_type()
!= ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved != ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved
@ -633,8 +687,13 @@ impl BaseSrcImpl for NdiVideoSrc {
["Separate field interlacing not supported"] ["Separate field interlacing not supported"]
); );
return Err(gst::FlowError::NotNegotiated); return Err(gst::FlowError::NotNegotiated);
} else { }
gst_video::VideoInfo::new(format, video_frame.xres() as u32, video_frame.yres() as u32)
let builder = gst_video::VideoInfo::new(
format,
video_frame.xres() as u32,
video_frame.yres() as u32,
)
.fps(gst::Fraction::from(video_frame.frame_rate())) .fps(gst::Fraction::from(video_frame.frame_rate()))
.par(par) .par(par)
.interlace_mode( .interlace_mode(
@ -645,29 +704,19 @@ impl BaseSrcImpl for NdiVideoSrc {
} else { } else {
gst_video::VideoInterlaceMode::Interleaved gst_video::VideoInterlaceMode::Interleaved
}, },
) );
.build() Ok(builder.build().unwrap());
.unwrap() }
};
if state.info.as_ref() != Some(&info) {
let caps = info.to_caps().unwrap();
state.info = Some(info);
state.current_latency = gst::SECOND
.mul_div_ceil(
video_frame.frame_rate().1 as u64,
video_frame.frame_rate().0 as u64,
)
.unwrap_or(gst::CLOCK_TIME_NONE);
gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps);
element
.set_caps(&caps)
.map_err(|_| gst::FlowError::NotNegotiated)?;
let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build());
} }
let mut buffer = gst::Buffer::with_size(state.info.as_ref().unwrap().size()).unwrap(); fn create_buffer(
&self,
element: &gst_base::BaseSrc,
pts: gst::ClockTime,
info: &gst_video::VideoInfo,
video_frame: &VideoFrame,
) -> Result<gst::Buffer, gst::FlowError> {
let mut buffer = gst::Buffer::with_size(info.size()).unwrap();
{ {
let duration = gst::SECOND let duration = gst::SECOND
.mul_div_floor( .mul_div_floor(
@ -734,18 +783,26 @@ impl BaseSrcImpl for NdiVideoSrc {
} }
} }
let buffer = { self.copy_frame(element, info, buffer, video_frame)
let mut vframe = }
gst_video::VideoFrame::from_buffer_writable(buffer, &state.info.as_ref().unwrap())
.unwrap();
match format { fn copy_frame(
&self,
_element: &gst_base::BaseSrc,
info: &gst_video::VideoInfo,
buffer: gst::Buffer,
video_frame: &VideoFrame,
) -> Result<gst::Buffer, gst::FlowError> {
// FIXME: Error handling if frame dimensions don't match
let mut vframe = gst_video::VideoFrame::from_buffer_writable(buffer, info).unwrap();
match info.format() {
gst_video::VideoFormat::Uyvy gst_video::VideoFormat::Uyvy
| gst_video::VideoFormat::Bgra | gst_video::VideoFormat::Bgra
| gst_video::VideoFormat::Bgrx | gst_video::VideoFormat::Bgrx
| gst_video::VideoFormat::Rgba | gst_video::VideoFormat::Rgba
| gst_video::VideoFormat::Rgbx => { | gst_video::VideoFormat::Rgbx => {
let line_bytes = if format == gst_video::VideoFormat::Uyvy { let line_bytes = if info.format() == gst_video::VideoFormat::Uyvy {
2 * vframe.width() as usize 2 * vframe.width() as usize
} else { } else {
4 * vframe.width() as usize 4 * vframe.width() as usize
@ -851,12 +908,7 @@ impl BaseSrcImpl for NdiVideoSrc {
_ => unreachable!(), _ => unreachable!(),
} }
vframe.into_buffer() Ok(vframe.into_buffer())
};
gst_log!(self.cat, obj: element, "Produced buffer {:?}", buffer);
Ok(buffer)
} }
} }