utils/togglerecord: Fix timestamp tracking logic for partially overlapping timestamps

And various other cases. Also adjust one of the tests accordingly and
improve assertions to print more information about internal
inconsistencies.
This commit is contained in:
Sebastian Dröge 2020-08-05 18:24:10 +03:00 committed by Sebastian Dröge
parent a91e8aadb2
commit 98b618cc9d
4 changed files with 85 additions and 32 deletions

View file

@ -15,6 +15,7 @@ gstreamer-video = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs
gtk = { git = "https://github.com/gtk-rs/gtk", optional = true } gtk = { git = "https://github.com/gtk-rs/gtk", optional = true }
gio = { git = "https://github.com/gtk-rs/gio", optional = true } gio = { git = "https://github.com/gtk-rs/gio", optional = true }
parking_lot = "0.11" parking_lot = "0.11"
more-asserts = "0.2"
lazy_static = "1.0" lazy_static = "1.0"
[dev-dependencies] [dev-dependencies]

View file

@ -25,6 +25,9 @@ extern crate gstreamer_video as gst_video;
#[macro_use] #[macro_use]
extern crate lazy_static; extern crate lazy_static;
#[macro_use]
extern crate more_asserts;
extern crate parking_lot; extern crate parking_lot;
mod togglerecord; mod togglerecord;

View file

