gst-plugins-rs/utils/togglerecord/tests/tests.rs
Sebastian Dröge 98b618cc9d utils/togglerecord: Fix timestamp tracking logic for partially overlapping timestamps
And various other cases. Also adjust one of the tests accordingly and
improve assertions to print more information about internal
inconsistencies.
2020-08-06 11:56:59 +00:00

1171 lines
44 KiB
Rust

// Copyright (C) 2017 Sebastian Dröge <sebastian@centricular.com>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.
extern crate glib;
use glib::prelude::*;
extern crate gstreamer as gst;
use gst::prelude::*;
extern crate either;
use either::*;
use std::sync::{mpsc, Mutex};
use std::thread;
extern crate gsttogglerecord;
fn init() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
gst::init().unwrap();
gsttogglerecord::plugin_register_static().expect("gsttogglerecord tests");
});
}
enum SendData {
Buffers(usize),
BuffersDelta(usize),
Gaps(usize),
Eos,
}
#[allow(clippy::type_complexity)]
fn setup_sender_receiver(
pipeline: &gst::Pipeline,
togglerecord: &gst::Element,
pad: &str,
offset: gst::ClockTime,
) -> (
mpsc::Sender<SendData>,
mpsc::Receiver<()>,
mpsc::Receiver<Either<gst::Buffer, gst::Event>>,
thread::JoinHandle<()>,
) {
let fakesink = gst::ElementFactory::make("fakesink", None).unwrap();
fakesink.set_property("async", &false).unwrap();
pipeline.add(&fakesink).unwrap();
let main_stream = pad == "src";
let (srcpad, sinkpad) = if main_stream {
(
togglerecord.get_static_pad("src").unwrap(),
togglerecord.get_static_pad("sink").unwrap(),
)
} else {
let sinkpad = togglerecord.get_request_pad("sink_%u").unwrap();
let srcpad = sinkpad.iterate_internal_links().next().unwrap().unwrap();
(srcpad, sinkpad)
};
let fakesink_sinkpad = fakesink.get_static_pad("sink").unwrap();
srcpad.link(&fakesink_sinkpad).unwrap();
let (sender_output, receiver_output) = mpsc::channel::<Either<gst::Buffer, gst::Event>>();
let sender_output = Mutex::new(sender_output);
srcpad.add_probe(
gst::PadProbeType::BUFFER | gst::PadProbeType::EVENT_DOWNSTREAM,
move |_, ref probe_info| {
match probe_info.data {
Some(gst::PadProbeData::Buffer(ref buffer)) => {
sender_output
.lock()
.unwrap()
.send(Left(buffer.clone()))
.unwrap();
}
Some(gst::PadProbeData::Event(ref event)) => {
sender_output
.lock()
.unwrap()
.send(Right(event.clone()))
.unwrap();
}
_ => {
unreachable!();
}
}
gst::PadProbeReturn::Ok
},
);
let (sender_input, receiver_input) = mpsc::channel::<SendData>();
let (sender_input_done, receiver_input_done) = mpsc::channel::<()>();
let thread = thread::spawn(move || {
let mut i = 0;
let mut first = true;
while let Ok(send_data) = receiver_input.recv() {
if first {
assert!(sinkpad.send_event(gst::event::StreamStart::new("test")));
let caps = if main_stream {
gst::Caps::builder("video/x-raw")
.field("format", &"ARGB")
.field("width", &320i32)
.field("height", &240i32)
.field("framerate", &gst::Fraction::new(50, 1))
.build()
} else {
gst::Caps::builder("audio/x-raw")
.field("format", &"U8")
.field("layout", &"interleaved")
.field("rate", &8000i32)
.field("channels", &1i32)
.build()
};
assert!(sinkpad.send_event(gst::event::Caps::new(&caps)));
let segment = gst::FormattedSegment::<gst::ClockTime>::new();
assert!(sinkpad.send_event(gst::event::Segment::new(&segment)));
let mut tags = gst::TagList::new();
tags.get_mut()
.unwrap()
.add::<gst::tags::Title>(&"some title", gst::TagMergeMode::Append);
assert!(sinkpad.send_event(gst::event::Tag::new(tags)));
first = false;
}
let buffer = if main_stream {
gst::Buffer::with_size(320 * 240 * 4).unwrap()
} else {
gst::Buffer::with_size(160).unwrap()
};
match send_data {
SendData::Eos => {
break;
}
SendData::Buffers(n) => {
for _ in 0..n {
let mut buffer = buffer.clone();
{
let buffer = buffer.make_mut();
buffer.set_pts(offset + i * 20 * gst::MSECOND);
buffer.set_duration(20 * gst::MSECOND);
}
let _ = sinkpad.chain(buffer);
i += 1;
}
}
SendData::BuffersDelta(n) => {
for _ in 0..n {
let mut buffer = gst::Buffer::new();
buffer
.get_mut()
.unwrap()
.set_pts(offset + i * 20 * gst::MSECOND);
buffer.get_mut().unwrap().set_duration(20 * gst::MSECOND);
buffer
.get_mut()
.unwrap()
.set_flags(gst::BufferFlags::DELTA_UNIT);
let _ = sinkpad.chain(buffer);
i += 1;
}
}
SendData::Gaps(n) => {
for _ in 0..n {
let event =
gst::event::Gap::new(offset + i * 20 * gst::MSECOND, 20 * gst::MSECOND);
let _ = sinkpad.send_event(event);
i += 1;
}
}
}
let _ = sender_input_done.send(());
}
let _ = sinkpad.send_event(gst::event::Eos::new());
let _ = sender_input_done.send(());
});
(sender_input, receiver_input_done, receiver_output, thread)
}
fn recv_buffers(
receiver_output: &mpsc::Receiver<Either<gst::Buffer, gst::Event>>,
segment: &mut gst::FormattedSegment<gst::ClockTime>,
wait_buffers: usize,
) -> Vec<(gst::ClockTime, gst::ClockTime, gst::ClockTime)> {
let mut res = Vec::new();
let mut n_buffers = 0;
while let Ok(val) = receiver_output.recv() {
match val {
Left(buffer) => {
res.push((
segment.to_running_time(buffer.get_pts()),
buffer.get_pts(),
buffer.get_duration(),
));
n_buffers += 1;
if wait_buffers > 0 && n_buffers == wait_buffers {
return res;
}
}
Right(event) => {
use gst::EventView;
match event.view() {
EventView::Gap(ref e) => {
let (ts, duration) = e.get();
res.push((segment.to_running_time(ts), ts, duration));
n_buffers += 1;
if wait_buffers > 0 && n_buffers == wait_buffers {
return res;
}
}
EventView::Eos(..) => {
return res;
}
EventView::Segment(ref e) => {
*segment = e.get_segment().clone().downcast().unwrap();
}
_ => (),
}
}
}
}
res
}
#[test]
fn test_create() {
init();
assert!(gst::ElementFactory::make("togglerecord", None).is_ok());
}
#[test]
fn test_create_pads() {
init();
let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
let sinkpad = togglerecord.get_request_pad("sink_%u").unwrap();
let srcpad = sinkpad.iterate_internal_links().next().unwrap().unwrap();
assert_eq!(sinkpad.get_name(), "sink_0");
assert_eq!(srcpad.get_name(), "src_0");
togglerecord.release_request_pad(&sinkpad);
assert!(sinkpad.get_parent().is_none());
assert!(srcpad.get_parent().is_none());
}
#[test]
fn test_one_stream_open() {
init();
let pipeline = gst::Pipeline::new(None);
let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
pipeline.add(&togglerecord).unwrap();
let (sender_input, _, receiver_output, thread) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
pipeline.set_state(gst::State::Playing).unwrap();
togglerecord.set_property("record", &true).unwrap();
sender_input.send(SendData::Buffers(10)).unwrap();
drop(sender_input);
let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers = recv_buffers(&receiver_output, &mut segment, 0);
assert_eq!(buffers.len(), 10);
for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
thread.join().unwrap();
pipeline.set_state(gst::State::Null).unwrap();
}
#[test]
fn test_one_stream_gaps_open() {
init();
let pipeline = gst::Pipeline::new(None);
let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
pipeline.add(&togglerecord).unwrap();
let (sender_input, _, receiver_output, thread) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
pipeline.set_state(gst::State::Playing).unwrap();
togglerecord.set_property("record", &true).unwrap();
sender_input.send(SendData::Buffers(5)).unwrap();
sender_input.send(SendData::Gaps(5)).unwrap();
drop(sender_input);
let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers = recv_buffers(&receiver_output, &mut segment, 0);
assert_eq!(buffers.len(), 10);
for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
thread.join().unwrap();
pipeline.set_state(gst::State::Null).unwrap();
}
#[test]
fn test_one_stream_close_open() {
init();
let pipeline = gst::Pipeline::new(None);
let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
pipeline.add(&togglerecord).unwrap();
let (sender_input, receiver_input_done, receiver_output, thread) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
pipeline.set_state(gst::State::Playing).unwrap();
sender_input.send(SendData::Buffers(10)).unwrap();
receiver_input_done.recv().unwrap();
togglerecord.set_property("record", &true).unwrap();
sender_input.send(SendData::Buffers(10)).unwrap();
drop(sender_input);
let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers = recv_buffers(&receiver_output, &mut segment, 0);
assert_eq!(buffers.len(), 10);
for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, (10 + index) * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
thread.join().unwrap();
pipeline.set_state(gst::State::Null).unwrap();
}
#[test]
fn test_one_stream_open_close() {
init();
let pipeline = gst::Pipeline::new(None);
let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
pipeline.add(&togglerecord).unwrap();
let (sender_input, receiver_input_done, receiver_output, thread) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
pipeline.set_state(gst::State::Playing).unwrap();
togglerecord.set_property("record", &true).unwrap();
sender_input.send(SendData::Buffers(10)).unwrap();
receiver_input_done.recv().unwrap();
togglerecord.set_property("record", &false).unwrap();
sender_input.send(SendData::Buffers(10)).unwrap();
drop(sender_input);
let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers = recv_buffers(&receiver_output, &mut segment, 0);
assert_eq!(buffers.len(), 10);
for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
thread.join().unwrap();
pipeline.set_state(gst::State::Null).unwrap();
}
#[test]
fn test_one_stream_open_close_open() {
init();
let pipeline = gst::Pipeline::new(None);
let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
pipeline.add(&togglerecord).unwrap();
let (sender_input, receiver_input_done, receiver_output, thread) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
pipeline.set_state(gst::State::Playing).unwrap();
togglerecord.set_property("record", &true).unwrap();
sender_input.send(SendData::Buffers(10)).unwrap();
receiver_input_done.recv().unwrap();
togglerecord.set_property("record", &false).unwrap();
sender_input.send(SendData::Buffers(10)).unwrap();
receiver_input_done.recv().unwrap();
togglerecord.set_property("record", &true).unwrap();
sender_input.send(SendData::Buffers(10)).unwrap();
drop(sender_input);
let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers = recv_buffers(&receiver_output, &mut segment, 0);
assert_eq!(buffers.len(), 20);
for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
} else {
0.into()
};
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
thread.join().unwrap();
pipeline.set_state(gst::State::Null).unwrap();
}
#[test]
fn test_two_stream_open() {
init();
let pipeline = gst::Pipeline::new(None);
let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
pipeline.set_state(gst::State::Playing).unwrap();
togglerecord.set_property("record", &true).unwrap();
sender_input_1.send(SendData::Buffers(10)).unwrap();
sender_input_2.send(SendData::Buffers(11)).unwrap();
receiver_input_done_1.recv().unwrap();
sender_input_1.send(SendData::Eos).unwrap();
receiver_input_done_1.recv().unwrap();
sender_input_2.send(SendData::Eos).unwrap();
receiver_input_done_2.recv().unwrap();
receiver_input_done_2.recv().unwrap();
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_1.len(), 10);
// Last buffer should be dropped from second stream
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_2.len(), 10);
thread_1.join().unwrap();
thread_2.join().unwrap();
pipeline.set_state(gst::State::Null).unwrap();
}
#[test]
fn test_two_stream_open_shift() {
init();
let pipeline = gst::Pipeline::new(None);
let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 5 * gst::MSECOND);
pipeline.set_state(gst::State::Playing).unwrap();
togglerecord.set_property("record", &true).unwrap();
sender_input_1.send(SendData::Buffers(10)).unwrap();
sender_input_2.send(SendData::Buffers(11)).unwrap();
receiver_input_done_1.recv().unwrap();
sender_input_1.send(SendData::Eos).unwrap();
receiver_input_done_1.recv().unwrap();
sender_input_2.send(SendData::Eos).unwrap();
receiver_input_done_2.recv().unwrap();
receiver_input_done_2.recv().unwrap();
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_1.len(), 10);
// Second to last buffer should be clipped from second stream, last should be dropped
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, 5 * gst::MSECOND + index * 20 * gst::MSECOND);
assert_eq!(pts, 5 * gst::MSECOND + index * 20 * gst::MSECOND);
if index == 9 {
assert_eq!(duration, 15 * gst::MSECOND);
} else {
assert_eq!(duration, 20 * gst::MSECOND);
}
}
assert_eq!(buffers_2.len(), 10);
thread_1.join().unwrap();
thread_2.join().unwrap();
pipeline.set_state(gst::State::Null).unwrap();
}
#[test]
fn test_two_stream_open_shift_main() {
init();
let pipeline = gst::Pipeline::new(None);
let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 5 * gst::MSECOND);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
pipeline.set_state(gst::State::Playing).unwrap();
togglerecord.set_property("record", &true).unwrap();
sender_input_1.send(SendData::Buffers(10)).unwrap();
sender_input_2.send(SendData::Buffers(12)).unwrap();
receiver_input_done_1.recv().unwrap();
sender_input_1.send(SendData::Eos).unwrap();
receiver_input_done_1.recv().unwrap();
sender_input_2.send(SendData::Eos).unwrap();
receiver_input_done_2.recv().unwrap();
receiver_input_done_2.recv().unwrap();
// PTS 5 maps to running time 0 now
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, 5 * gst::MSECOND + index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_1.len(), 10);
// First and second last buffer should be clipped from second stream,
// last buffer should be dropped
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
if index == 0 {
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, 5 * gst::MSECOND + index * 20 * gst::MSECOND);
assert_eq!(duration, 15 * gst::MSECOND);
} else if index == 10 {
assert_eq!(running_time, index * 20 * gst::MSECOND - 5 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 5 * gst::MSECOND);
} else {
assert_eq!(running_time, index * 20 * gst::MSECOND - 5 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
}
assert_eq!(buffers_2.len(), 11);
thread_1.join().unwrap();
thread_2.join().unwrap();
pipeline.set_state(gst::State::Null).unwrap();
}
#[test]
fn test_two_stream_open_close() {
init();
let pipeline = gst::Pipeline::new(None);
let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
pipeline.set_state(gst::State::Playing).unwrap();
togglerecord.set_property("record", &true).unwrap();
sender_input_1.send(SendData::Buffers(10)).unwrap();
sender_input_2.send(SendData::Buffers(11)).unwrap();
// Sender 2 is waiting for sender 1 to continue, sender 1 is finished
receiver_input_done_1.recv().unwrap();
// Stop recording and push new buffers to sender 1, which will advance
// it and release the 11th buffer of sender 2 above
togglerecord.set_property("record", &false).unwrap();
sender_input_1.send(SendData::Buffers(10)).unwrap();
receiver_input_done_2.recv().unwrap();
// Send another 9 buffers to sender 2, both are the same position now
sender_input_2.send(SendData::Buffers(9)).unwrap();
// Wait until all 20 buffers of both senders are done
receiver_input_done_1.recv().unwrap();
receiver_input_done_2.recv().unwrap();
// Send EOS and wait for it to be handled
sender_input_1.send(SendData::Eos).unwrap();
sender_input_2.send(SendData::Eos).unwrap();
receiver_input_done_1.recv().unwrap();
receiver_input_done_2.recv().unwrap();
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_1.len(), 10);
// Last buffer should be dropped from second stream
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_2.len(), 10);
thread_1.join().unwrap();
thread_2.join().unwrap();
pipeline.set_state(gst::State::Null).unwrap();
}
#[test]
fn test_two_stream_close_open() {
init();
let pipeline = gst::Pipeline::new(None);
let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
pipeline.set_state(gst::State::Playing).unwrap();
togglerecord.set_property("record", &false).unwrap();
sender_input_1.send(SendData::Buffers(10)).unwrap();
sender_input_2.send(SendData::Buffers(11)).unwrap();
// Sender 2 is waiting for sender 1 to continue, sender 1 is finished
receiver_input_done_1.recv().unwrap();
// Start recording and push new buffers to sender 1, which will advance
// it and release the 11th buffer of sender 2 above
togglerecord.set_property("record", &true).unwrap();
sender_input_1.send(SendData::Buffers(10)).unwrap();
receiver_input_done_2.recv().unwrap();
// Send another 9 buffers to sender 2, both are the same position now
sender_input_2.send(SendData::Buffers(9)).unwrap();
// Wait until all 20 buffers of both senders are done
receiver_input_done_1.recv().unwrap();
receiver_input_done_2.recv().unwrap();
// Send EOS and wait for it to be handled
sender_input_1.send(SendData::Eos).unwrap();
sender_input_2.send(SendData::Eos).unwrap();
receiver_input_done_1.recv().unwrap();
receiver_input_done_2.recv().unwrap();
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, (10 + index) * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_1.len(), 10);
// Last buffer should be dropped from second stream
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, (10 + index) * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_2.len(), 10);
thread_1.join().unwrap();
thread_2.join().unwrap();
pipeline.set_state(gst::State::Null).unwrap();
}
#[test]
fn test_two_stream_open_close_open() {
init();
let pipeline = gst::Pipeline::new(None);
let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
pipeline.set_state(gst::State::Playing).unwrap();
togglerecord.set_property("record", &true).unwrap();
sender_input_1.send(SendData::Buffers(10)).unwrap();
sender_input_2.send(SendData::Buffers(11)).unwrap();
// Sender 2 is waiting for sender 1 to continue, sender 1 is finished
receiver_input_done_1.recv().unwrap();
// Stop recording and push new buffers to sender 1, which will advance
// it and release the 11th buffer of sender 2 above
togglerecord.set_property("record", &false).unwrap();
sender_input_1.send(SendData::Buffers(10)).unwrap();
receiver_input_done_2.recv().unwrap();
// Send another 9 buffers to sender 2, both are the same position now
sender_input_2.send(SendData::Buffers(9)).unwrap();
// Wait until all 20 buffers of both senders are done
receiver_input_done_1.recv().unwrap();
receiver_input_done_2.recv().unwrap();
// Send another buffer to sender 2, this will block until sender 1 advances
// but must not be dropped, although we're not recording (yet)
sender_input_2.send(SendData::Buffers(1)).unwrap();
// Start recording again and send another set of buffers to both senders
togglerecord.set_property("record", &true).unwrap();
sender_input_1.send(SendData::Buffers(10)).unwrap();
sender_input_2.send(SendData::Buffers(10)).unwrap();
receiver_input_done_1.recv().unwrap();
// The single buffer above for sender 1 should be handled now
receiver_input_done_2.recv().unwrap();
// Send EOS and wait for it to be handled
sender_input_1.send(SendData::Eos).unwrap();
sender_input_2.send(SendData::Eos).unwrap();
receiver_input_done_1.recv().unwrap();
receiver_input_done_2.recv().unwrap();
receiver_input_done_2.recv().unwrap();
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
} else {
0.into()
};
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_1.len(), 20);
// Last buffer should be dropped from second stream
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
} else {
0.into()
};
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_2.len(), 20);
thread_1.join().unwrap();
thread_2.join().unwrap();
pipeline.set_state(gst::State::Null).unwrap();
}
#[test]
fn test_two_stream_open_close_open_gaps() {
init();
let pipeline = gst::Pipeline::new(None);
let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
pipeline.set_state(gst::State::Playing).unwrap();
togglerecord.set_property("record", &true).unwrap();
sender_input_1.send(SendData::Buffers(3)).unwrap();
sender_input_1.send(SendData::Gaps(3)).unwrap();
sender_input_1.send(SendData::Buffers(4)).unwrap();
sender_input_2.send(SendData::Buffers(11)).unwrap();
// Sender 2 is waiting for sender 1 to continue, sender 1 is finished
receiver_input_done_1.recv().unwrap();
receiver_input_done_1.recv().unwrap();
receiver_input_done_1.recv().unwrap();
// Stop recording and push new buffers to sender 1, which will advance
// it and release the 11th buffer of sender 2 above
togglerecord.set_property("record", &false).unwrap();
sender_input_1.send(SendData::Buffers(10)).unwrap();
receiver_input_done_2.recv().unwrap();
// Send another 4 gaps and 5 buffers to sender 2, both are the same position now
sender_input_2.send(SendData::Gaps(4)).unwrap();
sender_input_2.send(SendData::Buffers(5)).unwrap();
// Wait until all 20 buffers of both senders are done
receiver_input_done_1.recv().unwrap();
receiver_input_done_2.recv().unwrap();
receiver_input_done_2.recv().unwrap();
// Send another gap to sender 2, this will block until sender 1 advances
// but must not be dropped, although we're not recording (yet)
sender_input_2.send(SendData::Gaps(1)).unwrap();
// Start recording again and send another set of buffers to both senders
togglerecord.set_property("record", &true).unwrap();
sender_input_1.send(SendData::Buffers(10)).unwrap();
sender_input_2.send(SendData::Buffers(10)).unwrap();
receiver_input_done_1.recv().unwrap();
// The single buffer above for sender 1 should be handled now
receiver_input_done_2.recv().unwrap();
// Send EOS and wait for it to be handled
sender_input_1.send(SendData::Eos).unwrap();
sender_input_2.send(SendData::Eos).unwrap();
receiver_input_done_1.recv().unwrap();
receiver_input_done_2.recv().unwrap();
receiver_input_done_2.recv().unwrap();
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
} else {
0.into()
};
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_1.len(), 20);
// Last buffer should be dropped from second stream
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
} else {
0.into()
};
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_2.len(), 20);
thread_1.join().unwrap();
thread_2.join().unwrap();
pipeline.set_state(gst::State::Null).unwrap();
}
#[test]
fn test_two_stream_close_open_close_delta() {
init();
let pipeline = gst::Pipeline::new(None);
let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
pipeline.set_state(gst::State::Playing).unwrap();
togglerecord.set_property("record", &false).unwrap();
sender_input_1.send(SendData::Buffers(10)).unwrap();
sender_input_2.send(SendData::Buffers(11)).unwrap();
// Sender 2 is waiting for sender 1 to continue, sender 1 is finished
receiver_input_done_1.recv().unwrap();
// Start recording and push new buffers to sender 1. The first one is a delta frame,
// so will be dropped, and as such the next frame of sender 2 will also be dropped
// Sender 2 is empty now
togglerecord.set_property("record", &true).unwrap();
sender_input_1.send(SendData::BuffersDelta(1)).unwrap();
sender_input_1.send(SendData::Buffers(9)).unwrap();
receiver_input_done_2.recv().unwrap();
// Send another 9 buffers to sender 2, both are the same position now
sender_input_2.send(SendData::Buffers(9)).unwrap();
// Wait until all 20 buffers of both senders are done
receiver_input_done_1.recv().unwrap();
receiver_input_done_1.recv().unwrap();
receiver_input_done_2.recv().unwrap();
// Send another buffer to sender 2, this will block until sender 1 advances
// but must not be dropped, and we're still recording
sender_input_2.send(SendData::Buffers(1)).unwrap();
// Stop recording again and send another set of buffers to both senders
// The first one is a delta frame, so we only actually stop recording
// after recording another frame
togglerecord.set_property("record", &false).unwrap();
sender_input_1.send(SendData::BuffersDelta(1)).unwrap();
sender_input_1.send(SendData::Buffers(9)).unwrap();
sender_input_2.send(SendData::Buffers(10)).unwrap();
receiver_input_done_1.recv().unwrap();
receiver_input_done_1.recv().unwrap();
// The single buffer above for sender 1 should be handled now
receiver_input_done_2.recv().unwrap();
// Send EOS and wait for it to be handled
sender_input_1.send(SendData::Eos).unwrap();
sender_input_2.send(SendData::Eos).unwrap();
receiver_input_done_1.recv().unwrap();
receiver_input_done_2.recv().unwrap();
receiver_input_done_2.recv().unwrap();
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, (11 + index) * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_1.len(), 10);
// Last buffer should be dropped from second stream
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, (11 + index) * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_2.len(), 10);
thread_1.join().unwrap();
thread_2.join().unwrap();
pipeline.set_state(gst::State::Null).unwrap();
}
#[test]
fn test_three_stream_open_close_open() {
init();
let pipeline = gst::Pipeline::new(None);
let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
pipeline.set_state(gst::State::Playing).unwrap();
togglerecord.set_property("record", &true).unwrap();
sender_input_1.send(SendData::Buffers(10)).unwrap();
sender_input_2.send(SendData::Buffers(11)).unwrap();
sender_input_3.send(SendData::Buffers(11)).unwrap();
// Sender 2/3 is waiting for sender 1 to continue
receiver_input_done_1.recv().unwrap();
// Stop recording and push new buffers to sender 1, which will advance
// it and release the 11th buffer of sender 2/3 above
togglerecord.set_property("record", &false).unwrap();
sender_input_1.send(SendData::Buffers(10)).unwrap();
receiver_input_done_2.recv().unwrap();
receiver_input_done_3.recv().unwrap();
// Send another 9 buffers to sender 2/3, all streams are at the same position now
sender_input_2.send(SendData::Buffers(9)).unwrap();
sender_input_3.send(SendData::Buffers(9)).unwrap();
// Wait until all 20 buffers of all senders are done
receiver_input_done_1.recv().unwrap();
receiver_input_done_2.recv().unwrap();
receiver_input_done_3.recv().unwrap();
// Send another buffer to sender 2, this will block until sender 1 advances
// but must not be dropped, although we're not recording (yet)
sender_input_2.send(SendData::Buffers(1)).unwrap();
// Start recording again and send another set of buffers to both senders
togglerecord.set_property("record", &true).unwrap();
sender_input_1.send(SendData::Buffers(10)).unwrap();
sender_input_2.send(SendData::Buffers(10)).unwrap();
sender_input_3.send(SendData::Buffers(5)).unwrap();
receiver_input_done_1.recv().unwrap();
// The single buffer above for sender 1 should be handled now
receiver_input_done_2.recv().unwrap();
receiver_input_done_3.recv().unwrap();
sender_input_3.send(SendData::Buffers(5)).unwrap();
receiver_input_done_3.recv().unwrap();
// Send EOS and wait for it to be handled
sender_input_1.send(SendData::Eos).unwrap();
sender_input_2.send(SendData::Eos).unwrap();
sender_input_3.send(SendData::Eos).unwrap();
receiver_input_done_1.recv().unwrap();
receiver_input_done_2.recv().unwrap();
receiver_input_done_3.recv().unwrap();
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
} else {
0.into()
};
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_1.len(), 20);
// Last buffer should be dropped from second stream
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
} else {
0.into()
};
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_2.len(), 20);
let mut segment_3 = gst::FormattedSegment::<gst::ClockTime>::new();
let buffers_3 = recv_buffers(&receiver_output_3, &mut segment_3, 0);
for (index, &(running_time, pts, duration)) in buffers_3.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
} else {
0.into()
};
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
}
assert_eq!(buffers_3.len(), 20);
thread_1.join().unwrap();
thread_2.join().unwrap();
thread_3.join().unwrap();
pipeline.set_state(gst::State::Null).unwrap();
}