From f27c2507c5f1983092463c6778f1c9eaedb2d9db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Wed, 17 Jul 2019 11:44:24 +0300 Subject: [PATCH] Refactor huge create() functions into multiple smaller functions And don't hold any mutexes when unnecessary, especially not while trying to receive a frame. --- src/ndiaudiosrc.rs | 103 +++++++++---- src/ndivideosrc.rs | 370 ++++++++++++++++++++++++++------------------- 2 files changed, 283 insertions(+), 190 deletions(-) diff --git a/src/ndiaudiosrc.rs b/src/ndiaudiosrc.rs index bfadbcdd..8e01dead 100644 --- a/src/ndiaudiosrc.rs +++ b/src/ndiaudiosrc.rs @@ -438,15 +438,20 @@ impl BaseSrcImpl for NdiAudioSrc { _offset: u64, _length: u32, ) -> Result { - // FIXME: Make sure to not have any mutexes locked while wait + self.capture(element) + } +} + +impl NdiAudioSrc { + fn capture(&self, element: &gst_base::BaseSrc) -> Result { let settings = self.settings.lock().unwrap().clone(); - let mut state = self.state.lock().unwrap(); - let receivers = HASHMAP_RECEIVERS.lock().unwrap(); - let receiver = &receivers.get(&state.id_receiver.unwrap()).unwrap(); - let recv = &receiver.ndi_instance; - - let clock = element.get_clock().unwrap(); + let recv = { + let state = self.state.lock().unwrap(); + let receivers = HASHMAP_RECEIVERS.lock().unwrap(); + let receiver = &receivers.get(&state.id_receiver.unwrap()).unwrap(); + receiver.ndi_instance.clone() + }; let timeout = time::Instant::now(); let audio_frame = loop { @@ -478,6 +483,46 @@ impl BaseSrcImpl for NdiAudioSrc { break audio_frame; }; + let pts = self.calculate_timestamp(element, &settings, &audio_frame); + let info = self.create_audio_info(element, &audio_frame)?; + + { + let mut state = self.state.lock().unwrap(); + if state.info.as_ref() != Some(&info) { + let caps = info.to_caps().unwrap(); + state.info = Some(info.clone()); + state.current_latency = gst::SECOND + .mul_div_ceil( + audio_frame.no_samples() as u64, + audio_frame.sample_rate() as u64, + ) + .unwrap_or(gst::CLOCK_TIME_NONE); + drop(state); + gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps); + element + .set_caps(&caps) + .map_err(|_| gst::FlowError::NotNegotiated)?; + + let _ = + element.post_message(&gst::Message::new_latency().src(Some(element)).build()); + } + } + + let buffer = self.create_buffer(element, pts, &info, &audio_frame)?; + + gst_log!(self.cat, obj: element, "Produced buffer {:?}", buffer); + + Ok(buffer) + } + + fn calculate_timestamp( + &self, + element: &gst_base::BaseSrc, + settings: &Settings, + audio_frame: &AudioFrame, + ) -> gst::ClockTime { + let clock = element.get_clock().unwrap(); + // For now take the current running time as PTS. At a later time we // will want to work with the timestamp given by the NDI SDK if available let now = clock.get_time(); @@ -530,34 +575,32 @@ impl BaseSrcImpl for NdiAudioSrc { pts ); - let info = gst_audio::AudioInfo::new( + pts + } + + fn create_audio_info( + &self, + _element: &gst_base::BaseSrc, + audio_frame: &AudioFrame, + ) -> Result { + let builder = gst_audio::AudioInfo::new( gst_audio::AUDIO_FORMAT_S16, audio_frame.sample_rate() as u32, audio_frame.no_channels() as u32, - ) - .build() - .unwrap(); + ); - if state.info.as_ref() != Some(&info) { - let caps = info.to_caps().unwrap(); - state.info = Some(info); - state.current_latency = gst::SECOND - .mul_div_ceil( - audio_frame.no_samples() as u64, - audio_frame.sample_rate() as u64, - ) - .unwrap_or(gst::CLOCK_TIME_NONE); - - gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps); - element - .set_caps(&caps) - .map_err(|_| gst::FlowError::NotNegotiated)?; - - let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build()); - } + Ok(builder.build().unwrap()) + } + fn create_buffer( + &self, + _element: &gst_base::BaseSrc, + pts: gst::ClockTime, + info: &gst_audio::AudioInfo, + audio_frame: &AudioFrame, + ) -> Result { // We multiply by 2 because is the size in bytes of an i16 variable - let buff_size = (audio_frame.no_samples() * 2 * audio_frame.no_channels()) as usize; + let buff_size = (audio_frame.no_samples() as u32 * info.bpf()) as usize; let mut buffer = gst::Buffer::with_size(buff_size).unwrap(); { let duration = gst::SECOND @@ -598,8 +641,6 @@ impl BaseSrcImpl for NdiAudioSrc { ); } - gst_log!(self.cat, obj: element, "Produced buffer {:?}", buffer); - Ok(buffer) } } diff --git a/src/ndivideosrc.rs b/src/ndivideosrc.rs index 2fda6da5..97c4b816 100644 --- a/src/ndivideosrc.rs +++ b/src/ndivideosrc.rs @@ -484,15 +484,20 @@ impl BaseSrcImpl for NdiVideoSrc { _offset: u64, _length: u32, ) -> Result { - // FIXME: Make sure to not have any mutexes locked while wait + self.capture(element) + } +} + +impl NdiVideoSrc { + fn capture(&self, element: &gst_base::BaseSrc) -> Result { let settings = self.settings.lock().unwrap().clone(); - let mut state = self.state.lock().unwrap(); - let receivers = HASHMAP_RECEIVERS.lock().unwrap(); - let receiver = &receivers.get(&state.id_receiver.unwrap()).unwrap(); - let recv = &receiver.ndi_instance; - - let clock = element.get_clock().unwrap(); + let recv = { + let state = self.state.lock().unwrap(); + let receivers = HASHMAP_RECEIVERS.lock().unwrap(); + let receiver = &receivers.get(&state.id_receiver.unwrap()).unwrap(); + receiver.ndi_instance.clone() + }; let timeout = time::Instant::now(); let video_frame = loop { @@ -524,6 +529,46 @@ impl BaseSrcImpl for NdiVideoSrc { break video_frame; }; + let pts = self.calculate_timestamp(element, &settings, &video_frame); + let info = self.create_video_info(element, &video_frame)?; + + { + let mut state = self.state.lock().unwrap(); + if state.info.as_ref() != Some(&info) { + let caps = info.to_caps().unwrap(); + state.info = Some(info.clone()); + state.current_latency = gst::SECOND + .mul_div_ceil( + video_frame.frame_rate().1 as u64, + video_frame.frame_rate().0 as u64, + ) + .unwrap_or(gst::CLOCK_TIME_NONE); + drop(state); + gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps); + element + .set_caps(&caps) + .map_err(|_| gst::FlowError::NotNegotiated)?; + + let _ = + element.post_message(&gst::Message::new_latency().src(Some(element)).build()); + } + } + + let buffer = self.create_buffer(element, pts, &info, &video_frame)?; + + gst_log!(self.cat, obj: element, "Produced buffer {:?}", buffer); + + Ok(buffer) + } + + fn calculate_timestamp( + &self, + element: &gst_base::BaseSrc, + settings: &Settings, + video_frame: &VideoFrame, + ) -> gst::ClockTime { + let clock = element.get_clock().unwrap(); + // For now take the current running time as PTS. At a later time we // will want to work with the timestamp given by the NDI SDK if available let now = clock.get_time(); @@ -576,6 +621,14 @@ impl BaseSrcImpl for NdiVideoSrc { pts ); + pts + } + + fn create_video_info( + &self, + _element: &gst_base::BaseSrc, + video_frame: &VideoFrame, + ) -> Result { // YV12 and I420 are swapped in the NDI SDK compared to GStreamer let format = match video_frame.fourcc() { ndisys::NDIlib_FourCC_type_e::NDIlib_FourCC_type_UYVY => gst_video::VideoFormat::Uyvy, @@ -593,7 +646,7 @@ impl BaseSrcImpl for NdiVideoSrc { * gst::Fraction::new(video_frame.yres(), video_frame.xres()); #[cfg(feature = "interlaced-fields")] - let info = { + { let mut builder = gst_video::VideoInfo::new( format, video_frame.xres() as u32, @@ -618,56 +671,52 @@ impl BaseSrcImpl for NdiVideoSrc { builder = builder.field_order(gst_video::VideoFieldOrder::TopFieldFirst); } - builder.build().unwrap() - }; - - #[cfg(not(feature = "interlaced-fields"))] - let info = if video_frame.frame_format_type() - != ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_progressive - && video_frame.frame_format_type() - != ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved - { - gst_element_error!( - element, - gst::StreamError::Format, - ["Separate field interlacing not supported"] - ); - return Err(gst::FlowError::NotNegotiated); - } else { - gst_video::VideoInfo::new(format, video_frame.xres() as u32, video_frame.yres() as u32) - .fps(gst::Fraction::from(video_frame.frame_rate())) - .par(par) - .interlace_mode( - if video_frame.frame_format_type() - == ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_progressive - { - gst_video::VideoInterlaceMode::Progressive - } else { - gst_video::VideoInterlaceMode::Interleaved - }, - ) - .build() - .unwrap() - }; - - if state.info.as_ref() != Some(&info) { - let caps = info.to_caps().unwrap(); - state.info = Some(info); - state.current_latency = gst::SECOND - .mul_div_ceil( - video_frame.frame_rate().1 as u64, - video_frame.frame_rate().0 as u64, - ) - .unwrap_or(gst::CLOCK_TIME_NONE); - gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps); - element - .set_caps(&caps) - .map_err(|_| gst::FlowError::NotNegotiated)?; - - let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build()); + Ok(builder.build().unwrap()) } - let mut buffer = gst::Buffer::with_size(state.info.as_ref().unwrap().size()).unwrap(); + #[cfg(not(feature = "interlaced-fields"))] + { + if video_frame.frame_format_type() + != ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_progressive + && video_frame.frame_format_type() + != ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved + { + gst_element_error!( + element, + gst::StreamError::Format, + ["Separate field interlacing not supported"] + ); + return Err(gst::FlowError::NotNegotiated); + } + + let builder = gst_video::VideoInfo::new( + format, + video_frame.xres() as u32, + video_frame.yres() as u32, + ) + .fps(gst::Fraction::from(video_frame.frame_rate())) + .par(par) + .interlace_mode( + if video_frame.frame_format_type() + == ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_progressive + { + gst_video::VideoInterlaceMode::Progressive + } else { + gst_video::VideoInterlaceMode::Interleaved + }, + ); + Ok(builder.build().unwrap()); + } + } + + fn create_buffer( + &self, + element: &gst_base::BaseSrc, + pts: gst::ClockTime, + info: &gst_video::VideoInfo, + video_frame: &VideoFrame, + ) -> Result { + let mut buffer = gst::Buffer::with_size(info.size()).unwrap(); { let duration = gst::SECOND .mul_div_floor( @@ -734,22 +783,47 @@ impl BaseSrcImpl for NdiVideoSrc { } } - let buffer = { - let mut vframe = - gst_video::VideoFrame::from_buffer_writable(buffer, &state.info.as_ref().unwrap()) - .unwrap(); + self.copy_frame(element, info, buffer, video_frame) + } - match format { - gst_video::VideoFormat::Uyvy - | gst_video::VideoFormat::Bgra - | gst_video::VideoFormat::Bgrx - | gst_video::VideoFormat::Rgba - | gst_video::VideoFormat::Rgbx => { - let line_bytes = if format == gst_video::VideoFormat::Uyvy { - 2 * vframe.width() as usize - } else { - 4 * vframe.width() as usize - }; + fn copy_frame( + &self, + _element: &gst_base::BaseSrc, + info: &gst_video::VideoInfo, + buffer: gst::Buffer, + video_frame: &VideoFrame, + ) -> Result { + // FIXME: Error handling if frame dimensions don't match + let mut vframe = gst_video::VideoFrame::from_buffer_writable(buffer, info).unwrap(); + + match info.format() { + gst_video::VideoFormat::Uyvy + | gst_video::VideoFormat::Bgra + | gst_video::VideoFormat::Bgrx + | gst_video::VideoFormat::Rgba + | gst_video::VideoFormat::Rgbx => { + let line_bytes = if info.format() == gst_video::VideoFormat::Uyvy { + 2 * vframe.width() as usize + } else { + 4 * vframe.width() as usize + }; + let dest_stride = vframe.plane_stride()[0] as usize; + let dest = vframe.plane_data_mut(0).unwrap(); + let src_stride = video_frame.line_stride_in_bytes() as usize; + let src = video_frame.data(); + + for (dest, src) in dest + .chunks_exact_mut(dest_stride) + .zip(src.chunks_exact(src_stride)) + { + dest.copy_from_slice(src); + dest.copy_from_slice(&src[..line_bytes]); + } + } + gst_video::VideoFormat::Nv12 => { + // First plane + { + let line_bytes = vframe.width() as usize; let dest_stride = vframe.plane_stride()[0] as usize; let dest = vframe.plane_data_mut(0).unwrap(); let src_stride = video_frame.line_stride_in_bytes() as usize; @@ -759,104 +833,82 @@ impl BaseSrcImpl for NdiVideoSrc { .chunks_exact_mut(dest_stride) .zip(src.chunks_exact(src_stride)) { - dest.copy_from_slice(src); dest.copy_from_slice(&src[..line_bytes]); } } - gst_video::VideoFormat::Nv12 => { - // First plane + + // Second plane + { + let line_bytes = vframe.width() as usize; + let dest_stride = vframe.plane_stride()[1] as usize; + let dest = vframe.plane_data_mut(1).unwrap(); + let src_stride = video_frame.line_stride_in_bytes() as usize; + let src = &video_frame.data()[(video_frame.yres() as usize * src_stride)..]; + + for (dest, src) in dest + .chunks_exact_mut(dest_stride) + .zip(src.chunks_exact(src_stride)) { - let line_bytes = vframe.width() as usize; - let dest_stride = vframe.plane_stride()[0] as usize; - let dest = vframe.plane_data_mut(0).unwrap(); - let src_stride = video_frame.line_stride_in_bytes() as usize; - let src = video_frame.data(); - - for (dest, src) in dest - .chunks_exact_mut(dest_stride) - .zip(src.chunks_exact(src_stride)) - { - dest.copy_from_slice(&src[..line_bytes]); - } - } - - // Second plane - { - let line_bytes = vframe.width() as usize; - let dest_stride = vframe.plane_stride()[1] as usize; - let dest = vframe.plane_data_mut(1).unwrap(); - let src_stride = video_frame.line_stride_in_bytes() as usize; - let src = &video_frame.data()[(video_frame.yres() as usize * src_stride)..]; - - for (dest, src) in dest - .chunks_exact_mut(dest_stride) - .zip(src.chunks_exact(src_stride)) - { - dest.copy_from_slice(&src[..line_bytes]); - } + dest.copy_from_slice(&src[..line_bytes]); } } - gst_video::VideoFormat::Yv12 | gst_video::VideoFormat::I420 => { - // First plane - { - let line_bytes = vframe.width() as usize; - let dest_stride = vframe.plane_stride()[0] as usize; - let dest = vframe.plane_data_mut(0).unwrap(); - let src_stride = video_frame.line_stride_in_bytes() as usize; - let src = video_frame.data(); - - for (dest, src) in dest - .chunks_exact_mut(dest_stride) - .zip(src.chunks_exact(src_stride)) - { - dest.copy_from_slice(&src[..line_bytes]); - } - } - - // Second plane - { - let line_bytes = (vframe.width() as usize + 1) / 2; - let dest_stride = vframe.plane_stride()[1] as usize; - let dest = vframe.plane_data_mut(1).unwrap(); - let src_stride = video_frame.line_stride_in_bytes() as usize; - let src_stride1 = video_frame.line_stride_in_bytes() as usize / 2; - let src = &video_frame.data()[(video_frame.yres() as usize * src_stride)..]; - - for (dest, src) in dest - .chunks_exact_mut(dest_stride) - .zip(src.chunks_exact(src_stride1)) - { - dest.copy_from_slice(&src[..line_bytes]); - } - } - - // Third plane - { - let line_bytes = (vframe.width() as usize + 1) / 2; - let dest_stride = vframe.plane_stride()[2] as usize; - let dest = vframe.plane_data_mut(2).unwrap(); - let src_stride = video_frame.line_stride_in_bytes() as usize; - let src_stride1 = video_frame.line_stride_in_bytes() as usize / 2; - let src = &video_frame.data()[(video_frame.yres() as usize * src_stride - + (video_frame.yres() as usize + 1) / 2 * src_stride1)..]; - - for (dest, src) in dest - .chunks_exact_mut(dest_stride) - .zip(src.chunks_exact(src_stride1)) - { - dest.copy_from_slice(&src[..line_bytes]); - } - } - } - _ => unreachable!(), } + gst_video::VideoFormat::Yv12 | gst_video::VideoFormat::I420 => { + // First plane + { + let line_bytes = vframe.width() as usize; + let dest_stride = vframe.plane_stride()[0] as usize; + let dest = vframe.plane_data_mut(0).unwrap(); + let src_stride = video_frame.line_stride_in_bytes() as usize; + let src = video_frame.data(); - vframe.into_buffer() - }; + for (dest, src) in dest + .chunks_exact_mut(dest_stride) + .zip(src.chunks_exact(src_stride)) + { + dest.copy_from_slice(&src[..line_bytes]); + } + } - gst_log!(self.cat, obj: element, "Produced buffer {:?}", buffer); + // Second plane + { + let line_bytes = (vframe.width() as usize + 1) / 2; + let dest_stride = vframe.plane_stride()[1] as usize; + let dest = vframe.plane_data_mut(1).unwrap(); + let src_stride = video_frame.line_stride_in_bytes() as usize; + let src_stride1 = video_frame.line_stride_in_bytes() as usize / 2; + let src = &video_frame.data()[(video_frame.yres() as usize * src_stride)..]; - Ok(buffer) + for (dest, src) in dest + .chunks_exact_mut(dest_stride) + .zip(src.chunks_exact(src_stride1)) + { + dest.copy_from_slice(&src[..line_bytes]); + } + } + + // Third plane + { + let line_bytes = (vframe.width() as usize + 1) / 2; + let dest_stride = vframe.plane_stride()[2] as usize; + let dest = vframe.plane_data_mut(2).unwrap(); + let src_stride = video_frame.line_stride_in_bytes() as usize; + let src_stride1 = video_frame.line_stride_in_bytes() as usize / 2; + let src = &video_frame.data()[(video_frame.yres() as usize * src_stride + + (video_frame.yres() as usize + 1) / 2 * src_stride1)..]; + + for (dest, src) in dest + .chunks_exact_mut(dest_stride) + .zip(src.chunks_exact(src_stride1)) + { + dest.copy_from_slice(&src[..line_bytes]); + } + } + } + _ => unreachable!(), + } + + Ok(vframe.into_buffer()) } }