diff --git a/utils/togglerecord/src/togglerecord/imp.rs b/utils/togglerecord/src/togglerecord/imp.rs index 7311c1b9..64378c29 100644 --- a/utils/togglerecord/src/togglerecord/imp.rs +++ b/utils/togglerecord/src/togglerecord/imp.rs @@ -16,7 +16,7 @@ use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; -use parking_lot::{Condvar, Mutex, MutexGuard}; +use parking_lot::{Condvar, Mutex}; use std::cmp; use std::collections::HashMap; use std::iter; @@ -125,6 +125,9 @@ enum RecordingState { #[derive(Debug)] struct State { + other_streams: (Vec, u32), + pads: HashMap, + recording_state: RecordingState, last_recording_start: Option, last_recording_stop: Option, @@ -146,9 +149,11 @@ struct State { live: bool, } -impl Default for State { - fn default() -> Self { +impl State { + fn new(pads: HashMap) -> Self { Self { + other_streams: (Vec::new(), 0), + pads, recording_state: RecordingState::Stopped, last_recording_start: None, last_recording_stop: None, @@ -159,6 +164,17 @@ impl Default for State { live: false, } } + + fn reset(&mut self) { + self.recording_state = RecordingState::Stopped; + self.last_recording_start = None; + self.last_recording_stop = None; + self.recording_duration = gst::ClockTime::ZERO; + self.blocked_duration = gst::ClockTime::ZERO; + self.time_start_block = gst::ClockTime::NONE; + self.running_time_offset = 0; + self.live = false; + } } #[derive(Debug, PartialEq, Eq)] @@ -335,8 +351,6 @@ pub struct ToggleRecord { // If multiple stream states have to be locked, the // main_stream always comes first main_stream_cond: Condvar, - other_streams: Mutex<(Vec, u32)>, - pads: Mutex>, } static CAT: LazyLock = LazyLock::new(|| { @@ -348,26 +362,29 @@ static CAT: LazyLock = LazyLock::new(|| { }); impl ToggleRecord { - // called while holding stream.state + // called without lock fn block_if_upstream_not_live( &self, pad: &gst::Pad, mut settings: Settings, - state: &mut MutexGuard, + stream: &Stream, upstream_live: bool, ) -> Result { if !upstream_live { let clock = self.obj().clock(); let mut rec_state = self.state.lock(); + let mut state = stream.state.lock(); + if rec_state.time_start_block.is_none() { rec_state.time_start_block = clock .as_ref() .map_or(state.current_running_time, |c| c.time()); } - drop(rec_state); while !settings.record && !state.flushing { gst::debug!(CAT, obj = pad, "Waiting for record=true"); - self.main_stream_cond.wait(state); + drop(state); + self.main_stream_cond.wait(&mut rec_state); + state = stream.state.lock(); settings = *self.settings.lock(); } if state.flushing { @@ -376,12 +393,11 @@ impl ToggleRecord { } state.segment_pending = true; state.discont_pending = true; - for other_stream in &self.other_streams.lock().0 { + for other_stream in &rec_state.other_streams.0 { let mut other_state = other_stream.state.lock(); other_state.segment_pending = true; other_state.discont_pending = true; } - let mut rec_state = self.state.lock(); if let Some(time_start_block) = rec_state.time_start_block { // If we have a time_start_block it means the clock is there let clock = clock.expect("Cannot find pipeline clock"); @@ -415,6 +431,7 @@ impl ToggleRecord { data: T, upstream_live: bool, ) -> Result, gst::FlowError> { + let mut rec_state = self.state.lock(); let mut state = stream.state.lock(); let data = match data.clip(&state, &state.in_segment) { @@ -473,7 +490,6 @@ impl ToggleRecord { let settings = *self.settings.lock(); // First check if we need to block for non-live input - let mut rec_state = self.state.lock(); // Check if we have to update our recording state let settings_changed = match rec_state.recording_state { @@ -537,10 +553,9 @@ impl ToggleRecord { // Then unlock and wait for all other streams to reach a buffer that is completely // after/at the recording stop position (i.e. can be dropped completely) or go EOS // instead. - drop(rec_state); while !state.flushing - && !self.other_streams.lock().0.iter().all(|s| { + && !rec_state.other_streams.0.iter().all(|s| { let s = s.state.lock(); s.eos || s.current_running_time @@ -549,7 +564,9 @@ impl ToggleRecord { }) { gst::log!(CAT, obj = pad, "Waiting for other streams to stop"); - self.main_stream_cond.wait(&mut state); + drop(state); + self.main_stream_cond.wait(&mut rec_state); + state = stream.state.lock(); } if state.flushing { @@ -557,7 +574,6 @@ impl ToggleRecord { return Err(gst::FlowError::Flushing); } - let mut rec_state = self.state.lock(); rec_state.recording_state = RecordingState::Stopped; rec_state.recording_duration += last_recording_duration.unwrap_or(gst::ClockTime::ZERO); @@ -574,11 +590,10 @@ impl ToggleRecord { // Then become Stopped and drop this buffer. We always stop right before // a keyframe + drop(state); drop(rec_state); - let ret = - self.block_if_upstream_not_live(pad, settings, &mut state, upstream_live)?; - drop(state); + let ret = self.block_if_upstream_not_live(pad, settings, stream, upstream_live)?; self.obj().notify("recording"); if ret { @@ -592,7 +607,8 @@ impl ToggleRecord { rec_state.recording_state = RecordingState::Starting; } drop(rec_state); - if self.block_if_upstream_not_live(pad, settings, &mut state, upstream_live)? { + drop(state); + if self.block_if_upstream_not_live(pad, settings, stream, upstream_live)? { Ok(HandleResult::Pass(data)) } else { Ok(HandleResult::Drop) @@ -645,7 +661,7 @@ impl ToggleRecord { state.segment_pending = true; state.discont_pending = true; - for other_stream in &self.other_streams.lock().0 { + for other_stream in &rec_state.other_streams.0 { let mut other_state = other_stream.state.lock(); other_state.segment_pending = true; other_state.discont_pending = true; @@ -654,10 +670,9 @@ impl ToggleRecord { // Then unlock and wait for all other streams to reach a buffer that is completely // after/at the recording start position (i.e. can be passed through completely) or // go EOS instead. - drop(rec_state); while !state.flushing - && !self.other_streams.lock().0.iter().all(|s| { + && !rec_state.other_streams.0.iter().all(|s| { let s = s.state.lock(); s.eos || s.current_running_time @@ -666,7 +681,9 @@ impl ToggleRecord { }) { gst::log!(CAT, obj = pad, "Waiting for other streams to start"); - self.main_stream_cond.wait(&mut state); + drop(state); + self.main_stream_cond.wait(&mut rec_state); + state = stream.state.lock(); } if state.flushing { @@ -674,7 +691,6 @@ impl ToggleRecord { return Err(gst::FlowError::Flushing); } - let mut rec_state = self.state.lock(); rec_state.recording_state = RecordingState::Recording; gst::debug!( CAT, @@ -769,6 +785,7 @@ impl ToggleRecord { drop(state); + let mut rec_state = self.state.lock(); let mut main_state = self.main_stream.state.lock(); // Wake up, in case the main stream is waiting for us to progress up to here. We progressed @@ -777,8 +794,6 @@ impl ToggleRecord { state = stream.state.lock(); - let mut rec_state = self.state.lock(); - // Wait until the main stream advanced completely past our current running time in // Recording/Stopped modes to make sure we're not already outputting/dropping data that // should actually be dropped/output if recording is started/stopped now. @@ -820,11 +835,11 @@ impl ToggleRecord { main_state.current_running_time_end.display(), ); - drop(rec_state); + drop(main_state); drop(state); - self.main_stream_cond.wait(&mut main_state); + self.main_stream_cond.wait(&mut rec_state); + main_state = self.main_stream.state.lock(); state = stream.state.lock(); - rec_state = self.state.lock(); } if state.flushing { @@ -1202,7 +1217,7 @@ impl ToggleRecord { let mut all_others_eos = true; // Check eos state of all secondary streams - self.other_streams.lock().0.iter().all(|s| { + rec_state.other_streams.0.iter().all(|s| { if s == stream { return true; } @@ -1247,7 +1262,7 @@ impl ToggleRecord { let mut all_others_not_eos = false; // Check eos state of all secondary streams - self.other_streams.lock().0.iter().any(|s| { + rec_state.other_streams.0.iter().any(|s| { if s == stream { return false; } @@ -1277,7 +1292,8 @@ impl ToggleRecord { pad: &gst::Pad, buffer: gst::Buffer, ) -> Result { - let stream = self.pads.lock().get(pad).cloned().ok_or_else(|| { + let rec_state = self.state.lock(); + let stream = rec_state.pads.get(pad).cloned().ok_or_else(|| { gst::element_imp_error!(self, gst::CoreError::Pad, ["Unknown pad {:?}", pad.name()]); gst::FlowError::Error })?; @@ -1314,12 +1330,15 @@ impl ToggleRecord { } } + drop(rec_state); let handle_result = if stream != self.main_stream { self.handle_secondary_stream(pad, &stream, buffer, upstream_live) } else { self.handle_main_stream(pad, &stream, buffer, upstream_live) }?; + let rec_state = self.state.lock(); + let mut buffer = match handle_result { HandleResult::Drop => { return Ok(gst::FlowSuccess::Ok); @@ -1362,8 +1381,6 @@ impl ToggleRecord { let mut events = Vec::with_capacity(state.pending_events.len() + 1); if state.segment_pending { - let rec_state = self.state.lock(); - // Adjust so that last_recording_start has running time of // recording_duration @@ -1418,9 +1435,10 @@ impl ToggleRecord { // called without lock fn sink_event(&self, pad: &gst::Pad, mut event: gst::Event) -> bool { + let mut rec_state = self.state.lock(); use gst::EventView; - let stream = match self.pads.lock().get(pad) { + let stream = match rec_state.pads.get(pad) { None => { gst::element_imp_error!( self, @@ -1542,6 +1560,8 @@ impl ToggleRecord { Some(is_live) => upstream_live = is_live, } } + + drop(rec_state); let handle_result = if stream == self.main_stream { self.handle_main_stream(pad, &stream, (pts, duration), upstream_live) } else { @@ -1572,7 +1592,6 @@ impl ToggleRecord { let main_is_eos = main_state.as_ref().is_some_and(|main_state| main_state.eos); if !main_is_eos { - let mut rec_state = self.state.lock(); recording_state_changed = self.check_and_update_stream_start( pad, &stream, @@ -1599,7 +1618,6 @@ impl ToggleRecord { drop(main_state); if main_is_eos { - let mut rec_state = self.state.lock(); recording_state_changed = self.check_and_update_eos(pad, &stream, &mut state, &mut rec_state); } @@ -1668,7 +1686,8 @@ impl ToggleRecord { // called without lock fn sink_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool { - let stream = match self.pads.lock().get(pad) { + let rec_state = self.state.lock(); + let stream = match rec_state.pads.get(pad) { None => { gst::element_imp_error!( self, @@ -1699,9 +1718,10 @@ impl ToggleRecord { // called without lock fn src_event(&self, pad: &gst::Pad, mut event: gst::Event) -> bool { + let rec_state = self.state.lock(); use gst::EventView; - let stream = match self.pads.lock().get(pad) { + let stream = match rec_state.pads.get(pad) { None => { gst::element_imp_error!( self, @@ -1717,7 +1737,6 @@ impl ToggleRecord { let forward = !matches!(event.view(), EventView::Seek(..)); - let rec_state = self.state.lock(); let offset = event.running_time_offset(); event .make_mut() @@ -1735,9 +1754,10 @@ impl ToggleRecord { // called without lock fn src_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool { + let rec_state = self.state.lock(); use gst::QueryViewMut; - let stream = match self.pads.lock().get(pad) { + let stream = match rec_state.pads.get(pad) { None => { gst::element_imp_error!( self, @@ -1789,7 +1809,6 @@ impl ToggleRecord { QueryViewMut::Position(q) => { if q.format() == gst::Format::Time { let state = stream.state.lock(); - let rec_state = self.state.lock(); let mut recording_duration = rec_state.recording_duration; if rec_state.recording_state == RecordingState::Recording || rec_state.recording_state == RecordingState::Stopping @@ -1823,7 +1842,6 @@ impl ToggleRecord { QueryViewMut::Duration(q) => { if q.format() == gst::Format::Time { let state = stream.state.lock(); - let rec_state = self.state.lock(); let mut recording_duration = rec_state.recording_duration; if rec_state.recording_state == RecordingState::Recording || rec_state.recording_state == RecordingState::Stopping @@ -1863,7 +1881,8 @@ impl ToggleRecord { // called without lock fn iterate_internal_links(&self, pad: &gst::Pad) -> gst::Iterator { - let stream = match self.pads.lock().get(pad) { + let rec_state = self.state.lock(); + let stream = match rec_state.pads.get(pad) { None => { gst::element_imp_error!( self, @@ -1955,11 +1974,9 @@ impl ObjectSubclass for ToggleRecord { Self { settings: Mutex::new(Settings::default()), - state: Mutex::new(State::default()), + state: Mutex::new(State::new(pads)), main_stream, main_stream_cond: Condvar::new(), - other_streams: Mutex::new((Vec::new(), 0)), - pads: Mutex::new(pads), } } } @@ -2128,9 +2145,11 @@ impl ElementImpl for ToggleRecord { match transition { gst::StateChange::ReadyToPaused => { - for s in self + let mut rec_state = self.state.lock(); + rec_state.reset(); + + for s in rec_state .other_streams - .lock() .0 .iter() .chain(iter::once(&self.main_stream)) @@ -2139,14 +2158,13 @@ impl ElementImpl for ToggleRecord { *state = StreamState::default(); } - let mut rec_state = self.state.lock(); - *rec_state = State::default(); - let settings = *self.settings.lock(); rec_state.live = settings.live; } gst::StateChange::PausedToReady => { - for s in &self.other_streams.lock().0 { + let rec_state = self.state.lock(); + + for s in &rec_state.other_streams.0 { let mut state = s.state.lock(); state.flushing = true; } @@ -2161,9 +2179,10 @@ impl ElementImpl for ToggleRecord { let success = self.parent_change_state(transition)?; if transition == gst::StateChange::PausedToReady { - for s in self + let mut rec_state = self.state.lock(); + + for s in rec_state .other_streams - .lock() .0 .iter() .chain(iter::once(&self.main_stream)) @@ -2173,8 +2192,7 @@ impl ElementImpl for ToggleRecord { state.pending_events.clear(); } - let mut rec_state = self.state.lock(); - *rec_state = State::default(); + rec_state.reset(); drop(rec_state); self.obj().notify("recording"); } @@ -2188,12 +2206,9 @@ impl ElementImpl for ToggleRecord { _name: Option<&str>, _caps: Option<&gst::Caps>, ) -> Option { - let mut other_streams_guard = self.other_streams.lock(); - let (ref mut other_streams, ref mut pad_count) = *other_streams_guard; - let mut pads = self.pads.lock(); - - let id = *pad_count; - *pad_count += 1; + let mut rec_state = self.state.lock(); + let id = rec_state.other_streams.1; + rec_state.other_streams.1 += 1; let templ = self.obj().pad_template("sink_%u").unwrap(); let sinkpad = gst::Pad::builder_from_template(&templ) @@ -2259,13 +2274,14 @@ impl ElementImpl for ToggleRecord { let stream = Stream::new(sinkpad.clone(), srcpad.clone()); - pads.insert(stream.sinkpad.clone(), stream.clone()); - pads.insert(stream.srcpad.clone(), stream.clone()); + rec_state + .pads + .insert(stream.sinkpad.clone(), stream.clone()); + rec_state.pads.insert(stream.srcpad.clone(), stream.clone()); - other_streams.push(stream); + rec_state.other_streams.0.push(stream); - drop(pads); - drop(other_streams_guard); + drop(rec_state); self.obj().add_pad(&sinkpad).unwrap(); self.obj().add_pad(&srcpad).unwrap(); @@ -2274,24 +2290,21 @@ impl ElementImpl for ToggleRecord { } fn release_pad(&self, pad: &gst::Pad) { - let mut other_streams_guard = self.other_streams.lock(); - let (ref mut other_streams, _) = *other_streams_guard; - let mut pads = self.pads.lock(); + let mut rec_state = self.state.lock(); - let stream = match pads.get(pad) { + let stream = match rec_state.pads.get(pad) { None => return, Some(stream) => stream.clone(), }; - pads.remove(&stream.sinkpad).unwrap(); - pads.remove(&stream.srcpad).unwrap(); + rec_state.pads.remove(&stream.sinkpad).unwrap(); + rec_state.pads.remove(&stream.srcpad).unwrap(); // TODO: Replace with Vec::remove_item() once stable - let pos = other_streams.iter().position(|x| *x == stream); - pos.map(|pos| other_streams.swap_remove(pos)); + let pos = rec_state.other_streams.0.iter().position(|x| *x == stream); + pos.map(|pos| rec_state.other_streams.0.swap_remove(pos)); - drop(pads); - drop(other_streams_guard); + drop(rec_state); let main_state = self.main_stream.state.lock(); self.main_stream_cond.notify_all();