diff --git a/utils/togglerecord/src/togglerecord/imp.rs b/utils/togglerecord/src/togglerecord/imp.rs index 6f13610f..7ca671ca 100644 --- a/utils/togglerecord/src/togglerecord/imp.rs +++ b/utils/togglerecord/src/togglerecord/imp.rs @@ -175,7 +175,7 @@ impl Default for State { enum HandleResult { Pass(T), Drop, - Eos, + Eos(bool), Flushing, } @@ -760,7 +760,12 @@ impl ToggleRecord { // If we have no start or stop position (we never recorded) then we're EOS too now if rec_state.last_recording_stop.is_none() || rec_state.last_recording_start.is_none() { gst_debug!(CAT, obj: pad, "Main stream EOS and recording never started",); - return Ok(HandleResult::Eos); + return Ok(HandleResult::Eos(self.check_and_update_eos( + pad, + stream, + &mut state, + &mut rec_state, + ))); } else if data.can_clip(&*state) && current_running_time < rec_state.last_recording_start && current_running_time_end > rec_state.last_recording_start @@ -849,7 +854,12 @@ impl ToggleRecord { return Ok(HandleResult::Pass(data)); } else { gst_warning!(CAT, obj: pad, "Complete buffer clipped!"); - return Ok(HandleResult::Eos); + return Ok(HandleResult::Eos(self.check_and_update_eos( + pad, + stream, + &mut state, + &mut rec_state, + ))); } } else if current_running_time_end > rec_state.last_recording_stop { // Otherwise if the end of the buffer is after the recording stop, we're EOS @@ -862,7 +872,12 @@ impl ToggleRecord { current_running_time_end, rec_state.last_recording_stop ); - return Ok(HandleResult::Eos); + return Ok(HandleResult::Eos(self.check_and_update_eos( + pad, + stream, + &mut state, + &mut rec_state, + ))); } else { // In all other cases the buffer is fully between recording start and end and // can be passed through as is @@ -1039,6 +1054,49 @@ impl ToggleRecord { } } + // should be called only if main stream is in eos state + fn check_and_update_eos( + &self, + pad: &gst::Pad, + stream: &Stream, + stream_state: &mut StreamState, + rec_state: &mut State, + ) -> bool { + stream_state.eos = true; + + // Check whether all secondary streams are in eos. If so, update recording + // state to Stopped + if rec_state.recording_state != RecordingState::Stopped { + let mut others_eos = true; + + // Check eos state of all secondary streams + self.other_streams.lock().0.iter().all(|s| { + if s == stream { + return true; + } + + let s = s.state.lock(); + if !s.eos { + others_eos = false; + } + others_eos + }); + + if others_eos { + gst_debug!( + CAT, + obj: pad, + "All streams are in EOS state, change state to Stopped" + ); + + rec_state.recording_state = RecordingState::Stopped; + return true; + } + } + + false + } + fn sink_chain( &self, pad: &gst::Pad, @@ -1079,12 +1137,17 @@ impl ToggleRecord { HandleResult::Flushing => { return Err(gst::FlowError::Flushing); } - HandleResult::Eos => { + HandleResult::Eos(recording_state_updated) => { stream.srcpad.push_event( gst::event::Eos::builder() .seqnum(stream.state.lock().segment_seqnum) .build(), ); + + if recording_state_updated { + element.notify("recording"); + } + return Err(gst::FlowError::Eos); } HandleResult::Pass(buffer) => { @@ -1178,6 +1241,7 @@ impl ToggleRecord { let mut forward = true; let mut send_pending = false; + let mut recording_state_changed = false; match event.view() { EventView::FlushStart(..) => { @@ -1279,14 +1343,26 @@ impl ToggleRecord { }; } EventView::Eos(..) => { - let _main_state = if stream != self.main_stream { + let main_state = if stream != self.main_stream { Some(self.main_stream.state.lock()) } else { None }; let mut state = stream.state.lock(); - state.eos = true; + + let main_is_eos = if let Some(main_state) = main_state { + main_state.eos + } else { + true + }; + + if main_is_eos { + let mut rec_state = self.state.lock(); + recording_state_changed = + self.check_and_update_eos(pad, &stream, &mut state, &mut rec_state); + } + self.main_stream_cond.notify_all(); gst_debug!( CAT, @@ -1299,6 +1375,10 @@ impl ToggleRecord { _ => (), }; + if recording_state_changed { + element.notify("recording"); + } + // If a serialized event and coming after Segment and a new Segment is pending, // queue up and send at a later time (buffer/gap) after we sent the Segment let type_ = event.get_type(); diff --git a/utils/togglerecord/tests/tests.rs b/utils/togglerecord/tests/tests.rs index 99dc20b0..75e9e16b 100644 --- a/utils/togglerecord/tests/tests.rs +++ b/utils/togglerecord/tests/tests.rs @@ -39,6 +39,7 @@ enum SendData { BuffersDelta(usize), Gaps(usize), Eos, + Terminate, } #[allow(clippy::type_complexity)] @@ -185,6 +186,10 @@ fn setup_sender_receiver( i += 1; } } + SendData::Terminate => { + let _ = sender_input_done.send(()); + return; + } } let _ = sender_input_done.send(()); @@ -201,9 +206,10 @@ fn recv_buffers( receiver_output: &mpsc::Receiver>, segment: &mut gst::FormattedSegment, wait_buffers: usize, -) -> Vec<(gst::ClockTime, gst::ClockTime, gst::ClockTime)> { +) -> (Vec<(gst::ClockTime, gst::ClockTime, gst::ClockTime)>, bool) { let mut res = Vec::new(); let mut n_buffers = 0; + let mut saw_eos = false; while let Ok(val) = receiver_output.recv() { match val { Left(buffer) => { @@ -214,7 +220,7 @@ fn recv_buffers( )); n_buffers += 1; if wait_buffers > 0 && n_buffers == wait_buffers { - return res; + return (res, saw_eos); } } Right(event) => { @@ -227,11 +233,12 @@ fn recv_buffers( res.push((segment.to_running_time(ts), ts, duration)); n_buffers += 1; if wait_buffers > 0 && n_buffers == wait_buffers { - return res; + return (res, saw_eos); } } EventView::Eos(..) => { - return res; + saw_eos = true; + return (res, saw_eos); } EventView::Segment(ref e) => { *segment = e.get_segment().clone().downcast().unwrap(); @@ -242,7 +249,7 @@ fn recv_buffers( } } - res + (res, saw_eos) } #[test] @@ -285,7 +292,7 @@ fn test_one_stream_open() { drop(sender_input); let mut segment = gst::FormattedSegment::::new(); - let buffers = recv_buffers(&receiver_output, &mut segment, 0); + let (buffers, _) = recv_buffers(&receiver_output, &mut segment, 0); assert_eq!(buffers.len(), 10); for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() { let index = index as u64; @@ -318,7 +325,7 @@ fn test_one_stream_gaps_open() { drop(sender_input); let mut segment = gst::FormattedSegment::::new(); - let buffers = recv_buffers(&receiver_output, &mut segment, 0); + let (buffers, _) = recv_buffers(&receiver_output, &mut segment, 0); assert_eq!(buffers.len(), 10); for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() { let index = index as u64; @@ -352,7 +359,7 @@ fn test_one_stream_close_open() { drop(sender_input); let mut segment = gst::FormattedSegment::::new(); - let buffers = recv_buffers(&receiver_output, &mut segment, 0); + let (buffers, _) = recv_buffers(&receiver_output, &mut segment, 0); assert_eq!(buffers.len(), 10); for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() { let index = index as u64; @@ -387,7 +394,7 @@ fn test_one_stream_open_close() { drop(sender_input); let mut segment = gst::FormattedSegment::::new(); - let buffers = recv_buffers(&receiver_output, &mut segment, 0); + let (buffers, _) = recv_buffers(&receiver_output, &mut segment, 0); assert_eq!(buffers.len(), 10); for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() { let index = index as u64; @@ -425,7 +432,7 @@ fn test_one_stream_open_close_open() { drop(sender_input); let mut segment = gst::FormattedSegment::::new(); - let buffers = recv_buffers(&receiver_output, &mut segment, 0); + let (buffers, _) = recv_buffers(&receiver_output, &mut segment, 0); assert_eq!(buffers.len(), 20); for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() { let pts_off = if index >= 10 { @@ -472,7 +479,7 @@ fn test_two_stream_open() { receiver_input_done_2.recv().unwrap(); let mut segment_1 = gst::FormattedSegment::::new(); - let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); + let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0); for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); @@ -483,7 +490,7 @@ fn test_two_stream_open() { // Last buffer should be dropped from second stream let mut segment_2 = gst::FormattedSegment::::new(); - let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); + let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0); for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); @@ -525,7 +532,7 @@ fn test_two_stream_open_shift() { receiver_input_done_2.recv().unwrap(); let mut segment_1 = gst::FormattedSegment::::new(); - let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); + let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0); for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); @@ -536,7 +543,7 @@ fn test_two_stream_open_shift() { // Second to last buffer should be clipped from second stream, last should be dropped let mut segment_2 = gst::FormattedSegment::::new(); - let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); + let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0); for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let index = index as u64; assert_eq!(running_time, 5 * gst::MSECOND + index * 20 * gst::MSECOND); @@ -583,7 +590,7 @@ fn test_two_stream_open_shift_main() { // PTS 5 maps to running time 0 now let mut segment_1 = gst::FormattedSegment::::new(); - let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); + let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0); for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); @@ -595,7 +602,7 @@ fn test_two_stream_open_shift_main() { // First and second last buffer should be clipped from second stream, // last buffer should be dropped let mut segment_2 = gst::FormattedSegment::::new(); - let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); + let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0); for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let index = index as u64; if index == 0 { @@ -663,7 +670,7 @@ fn test_two_stream_open_close() { receiver_input_done_2.recv().unwrap(); let mut segment_1 = gst::FormattedSegment::::new(); - let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); + let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0); for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); @@ -674,7 +681,7 @@ fn test_two_stream_open_close() { // Last buffer should be dropped from second stream let mut segment_2 = gst::FormattedSegment::::new(); - let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); + let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0); for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); @@ -732,7 +739,7 @@ fn test_two_stream_close_open() { receiver_input_done_2.recv().unwrap(); let mut segment_1 = gst::FormattedSegment::::new(); - let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); + let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0); for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); @@ -743,7 +750,7 @@ fn test_two_stream_close_open() { // Last buffer should be dropped from second stream let mut segment_2 = gst::FormattedSegment::::new(); - let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); + let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0); for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); @@ -814,7 +821,7 @@ fn test_two_stream_open_close_open() { receiver_input_done_2.recv().unwrap(); let mut segment_1 = gst::FormattedSegment::::new(); - let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); + let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0); for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let pts_off = if index >= 10 { 10 * 20 * gst::MSECOND @@ -831,7 +838,7 @@ fn test_two_stream_open_close_open() { // Last buffer should be dropped from second stream let mut segment_2 = gst::FormattedSegment::::new(); - let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); + let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0); for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let pts_off = if index >= 10 { 10 * 20 * gst::MSECOND @@ -914,7 +921,7 @@ fn test_two_stream_open_close_open_gaps() { receiver_input_done_2.recv().unwrap(); let mut segment_1 = gst::FormattedSegment::::new(); - let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); + let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0); for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let pts_off = if index >= 10 { 10 * 20 * gst::MSECOND @@ -931,7 +938,7 @@ fn test_two_stream_open_close_open_gaps() { // Last buffer should be dropped from second stream let mut segment_2 = gst::FormattedSegment::::new(); - let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); + let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0); for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let pts_off = if index >= 10 { 10 * 20 * gst::MSECOND @@ -1015,7 +1022,7 @@ fn test_two_stream_close_open_close_delta() { receiver_input_done_2.recv().unwrap(); let mut segment_1 = gst::FormattedSegment::::new(); - let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); + let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0); for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); @@ -1026,7 +1033,7 @@ fn test_two_stream_close_open_close_delta() { // Last buffer should be dropped from second stream let mut segment_2 = gst::FormattedSegment::::new(); - let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); + let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0); for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); @@ -1110,7 +1117,7 @@ fn test_three_stream_open_close_open() { receiver_input_done_3.recv().unwrap(); let mut segment_1 = gst::FormattedSegment::::new(); - let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); + let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0); for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let pts_off = if index >= 10 { 10 * 20 * gst::MSECOND @@ -1127,7 +1134,7 @@ fn test_three_stream_open_close_open() { // Last buffer should be dropped from second stream let mut segment_2 = gst::FormattedSegment::::new(); - let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); + let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0); for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let pts_off = if index >= 10 { 10 * 20 * gst::MSECOND @@ -1143,7 +1150,7 @@ fn test_three_stream_open_close_open() { assert_eq!(buffers_2.len(), 20); let mut segment_3 = gst::FormattedSegment::::new(); - let buffers_3 = recv_buffers(&receiver_output_3, &mut segment_3, 0); + let (buffers_3, _) = recv_buffers(&receiver_output_3, &mut segment_3, 0); for (index, &(running_time, pts, duration)) in buffers_3.iter().enumerate() { let pts_off = if index >= 10 { 10 * 20 * gst::MSECOND @@ -1164,3 +1171,482 @@ fn test_three_stream_open_close_open() { pipeline.set_state(gst::State::Null).unwrap(); } + +#[test] +fn test_two_stream_main_eos() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + + pipeline.set_state(gst::State::Playing).unwrap(); + + togglerecord.set_property("record", &true).unwrap(); + + // Send 10 buffers to main stream first + sender_input_1.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(9)).unwrap(); + receiver_input_done_1.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + + // And send EOS, at this moment, recording state should be still recording + // since running time of main stream is advanced than secondary stream + sender_input_1.send(SendData::Eos).unwrap(); + receiver_input_done_1.recv().unwrap(); + + let recording = togglerecord + .get_property("recording") + .unwrap() + .get_some::() + .unwrap(); + assert_eq!(recording, true); + + // Send 2 buffers to secondary stream. At this moment, main stream got eos + // already (after 10 buffers) and secondary stream got 2 buffers. + // it will make running time of secondary stream to be advanced than main + // stream and results in all-eos state even if we don't send EOS event + // explicitly + sender_input_2.send(SendData::Buffers(2)).unwrap(); + receiver_input_done_2.recv().unwrap(); + sender_input_2.send(SendData::Terminate).unwrap(); + receiver_input_done_2.recv().unwrap(); + + // At this moment, all streams should be in eos state. So togglerecord + // must be in stopped state + let recording = togglerecord + .get_property("recording") + .unwrap() + .get_some::() + .unwrap(); + assert_eq!(recording, false); + + let mut segment_1 = gst::FormattedSegment::::new(); + let (buffers_1, saw_eos) = recv_buffers(&receiver_output_1, &mut segment_1, 0); + for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_1.len(), 10); + assert_eq!(saw_eos, true); + + // Last buffer should be dropped from second stream + let mut segment_2 = gst::FormattedSegment::::new(); + let (buffers_2, saw_eos) = recv_buffers(&receiver_output_2, &mut segment_2, 0); + for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_2.len(), 10); + assert_eq!(saw_eos, true); + + thread_1.join().unwrap(); + thread_2.join().unwrap(); + + pipeline.set_state(gst::State::Null).unwrap(); +} + +#[test] +fn test_two_stream_secondary_eos_first() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + + pipeline.set_state(gst::State::Playing).unwrap(); + + togglerecord.set_property("record", &true).unwrap(); + + // Send 10 buffers to main stream first + sender_input_1.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(9)).unwrap(); + receiver_input_done_1.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + + // Send EOS to the second stream + sender_input_2.send(SendData::Eos).unwrap(); + receiver_input_done_2.recv().unwrap(); + + // Since main stream is not yet EOS state, we should be in recording state + let recording = togglerecord + .get_property("recording") + .unwrap() + .get_some::() + .unwrap(); + assert_eq!(recording, true); + + // And send EOS to the main stream then it will update state to Stopped + sender_input_1.send(SendData::Eos).unwrap(); + receiver_input_done_1.recv().unwrap(); + + let recording = togglerecord + .get_property("recording") + .unwrap() + .get_some::() + .unwrap(); + assert_eq!(recording, false); + + let mut segment_1 = gst::FormattedSegment::::new(); + let (buffers_1, saw_eos) = recv_buffers(&receiver_output_1, &mut segment_1, 0); + for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_1.len(), 10); + assert_eq!(saw_eos, true); + + // We sent 9 buffers to the second stream, and there should be no dropped + // buffer + let mut segment_2 = gst::FormattedSegment::::new(); + let (buffers_2, saw_eos) = recv_buffers(&receiver_output_2, &mut segment_2, 0); + for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_2.len(), 9); + assert_eq!(saw_eos, true); + + thread_1.join().unwrap(); + thread_2.join().unwrap(); + + pipeline.set_state(gst::State::Null).unwrap(); +} + +#[test] +fn test_three_stream_main_eos() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) = + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + + pipeline.set_state(gst::State::Playing).unwrap(); + + togglerecord.set_property("record", &true).unwrap(); + + sender_input_1.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(9)).unwrap(); + sender_input_3.send(SendData::Buffers(9)).unwrap(); + receiver_input_done_1.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + receiver_input_done_3.recv().unwrap(); + + // And send EOS, at this moment, recording state should be still recording + // since running time of the second stream is not in eos state + sender_input_1.send(SendData::Eos).unwrap(); + receiver_input_done_1.recv().unwrap(); + + let recording = togglerecord + .get_property("recording") + .unwrap() + .get_some::() + .unwrap(); + assert_eq!(recording, true); + + // Send 2 buffers to non-main streams. At this moment, main stream got EOS + // already (after 10 buffers) and the other streams got 9 buffers. + // So those 2 additional buffers to the non-main streams will make running + // times of those streams to be advanced than main stream. + // It will result in all-eos state even if we don't send EOS event + // to the non-main streams explicitly + sender_input_2.send(SendData::Buffers(2)).unwrap(); + receiver_input_done_2.recv().unwrap(); + sender_input_2.send(SendData::Terminate).unwrap(); + receiver_input_done_2.recv().unwrap(); + + // The third stream is not in EOS state yet, so still recording == true + let recording = togglerecord + .get_property("recording") + .unwrap() + .get_some::() + .unwrap(); + assert_eq!(recording, true); + + // And terminate the third thread without EOS + sender_input_3.send(SendData::Buffers(2)).unwrap(); + receiver_input_done_3.recv().unwrap(); + sender_input_3.send(SendData::Terminate).unwrap(); + receiver_input_done_3.recv().unwrap(); + + // At this moment, all streams should be in eos state. So togglerecord + // must be in stopped state + let recording = togglerecord + .get_property("recording") + .unwrap() + .get_some::() + .unwrap(); + assert_eq!(recording, false); + + let mut segment_1 = gst::FormattedSegment::::new(); + let (buffers_1, saw_eos) = recv_buffers(&receiver_output_1, &mut segment_1, 0); + for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_1.len(), 10); + assert_eq!(saw_eos, true); + + // Last buffer should be dropped from non-main streams + let mut segment_2 = gst::FormattedSegment::::new(); + let (buffers_2, saw_eos) = recv_buffers(&receiver_output_2, &mut segment_2, 0); + for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_2.len(), 10); + assert_eq!(saw_eos, true); + + let mut segment_3 = gst::FormattedSegment::::new(); + let (buffers_3, saw_eos) = recv_buffers(&receiver_output_3, &mut segment_3, 0); + for (index, &(running_time, pts, duration)) in buffers_3.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_3.len(), 10); + assert_eq!(saw_eos, true); + + thread_1.join().unwrap(); + thread_2.join().unwrap(); + thread_3.join().unwrap(); + + pipeline.set_state(gst::State::Null).unwrap(); +} + +#[test] +fn test_three_stream_main_and_second_eos() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) = + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + + pipeline.set_state(gst::State::Playing).unwrap(); + + togglerecord.set_property("record", &true).unwrap(); + + sender_input_1.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(9)).unwrap(); + sender_input_3.send(SendData::Buffers(9)).unwrap(); + receiver_input_done_1.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + receiver_input_done_3.recv().unwrap(); + + // And send EOS, at this moment, recording state should be still recording + // since running time of the third stream is not in eos state + sender_input_1.send(SendData::Eos).unwrap(); + receiver_input_done_1.recv().unwrap(); + + let recording = togglerecord + .get_property("recording") + .unwrap() + .get_some::() + .unwrap(); + assert_eq!(recording, true); + + // And send EOS to the second stream, but state shouldn't be affected by + // this EOS. The third stream is still not in EOS state + sender_input_2.send(SendData::Eos).unwrap(); + receiver_input_done_2.recv().unwrap(); + + let recording = togglerecord + .get_property("recording") + .unwrap() + .get_some::() + .unwrap(); + assert_eq!(recording, true); + + // Send 2 buffers to the third stream. At this moment, main stream and + // the second stream got EOS already (after 10 buffers) and the third stream + // got 9 buffers. + // So those 2 additional buffers to the third streams will make running + // time of the stream to be advanced than main stream. + // It will result in all-eos state even if we don't send EOS event + // to the third stream explicitly + sender_input_3.send(SendData::Buffers(2)).unwrap(); + receiver_input_done_3.recv().unwrap(); + sender_input_3.send(SendData::Terminate).unwrap(); + receiver_input_done_3.recv().unwrap(); + + // At this moment, all streams should be in eos state. So togglerecord + // must be in stopped state + let recording = togglerecord + .get_property("recording") + .unwrap() + .get_some::() + .unwrap(); + assert_eq!(recording, false); + + let mut segment_1 = gst::FormattedSegment::::new(); + let (buffers_1, saw_eos) = recv_buffers(&receiver_output_1, &mut segment_1, 0); + for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_1.len(), 10); + assert_eq!(saw_eos, true); + + // We sent 9 buffers to the second stream, and there must be no dropped one + let mut segment_2 = gst::FormattedSegment::::new(); + let (buffers_2, saw_eos) = recv_buffers(&receiver_output_2, &mut segment_2, 0); + for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_2.len(), 9); + assert_eq!(saw_eos, true); + + // Last buffer should be dropped from the third stream + let mut segment_3 = gst::FormattedSegment::::new(); + let (buffers_3, saw_eos) = recv_buffers(&receiver_output_3, &mut segment_3, 0); + for (index, &(running_time, pts, duration)) in buffers_3.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_3.len(), 10); + assert_eq!(saw_eos, true); + + thread_1.join().unwrap(); + thread_2.join().unwrap(); + thread_3.join().unwrap(); + + pipeline.set_state(gst::State::Null).unwrap(); +} + +#[test] +fn test_three_stream_secondary_eos_first() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) = + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + + pipeline.set_state(gst::State::Playing).unwrap(); + + togglerecord.set_property("record", &true).unwrap(); + + sender_input_1.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(9)).unwrap(); + sender_input_3.send(SendData::Buffers(9)).unwrap(); + receiver_input_done_1.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + receiver_input_done_3.recv().unwrap(); + + // Send EOS to non-main streams + sender_input_2.send(SendData::Eos).unwrap(); + receiver_input_done_2.recv().unwrap(); + + sender_input_3.send(SendData::Eos).unwrap(); + receiver_input_done_3.recv().unwrap(); + + // Since main stream is not yet EOS state, we should be in recording state + let recording = togglerecord + .get_property("recording") + .unwrap() + .get_some::() + .unwrap(); + assert_eq!(recording, true); + + // And send EOS, Send EOS to the main stream then it will update state to + // Stopped + sender_input_1.send(SendData::Eos).unwrap(); + receiver_input_done_1.recv().unwrap(); + + let recording = togglerecord + .get_property("recording") + .unwrap() + .get_some::() + .unwrap(); + assert_eq!(recording, false); + + let mut segment_1 = gst::FormattedSegment::::new(); + let (buffers_1, saw_eos) = recv_buffers(&receiver_output_1, &mut segment_1, 0); + for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_1.len(), 10); + assert_eq!(saw_eos, true); + + // Last buffer should be dropped from non-main streams + let mut segment_2 = gst::FormattedSegment::::new(); + let (buffers_2, saw_eos) = recv_buffers(&receiver_output_2, &mut segment_2, 0); + for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_2.len(), 9); + assert_eq!(saw_eos, true); + + let mut segment_3 = gst::FormattedSegment::::new(); + let (buffers_3, saw_eos) = recv_buffers(&receiver_output_3, &mut segment_3, 0); + for (index, &(running_time, pts, duration)) in buffers_3.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_3.len(), 9); + assert_eq!(saw_eos, true); + + thread_1.join().unwrap(); + thread_2.join().unwrap(); + thread_3.join().unwrap(); + + pipeline.set_state(gst::State::Null).unwrap(); +}