togglerecord: Update recording state on EOS

If all input streams are in EOS state, update recording state
to Stopped and notify the change as well
This commit is contained in:
Seungha Yang 2021-01-07 03:59:44 +09:00
parent 9d8fe31a5b
commit 2b7cebb02a
2 changed files with 602 additions and 36 deletions

View file

@ -175,7 +175,7 @@ impl Default for State {
enum HandleResult<T> {
Pass(T),
Drop,
Eos,
Eos(bool),
Flushing,
}
@ -760,7 +760,12 @@ impl ToggleRecord {
// If we have no start or stop position (we never recorded) then we're EOS too now
if rec_state.last_recording_stop.is_none() || rec_state.last_recording_start.is_none() {
gst_debug!(CAT, obj: pad, "Main stream EOS and recording never started",);
return Ok(HandleResult::Eos);
return Ok(HandleResult::Eos(self.check_and_update_eos(
pad,
stream,
&mut state,
&mut rec_state,
)));
} else if data.can_clip(&*state)
&& current_running_time < rec_state.last_recording_start
&& current_running_time_end > rec_state.last_recording_start
@ -849,7 +854,12 @@ impl ToggleRecord {
return Ok(HandleResult::Pass(data));
} else {
gst_warning!(CAT, obj: pad, "Complete buffer clipped!");
return Ok(HandleResult::Eos);
return Ok(HandleResult::Eos(self.check_and_update_eos(
pad,
stream,
&mut state,
&mut rec_state,
)));
}
} else if current_running_time_end > rec_state.last_recording_stop {
// Otherwise if the end of the buffer is after the recording stop, we're EOS
@ -862,7 +872,12 @@ impl ToggleRecord {
current_running_time_end,
rec_state.last_recording_stop
);
return Ok(HandleResult::Eos);
return Ok(HandleResult::Eos(self.check_and_update_eos(
pad,
stream,
&mut state,
&mut rec_state,
)));
} else {
// In all other cases the buffer is fully between recording start and end and
// can be passed through as is
@ -1039,6 +1054,49 @@ impl ToggleRecord {
}
}
// should be called only if main stream is in eos state
fn check_and_update_eos(
&self,
pad: &gst::Pad,
stream: &Stream,
stream_state: &mut StreamState,
rec_state: &mut State,
) -> bool {
stream_state.eos = true;
// Check whether all secondary streams are in eos. If so, update recording
// state to Stopped
if rec_state.recording_state != RecordingState::Stopped {
let mut others_eos = true;
// Check eos state of all secondary streams
self.other_streams.lock().0.iter().all(|s| {
if s == stream {
return true;
}
let s = s.state.lock();
if !s.eos {
others_eos = false;
}
others_eos
});
if others_eos {
gst_debug!(
CAT,
obj: pad,
"All streams are in EOS state, change state to Stopped"
);
rec_state.recording_state = RecordingState::Stopped;
return true;
}
}
false
}
fn sink_chain(
&self,
pad: &gst::Pad,
@ -1079,12 +1137,17 @@ impl ToggleRecord {
HandleResult::Flushing => {
return Err(gst::FlowError::Flushing);
}
HandleResult::Eos => {
HandleResult::Eos(recording_state_updated) => {
stream.srcpad.push_event(
gst::event::Eos::builder()
.seqnum(stream.state.lock().segment_seqnum)
.build(),
);
if recording_state_updated {
element.notify("recording");
}
return Err(gst::FlowError::Eos);
}
HandleResult::Pass(buffer) => {
@ -1178,6 +1241,7 @@ impl ToggleRecord {
let mut forward = true;
let mut send_pending = false;
let mut recording_state_changed = false;
match event.view() {
EventView::FlushStart(..) => {
@ -1279,14 +1343,26 @@ impl ToggleRecord {
};
}
EventView::Eos(..) => {
let _main_state = if stream != self.main_stream {
let main_state = if stream != self.main_stream {
Some(self.main_stream.state.lock())
} else {
None
};
let mut state = stream.state.lock();
state.eos = true;
let main_is_eos = if let Some(main_state) = main_state {
main_state.eos
} else {
true
};
if main_is_eos {
let mut rec_state = self.state.lock();
recording_state_changed =
self.check_and_update_eos(pad, &stream, &mut state, &mut rec_state);
}
self.main_stream_cond.notify_all();
gst_debug!(
CAT,
@ -1299,6 +1375,10 @@ impl ToggleRecord {
_ => (),
};
if recording_state_changed {
element.notify("recording");
}
// If a serialized event and coming after Segment and a new Segment is pending,
// queue up and send at a later time (buffer/gap) after we sent the Segment
let type_ = event.get_type();

View file

@ -39,6 +39,7 @@ enum SendData {
BuffersDelta(usize),
Gaps(usize),
Eos,
Terminate,
}
#[allow(clippy::type_complexity)]
@ -185,6 +186,10 @@ fn setup_sender_receiver(
i += 1;
}
}
SendData::Terminate => {
let _ = sender_input_done.send(());
return;
}
}
let _ = sender_input_done.send(());
@ -201,9 +206,10 @@ fn recv_buffers(
receiver_output: &mpsc::Receiver<Either<gst::Buffer, gst::Event>>,
segment: &mut gst::FormattedSegment<gst::ClockTime>,
wait_buffers: usize,
) -> Vec<(gst::ClockTime, gst::ClockTime, gst::ClockTime)> {
) -> (Vec<(gst::ClockTime, gst::ClockTime, gst::ClockTime)>, bool) {
let mut res = Vec::new();
let mut n_buffers = 0;
let mut saw_eos = false;
while let Ok(val) = receiver_output.recv() {
match val {
Left(buffer) => {
@ -214,7 +220,7 @@ fn recv_buffers(
));
n_buffers += 1;
if wait_buffers > 0 && n_buffers == wait_buffers {
return res;
return (res, saw_eos);
}
}
Right(event) => {
@ -227,11 +233,12 @@ fn recv_buffers(
res.push((segment.to_running_time(ts), ts, duration));
n_buffers += 1;
if wait_buffers > 0 && n_buffers == wait_buffers {
return res;
return (res, saw_eos);
}
}
EventView::Eos(..) => {
return res;
saw_eos = true;
return (res, saw_eos);
}
EventView::Segment(ref e) => {
*segment = e.get_segment().clone().downcast().unwrap();
@ -242,7 +249,7 @@ fn recv_buffers(
}
}
res
(res, saw_eos)
}
#[test]
@ -285,7 +292,7 @@ fn test_one_stream_open() {
drop(sender_input);
let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers = recv_buffers(&receiver_output, &mut segment, 0);
let (buffers, _) = recv_buffers(&receiver_output, &mut segment, 0);
assert_eq!(buffers.len(), 10);
for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
let index = index as u64;
@ -318,7 +325,7 @@ fn test_one_stream_gaps_open() {
drop(sender_input);
let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers = recv_buffers(&receiver_output, &mut segment, 0);
let (buffers, _) = recv_buffers(&receiver_output, &mut segment, 0);
assert_eq!(buffers.len(), 10);
for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
let index = index as u64;
@ -352,7 +359,7 @@ fn test_one_stream_close_open() {
drop(sender_input);
let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers = recv_buffers(&receiver_output, &mut segment, 0);
let (buffers, _) = recv_buffers(&receiver_output, &mut segment, 0);
assert_eq!(buffers.len(), 10);
for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
let index = index as u64;
@ -387,7 +394,7 @@ fn test_one_stream_open_close() {
drop(sender_input);
let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers = recv_buffers(&receiver_output, &mut segment, 0);
let (buffers, _) = recv_buffers(&receiver_output, &mut segment, 0);
assert_eq!(buffers.len(), 10);
for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
let index = index as u64;
@ -425,7 +432,7 @@ fn test_one_stream_open_close_open() {
drop(sender_input);
let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers = recv_buffers(&receiver_output, &mut segment, 0);
let (buffers, _) = recv_buffers(&receiver_output, &mut segment, 0);
assert_eq!(buffers.len(), 20);
for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
let pts_off = if index >= 10 {
@ -472,7 +479,7 @@ fn test_two_stream_open() {
receiver_input_done_2.recv().unwrap();
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
@ -483,7 +490,7 @@ fn test_two_stream_open() {
// Last buffer should be dropped from second stream
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
@ -525,7 +532,7 @@ fn test_two_stream_open_shift() {
receiver_input_done_2.recv().unwrap();
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
@ -536,7 +543,7 @@ fn test_two_stream_open_shift() {
// Second to last buffer should be clipped from second stream, last should be dropped
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, 5 * gst::MSECOND + index * 20 * gst::MSECOND);
@ -583,7 +590,7 @@ fn test_two_stream_open_shift_main() {
// PTS 5 maps to running time 0 now
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
@ -595,7 +602,7 @@ fn test_two_stream_open_shift_main() {
// First and second last buffer should be clipped from second stream,
// last buffer should be dropped
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
if index == 0 {
@ -663,7 +670,7 @@ fn test_two_stream_open_close() {
receiver_input_done_2.recv().unwrap();
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
@ -674,7 +681,7 @@ fn test_two_stream_open_close() {
// Last buffer should be dropped from second stream
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
@ -732,7 +739,7 @@ fn test_two_stream_close_open() {
receiver_input_done_2.recv().unwrap();
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
@ -743,7 +750,7 @@ fn test_two_stream_close_open() {
// Last buffer should be dropped from second stream
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
@ -814,7 +821,7 @@ fn test_two_stream_open_close_open() {
receiver_input_done_2.recv().unwrap();
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
@ -831,7 +838,7 @@ fn test_two_stream_open_close_open() {
// Last buffer should be dropped from second stream
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
@ -914,7 +921,7 @@ fn test_two_stream_open_close_open_gaps() {
receiver_input_done_2.recv().unwrap();
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
@ -931,7 +938,7 @@ fn test_two_stream_open_close_open_gaps() {
// Last buffer should be dropped from second stream
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
@ -1015,7 +1022,7 @@ fn test_two_stream_close_open_close_delta() {
receiver_input_done_2.recv().unwrap();
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
@ -1026,7 +1033,7 @@ fn test_two_stream_close_open_close_delta() {
// Last buffer should be dropped from second stream
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
@ -1110,7 +1117,7 @@ fn test_three_stream_open_close_open() {
receiver_input_done_3.recv().unwrap();
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
@ -1127,7 +1134,7 @@ fn test_three_stream_open_close_open() {
// Last buffer should be dropped from second stream
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
@ -1143,7 +1150,7 @@ fn test_three_stream_open_close_open() {
assert_eq!(buffers_2.len(), 20);
let mut segment_3 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_3 = recv_buffers(&receiver_output_3, &mut segment_3, 0);
let (buffers_3, _) = recv_buffers(&receiver_output_3, &mut segment_3, 0);
for (index, &(running_time, pts, duration)) in buffers_3.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
@ -1164,3 +1171,482 @@ fn test_three_stream_open_close_open() {
pipeline.set_state(gst::State::Null).unwrap();
}
#[test]
fn test_two_stream_main_eos() {
init();
let pipeline = gst::Pipeline::new(None);
let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
pipeline.set_state(gst::State::Playing).unwrap();
togglerecord.set_property("record", &true).unwrap();
// Send 10 buffers to main stream first
sender_input_1.send(SendData::Buffers(10)).unwrap();
sender_input_2.send(SendData::Buffers(9)).unwrap();
receiver_input_done_1.recv().unwrap();
receiver_input_done_2.recv().unwrap();
// And send EOS, at this moment, recording state should be still recording
// since running time of main stream is advanced than secondary stream
sender_input_1.send(SendData::Eos).unwrap();
receiver_input_done_1.recv().unwrap();
let recording = togglerecord
.get_property("recording")
.unwrap()
.get_some::<bool>()
.unwrap();
assert_eq!(recording, true);
// Send 2 buffers to secondary stream. At this moment, main stream got eos
// already (after 10 buffers) and secondary stream got 2 buffers.
// it will make running time of secondary stream to be advanced than main
// stream and results in all-eos state even if we don't send EOS event
// explicitly
sender_input_2.send(SendData::Buffers(2)).unwrap();
receiver_input_done_2.recv().unwrap();
sender_input_2.send(SendData::Terminate).unwrap();
receiver_input_done_2.recv().unwrap();
// At this moment, all streams should be in eos state. So togglerecord
// must be in stopped state
let recording = togglerecord
.get_property("recording")
.unwrap()
.get_some::<bool>()
.unwrap();
assert_eq!(recording, false);
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let (buffers_1, saw_eos) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_1.len(), 10);
assert_eq!(saw_eos, true);
// Last buffer should be dropped from second stream
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let (buffers_2, saw_eos) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_2.len(), 10);
assert_eq!(saw_eos, true);
thread_1.join().unwrap();
thread_2.join().unwrap();
pipeline.set_state(gst::State::Null).unwrap();
}
#[test]
fn test_two_stream_secondary_eos_first() {
init();
let pipeline = gst::Pipeline::new(None);
let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
pipeline.set_state(gst::State::Playing).unwrap();
togglerecord.set_property("record", &true).unwrap();
// Send 10 buffers to main stream first
sender_input_1.send(SendData::Buffers(10)).unwrap();
sender_input_2.send(SendData::Buffers(9)).unwrap();
receiver_input_done_1.recv().unwrap();
receiver_input_done_2.recv().unwrap();
// Send EOS to the second stream
sender_input_2.send(SendData::Eos).unwrap();
receiver_input_done_2.recv().unwrap();
// Since main stream is not yet EOS state, we should be in recording state
let recording = togglerecord
.get_property("recording")
.unwrap()
.get_some::<bool>()
.unwrap();
assert_eq!(recording, true);
// And send EOS to the main stream then it will update state to Stopped
sender_input_1.send(SendData::Eos).unwrap();
receiver_input_done_1.recv().unwrap();
let recording = togglerecord
.get_property("recording")
.unwrap()
.get_some::<bool>()
.unwrap();
assert_eq!(recording, false);
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let (buffers_1, saw_eos) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_1.len(), 10);
assert_eq!(saw_eos, true);
// We sent 9 buffers to the second stream, and there should be no dropped
// buffer
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let (buffers_2, saw_eos) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_2.len(), 9);
assert_eq!(saw_eos, true);
thread_1.join().unwrap();
thread_2.join().unwrap();
pipeline.set_state(gst::State::Null).unwrap();
}
#[test]
fn test_three_stream_main_eos() {
init();
let pipeline = gst::Pipeline::new(None);
let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
pipeline.set_state(gst::State::Playing).unwrap();
togglerecord.set_property("record", &true).unwrap();
sender_input_1.send(SendData::Buffers(10)).unwrap();
sender_input_2.send(SendData::Buffers(9)).unwrap();
sender_input_3.send(SendData::Buffers(9)).unwrap();
receiver_input_done_1.recv().unwrap();
receiver_input_done_2.recv().unwrap();
receiver_input_done_3.recv().unwrap();
// And send EOS, at this moment, recording state should be still recording
// since running time of the second stream is not in eos state
sender_input_1.send(SendData::Eos).unwrap();
receiver_input_done_1.recv().unwrap();
let recording = togglerecord
.get_property("recording")
.unwrap()
.get_some::<bool>()
.unwrap();
assert_eq!(recording, true);
// Send 2 buffers to non-main streams. At this moment, main stream got EOS
// already (after 10 buffers) and the other streams got 9 buffers.
// So those 2 additional buffers to the non-main streams will make running
// times of those streams to be advanced than main stream.
// It will result in all-eos state even if we don't send EOS event
// to the non-main streams explicitly
sender_input_2.send(SendData::Buffers(2)).unwrap();
receiver_input_done_2.recv().unwrap();
sender_input_2.send(SendData::Terminate).unwrap();
receiver_input_done_2.recv().unwrap();
// The third stream is not in EOS state yet, so still recording == true
let recording = togglerecord
.get_property("recording")
.unwrap()
.get_some::<bool>()
.unwrap();
assert_eq!(recording, true);
// And terminate the third thread without EOS
sender_input_3.send(SendData::Buffers(2)).unwrap();
receiver_input_done_3.recv().unwrap();
sender_input_3.send(SendData::Terminate).unwrap();
receiver_input_done_3.recv().unwrap();
// At this moment, all streams should be in eos state. So togglerecord
// must be in stopped state
let recording = togglerecord
.get_property("recording")
.unwrap()
.get_some::<bool>()
.unwrap();
assert_eq!(recording, false);
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let (buffers_1, saw_eos) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_1.len(), 10);
assert_eq!(saw_eos, true);
// Last buffer should be dropped from non-main streams
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let (buffers_2, saw_eos) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_2.len(), 10);
assert_eq!(saw_eos, true);
let mut segment_3 = gst::FormattedSegment::<gst::ClockTime>::new();
let (buffers_3, saw_eos) = recv_buffers(&receiver_output_3, &mut segment_3, 0);
for (index, &(running_time, pts, duration)) in buffers_3.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_3.len(), 10);
assert_eq!(saw_eos, true);
thread_1.join().unwrap();
thread_2.join().unwrap();
thread_3.join().unwrap();
pipeline.set_state(gst::State::Null).unwrap();
}
#[test]
fn test_three_stream_main_and_second_eos() {
init();
let pipeline = gst::Pipeline::new(None);
let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
pipeline.set_state(gst::State::Playing).unwrap();
togglerecord.set_property("record", &true).unwrap();
sender_input_1.send(SendData::Buffers(10)).unwrap();
sender_input_2.send(SendData::Buffers(9)).unwrap();
sender_input_3.send(SendData::Buffers(9)).unwrap();
receiver_input_done_1.recv().unwrap();
receiver_input_done_2.recv().unwrap();
receiver_input_done_3.recv().unwrap();
// And send EOS, at this moment, recording state should be still recording
// since running time of the third stream is not in eos state
sender_input_1.send(SendData::Eos).unwrap();
receiver_input_done_1.recv().unwrap();
let recording = togglerecord
.get_property("recording")
.unwrap()
.get_some::<bool>()
.unwrap();
assert_eq!(recording, true);
// And send EOS to the second stream, but state shouldn't be affected by
// this EOS. The third stream is still not in EOS state
sender_input_2.send(SendData::Eos).unwrap();
receiver_input_done_2.recv().unwrap();
let recording = togglerecord
.get_property("recording")
.unwrap()
.get_some::<bool>()
.unwrap();
assert_eq!(recording, true);
// Send 2 buffers to the third stream. At this moment, main stream and
// the second stream got EOS already (after 10 buffers) and the third stream
// got 9 buffers.
// So those 2 additional buffers to the third streams will make running
// time of the stream to be advanced than main stream.
// It will result in all-eos state even if we don't send EOS event
// to the third stream explicitly
sender_input_3.send(SendData::Buffers(2)).unwrap();
receiver_input_done_3.recv().unwrap();
sender_input_3.send(SendData::Terminate).unwrap();
receiver_input_done_3.recv().unwrap();
// At this moment, all streams should be in eos state. So togglerecord
// must be in stopped state
let recording = togglerecord
.get_property("recording")
.unwrap()
.get_some::<bool>()
.unwrap();
assert_eq!(recording, false);
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let (buffers_1, saw_eos) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_1.len(), 10);
assert_eq!(saw_eos, true);
// We sent 9 buffers to the second stream, and there must be no dropped one
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let (buffers_2, saw_eos) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_2.len(), 9);
assert_eq!(saw_eos, true);
// Last buffer should be dropped from the third stream
let mut segment_3 = gst::FormattedSegment::<gst::ClockTime>::new();
let (buffers_3, saw_eos) = recv_buffers(&receiver_output_3, &mut segment_3, 0);
for (index, &(running_time, pts, duration)) in buffers_3.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_3.len(), 10);
assert_eq!(saw_eos, true);
thread_1.join().unwrap();
thread_2.join().unwrap();
thread_3.join().unwrap();
pipeline.set_state(gst::State::Null).unwrap();
}
#[test]
fn test_three_stream_secondary_eos_first() {
init();
let pipeline = gst::Pipeline::new(None);
let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
pipeline.set_state(gst::State::Playing).unwrap();
togglerecord.set_property("record", &true).unwrap();
sender_input_1.send(SendData::Buffers(10)).unwrap();
sender_input_2.send(SendData::Buffers(9)).unwrap();
sender_input_3.send(SendData::Buffers(9)).unwrap();
receiver_input_done_1.recv().unwrap();
receiver_input_done_2.recv().unwrap();
receiver_input_done_3.recv().unwrap();
// Send EOS to non-main streams
sender_input_2.send(SendData::Eos).unwrap();
receiver_input_done_2.recv().unwrap();
sender_input_3.send(SendData::Eos).unwrap();
receiver_input_done_3.recv().unwrap();
// Since main stream is not yet EOS state, we should be in recording state
let recording = togglerecord
.get_property("recording")
.unwrap()
.get_some::<bool>()
.unwrap();
assert_eq!(recording, true);
// And send EOS, Send EOS to the main stream then it will update state to
// Stopped
sender_input_1.send(SendData::Eos).unwrap();
receiver_input_done_1.recv().unwrap();
let recording = togglerecord
.get_property("recording")
.unwrap()
.get_some::<bool>()
.unwrap();
assert_eq!(recording, false);
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let (buffers_1, saw_eos) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_1.len(), 10);
assert_eq!(saw_eos, true);
// Last buffer should be dropped from non-main streams
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let (buffers_2, saw_eos) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_2.len(), 9);
assert_eq!(saw_eos, true);
let mut segment_3 = gst::FormattedSegment::<gst::ClockTime>::new();
let (buffers_3, saw_eos) = recv_buffers(&receiver_output_3, &mut segment_3, 0);
for (index, &(running_time, pts, duration)) in buffers_3.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_3.len(), 9);
assert_eq!(saw_eos, true);
thread_1.join().unwrap();
thread_2.join().unwrap();
thread_3.join().unwrap();
pipeline.set_state(gst::State::Null).unwrap();
}