dav1ddec: Keep state unlocked while calling into the decoder

This is a preparation for handling allocations ourselves from the
element as that would call back from the decoder into the element.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2190>
This commit is contained in:
Sebastian Dröge 2025-04-27 13:51:40 +03:00 committed by GStreamer Marge Bot
parent 52037c8aca
commit 6ae34b751a

View file

@ -54,7 +54,10 @@ const DEFAULT_APPLY_GRAIN: bool = false;
const DEFAULT_INLOOP_FILTERS: InloopFilterType = InloopFilterType::empty(); const DEFAULT_INLOOP_FILTERS: InloopFilterType = InloopFilterType::empty();
struct State { struct State {
decoder: dav1d::Decoder, // Decoder always exists when the state does but is temporarily removed
// while passing frames to it to allow unlocking the state as the decoder
// will call back into the element for allocations.
decoder: Option<dav1d::Decoder>,
input_state: gst_video::VideoCodecState<'static, gst_video::video_codec_state::Readable>, input_state: gst_video::VideoCodecState<'static, gst_video::video_codec_state::Readable>,
output_state: output_state:
Option<gst_video::VideoCodecState<'static, gst_video::video_codec_state::Readable>>, Option<gst_video::VideoCodecState<'static, gst_video::video_codec_state::Readable>>,
@ -96,6 +99,43 @@ static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
}); });
impl Dav1dDec { impl Dav1dDec {
fn with_decoder_unlocked<'a, T, F: FnOnce(&mut dav1d::Decoder) -> T>(
&'a self,
mut state_guard: MutexGuard<'a, Option<State>>,
func: F,
) -> (MutexGuard<'a, Option<State>>, Option<T>) {
let Some(state) = &mut *state_guard else {
return (state_guard, None);
};
let Some(mut decoder) = state.decoder.take() else {
return (state_guard, None);
};
drop(state_guard);
let res = func(&mut decoder);
state_guard = self.state.lock().unwrap();
if let Some(state) = &mut *state_guard {
state.decoder = Some(decoder);
}
(state_guard, Some(res))
}
fn with_decoder_mut<T, F: FnOnce(&mut dav1d::Decoder) -> T>(
&self,
state: &mut State,
func: F,
) -> Option<T> {
let decoder = state.decoder.as_mut()?;
let res = func(decoder);
Some(res)
}
// FIXME: drop this once we have API from dav1d to query this value // FIXME: drop this once we have API from dav1d to query this value
// https://code.videolan.org/videolan/dav1d/-/merge_requests/1407 // https://code.videolan.org/videolan/dav1d/-/merge_requests/1407
fn estimate_frame_delay(&self, max_frame_delay: u32, n_threads: u32) -> u32 { fn estimate_frame_delay(&self, max_frame_delay: u32, n_threads: u32) -> u32 {
@ -427,15 +467,17 @@ impl Dav1dDec {
fn flush_decoder(&self, state: &mut State) { fn flush_decoder(&self, state: &mut State) {
gst::info!(CAT, imp = self, "Flushing decoder"); gst::info!(CAT, imp = self, "Flushing decoder");
state.decoder.flush(); self.with_decoder_mut(state, |decoder| decoder.flush());
} }
fn send_data( #[allow(clippy::type_complexity)]
&self, fn send_data<'a>(
state_guard: &mut MutexGuard<Option<State>>, &'a self,
state_guard: MutexGuard<'a, Option<State>>,
input_buffer: gst::Buffer, input_buffer: gst::Buffer,
frame: gst_video::VideoCodecFrame, frame: gst_video::VideoCodecFrame,
) -> Result<std::ops::ControlFlow<(), ()>, gst::FlowError> { ) -> Result<(MutexGuard<'a, Option<State>>, std::ops::ControlFlow<(), ()>), gst::FlowError>
{
gst::trace!( gst::trace!(
CAT, CAT,
imp = self, imp = self,
@ -443,8 +485,6 @@ impl Dav1dDec {
frame.system_frame_number() frame.system_frame_number()
); );
let state = state_guard.as_mut().ok_or(gst::FlowError::Flushing)?;
let timestamp = frame.dts().map(|ts| *ts as i64); let timestamp = frame.dts().map(|ts| *ts as i64);
let duration = frame.duration().map(|d| *d as i64); let duration = frame.duration().map(|d| *d as i64);
@ -454,10 +494,13 @@ impl Dav1dDec {
.into_mapped_buffer_readable() .into_mapped_buffer_readable()
.map_err(|_| gst::FlowError::Error)?; .map_err(|_| gst::FlowError::Error)?;
match state let (state_guard, res) = self.with_decoder_unlocked(state_guard, |decoder| {
.decoder decoder.send_data(input_data, frame_number, timestamp, duration)
.send_data(input_data, frame_number, timestamp, duration) });
{
let res = res.ok_or(gst::FlowError::Flushing)?;
let res = match res {
Ok(()) => { Ok(()) => {
gst::trace!(CAT, imp = self, "Decoder returned OK"); gst::trace!(CAT, imp = self, "Decoder returned OK");
Ok(std::ops::ControlFlow::Break(())) Ok(std::ops::ControlFlow::Break(()))
@ -487,18 +530,27 @@ impl Dav1dDec {
) )
.map(|_| std::ops::ControlFlow::Break(())) .map(|_| std::ops::ControlFlow::Break(()))
} }
} };
let res = res?;
Ok((state_guard, res))
} }
fn send_pending_data( #[allow(clippy::type_complexity)]
&self, fn send_pending_data<'a>(
state_guard: &mut MutexGuard<Option<State>>, &'a self,
) -> Result<std::ops::ControlFlow<(), ()>, gst::FlowError> { state_guard: MutexGuard<'a, Option<State>>,
) -> Result<(MutexGuard<'a, Option<State>>, std::ops::ControlFlow<(), ()>), gst::FlowError>
{
gst::trace!(CAT, imp = self, "Sending pending data to decoder"); gst::trace!(CAT, imp = self, "Sending pending data to decoder");
let state = state_guard.as_mut().ok_or(gst::FlowError::Flushing)?; let (state_guard, res) =
self.with_decoder_unlocked(state_guard, |decoder| decoder.send_pending_data());
match state.decoder.send_pending_data() { let res = res.ok_or(gst::FlowError::Flushing)?;
let res = match res {
Ok(()) => { Ok(()) => {
gst::trace!(CAT, imp = self, "Decoder returned OK"); gst::trace!(CAT, imp = self, "Decoder returned OK");
Ok(std::ops::ControlFlow::Break(())) Ok(std::ops::ControlFlow::Break(()))
@ -683,15 +735,21 @@ impl Dav1dDec {
} }
} }
fn pending_picture( #[allow(clippy::type_complexity)]
&self, fn pending_picture<'a>(
state_guard: &mut MutexGuard<Option<State>>, &'a self,
) -> Result<Option<dav1d::Picture>, gst::FlowError> { mut state_guard: MutexGuard<'a, Option<State>>,
) -> Result<(MutexGuard<'a, Option<State>>, Option<dav1d::Picture>), gst::FlowError> {
gst::trace!(CAT, imp = self, "Retrieving pending picture"); gst::trace!(CAT, imp = self, "Retrieving pending picture");
let state = state_guard.as_mut().ok_or(gst::FlowError::Flushing)?; let _state = state_guard.as_mut().ok_or(gst::FlowError::Flushing)?;
match state.decoder.get_picture() { let res;
(state_guard, res) =
self.with_decoder_unlocked(state_guard, |decoder| decoder.get_picture());
let res = res.ok_or(gst::FlowError::Flushing)?;
let res = match res {
Ok(pic) => { Ok(pic) => {
gst::trace!(CAT, imp = self, "Retrieved picture {}", pic.offset()); gst::trace!(CAT, imp = self, "Retrieved picture {}", pic.offset());
Ok(Some(pic)) Ok(Some(pic))
@ -716,7 +774,11 @@ impl Dav1dDec {
) )
.map(|_| None) .map(|_| None)
} }
} };
let res = res?;
Ok((state_guard, res))
} }
fn forward_pending_pictures<'s>( fn forward_pending_pictures<'s>(
@ -729,7 +791,13 @@ impl Dav1dDec {
let mut call_twice = drain; let mut call_twice = drain;
loop { loop {
while let Some(pic) = self.pending_picture(&mut state_guard)? { loop {
let pic;
(state_guard, pic) = self.pending_picture(state_guard)?;
let Some(pic) = pic else {
break;
};
state_guard = self.handle_picture(state_guard, &pic)?; state_guard = self.handle_picture(state_guard, &pic)?;
call_twice = false; call_twice = false;
if !drain { if !drain {
@ -927,7 +995,7 @@ impl VideoDecoderImpl for Dav1dDec {
match *state_guard { match *state_guard {
Some(ref state) => match state.output_state.as_ref().map(|s| s.info()) { Some(ref state) => match state.output_state.as_ref().map(|s| s.info()) {
Some(ref info) => { Some(info) => {
let mut upstream_latency = gst::query::Latency::new(); let mut upstream_latency = gst::query::Latency::new();
if self.obj().sink_pad().peer_query(&mut upstream_latency) { if self.obj().sink_pad().peer_query(&mut upstream_latency) {
@ -1040,7 +1108,7 @@ impl VideoDecoderImpl for Dav1dDec {
})?; })?;
*state_guard = Some(State { *state_guard = Some(State {
decoder, decoder: Some(decoder),
input_state: input_state.clone(), input_state: input_state.clone(),
output_state: None, output_state: None,
video_meta_supported: false, video_meta_supported: false,
@ -1058,21 +1126,20 @@ impl VideoDecoderImpl for Dav1dDec {
.input_buffer_owned() .input_buffer_owned()
.expect("frame without input buffer"); .expect("frame without input buffer");
{ let mut state_guard = self.state.lock().unwrap();
let mut state_guard = self.state.lock().unwrap(); let mut res;
if self.send_data(&mut state_guard, input_buffer, frame)? (state_guard, res) = self.send_data(state_guard, input_buffer, frame)?;
== std::ops::ControlFlow::Continue(())
{ if res == std::ops::ControlFlow::Continue(()) {
loop { loop {
state_guard = self.forward_pending_pictures(state_guard, false)?; state_guard = self.forward_pending_pictures(state_guard, false)?;
if self.send_pending_data(&mut state_guard)? == std::ops::ControlFlow::Break(()) (state_guard, res) = self.send_pending_data(state_guard)?;
{ if res == std::ops::ControlFlow::Break(()) {
break; break;
}
} }
} }
let _state_guard = self.forward_pending_pictures(state_guard, false)?;
} }
let _state_guard = self.forward_pending_pictures(state_guard, false)?;
Ok(gst::FlowSuccess::Ok) Ok(gst::FlowSuccess::Ok)
} }