From 2be14b95b306cf23bd52a192106c17108d573a03 Mon Sep 17 00:00:00 2001 From: Vivia Nikolaidou Date: Tue, 20 Jun 2023 15:05:29 +0300 Subject: [PATCH] togglerecord: Fix nonlive inputs when element is started not recording Part-of: --- utils/togglerecord/src/togglerecord/imp.rs | 22 ++- utils/togglerecord/tests/tests.rs | 217 +++++++++++++++++++++ 2 files changed, 238 insertions(+), 1 deletion(-) diff --git a/utils/togglerecord/src/togglerecord/imp.rs b/utils/togglerecord/src/togglerecord/imp.rs index d6643722..74fdaba0 100644 --- a/utils/togglerecord/src/togglerecord/imp.rs +++ b/utils/togglerecord/src/togglerecord/imp.rs @@ -356,6 +356,14 @@ impl ToggleRecord { upstream_live: bool, ) -> Result { if !upstream_live { + let clock = self.obj().clock(); + let mut rec_state = self.state.lock(); + if rec_state.time_start_block.is_none() { + rec_state.time_start_block = clock + .as_ref() + .map_or(state.current_running_time, |c| c.time()); + } + drop(rec_state); while !settings.record && !state.flushing { gst::debug!(CAT, obj: pad, "Waiting for record=true"); self.main_stream_cond.wait(state); @@ -374,12 +382,20 @@ impl ToggleRecord { } let mut rec_state = self.state.lock(); if let Some(time_start_block) = rec_state.time_start_block { - let clock = self.obj().clock().expect("Cannot find pipeline clock"); + // If we have a time_start_block it means the clock is there + let clock = clock.expect("Cannot find pipeline clock"); rec_state.blocked_duration += clock.time().unwrap() - time_start_block; if settings.live { rec_state.running_time_offset = rec_state.blocked_duration.nseconds() as i64; } rec_state.time_start_block = gst::ClockTime::NONE; + } else { + // How did we even get here? + gst::warning!( + CAT, + obj: pad, + "Have no clock and no current running time. Will not offset buffers" + ); } drop(rec_state); gst::log!(CAT, obj: pad, "Done blocking main stream"); @@ -573,6 +589,10 @@ impl ToggleRecord { } } RecordingState::Stopped => { + if !upstream_live { + rec_state.recording_state = RecordingState::Starting; + } + drop(rec_state); if self.block_if_upstream_not_live(pad, settings, &mut state, upstream_live)? { Ok(HandleResult::Pass(data)) } else { diff --git a/utils/togglerecord/tests/tests.rs b/utils/togglerecord/tests/tests.rs index 988a9b14..1a72881f 100644 --- a/utils/togglerecord/tests/tests.rs +++ b/utils/togglerecord/tests/tests.rs @@ -876,6 +876,223 @@ fn test_two_stream_close_open() { pipeline.set_state(gst::State::Null).unwrap(); } +#[test] +fn test_two_stream_close_open_nonlivein_nonliveout() { + init(); + + let pipeline = gst::Pipeline::default(); + let togglerecord = gst::ElementFactory::make("togglerecord") + .property("is-live", false) + .property("record", false) + .build() + .unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let main_buffers_in_gap = 10u64; + let secondary_buffers_in_gap = main_buffers_in_gap + 1; + let main_buffers_after_gap = 10u64; + let secondary_buffers_after_gap = 9u64; + let recv_timeout = Duration::from_secs(10); + + let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, false); + let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = + setup_sender_receiver( + &pipeline, + &togglerecord, + "src_%u", + gst::ClockTime::ZERO, + false, + ); + + pipeline.set_state(gst::State::Playing).unwrap(); + + sender_input_1 + .send(SendData::Buffers(main_buffers_in_gap as usize)) + .unwrap(); + assert_eq!( + receiver_input_done_1.recv_timeout(Duration::from_millis(20)), + Err(mpsc::RecvTimeoutError::Timeout) + ); + + // Sender 2 is waiting for sender 1 to continue, sender 1 is finished + sender_input_2 + .send(SendData::Buffers((secondary_buffers_in_gap) as usize)) + .unwrap(); + assert_eq!( + receiver_input_done_2.recv_timeout(Duration::from_millis(20)), + Err(mpsc::RecvTimeoutError::Timeout) + ); + + // Start recording and push new buffers to sender 1, which will advance + // it and release the 11th buffer of sender 2 above + togglerecord.set_property("record", true); + sender_input_1 + .send(SendData::Buffers(main_buffers_after_gap as usize)) + .unwrap(); + receiver_input_done_2.recv_timeout(recv_timeout).unwrap(); + + // Send another 9 buffers to sender 2, both are the same position now + sender_input_2 + .send(SendData::Buffers(secondary_buffers_after_gap as usize)) + .unwrap(); + + // Wait until all 20 buffers of both senders are done + receiver_input_done_1.recv_timeout(recv_timeout).unwrap(); + receiver_input_done_2.recv_timeout(recv_timeout).unwrap(); + + // Send EOS and wait for it to be handled + sender_input_1.send(SendData::Eos).unwrap(); + sender_input_2.send(SendData::Eos).unwrap(); + receiver_input_done_1.recv_timeout(recv_timeout).unwrap(); + receiver_input_done_2.recv_timeout(recv_timeout).unwrap(); + + let mut segment_1 = gst::FormattedSegment::::new(); + let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0); + for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time.unwrap(), index * 20.mseconds()); + assert_eq!(pts.unwrap(), index * 20.mseconds()); + assert_eq!(duration.unwrap(), 20.mseconds()); + } + assert_eq!( + buffers_1.len() as u64, + main_buffers_in_gap + main_buffers_in_gap + ); + + // Last buffer should be dropped from second stream + let mut segment_2 = gst::FormattedSegment::::new(); + let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0); + for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time.unwrap(), index * 20.mseconds()); + assert_eq!(pts.unwrap(), index * 20.mseconds()); + assert_eq!(duration.unwrap(), 20.mseconds()); + } + assert_eq!( + buffers_2.len() as u64, + secondary_buffers_in_gap + secondary_buffers_after_gap + ); + + thread_1.join().unwrap(); + thread_2.join().unwrap(); + + pipeline.set_state(gst::State::Null).unwrap(); +} + +#[test] +fn test_two_stream_close_open_nonlivein_liveout() { + init(); + + let testclock = gst_check::TestClock::new(); + let pipeline = gst::Pipeline::default(); + pipeline.use_clock(Some(&testclock)); + let togglerecord = gst::ElementFactory::make("togglerecord") + .property("is-live", true) + .property("record", false) + .build() + .unwrap(); + togglerecord.set_clock(Some(&testclock)).unwrap(); + pipeline.add(&togglerecord).unwrap(); + let testclock = testclock.downcast::().unwrap(); + testclock.set_time(gst::ClockTime::ZERO); + + let main_buffers_in_gap = 10u64; + let secondary_buffers_in_gap = main_buffers_in_gap + 1; + let main_buffers_after_gap = 10u64; + let secondary_buffers_after_gap = 9u64; + let recv_timeout = Duration::from_secs(10); + + let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, false); + let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = + setup_sender_receiver( + &pipeline, + &togglerecord, + "src_%u", + gst::ClockTime::ZERO, + false, + ); + + pipeline.set_state(gst::State::Playing).unwrap(); + + sender_input_1 + .send(SendData::Buffers(main_buffers_in_gap as usize)) + .unwrap(); + assert_eq!( + receiver_input_done_1.recv_timeout(Duration::from_millis(20)), + Err(mpsc::RecvTimeoutError::Timeout) + ); + + // Sender 2 is waiting for sender 1 to continue, sender 1 is finished + sender_input_2 + .send(SendData::Buffers((secondary_buffers_in_gap) as usize)) + .unwrap(); + assert_eq!( + receiver_input_done_2.recv_timeout(Duration::from_millis(20)), + Err(mpsc::RecvTimeoutError::Timeout) + ); + + // Advance the clock + let block_time = gst::ClockTime::from_mseconds(42); + testclock.advance_time(block_time.nseconds() as i64); + + // Start recording and push new buffers to sender 1, which will advance + // it and release the 11th buffer of sender 2 above + togglerecord.set_property("record", true); + sender_input_1 + .send(SendData::Buffers(main_buffers_after_gap as usize)) + .unwrap(); + receiver_input_done_2.recv_timeout(recv_timeout).unwrap(); + + // Send another 9 buffers to sender 2, both are the same position now + sender_input_2 + .send(SendData::Buffers(secondary_buffers_after_gap as usize)) + .unwrap(); + + // Wait until all 20 buffers of both senders are done + receiver_input_done_1.recv_timeout(recv_timeout).unwrap(); + receiver_input_done_2.recv_timeout(recv_timeout).unwrap(); + + // Send EOS and wait for it to be handled + sender_input_1.send(SendData::Eos).unwrap(); + sender_input_2.send(SendData::Eos).unwrap(); + receiver_input_done_1.recv_timeout(recv_timeout).unwrap(); + receiver_input_done_2.recv_timeout(recv_timeout).unwrap(); + + let mut segment_1 = gst::FormattedSegment::::new(); + let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0); + for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time.unwrap(), block_time + index * 20.mseconds()); + assert_eq!(pts.unwrap(), index * 20.mseconds()); + assert_eq!(duration.unwrap(), 20.mseconds()); + } + assert_eq!( + buffers_1.len() as u64, + main_buffers_in_gap + main_buffers_in_gap + ); + + // Last buffer should be dropped from second stream + let mut segment_2 = gst::FormattedSegment::::new(); + let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0); + for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time.unwrap(), block_time + index * 20.mseconds()); + assert_eq!(pts.unwrap(), index * 20.mseconds()); + assert_eq!(duration.unwrap(), 20.mseconds()); + } + assert_eq!( + buffers_2.len() as u64, + secondary_buffers_in_gap + secondary_buffers_after_gap + ); + + thread_1.join().unwrap(); + thread_2.join().unwrap(); + + pipeline.set_state(gst::State::Null).unwrap(); +} + #[test] fn test_two_stream_open_close_open_nonlivein_liveout() { init();