togglerecord: move pads and other_streams to State

Allow us to remove two mutexes, making dead locks easier to debug.

Also now use the State lock with `main_stream_cond` as we want to use
the higher order mutex in order to prevent dead locks.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1935>
This commit is contained in:
Guillaume Desmottes 2024-11-28 12:18:18 +01:00 committed by GStreamer Marge Bot
parent fb54cfa425
commit 20fb4f82d8

View file

@ -16,7 +16,7 @@ use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
use parking_lot::{Condvar, Mutex, MutexGuard};
use parking_lot::{Condvar, Mutex};
use std::cmp;
use std::collections::HashMap;
use std::iter;
@ -125,6 +125,9 @@ enum RecordingState {
#[derive(Debug)]
struct State {
other_streams: (Vec<Stream>, u32),
pads: HashMap<gst::Pad, Stream>,
recording_state: RecordingState,
last_recording_start: Option<gst::ClockTime>,
last_recording_stop: Option<gst::ClockTime>,
@ -146,9 +149,11 @@ struct State {
live: bool,
}
impl Default for State {
fn default() -> Self {
impl State {
fn new(pads: HashMap<gst::Pad, Stream>) -> Self {
Self {
other_streams: (Vec::new(), 0),
pads,
recording_state: RecordingState::Stopped,
last_recording_start: None,
last_recording_stop: None,
@ -159,6 +164,17 @@ impl Default for State {
live: false,
}
}
fn reset(&mut self) {
self.recording_state = RecordingState::Stopped;
self.last_recording_start = None;
self.last_recording_stop = None;
self.recording_duration = gst::ClockTime::ZERO;
self.blocked_duration = gst::ClockTime::ZERO;
self.time_start_block = gst::ClockTime::NONE;
self.running_time_offset = 0;
self.live = false;
}
}
#[derive(Debug, PartialEq, Eq)]
@ -335,8 +351,6 @@ pub struct ToggleRecord {
// If multiple stream states have to be locked, the
// main_stream always comes first
main_stream_cond: Condvar,
other_streams: Mutex<(Vec<Stream>, u32)>,
pads: Mutex<HashMap<gst::Pad, Stream>>,
}
static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
@ -348,26 +362,29 @@ static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
});
impl ToggleRecord {
// called while holding stream.state
// called without lock
fn block_if_upstream_not_live(
&self,
pad: &gst::Pad,
mut settings: Settings,
state: &mut MutexGuard<StreamState>,
stream: &Stream,
upstream_live: bool,
) -> Result<bool, gst::FlowError> {
if !upstream_live {
let clock = self.obj().clock();
let mut rec_state = self.state.lock();
let mut state = stream.state.lock();
if rec_state.time_start_block.is_none() {
rec_state.time_start_block = clock
.as_ref()
.map_or(state.current_running_time, |c| c.time());
}
drop(rec_state);
while !settings.record && !state.flushing {
gst::debug!(CAT, obj = pad, "Waiting for record=true");
self.main_stream_cond.wait(state);
drop(state);
self.main_stream_cond.wait(&mut rec_state);
state = stream.state.lock();
settings = *self.settings.lock();
}
if state.flushing {
@ -376,12 +393,11 @@ impl ToggleRecord {
}
state.segment_pending = true;
state.discont_pending = true;
for other_stream in &self.other_streams.lock().0 {
for other_stream in &rec_state.other_streams.0 {
let mut other_state = other_stream.state.lock();
other_state.segment_pending = true;
other_state.discont_pending = true;
}
let mut rec_state = self.state.lock();
if let Some(time_start_block) = rec_state.time_start_block {
// If we have a time_start_block it means the clock is there
let clock = clock.expect("Cannot find pipeline clock");
@ -415,6 +431,7 @@ impl ToggleRecord {
data: T,
upstream_live: bool,
) -> Result<HandleResult<T>, gst::FlowError> {
let mut rec_state = self.state.lock();
let mut state = stream.state.lock();
let data = match data.clip(&state, &state.in_segment) {
@ -473,7 +490,6 @@ impl ToggleRecord {
let settings = *self.settings.lock();
// First check if we need to block for non-live input
let mut rec_state = self.state.lock();
// Check if we have to update our recording state
let settings_changed = match rec_state.recording_state {
@ -537,10 +553,9 @@ impl ToggleRecord {
// Then unlock and wait for all other streams to reach a buffer that is completely
// after/at the recording stop position (i.e. can be dropped completely) or go EOS
// instead.
drop(rec_state);
while !state.flushing
&& !self.other_streams.lock().0.iter().all(|s| {
&& !rec_state.other_streams.0.iter().all(|s| {
let s = s.state.lock();
s.eos
|| s.current_running_time
@ -549,7 +564,9 @@ impl ToggleRecord {
})
{
gst::log!(CAT, obj = pad, "Waiting for other streams to stop");
self.main_stream_cond.wait(&mut state);
drop(state);
self.main_stream_cond.wait(&mut rec_state);
state = stream.state.lock();
}
if state.flushing {
@ -557,7 +574,6 @@ impl ToggleRecord {
return Err(gst::FlowError::Flushing);
}
let mut rec_state = self.state.lock();
rec_state.recording_state = RecordingState::Stopped;
rec_state.recording_duration +=
last_recording_duration.unwrap_or(gst::ClockTime::ZERO);
@ -574,11 +590,10 @@ impl ToggleRecord {
// Then become Stopped and drop this buffer. We always stop right before
// a keyframe
drop(state);
drop(rec_state);
let ret =
self.block_if_upstream_not_live(pad, settings, &mut state, upstream_live)?;
drop(state);
let ret = self.block_if_upstream_not_live(pad, settings, stream, upstream_live)?;
self.obj().notify("recording");
if ret {
@ -592,7 +607,8 @@ impl ToggleRecord {
rec_state.recording_state = RecordingState::Starting;
}
drop(rec_state);
if self.block_if_upstream_not_live(pad, settings, &mut state, upstream_live)? {
drop(state);
if self.block_if_upstream_not_live(pad, settings, stream, upstream_live)? {
Ok(HandleResult::Pass(data))
} else {
Ok(HandleResult::Drop)
@ -645,7 +661,7 @@ impl ToggleRecord {
state.segment_pending = true;
state.discont_pending = true;
for other_stream in &self.other_streams.lock().0 {
for other_stream in &rec_state.other_streams.0 {
let mut other_state = other_stream.state.lock();
other_state.segment_pending = true;
other_state.discont_pending = true;
@ -654,10 +670,9 @@ impl ToggleRecord {
// Then unlock and wait for all other streams to reach a buffer that is completely
// after/at the recording start position (i.e. can be passed through completely) or
// go EOS instead.
drop(rec_state);
while !state.flushing
&& !self.other_streams.lock().0.iter().all(|s| {
&& !rec_state.other_streams.0.iter().all(|s| {
let s = s.state.lock();
s.eos
|| s.current_running_time
@ -666,7 +681,9 @@ impl ToggleRecord {
})
{
gst::log!(CAT, obj = pad, "Waiting for other streams to start");
self.main_stream_cond.wait(&mut state);
drop(state);
self.main_stream_cond.wait(&mut rec_state);
state = stream.state.lock();
}
if state.flushing {
@ -674,7 +691,6 @@ impl ToggleRecord {
return Err(gst::FlowError::Flushing);
}
let mut rec_state = self.state.lock();
rec_state.recording_state = RecordingState::Recording;
gst::debug!(
CAT,
@ -769,6 +785,7 @@ impl ToggleRecord {
drop(state);
let mut rec_state = self.state.lock();
let mut main_state = self.main_stream.state.lock();
// Wake up, in case the main stream is waiting for us to progress up to here. We progressed
@ -777,8 +794,6 @@ impl ToggleRecord {
state = stream.state.lock();
let mut rec_state = self.state.lock();
// Wait until the main stream advanced completely past our current running time in
// Recording/Stopped modes to make sure we're not already outputting/dropping data that
// should actually be dropped/output if recording is started/stopped now.
@ -820,11 +835,11 @@ impl ToggleRecord {
main_state.current_running_time_end.display(),
);
drop(rec_state);
drop(main_state);
drop(state);
self.main_stream_cond.wait(&mut main_state);
self.main_stream_cond.wait(&mut rec_state);
main_state = self.main_stream.state.lock();
state = stream.state.lock();
rec_state = self.state.lock();
}
if state.flushing {
@ -1202,7 +1217,7 @@ impl ToggleRecord {
let mut all_others_eos = true;
// Check eos state of all secondary streams
self.other_streams.lock().0.iter().all(|s| {
rec_state.other_streams.0.iter().all(|s| {
if s == stream {
return true;
}
@ -1247,7 +1262,7 @@ impl ToggleRecord {
let mut all_others_not_eos = false;
// Check eos state of all secondary streams
self.other_streams.lock().0.iter().any(|s| {
rec_state.other_streams.0.iter().any(|s| {
if s == stream {
return false;
}
@ -1277,7 +1292,8 @@ impl ToggleRecord {
pad: &gst::Pad,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let stream = self.pads.lock().get(pad).cloned().ok_or_else(|| {
let rec_state = self.state.lock();
let stream = rec_state.pads.get(pad).cloned().ok_or_else(|| {
gst::element_imp_error!(self, gst::CoreError::Pad, ["Unknown pad {:?}", pad.name()]);
gst::FlowError::Error
})?;
@ -1314,12 +1330,15 @@ impl ToggleRecord {
}
}
drop(rec_state);
let handle_result = if stream != self.main_stream {
self.handle_secondary_stream(pad, &stream, buffer, upstream_live)
} else {
self.handle_main_stream(pad, &stream, buffer, upstream_live)
}?;
let rec_state = self.state.lock();
let mut buffer = match handle_result {
HandleResult::Drop => {
return Ok(gst::FlowSuccess::Ok);
@ -1362,8 +1381,6 @@ impl ToggleRecord {
let mut events = Vec::with_capacity(state.pending_events.len() + 1);
if state.segment_pending {
let rec_state = self.state.lock();
// Adjust so that last_recording_start has running time of
// recording_duration
@ -1418,9 +1435,10 @@ impl ToggleRecord {
// called without lock
fn sink_event(&self, pad: &gst::Pad, mut event: gst::Event) -> bool {
let mut rec_state = self.state.lock();
use gst::EventView;
let stream = match self.pads.lock().get(pad) {
let stream = match rec_state.pads.get(pad) {
None => {
gst::element_imp_error!(
self,
@ -1542,6 +1560,8 @@ impl ToggleRecord {
Some(is_live) => upstream_live = is_live,
}
}
drop(rec_state);
let handle_result = if stream == self.main_stream {
self.handle_main_stream(pad, &stream, (pts, duration), upstream_live)
} else {
@ -1572,7 +1592,6 @@ impl ToggleRecord {
let main_is_eos = main_state.as_ref().is_some_and(|main_state| main_state.eos);
if !main_is_eos {
let mut rec_state = self.state.lock();
recording_state_changed = self.check_and_update_stream_start(
pad,
&stream,
@ -1599,7 +1618,6 @@ impl ToggleRecord {
drop(main_state);
if main_is_eos {
let mut rec_state = self.state.lock();
recording_state_changed =
self.check_and_update_eos(pad, &stream, &mut state, &mut rec_state);
}
@ -1668,7 +1686,8 @@ impl ToggleRecord {
// called without lock
fn sink_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool {
let stream = match self.pads.lock().get(pad) {
let rec_state = self.state.lock();
let stream = match rec_state.pads.get(pad) {
None => {
gst::element_imp_error!(
self,
@ -1699,9 +1718,10 @@ impl ToggleRecord {
// called without lock
fn src_event(&self, pad: &gst::Pad, mut event: gst::Event) -> bool {
let rec_state = self.state.lock();
use gst::EventView;
let stream = match self.pads.lock().get(pad) {
let stream = match rec_state.pads.get(pad) {
None => {
gst::element_imp_error!(
self,
@ -1717,7 +1737,6 @@ impl ToggleRecord {
let forward = !matches!(event.view(), EventView::Seek(..));
let rec_state = self.state.lock();
let offset = event.running_time_offset();
event
.make_mut()
@ -1735,9 +1754,10 @@ impl ToggleRecord {
// called without lock
fn src_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool {
let rec_state = self.state.lock();
use gst::QueryViewMut;
let stream = match self.pads.lock().get(pad) {
let stream = match rec_state.pads.get(pad) {
None => {
gst::element_imp_error!(
self,
@ -1789,7 +1809,6 @@ impl ToggleRecord {
QueryViewMut::Position(q) => {
if q.format() == gst::Format::Time {
let state = stream.state.lock();
let rec_state = self.state.lock();
let mut recording_duration = rec_state.recording_duration;
if rec_state.recording_state == RecordingState::Recording
|| rec_state.recording_state == RecordingState::Stopping
@ -1823,7 +1842,6 @@ impl ToggleRecord {
QueryViewMut::Duration(q) => {
if q.format() == gst::Format::Time {
let state = stream.state.lock();
let rec_state = self.state.lock();
let mut recording_duration = rec_state.recording_duration;
if rec_state.recording_state == RecordingState::Recording
|| rec_state.recording_state == RecordingState::Stopping
@ -1863,7 +1881,8 @@ impl ToggleRecord {
// called without lock
fn iterate_internal_links(&self, pad: &gst::Pad) -> gst::Iterator<gst::Pad> {
let stream = match self.pads.lock().get(pad) {
let rec_state = self.state.lock();
let stream = match rec_state.pads.get(pad) {
None => {
gst::element_imp_error!(
self,
@ -1955,11 +1974,9 @@ impl ObjectSubclass for ToggleRecord {
Self {
settings: Mutex::new(Settings::default()),
state: Mutex::new(State::default()),
state: Mutex::new(State::new(pads)),
main_stream,
main_stream_cond: Condvar::new(),
other_streams: Mutex::new((Vec::new(), 0)),
pads: Mutex::new(pads),
}
}
}
@ -2128,9 +2145,11 @@ impl ElementImpl for ToggleRecord {
match transition {
gst::StateChange::ReadyToPaused => {
for s in self
let mut rec_state = self.state.lock();
rec_state.reset();
for s in rec_state
.other_streams
.lock()
.0
.iter()
.chain(iter::once(&self.main_stream))
@ -2139,14 +2158,13 @@ impl ElementImpl for ToggleRecord {
*state = StreamState::default();
}
let mut rec_state = self.state.lock();
*rec_state = State::default();
let settings = *self.settings.lock();
rec_state.live = settings.live;
}
gst::StateChange::PausedToReady => {
for s in &self.other_streams.lock().0 {
let rec_state = self.state.lock();
for s in &rec_state.other_streams.0 {
let mut state = s.state.lock();
state.flushing = true;
}
@ -2161,9 +2179,10 @@ impl ElementImpl for ToggleRecord {
let success = self.parent_change_state(transition)?;
if transition == gst::StateChange::PausedToReady {
for s in self
let mut rec_state = self.state.lock();
for s in rec_state
.other_streams
.lock()
.0
.iter()
.chain(iter::once(&self.main_stream))
@ -2173,8 +2192,7 @@ impl ElementImpl for ToggleRecord {
state.pending_events.clear();
}
let mut rec_state = self.state.lock();
*rec_state = State::default();
rec_state.reset();
drop(rec_state);
self.obj().notify("recording");
}
@ -2188,12 +2206,9 @@ impl ElementImpl for ToggleRecord {
_name: Option<&str>,
_caps: Option<&gst::Caps>,
) -> Option<gst::Pad> {
let mut other_streams_guard = self.other_streams.lock();
let (ref mut other_streams, ref mut pad_count) = *other_streams_guard;
let mut pads = self.pads.lock();
let id = *pad_count;
*pad_count += 1;
let mut rec_state = self.state.lock();
let id = rec_state.other_streams.1;
rec_state.other_streams.1 += 1;
let templ = self.obj().pad_template("sink_%u").unwrap();
let sinkpad = gst::Pad::builder_from_template(&templ)
@ -2259,13 +2274,14 @@ impl ElementImpl for ToggleRecord {
let stream = Stream::new(sinkpad.clone(), srcpad.clone());
pads.insert(stream.sinkpad.clone(), stream.clone());
pads.insert(stream.srcpad.clone(), stream.clone());
rec_state
.pads
.insert(stream.sinkpad.clone(), stream.clone());
rec_state.pads.insert(stream.srcpad.clone(), stream.clone());
other_streams.push(stream);
rec_state.other_streams.0.push(stream);
drop(pads);
drop(other_streams_guard);
drop(rec_state);
self.obj().add_pad(&sinkpad).unwrap();
self.obj().add_pad(&srcpad).unwrap();
@ -2274,24 +2290,21 @@ impl ElementImpl for ToggleRecord {
}
fn release_pad(&self, pad: &gst::Pad) {
let mut other_streams_guard = self.other_streams.lock();
let (ref mut other_streams, _) = *other_streams_guard;
let mut pads = self.pads.lock();
let mut rec_state = self.state.lock();
let stream = match pads.get(pad) {
let stream = match rec_state.pads.get(pad) {
None => return,
Some(stream) => stream.clone(),
};
pads.remove(&stream.sinkpad).unwrap();
pads.remove(&stream.srcpad).unwrap();
rec_state.pads.remove(&stream.sinkpad).unwrap();
rec_state.pads.remove(&stream.srcpad).unwrap();
// TODO: Replace with Vec::remove_item() once stable
let pos = other_streams.iter().position(|x| *x == stream);
pos.map(|pos| other_streams.swap_remove(pos));
let pos = rec_state.other_streams.0.iter().position(|x| *x == stream);
pos.map(|pos| rec_state.other_streams.0.swap_remove(pos));
drop(pads);
drop(other_streams_guard);
drop(rec_state);
let main_state = self.main_stream.state.lock();
self.main_stream_cond.notify_all();