@ -93,7 +93,9 @@ struct StreamState {
in_segment: gst::FormattedSegment<gst::ClockTime>, in_segment: gst::FormattedSegment<gst::ClockTime>,
out_segment: gst::FormattedSegment<gst::ClockTime>, out_segment: gst::FormattedSegment<gst::ClockTime>,
segment_seqnum: gst::Seqnum, segment_seqnum: gst::Seqnum,
// Start/end running time of the current/last buffer
current_running_time: gst::ClockTime, current_running_time: gst::ClockTime,
current_running_time_end: gst::ClockTime,
eos: bool, eos: bool,
flushing: bool, flushing: bool,
segment_pending: bool, segment_pending: bool,
@ -110,6 +112,7 @@ impl Default for StreamState {
out_segment: gst::FormattedSegment::new(), out_segment: gst::FormattedSegment::new(),
segment_seqnum: gst::Seqnum::next(), segment_seqnum: gst::Seqnum::next(),
current_running_time: gst::CLOCK_TIME_NONE, current_running_time: gst::CLOCK_TIME_NONE,
current_running_time_end: gst::CLOCK_TIME_NONE,
eos: false, eos: false,
flushing: false, flushing: false,
segment_pending: false, segment_pending: false,
@ -407,7 +410,9 @@ impl ToggleRecord {
let current_running_time = state.in_segment.to_running_time(dts_or_pts); let current_running_time = state.in_segment.to_running_time(dts_or_pts);
let current_running_time_end = state.in_segment.to_running_time(dts_or_pts_end); let current_running_time_end = state.in_segment.to_running_time(dts_or_pts_end);
state.current_running_time = cmp::max(current_running_time_end, state.current_running_time); state.current_running_time = cmp::max(current_running_time, state.current_running_time);
state.current_running_time_end =
cmp::max(current_running_time_end, state.current_running_time_end);
// Wake up everybody, we advanced a bit // Wake up everybody, we advanced a bit
// Important: They will only be able to advance once we're done with this // Important: They will only be able to advance once we're done with this
@ -480,14 +485,16 @@ impl ToggleRecord {
rec_state.recording_duration rec_state.recording_duration
); );
// Then unlock and wait for all other streams to reach it or go EOS instead. // Then unlock and wait for all other streams to reach a buffer that is completely
// after/at the recording stop position (i.e. can be dropped completely) or go EOS
// instead.
drop(rec_state); drop(rec_state);
while !self.other_streams.lock().0.iter().all(|s| { while !self.other_streams.lock().0.iter().all(|s| {
let s = s.state.lock(); let s = s.state.lock();
s.eos s.eos
|| (s.current_running_time.is_some() || (s.current_running_time.is_some()
&& s.current_running_time >= current_running_time_end) && s.current_running_time >= current_running_time)
}) { }) {
gst_log!(CAT, obj: pad, "Waiting for other streams to stop"); gst_log!(CAT, obj: pad, "Waiting for other streams to stop");
self.main_stream_cond.wait(&mut state); self.main_stream_cond.wait(&mut state);
@ -563,15 +570,16 @@ impl ToggleRecord {
other_state.discont_pending = true; other_state.discont_pending = true;
} }
// Then unlock and wait for all other streams to reach // Then unlock and wait for all other streams to reach a buffer that is completely
// it or go EOS instead // after/at the recording start position (i.e. can be passed through completely) or
// go EOS instead.
drop(rec_state); drop(rec_state);
while !self.other_streams.lock().0.iter().all(|s| { while !self.other_streams.lock().0.iter().all(|s| {
let s = s.state.lock(); let s = s.state.lock();
s.eos s.eos
|| (s.current_running_time.is_some() || (s.current_running_time.is_some()
&& s.current_running_time >= current_running_time_end) && s.current_running_time >= current_running_time)
}) { }) {
gst_log!(CAT, obj: pad, "Waiting for other streams to start"); gst_log!(CAT, obj: pad, "Waiting for other streams to start");
self.main_stream_cond.wait(&mut state); self.main_stream_cond.wait(&mut state);
@ -664,7 +672,9 @@ impl ToggleRecord {
let current_running_time = state.in_segment.to_running_time(pts); let current_running_time = state.in_segment.to_running_time(pts);
let current_running_time_end = state.in_segment.to_running_time(pts_end); let current_running_time_end = state.in_segment.to_running_time(pts_end);
state.current_running_time = cmp::max(current_running_time_end, state.current_running_time); state.current_running_time = cmp::max(current_running_time, state.current_running_time);
state.current_running_time_end =
cmp::max(current_running_time_end, state.current_running_time_end);
gst_log!( gst_log!(
CAT, CAT,
@ -684,20 +694,41 @@ impl ToggleRecord {
// above but all notifying must happen while the main_stream state is locked as per above. // above but all notifying must happen while the main_stream state is locked as per above.
self.main_stream_cond.notify_all(); self.main_stream_cond.notify_all();
let mut rec_state = self.state.lock();
// Wait until the main stream advanced completely past our current running time in
// Recording/Stopped modes to make sure we're not already outputting/dropping data that
// should actually be dropped/output if recording is started/stopped now.
//
// In Starting/Stopping mode we wait if we the start of this buffer is after last recording
// start/stop as in that case we should be in Recording/Stopped mode already. The main
// stream is waiting for us to reach that position to switch to Recording/Stopped mode so
// that in those modes we only have to pass through/drop the whole buffers.
while (main_state.current_running_time == gst::CLOCK_TIME_NONE while (main_state.current_running_time == gst::CLOCK_TIME_NONE
|| main_state.current_running_time < current_running_time_end) || rec_state.recording_state != RecordingState::Starting
&& rec_state.recording_state != RecordingState::Stopping
&& main_state.current_running_time_end < current_running_time_end
|| rec_state.recording_state == RecordingState::Starting
&& rec_state.last_recording_start <= current_running_time
|| rec_state.recording_state == RecordingState::Stopping
&& rec_state.last_recording_stop <= current_running_time)
&& !main_state.eos && !main_state.eos
&& !stream.state.lock().flushing && !stream.state.lock().flushing
{ {
gst_log!( gst_log!(
CAT, CAT,
obj: pad, obj: pad,
"Waiting for reaching {} / EOS / flushing, main stream at {}", "Waiting at {}-{} in {:?} state, main stream at {}-{}",
current_running_time, current_running_time,
main_state.current_running_time current_running_time_end,
rec_state.recording_state,
main_state.current_running_time,
main_state.current_running_time_end
); );
drop(rec_state);
self.main_stream_cond.wait(&mut main_state); self.main_stream_cond.wait(&mut main_state);
rec_state = self.state.lock();
} }
state = stream.state.lock(); state = stream.state.lock();
@ -707,8 +738,6 @@ impl ToggleRecord {
return Ok(HandleResult::Flushing); return Ok(HandleResult::Flushing);
} }
let rec_state = self.state.lock();
// If the main stream is EOS, we are also EOS unless we are // If the main stream is EOS, we are also EOS unless we are
// before the final last recording stop running time // before the final last recording stop running time
if main_state.eos { if main_state.eos {
@ -821,8 +850,8 @@ impl ToggleRecord {
} else { } else {
// In all other cases the buffer is fully between recording start and end and // In all other cases the buffer is fully between recording start and end and
// can be passed through as is // can be passed through as is
assert!(current_running_time >= rec_state.last_recording_start); assert_ge!(current_running_time, rec_state.last_recording_start);
assert!(current_running_time_end <= rec_state.last_recording_stop); assert_le!(current_running_time_end, rec_state.last_recording_stop);
gst_debug!( gst_debug!(
CAT, CAT,
@ -836,19 +865,27 @@ impl ToggleRecord {
} }
} }
// The end of our buffer is before the end of the previous buffer of the main stream
assert!(main_state.current_running_time >= current_running_time_end);
match rec_state.recording_state { match rec_state.recording_state {
RecordingState::Recording => { RecordingState::Recording => {
// The end of our buffer must be before/at the end of the previous buffer of the main
// stream
assert_le!(
current_running_time_end,
main_state.current_running_time_end
);
// We're properly started, must have a start position and // We're properly started, must have a start position and
// be actually after that start position // be actually after that start position
assert!(rec_state.last_recording_start.is_some()); assert!(rec_state.last_recording_start.is_some());
assert!(current_running_time >= rec_state.last_recording_start); assert_ge!(current_running_time, rec_state.last_recording_start);
gst_log!(CAT, obj: pad, "Passing buffer (recording)"); gst_log!(CAT, obj: pad, "Passing buffer (recording)");
Ok(HandleResult::Pass(data)) Ok(HandleResult::Pass(data))
} }
RecordingState::Stopping => { RecordingState::Stopping => {
// The start of our buffer must be before the last recording stop as
// otherwise we would be in Stopped state already
assert_lt!(current_running_time, rec_state.last_recording_stop);
// If we have no start position yet, the main stream is waiting for a key-frame // If we have no start position yet, the main stream is waiting for a key-frame
if rec_state.last_recording_stop.is_none() { if rec_state.last_recording_stop.is_none() {
gst_log!( gst_log!(
@ -908,11 +945,22 @@ impl ToggleRecord {
} }
} }
RecordingState::Stopped => { RecordingState::Stopped => {
// The end of our buffer must be before/at the end of the previous buffer of the main
// stream
assert_le!(
current_running_time_end,
main_state.current_running_time_end
);
// We're properly stopped // We're properly stopped
gst_log!(CAT, obj: pad, "Dropping buffer (stopped)"); gst_log!(CAT, obj: pad, "Dropping buffer (stopped)");
Ok(HandleResult::Drop) Ok(HandleResult::Drop)
} }
RecordingState::Starting => { RecordingState::Starting => {
// The start of our buffer must be before the last recording start as
// otherwise we would be in Recording state already
assert_lt!(current_running_time, rec_state.last_recording_start);
// If we have no start position yet, the main stream is waiting for a key-frame // If we have no start position yet, the main stream is waiting for a key-frame
if rec_state.last_recording_start.is_none() { if rec_state.last_recording_start.is_none() {
gst_log!( gst_log!(
@ -1126,6 +1174,7 @@ impl ToggleRecord {
state.segment_pending = true; state.segment_pending = true;
state.discont_pending = true; state.discont_pending = true;
state.current_running_time = gst::CLOCK_TIME_NONE; state.current_running_time = gst::CLOCK_TIME_NONE;
state.current_running_time_end = gst::CLOCK_TIME_NONE;
} }
EventView::Caps(c) => { EventView::Caps(c) => {
let mut state = stream.state.lock(); let mut state = stream.state.lock();
@ -1178,6 +1227,7 @@ impl ToggleRecord {
state.segment_seqnum = event.get_seqnum(); state.segment_seqnum = event.get_seqnum();
state.segment_pending = true; state.segment_pending = true;
state.current_running_time = gst::CLOCK_TIME_NONE; state.current_running_time = gst::CLOCK_TIME_NONE;
state.current_running_time_end = gst::CLOCK_TIME_NONE;
gst_debug!(CAT, obj: pad, "Got new Segment {:?}", state.in_segment); gst_debug!(CAT, obj: pad, "Got new Segment {:?}", state.in_segment);
@ -1397,13 +1447,13 @@ impl ToggleRecord {
obj: pad, obj: pad,
"Returning position {} = {} - ({} + {})", "Returning position {} = {} - ({} + {})",
recording_duration recording_duration
+ (state.current_running_time - rec_state.last_recording_start), + (state.current_running_time_end - rec_state.last_recording_start),
recording_duration, recording_duration,
state.current_running_time, state.current_running_time_end,
rec_state.last_recording_start rec_state.last_recording_start
); );
recording_duration += recording_duration +=
state.current_running_time - rec_state.last_recording_start; state.current_running_time_end - rec_state.last_recording_start;
} else { } else {
gst_debug!(CAT, obj: pad, "Returning position {}", recording_duration,); gst_debug!(CAT, obj: pad, "Returning position {}", recording_duration,);
} }
@ -1426,13 +1476,13 @@ impl ToggleRecord {
obj: pad, obj: pad,
"Returning duration {} = {} - ({} + {})", "Returning duration {} = {} - ({} + {})",
recording_duration recording_duration
+ (state.current_running_time - rec_state.last_recording_start), + (state.current_running_time_end - rec_state.last_recording_start),
recording_duration, recording_duration,
state.current_running_time, state.current_running_time_end,
rec_state.last_recording_start rec_state.last_recording_start
); );
recording_duration += recording_duration +=
state.current_running_time - rec_state.last_recording_start; state.current_running_time_end - rec_state.last_recording_start;
} else { } else {
gst_debug!(CAT, obj: pad, "Returning duration {}", recording_duration,); gst_debug!(CAT, obj: pad, "Returning duration {}", recording_duration,);
} }

View file

@ -1067,23 +1067,22 @@ fn test_three_stream_open_close_open() {
sender_input_1.send(SendData::Buffers(10)).unwrap(); sender_input_1.send(SendData::Buffers(10)).unwrap();
sender_input_2.send(SendData::Buffers(11)).unwrap(); sender_input_2.send(SendData::Buffers(11)).unwrap();
sender_input_3.send(SendData::Buffers(10)).unwrap(); sender_input_3.send(SendData::Buffers(11)).unwrap();
// Sender 2 is waiting for sender 1 to continue, sender 1/3 are finished // Sender 2/3 is waiting for sender 1 to continue
receiver_input_done_1.recv().unwrap(); receiver_input_done_1.recv().unwrap();
receiver_input_done_3.recv().unwrap();
// Stop recording and push new buffers to sender 1, which will advance // Stop recording and push new buffers to sender 1, which will advance
// it and release the 11th buffer of sender 2 above // it and release the 11th buffer of sender 2/3 above
togglerecord.set_property("record", &false).unwrap(); togglerecord.set_property("record", &false).unwrap();
sender_input_1.send(SendData::Buffers(10)).unwrap(); sender_input_1.send(SendData::Buffers(10)).unwrap();
receiver_input_done_2.recv().unwrap(); receiver_input_done_2.recv().unwrap();
receiver_input_done_3.recv().unwrap();
// Send another 9 buffers to sender 2, 1/2 are at the same position now // Send another 9 buffers to sender 2/3, all streams are at the same position now
sender_input_2.send(SendData::Buffers(9)).unwrap(); sender_input_2.send(SendData::Buffers(9)).unwrap();
sender_input_3.send(SendData::Buffers(9)).unwrap();
// Send the remaining 10 buffers to sender 3, all are at the same position now
sender_input_3.send(SendData::Buffers(10)).unwrap();
// Wait until all 20 buffers of all senders are done // Wait until all 20 buffers of all senders are done
receiver_input_done_1.recv().unwrap(); receiver_input_done_1.recv().unwrap();