From 6ce3029e07591160ade1e0c94f41716f454f5ea0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Wed, 2 Feb 2022 16:26:31 +0200 Subject: [PATCH] dav1ddec: Update for fixed dav1d-rs API As a side effect this allows us also to handle errors more gracefully and to reduce memory load by outputting decoded frames immediately. Also the code was changed a bit to reduce the number of redundant mutex lock/unlocks. --- video/dav1d/Cargo.toml | 3 +- video/dav1d/src/dav1ddec/imp.rs | 273 ++++++++++++++++++++++---------- 2 files changed, 187 insertions(+), 89 deletions(-) diff --git a/video/dav1d/Cargo.toml b/video/dav1d/Cargo.toml index 85a9b112..f3d98b2c 100644 --- a/video/dav1d/Cargo.toml +++ b/video/dav1d/Cargo.toml @@ -9,7 +9,8 @@ license = "MIT/Apache-2.0" description = "Dav1d Plugin" [dependencies] -dav1d = "0.6" +# dav1d = "0.7" +dav1d = { git = "https://github.com/rust-av/dav1d-rs" } gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_12"] } diff --git a/video/dav1d/src/dav1ddec/imp.rs b/video/dav1d/src/dav1ddec/imp.rs index dc3dcc46..741ab69a 100644 --- a/video/dav1d/src/dav1ddec/imp.rs +++ b/video/dav1d/src/dav1ddec/imp.rs @@ -18,7 +18,7 @@ use gst_video::subclass::prelude::*; use once_cell::sync::Lazy; use std::i32; -use std::sync::Mutex; +use std::sync::{Mutex, MutexGuard}; #[derive(Default)] struct State { @@ -43,7 +43,7 @@ static CAT: Lazy = Lazy::new(|| { }); impl Dav1dDec { - pub fn gst_video_format_from_dav1d_picture( + fn gst_video_format_from_dav1d_picture( &self, element: &super::Dav1dDec, pic: &dav1d::Picture, @@ -87,16 +87,20 @@ impl Dav1dDec { }) } - pub fn handle_resolution_change( - &self, + fn handle_resolution_change<'s>( + &'s self, element: &super::Dav1dDec, + mut state_guard: MutexGuard<'s, Option>, pic: &dav1d::Picture, - format: gst_video::VideoFormat, - ) -> Result<(), gst::FlowError> { - let mut state_guard = self.state.lock().unwrap(); + ) -> Result>, gst::FlowError> { let state = state_guard.as_mut().unwrap(); - let negotiate = { + let format = self.gst_video_format_from_dav1d_picture(element, pic); + if format == gst_video::VideoFormat::Unknown { + return Err(gst::FlowError::NotNegotiated); + } + + let need_negotiate = { match state.output_info { Some(ref i) => { (i.width() != pic.width()) @@ -105,13 +109,15 @@ impl Dav1dDec { None => true, } }; - if !negotiate { - return Ok(()); + if !need_negotiate { + return Ok(state_guard); } + gst_info!( CAT, obj: element, - "Negotiating format picture dimensions {}x{}", + "Negotiating format {:?} picture dimensions {}x{}", + format, pic.width(), pic.height() ); @@ -124,68 +130,105 @@ impl Dav1dDec { element.negotiate(output_state)?; let out_state = element.output_state().unwrap(); - let mut state_guard = self.state.lock().unwrap(); + state_guard = self.state.lock().unwrap(); let state = state_guard.as_mut().unwrap(); state.output_info = Some(out_state.info()); - Ok(()) + Ok(state_guard) } - fn flush_decoder(&self, _element: &super::Dav1dDec) { - let mut state_guard = self.state.lock().unwrap(); + fn flush_decoder(&self, element: &super::Dav1dDec, state_guard: &mut Option) { + gst_info!(CAT, obj: element, "Flushing decoder"); + let state = state_guard.as_mut().unwrap(); state.decoder.flush(); } - fn decode( + fn send_data( &self, element: &super::Dav1dDec, - input_buffer: &gst::BufferRef, + state_guard: &mut MutexGuard>, + input_buffer: gst::Buffer, frame: &gst_video::VideoCodecFrame, - ) -> Result, gst::FlowError> { - let mut state_guard = self.state.lock().unwrap(); + ) -> Result, gst::FlowError> { + gst_trace!( + CAT, + obj: element, + "Sending data to decoder for frame {}", + frame.system_frame_number() + ); + let state = state_guard.as_mut().unwrap(); let timestamp = frame.dts().map(|ts| *ts as i64); let duration = frame.duration().map(|d| *d as i64); let frame_number = Some(frame.system_frame_number() as i64); - let input_data = input_buffer - .map_readable() - .map_err(|_| gst::FlowError::Error)?; - let pictures = - match state - .decoder - .decode(input_data, frame_number, timestamp, duration, || {}) - { - Ok(pictures) => pictures, - Err(err) => { - gst_error!(CAT, "Decoding failed (error code: {})", err); - return gst_video::video_decoder_error!( - element, - 1, - gst::StreamError::Decode, - ["Decoding failed (error code {})", err] - ) - .map(|_| vec![]); - } - }; - let mut decoded_pictures = vec![]; - for pic in pictures { - let format = self.gst_video_format_from_dav1d_picture(element, &pic); - if format != gst_video::VideoFormat::Unknown { - decoded_pictures.push((pic, format)); - } else { - return Err(gst::FlowError::NotNegotiated); + let input_data = input_buffer + .into_mapped_buffer_readable() + .map_err(|_| gst::FlowError::Error)?; + + match state + .decoder + .send_data(input_data, frame_number, timestamp, duration) + { + Ok(()) => { + gst_trace!(CAT, obj: element, "Decoder returned OK"); + Ok(std::ops::ControlFlow::Break(())) + } + Err(err) if err.is_again() => { + gst_trace!(CAT, obj: element, "Decoder returned EAGAIN"); + Ok(std::ops::ControlFlow::Continue(())) + } + Err(err) => { + gst_error!(CAT, "Sending data failed (error code: {})", err); + return gst_video::video_decoder_error!( + element, + 1, + gst::StreamError::Decode, + ["Sending data failed (error code {})", err] + ) + .map(|_| std::ops::ControlFlow::Break(())); } } - Ok(decoded_pictures) } - pub fn decoded_picture_as_buffer( + fn send_pending_data( &self, element: &super::Dav1dDec, + state_guard: &mut MutexGuard>, + ) -> Result, gst::FlowError> { + gst_trace!(CAT, obj: element, "Sending pending data to decoder"); + + let state = state_guard.as_mut().unwrap(); + + match state.decoder.send_pending_data() { + Ok(()) => { + gst_trace!(CAT, obj: element, "Decoder returned OK"); + Ok(std::ops::ControlFlow::Break(())) + } + Err(err) if err.is_again() => { + gst_trace!(CAT, obj: element, "Decoder returned EAGAIN"); + Ok(std::ops::ControlFlow::Continue(())) + } + Err(err) => { + gst_error!(CAT, "Sending data failed (error code: {})", err); + return gst_video::video_decoder_error!( + element, + 1, + gst::StreamError::Decode, + ["Sending data failed (error code {})", err] + ) + .map(|_| std::ops::ControlFlow::Break(())); + } + } + } + + fn decoded_picture_as_buffer( + &self, + element: &super::Dav1dDec, + state_guard: &mut MutexGuard>, pic: &dav1d::Picture, output_state: gst_video::VideoCodecState, ) -> Result { @@ -193,10 +236,8 @@ impl Dav1dDec { let mut strides = vec![]; let mut acc_offset: usize = 0; - let mut state_guard = self.state.lock().unwrap(); let state = state_guard.as_mut().unwrap(); let video_meta_supported = state.video_meta_supported; - drop(state_guard); let info = output_state.info(); let mut out_buffer = gst::Buffer::new(); @@ -268,38 +309,41 @@ impl Dav1dDec { .unwrap() .set_duration(gst::ClockTime::from_nseconds(duration)); } + Ok(out_buffer) } - fn handle_picture( - &self, + fn handle_picture<'s>( + &'s self, element: &super::Dav1dDec, + mut state_guard: MutexGuard<'s, Option>, pic: &dav1d::Picture, - format: gst_video::VideoFormat, - ) -> Result { - self.handle_resolution_change(element, pic, format)?; + ) -> Result>, gst::FlowError> { + gst_trace!(CAT, obj: element, "Handling picture {}", pic.offset()); + + state_guard = self.handle_resolution_change(element, state_guard, pic)?; let output_state = element .output_state() .expect("Output state not set. Shouldn't happen!"); let offset = pic.offset() as i32; + if let Some(mut frame) = element.frame(offset) { - let output_buffer = self.decoded_picture_as_buffer(element, pic, output_state)?; + let output_buffer = + self.decoded_picture_as_buffer(element, &mut state_guard, pic, output_state)?; frame.set_output_buffer(output_buffer); + drop(state_guard); element.finish_frame(frame)?; + Ok(self.state.lock().unwrap()) } else { gst_warning!(CAT, obj: element, "No frame found for offset {}", offset); + Ok(state_guard) } - - self.forward_pending_pictures(element) } - fn drop_decoded_pictures(&self, element: &super::Dav1dDec) { - let mut state_guard = self.state.lock().unwrap(); - let state = state_guard.as_mut().unwrap(); - - while let Ok(pic) = state.decoder.get_picture() { - gst_debug!(CAT, obj: element, "Dropping picture"); + fn drop_decoded_pictures(&self, element: &super::Dav1dDec, state_guard: &mut Option) { + while let Ok(Some(pic)) = self.pending_pictures(element, state_guard) { + gst_debug!(CAT, obj: element, "Dropping picture {}", pic.offset()); drop(pic); } } @@ -307,29 +351,50 @@ impl Dav1dDec { fn pending_pictures( &self, element: &super::Dav1dDec, - ) -> Result, gst::FlowError> { - let mut state_guard = self.state.lock().unwrap(); + state_guard: &mut Option, + ) -> Result, gst::FlowError> { + gst_trace!(CAT, obj: element, "Retrieving pending picture"); + let state = state_guard.as_mut().unwrap(); - let mut pictures = vec![]; - while let Ok(pic) = state.decoder.get_picture() { - let format = self.gst_video_format_from_dav1d_picture(element, &pic); - if format == gst_video::VideoFormat::Unknown { - return Err(gst::FlowError::NotNegotiated); + match state.decoder.get_picture() { + Ok(pic) => { + gst_trace!(CAT, obj: element, "Retrieved picture {}", pic.offset()); + Ok(Some(pic)) + } + Err(err) if err.is_again() => { + gst_trace!(CAT, obj: element, "Decoder needs more data"); + Ok(None) + } + Err(err) => { + gst_error!( + CAT, + obj: element, + "Retrieving decoded picture failed (error code {})", + err + ); + + gst_video::video_decoder_error!( + element, + 1, + gst::StreamError::Decode, + ["Retrieving decoded picture failed (error code {})", err] + ) + .map(|_| None) } - pictures.push((pic, format)); } - Ok(pictures) } - fn forward_pending_pictures( - &self, + fn forward_pending_pictures<'s>( + &'s self, element: &super::Dav1dDec, - ) -> Result { - for (pic, format) in self.pending_pictures(element)? { - self.handle_picture(element, &pic, format)?; + mut state_guard: MutexGuard<'s, Option>, + ) -> Result>, gst::FlowError> { + while let Some(pic) = self.pending_pictures(element, &mut state_guard)? { + state_guard = self.handle_picture(element, state_guard, &pic)?; } - Ok(gst::FlowSuccess::Ok) + + Ok(state_guard) } } @@ -475,9 +540,26 @@ impl VideoDecoderImpl for Dav1dDec { element: &Self::Type, frame: gst_video::VideoCodecFrame, ) -> Result { - let input_buffer = frame.input_buffer().expect("frame without input buffer"); - for (pic, format) in self.decode(element, input_buffer, &frame)? { - self.handle_picture(element, &pic, format)?; + let input_buffer = frame + .input_buffer_owned() + .expect("frame without input buffer"); + + { + let mut state_guard = self.state.lock().unwrap(); + state_guard = self.forward_pending_pictures(element, state_guard)?; + if self.send_data(element, &mut state_guard, input_buffer, &frame)? + == std::ops::ControlFlow::Continue(()) + { + loop { + state_guard = self.forward_pending_pictures(element, state_guard)?; + if self.send_pending_data(element, &mut state_guard)? + == std::ops::ControlFlow::Break(()) + { + break; + } + } + } + let _state_guard = self.forward_pending_pictures(element, state_guard)?; } Ok(gst::FlowSuccess::Ok) @@ -485,22 +567,37 @@ impl VideoDecoderImpl for Dav1dDec { fn flush(&self, element: &Self::Type) -> bool { gst_info!(CAT, obj: element, "Flushing"); - self.flush_decoder(element); - self.drop_decoded_pictures(element); + + { + let mut state_guard = self.state.lock().unwrap(); + self.flush_decoder(element, &mut state_guard); + self.drop_decoded_pictures(element, &mut state_guard); + } + true } fn drain(&self, element: &Self::Type) -> Result { gst_info!(CAT, obj: element, "Draining"); - self.flush_decoder(element); - self.forward_pending_pictures(element)?; + + { + let mut state_guard = self.state.lock().unwrap(); + self.flush_decoder(element, &mut state_guard); + let _state_guard = self.forward_pending_pictures(element, state_guard)?; + } + self.parent_drain(element) } fn finish(&self, element: &Self::Type) -> Result { gst_info!(CAT, obj: element, "Finishing"); - self.flush_decoder(element); - self.forward_pending_pictures(element)?; + + { + let mut state_guard = self.state.lock().unwrap(); + self.flush_decoder(element, &mut state_guard); + let _state_guard = self.forward_pending_pictures(element, state_guard)?; + } + self.parent_finish(element) }