Do not hold mutex while performing negotiation

This commit is contained in:
Rafael Caricio 2023-05-19 21:31:07 +02:00
parent 2513721921
commit 670123373b
Signed by: rafaelcaricio
GPG key ID: 3C86DBCE8E93C947
2 changed files with 45 additions and 16 deletions

View file

@ -13,7 +13,6 @@ gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/g
gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
qoaudio = "0.5.0"
byte-slice-cast = "1.0"
atomic_refcell = "0.1"
once_cell = "1.0"
[dev-dependencies]

View file

@ -6,7 +6,6 @@
//
// SPDX-License-Identifier: MPL-2.0
use atomic_refcell::AtomicRefCell;
use byte_slice_cast::*;
use gst::glib;
use gst::subclass::prelude::*;
@ -14,6 +13,7 @@ use gst_audio::prelude::*;
use gst_audio::subclass::prelude::*;
use once_cell::sync::Lazy;
use qoaudio::{DecodedAudio, QoaDecoder};
use std::sync::{Arc, Mutex};
#[derive(Default)]
struct State {
@ -23,7 +23,12 @@ struct State {
#[derive(Default)]
pub struct QoaDec {
state: AtomicRefCell<Option<State>>,
state: Arc<Mutex<Option<State>>>,
}
enum HandledBuffer {
FormatChanged(gst::Buffer),
SameFormat(gst::Buffer),
}
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
@ -101,14 +106,17 @@ impl AudioDecoderImpl for QoaDec {
fn start(&self) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, imp: self, "Starting...");
*self.state.borrow_mut() = Some(State::default());
let mut state = self.state.lock().unwrap();
*state = Some(State::default());
Ok(())
}
fn stop(&self) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, imp: self, "Stopping...");
*self.state.borrow_mut() = None;
let mut state = self.state.lock().unwrap();
*state = None;
Ok(())
}
@ -119,19 +127,31 @@ impl AudioDecoderImpl for QoaDec {
gst::debug!(CAT, imp: self, "Handling buffer {:?}", inbuf);
let inbuf = inbuf.expect("Non-drainable should never receive empty buffer");
let inmap = inbuf.map_readable().map_err(|_| {
gst::error!(CAT, imp: self, "Failed to buffer readable");
gst::FlowError::Error
})?;
let mut state_guard = self.state.borrow_mut();
let mut state_guard = self.state.lock().unwrap();
let state = state_guard.as_mut().ok_or_else(|| {
gst::error!(CAT, imp: self, "Failed to get state");
gst::FlowError::NotNegotiated
})?;
self.handle_buffer(state, &inmap)
let outbuf = match self.handle_buffer(state, &inmap)? {
HandledBuffer::FormatChanged(buffer) => {
self.obj()
.set_output_format(state.audio_info.as_ref().unwrap())?;
drop(state_guard); // Do not hold a lock while calling for negotiation
self.obj().negotiate()?;
buffer
}
HandledBuffer::SameFormat(buffer) => buffer,
};
self.obj().finish_frame(Some(outbuf), 1)
}
}
@ -140,7 +160,7 @@ impl QoaDec {
&self,
state: &mut State,
indata: &[u8],
) -> Result<gst::FlowSuccess, gst::FlowError> {
) -> Result<HandledBuffer, gst::FlowError> {
let decoder = match state.decoder {
Some(ref mut decoder) => decoder,
None => {
@ -177,11 +197,16 @@ impl QoaDec {
gst::FlowError::Error
})?;
gst::trace!(CAT, imp: self, "Decoded audio: {:?}", audio.duration());
gst::trace!(
CAT,
imp: self,
"Decoded audio with a duration of {:?}",
audio.duration()
);
// On new buffers the audio configuration might change, if so we need to request renegotiation
// and reconfigure the audio info
if state.audio_info.is_none()
let format_changed = if state.audio_info.is_none()
|| state.audio_info.as_ref().unwrap().channels() != audio.channels()
|| state.audio_info.as_ref().unwrap().rate() != audio.sample_rate()
{
@ -201,11 +226,12 @@ impl QoaDec {
audio_info
);
self.obj().set_output_format(&audio_info)?;
self.obj().negotiate()?;
state.audio_info = Some(audio_info);
}
true
} else {
false
};
let samples = audio.collect::<Vec<i16>>();
@ -222,7 +248,11 @@ impl QoaDec {
}
let outbuf = gst::Buffer::from_mut_slice(CastVec(samples));
self.obj().finish_frame(Some(outbuf), 1)
if format_changed {
Ok(HandledBuffer::FormatChanged(outbuf))
} else {
Ok(HandledBuffer::SameFormat(outbuf))
}
}
}