togglerecord: Fix nonlive inputs when element is started not recording

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1252>
This commit is contained in:
Vivia Nikolaidou 2023-06-20 15:05:29 +03:00
parent 1119ed6620
commit 2be14b95b3
2 changed files with 238 additions and 1 deletions

View file

@ -356,6 +356,14 @@ impl ToggleRecord {
upstream_live: bool, upstream_live: bool,
) -> Result<bool, gst::FlowError> { ) -> Result<bool, gst::FlowError> {
if !upstream_live { if !upstream_live {
let clock = self.obj().clock();
let mut rec_state = self.state.lock();
if rec_state.time_start_block.is_none() {
rec_state.time_start_block = clock
.as_ref()
.map_or(state.current_running_time, |c| c.time());
}
drop(rec_state);
while !settings.record && !state.flushing { while !settings.record && !state.flushing {
gst::debug!(CAT, obj: pad, "Waiting for record=true"); gst::debug!(CAT, obj: pad, "Waiting for record=true");
self.main_stream_cond.wait(state); self.main_stream_cond.wait(state);
@ -374,12 +382,20 @@ impl ToggleRecord {
} }
let mut rec_state = self.state.lock(); let mut rec_state = self.state.lock();
if let Some(time_start_block) = rec_state.time_start_block { if let Some(time_start_block) = rec_state.time_start_block {
let clock = self.obj().clock().expect("Cannot find pipeline clock"); // If we have a time_start_block it means the clock is there
let clock = clock.expect("Cannot find pipeline clock");
rec_state.blocked_duration += clock.time().unwrap() - time_start_block; rec_state.blocked_duration += clock.time().unwrap() - time_start_block;
if settings.live { if settings.live {
rec_state.running_time_offset = rec_state.blocked_duration.nseconds() as i64; rec_state.running_time_offset = rec_state.blocked_duration.nseconds() as i64;
} }
rec_state.time_start_block = gst::ClockTime::NONE; rec_state.time_start_block = gst::ClockTime::NONE;
} else {
// How did we even get here?
gst::warning!(
CAT,
obj: pad,
"Have no clock and no current running time. Will not offset buffers"
);
} }
drop(rec_state); drop(rec_state);
gst::log!(CAT, obj: pad, "Done blocking main stream"); gst::log!(CAT, obj: pad, "Done blocking main stream");
@ -573,6 +589,10 @@ impl ToggleRecord {
} }
} }
RecordingState::Stopped => { RecordingState::Stopped => {
if !upstream_live {
rec_state.recording_state = RecordingState::Starting;
}
drop(rec_state);
if self.block_if_upstream_not_live(pad, settings, &mut state, upstream_live)? { if self.block_if_upstream_not_live(pad, settings, &mut state, upstream_live)? {
Ok(HandleResult::Pass(data)) Ok(HandleResult::Pass(data))
} else { } else {

View file

@ -876,6 +876,223 @@ fn test_two_stream_close_open() {
pipeline.set_state(gst::State::Null).unwrap(); pipeline.set_state(gst::State::Null).unwrap();
} }
#[test]
fn test_two_stream_close_open_nonlivein_nonliveout() {
init();
let pipeline = gst::Pipeline::default();
let togglerecord = gst::ElementFactory::make("togglerecord")
.property("is-live", false)
.property("record", false)
.build()
.unwrap();
pipeline.add(&togglerecord).unwrap();
let main_buffers_in_gap = 10u64;
let secondary_buffers_in_gap = main_buffers_in_gap + 1;
let main_buffers_after_gap = 10u64;
let secondary_buffers_after_gap = 9u64;
let recv_timeout = Duration::from_secs(10);
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, false);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(
&pipeline,
&togglerecord,
"src_%u",
gst::ClockTime::ZERO,
false,
);
pipeline.set_state(gst::State::Playing).unwrap();
sender_input_1
.send(SendData::Buffers(main_buffers_in_gap as usize))
.unwrap();
assert_eq!(
receiver_input_done_1.recv_timeout(Duration::from_millis(20)),
Err(mpsc::RecvTimeoutError::Timeout)
);
// Sender 2 is waiting for sender 1 to continue, sender 1 is finished
sender_input_2
.send(SendData::Buffers((secondary_buffers_in_gap) as usize))
.unwrap();
assert_eq!(
receiver_input_done_2.recv_timeout(Duration::from_millis(20)),
Err(mpsc::RecvTimeoutError::Timeout)
);
// Start recording and push new buffers to sender 1, which will advance
// it and release the 11th buffer of sender 2 above
togglerecord.set_property("record", true);
sender_input_1
.send(SendData::Buffers(main_buffers_after_gap as usize))
.unwrap();
receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
// Send another 9 buffers to sender 2, both are the same position now
sender_input_2
.send(SendData::Buffers(secondary_buffers_after_gap as usize))
.unwrap();
// Wait until all 20 buffers of both senders are done
receiver_input_done_1.recv_timeout(recv_timeout).unwrap();
receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
// Send EOS and wait for it to be handled
sender_input_1.send(SendData::Eos).unwrap();
sender_input_2.send(SendData::Eos).unwrap();
receiver_input_done_1.recv_timeout(recv_timeout).unwrap();
receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time.unwrap(), index * 20.mseconds());
assert_eq!(pts.unwrap(), index * 20.mseconds());
assert_eq!(duration.unwrap(), 20.mseconds());
}
assert_eq!(
buffers_1.len() as u64,
main_buffers_in_gap + main_buffers_in_gap
);
// Last buffer should be dropped from second stream
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time.unwrap(), index * 20.mseconds());
assert_eq!(pts.unwrap(), index * 20.mseconds());
assert_eq!(duration.unwrap(), 20.mseconds());
}
assert_eq!(
buffers_2.len() as u64,
secondary_buffers_in_gap + secondary_buffers_after_gap
);
thread_1.join().unwrap();
thread_2.join().unwrap();
pipeline.set_state(gst::State::Null).unwrap();
}
#[test]
fn test_two_stream_close_open_nonlivein_liveout() {
init();
let testclock = gst_check::TestClock::new();
let pipeline = gst::Pipeline::default();
pipeline.use_clock(Some(&testclock));
let togglerecord = gst::ElementFactory::make("togglerecord")
.property("is-live", true)
.property("record", false)
.build()
.unwrap();
togglerecord.set_clock(Some(&testclock)).unwrap();
pipeline.add(&togglerecord).unwrap();
let testclock = testclock.downcast::<gst_check::TestClock>().unwrap();
testclock.set_time(gst::ClockTime::ZERO);
let main_buffers_in_gap = 10u64;
let secondary_buffers_in_gap = main_buffers_in_gap + 1;
let main_buffers_after_gap = 10u64;
let secondary_buffers_after_gap = 9u64;
let recv_timeout = Duration::from_secs(10);
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, false);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(
&pipeline,
&togglerecord,
"src_%u",
gst::ClockTime::ZERO,
false,
);
pipeline.set_state(gst::State::Playing).unwrap();
sender_input_1
.send(SendData::Buffers(main_buffers_in_gap as usize))
.unwrap();
assert_eq!(
receiver_input_done_1.recv_timeout(Duration::from_millis(20)),
Err(mpsc::RecvTimeoutError::Timeout)
);
// Sender 2 is waiting for sender 1 to continue, sender 1 is finished
sender_input_2
.send(SendData::Buffers((secondary_buffers_in_gap) as usize))
.unwrap();
assert_eq!(
receiver_input_done_2.recv_timeout(Duration::from_millis(20)),
Err(mpsc::RecvTimeoutError::Timeout)
);
// Advance the clock
let block_time = gst::ClockTime::from_mseconds(42);
testclock.advance_time(block_time.nseconds() as i64);
// Start recording and push new buffers to sender 1, which will advance
// it and release the 11th buffer of sender 2 above
togglerecord.set_property("record", true);
sender_input_1
.send(SendData::Buffers(main_buffers_after_gap as usize))
.unwrap();
receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
// Send another 9 buffers to sender 2, both are the same position now
sender_input_2
.send(SendData::Buffers(secondary_buffers_after_gap as usize))
.unwrap();
// Wait until all 20 buffers of both senders are done
receiver_input_done_1.recv_timeout(recv_timeout).unwrap();
receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
// Send EOS and wait for it to be handled
sender_input_1.send(SendData::Eos).unwrap();
sender_input_2.send(SendData::Eos).unwrap();
receiver_input_done_1.recv_timeout(recv_timeout).unwrap();
receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time.unwrap(), block_time + index * 20.mseconds());
assert_eq!(pts.unwrap(), index * 20.mseconds());
assert_eq!(duration.unwrap(), 20.mseconds());
}
assert_eq!(
buffers_1.len() as u64,
main_buffers_in_gap + main_buffers_in_gap
);
// Last buffer should be dropped from second stream
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time.unwrap(), block_time + index * 20.mseconds());
assert_eq!(pts.unwrap(), index * 20.mseconds());
assert_eq!(duration.unwrap(), 20.mseconds());
}
assert_eq!(
buffers_2.len() as u64,
secondary_buffers_in_gap + secondary_buffers_after_gap
);
thread_1.join().unwrap();
thread_2.join().unwrap();
pipeline.set_state(gst::State::Null).unwrap();
}
#[test] #[test]
fn test_two_stream_open_close_open_nonlivein_liveout() { fn test_two_stream_open_close_open_nonlivein_liveout() {
init(); init();