togglerecord: Add support for non-live inputs

Live input + is-live=false:
    While not recording, drop input
    When recording is started, offset to collapse the gap

Live input + is-live=true:
    While not recording, drop input
    Don't modify the offset

Non-live input + is-live=false:
    While not recording, block input
    Don't modify the offset

Non-live input + is-live=true:
    While not recording, block input
    When recording is started, offset to current running time

Co-authored-by: Jan Alexander Steffens (heftig) <jan.steffens@ltnglobal.com>
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1206>
This commit is contained in:
Vivia Nikolaidou 2023-04-27 19:59:19 +03:00
parent 4683291c1f
commit 063871a1eb
5 changed files with 704 additions and 75 deletions

View file

@ -8486,7 +8486,7 @@
"elements": {
"togglerecord": {
"author": "Sebastian Dröge <sebastian@centricular.com>",
"description": "Valve that ensures multiple streams start/end at the same time",
"description": "Valve that ensures multiple streams start/end at the same time. If the input comes from a live stream, when not recording it will be dropped. If it comes from a non-live stream, when not recording it will be blocked.",
"hierarchy": [
"GstToggleRecord",
"GstElement",
@ -8520,7 +8520,7 @@
},
"properties": {
"is-live": {
"blurb": "Live mode: no \"gap eating\", forward incoming segment",
"blurb": "Live output mode: no \"gap eating\", forward incoming segment for live input, create a gap to fill the paused duration for non-live input",
"conditionally-available": false,
"construct": false,
"construct-only": false,

View file

@ -20,6 +20,7 @@ once_cell = "1.0"
[dev-dependencies]
either = "1.0"
gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
[lib]
name = "gsttogglerecord"

View file

@ -0,0 +1,30 @@
# togglerecord
A multistream valve-like plugin that ensures multiple streams start/end at the
same time.
It supports both live and non-live input and toggles recording via the
`record` property. Live inputs will be dropped when not recording, while
non-live inputs will be blocked.
## Use cases
The `is-live` property refers to whether the output of the element will be
live. So, based on whether the input is live and on whether the output
`is-live`, we have these four behaviours:
- Live input + `is-live=false`:
- While not recording, drop input
- When recording is started, offset to collapse the gap
- Live input + `is-live=true`:
- While not recording, drop input
- Don't modify the offset
- Non-live input + `is-live=false`:
- While not recording, block input
- Don't modify the offset
- Non-live input + `is-live=true`:
- While not recording, block input
- When recording is started, offset to current running time

View file

@ -6,12 +6,18 @@
//
// SPDX-License-Identifier: MPL-2.0
/**
* element-togglerecord:
*
* {{ utils/togglerecord/README.md[2:30] }}
*
*/
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
use once_cell::sync::Lazy;
use parking_lot::{Condvar, Mutex};
use parking_lot::{Condvar, Mutex, MutexGuard};
use std::cmp;
use std::collections::HashMap;
use std::f64;
@ -72,6 +78,7 @@ struct StreamState {
flushing: bool,
segment_pending: bool,
discont_pending: bool,
upstream_live: Option<bool>,
pending_events: Vec<gst::Event>,
audio_info: Option<gst_audio::AudioInfo>,
video_info: Option<gst_video::VideoInfo>,
@ -89,6 +96,7 @@ impl Default for StreamState {
flushing: false,
segment_pending: false,
discont_pending: true,
upstream_live: None,
pending_events: Vec::new(),
audio_info: None,
video_info: None,
@ -104,7 +112,7 @@ impl Default for StreamState {
// Recording: Passing through all data
// Stopping: Main stream remembering current last_recording_stop, waiting for all
// other streams to reach this position
// Stopped: Dropping all data
// Stopped: Dropping (live input) or blocking (non-live input) all data
// Starting: Main stream waiting until next keyframe and setting last_recording_start, waiting
// for all other streams to reach this position
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
@ -120,11 +128,21 @@ struct State {
recording_state: RecordingState,
last_recording_start: Option<gst::ClockTime>,
last_recording_stop: Option<gst::ClockTime>,
// Accumulated duration of previous recording segments,
// updated whenever going to Stopped
recording_duration: gst::ClockTime,
// Accumulated duration of blocked segments
blocked_duration: gst::ClockTime,
// What time we started blocking
time_start_block: Option<gst::ClockTime>,
// Updated whenever going to Recording
running_time_offset: i64,
// Copied from settings
live: bool,
}
@ -135,6 +153,8 @@ impl Default for State {
last_recording_start: None,
last_recording_stop: None,
recording_duration: gst::ClockTime::ZERO,
blocked_duration: gst::ClockTime::ZERO,
time_start_block: gst::ClockTime::NONE,
running_time_offset: 0,
live: false,
}
@ -146,7 +166,6 @@ enum HandleResult<T> {
Pass(T),
Drop,
Eos(bool),
Flushing,
}
trait HandleData: Sized {
@ -329,11 +348,54 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
});
impl ToggleRecord {
fn block_if_upstream_not_live(
&self,
pad: &gst::Pad,
mut settings: Settings,
state: &mut MutexGuard<StreamState>,
upstream_live: bool,
) -> Result<bool, gst::FlowError> {
if !upstream_live {
while !settings.record && !state.flushing {
gst::debug!(CAT, obj: pad, "Waiting for record=true");
self.main_stream_cond.wait(state);
settings = *self.settings.lock();
}
if state.flushing {
gst::debug!(CAT, obj: pad, "Flushing");
return Err(gst::FlowError::Flushing);
}
state.segment_pending = true;
state.discont_pending = true;
for other_stream in &self.other_streams.lock().0 {
let mut other_state = other_stream.state.lock();
other_state.segment_pending = true;
other_state.discont_pending = true;
}
let mut rec_state = self.state.lock();
if let Some(time_start_block) = rec_state.time_start_block {
let clock = self.obj().clock().expect("Cannot find pipeline clock");
rec_state.blocked_duration += clock.time().unwrap() - time_start_block;
if settings.live {
rec_state.running_time_offset = rec_state.blocked_duration.nseconds() as i64;
}
rec_state.time_start_block = gst::ClockTime::NONE;
}
drop(rec_state);
gst::log!(CAT, obj: pad, "Done blocking main stream");
Ok(true)
} else {
gst::log!(CAT, obj: pad, "Dropping buffer (stopped)");
Ok(false)
}
}
fn handle_main_stream<T: HandleData>(
&self,
pad: &gst::Pad,
stream: &Stream,
data: T,
upstream_live: bool,
) -> Result<HandleResult<T>, gst::FlowError> {
let mut state = stream.state.lock();
@ -395,10 +457,14 @@ impl ToggleRecord {
let settings = *self.settings.lock();
// First check if we have to update our recording state
// First check if we need to block for non-live input
let mut rec_state = self.state.lock();
// Check if we have to update our recording state
let settings_changed = match rec_state.recording_state {
RecordingState::Recording if !settings.record => {
let clock = self.obj().clock().expect("Cannot find pipeline clock");
rec_state.time_start_block = Some(clock.time().unwrap());
gst::debug!(CAT, obj: pad, "Stopping recording");
rec_state.recording_state = RecordingState::Stopping;
true
@ -473,7 +539,7 @@ impl ToggleRecord {
if state.flushing {
gst::debug!(CAT, obj: pad, "Flushing");
return Ok(HandleResult::Flushing);
return Err(gst::FlowError::Flushing);
}
let mut rec_state = self.state.lock();
@ -493,17 +559,25 @@ impl ToggleRecord {
// Then become Stopped and drop this buffer. We always stop right before
// a keyframe
gst::log!(CAT, obj: pad, "Dropping buffer (stopped)");
drop(rec_state);
let ret =
self.block_if_upstream_not_live(pad, settings, &mut state, upstream_live)?;
drop(state);
self.obj().notify("recording");
Ok(HandleResult::Drop)
if ret {
Ok(HandleResult::Pass(data))
} else {
Ok(HandleResult::Drop)
}
}
RecordingState::Stopped => {
gst::log!(CAT, obj: pad, "Dropping buffer (stopped)");
Ok(HandleResult::Drop)
if self.block_if_upstream_not_live(pad, settings, &mut state, upstream_live)? {
Ok(HandleResult::Pass(data))
} else {
Ok(HandleResult::Drop)
}
}
RecordingState::Starting => {
// If this is no keyframe, we can directly go out again here and drop the frame
@ -519,23 +593,35 @@ impl ToggleRecord {
.push_event(gst_video::UpstreamForceKeyUnitEvent::builder().build());
}
if !upstream_live {
gst::log!(
CAT,
obj: pad,
"Always passing data when upstream is not live"
);
return Ok(HandleResult::Pass(data));
}
return Ok(HandleResult::Drop);
}
// Remember the time when we started: now!
rec_state.last_recording_start = current_running_time;
rec_state.running_time_offset =
current_running_time.map_or(0, |current_running_time| {
current_running_time
.saturating_sub(rec_state.recording_duration)
.nseconds()
}) as i64;
// We made sure a few lines above, but let's be sure again
if !settings.live || upstream_live {
rec_state.running_time_offset =
0 - current_running_time.map_or(0, |current_running_time| {
current_running_time
.saturating_sub(rec_state.recording_duration)
.nseconds()
}) as i64
};
gst::debug!(
CAT,
obj: pad,
"Starting at {}, previous accumulated recording duration {}",
"Starting at {}, previous accumulated recording duration {}, offset {}",
current_running_time.display(),
rec_state.recording_duration,
rec_state.running_time_offset,
);
state.segment_pending = true;
@ -566,7 +652,7 @@ impl ToggleRecord {
if state.flushing {
gst::debug!(CAT, obj: pad, "Flushing");
return Ok(HandleResult::Flushing);
return Err(gst::FlowError::Flushing);
}
let mut rec_state = self.state.lock();
@ -596,6 +682,7 @@ impl ToggleRecord {
pad: &gst::Pad,
stream: &Stream,
data: T,
upstream_live: bool,
) -> Result<HandleResult<T>, gst::FlowError> {
// Calculate end pts & current running time and make sure we stay in the segment
let mut state = stream.state.lock();
@ -722,7 +809,7 @@ impl ToggleRecord {
if state.flushing {
gst::debug!(CAT, obj: pad, "Flushing");
return Ok(HandleResult::Flushing);
return Err(gst::FlowError::Flushing);
}
// If the main stream is EOS, we are also EOS unless we are
@ -889,6 +976,10 @@ impl ToggleRecord {
}
}
if !upstream_live {
return Ok(HandleResult::Pass(data));
}
match rec_state.recording_state {
RecordingState::Recording => {
// The end of our buffer must be before/at the end of the previous buffer of the main
@ -1154,31 +1245,48 @@ impl ToggleRecord {
gst::FlowError::Error
})?;
gst::log!(CAT, obj: pad, "Handling buffer {:?}", buffer);
let upstream_live;
{
let state = stream.state.lock();
let mut state = stream.state.lock();
if state.eos {
return Err(gst::FlowError::Eos);
}
if state.flushing {
return Err(gst::FlowError::Flushing);
}
match state.upstream_live {
None => {
// Not handling anything here, the pad's query function will catch it
let mut query = gst::query::Latency::new();
let success = pad.peer_query(&mut query);
if success {
(upstream_live, _, _) = query.result();
state.upstream_live = Some(upstream_live);
} else {
state.upstream_live = None;
upstream_live = false;
gst::warning!(
CAT,
obj: pad,
"Latency query failed, assuming non-live input, will retry"
);
}
}
Some(is_live) => upstream_live = is_live,
}
}
let handle_result = if stream != self.main_stream {
self.handle_secondary_stream(pad, &stream, buffer)
self.handle_secondary_stream(pad, &stream, buffer, upstream_live)
} else {
self.handle_main_stream(pad, &stream, buffer)
self.handle_main_stream(pad, &stream, buffer, upstream_live)
}?;
let mut buffer = match handle_result {
HandleResult::Drop => {
return Ok(gst::FlowSuccess::Ok);
}
HandleResult::Flushing => {
return Err(gst::FlowError::Flushing);
}
HandleResult::Eos(recording_state_updated) => {
stream.srcpad.push_event(
gst::event::Eos::builder()
@ -1224,10 +1332,13 @@ impl ToggleRecord {
state.out_segment = state.in_segment.clone();
if !rec_state.live {
// state.upstream_live should have a value from a few lines above
// segment offset is taken into account in case upstream is live and we are not
// (collapse gap)
if rec_state.live != upstream_live {
state
.out_segment
.offset_running_time(-rec_state.running_time_offset)
.offset_running_time(rec_state.running_time_offset)
.expect("Adjusting record duration");
}
events.push(
@ -1368,10 +1479,35 @@ impl ToggleRecord {
EventView::Gap(e) => {
gst::debug!(CAT, obj: pad, "Handling Gap event {:?}", event);
let (pts, duration) = e.get();
let upstream_live;
{
let mut state = stream.state.lock();
match state.upstream_live {
None => {
// Not handling anything here, the pad's query function will catch it
let mut query = gst::query::Latency::new();
let success = pad.peer_query(&mut query);
if success {
(upstream_live, _, _) = query.result();
state.upstream_live = Some(upstream_live);
} else {
state.upstream_live = None;
upstream_live = false;
gst::warning!(
CAT,
obj: pad,
"Latency query failed, assuming non-live input, will retry"
);
}
}
Some(is_live) => upstream_live = is_live,
}
}
let handle_result = if stream == self.main_stream {
self.handle_main_stream(pad, &stream, (pts, duration))
self.handle_main_stream(pad, &stream, (pts, duration), upstream_live)
} else {
self.handle_secondary_stream(pad, &stream, (pts, duration))
self.handle_secondary_stream(pad, &stream, (pts, duration), upstream_live)
};
forward = match handle_result {
@ -1508,7 +1644,19 @@ impl ToggleRecord {
gst::log!(CAT, obj: pad, "Handling query {:?}", query);
stream.srcpad.peer_query(query)
let success = stream.srcpad.peer_query(query);
if let gst::QueryView::Latency(latency) = query.view() {
let mut state = stream.state.lock();
if success {
let (is_live, _, _) = latency.result();
state.upstream_live = Some(is_live);
} else {
state.upstream_live = None;
}
}
success
}
// FIXME `matches!` was introduced in rustc 1.42.0, current MSRV is 1.41.0
@ -1537,7 +1685,7 @@ impl ToggleRecord {
let offset = event.running_time_offset();
event
.make_mut()
.set_running_time_offset(offset + rec_state.running_time_offset);
.set_running_time_offset(offset - rec_state.running_time_offset);
drop(rec_state);
if forward {
@ -1795,8 +1943,12 @@ impl ObjectImpl for ToggleRecord {
.read_only()
.build(),
glib::ParamSpecBoolean::builder("is-live")
.nick("Live mode")
.blurb("Live mode: no \"gap eating\", forward incoming segment")
.nick("Live output mode")
.blurb(
"Live output mode: no \"gap eating\", \
forward incoming segment for live input, \
create a gap to fill the paused duration for non-live input",
)
.default_value(DEFAULT_LIVE)
.mutable_ready()
.build(),
@ -1820,6 +1972,7 @@ impl ObjectImpl for ToggleRecord {
);
settings.record = record;
self.main_stream_cond.notify_all();
}
"is-live" => {
let mut settings = self.settings.lock();
@ -1873,7 +2026,9 @@ impl ElementImpl for ToggleRecord {
gst::subclass::ElementMetadata::new(
"Toggle Record",
"Generic",
"Valve that ensures multiple streams start/end at the same time",
"Valve that ensures multiple streams start/end at the same time. \
If the input comes from a live stream, when not recording it will be dropped. \
If it comes from a non-live stream, when not recording it will be blocked.",
"Sebastian Dröge <sebastian@centricular.com>",
)
});
@ -1948,6 +2103,7 @@ impl ElementImpl for ToggleRecord {
let mut rec_state = self.state.lock();
*rec_state = State::default();
let settings = *self.settings.lock();
rec_state.live = settings.live;
}

View file

@ -11,7 +11,7 @@ use gst::prelude::*;
use either::*;
use std::sync::{mpsc, Mutex};
use std::thread;
use std::{thread, time::Duration};
fn init() {
use std::sync::Once;
@ -37,6 +37,7 @@ fn setup_sender_receiver(
togglerecord: &gst::Element,
pad: &str,
offset: gst::ClockTime,
live: bool,
) -> (
mpsc::Sender<SendData>,
mpsc::Receiver<()>,
@ -62,6 +63,25 @@ fn setup_sender_receiver(
(srcpad, sinkpad)
};
sinkpad.add_probe(
gst::PadProbeType::QUERY_UPSTREAM,
move |_pad, probe_info| {
let query = match &mut probe_info.data {
Some(gst::PadProbeData::Query(q)) => q,
_ => unreachable!(),
};
use gst::QueryViewMut::*;
match query.view_mut() {
Latency(q) => {
q.set(live, gst::ClockTime::ZERO, None);
gst::PadProbeReturn::Handled
}
_ => gst::PadProbeReturn::Ok,
}
},
);
let fakesink_sinkpad = fakesink.static_pad("sink").unwrap();
srcpad.link(&fakesink_sinkpad).unwrap();
@ -271,6 +291,76 @@ fn test_create_pads() {
assert!(srcpad.parent().is_none());
}
#[test]
fn test_one_stream_open_nonlivein_nonliveout() {
init();
let pipeline = gst::Pipeline::default();
let togglerecord = gst::ElementFactory::make("togglerecord")
.property("is-live", false)
.build()
.unwrap();
pipeline.add(&togglerecord).unwrap();
let (sender_input, _, receiver_output, thread) =
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, false);
pipeline.set_state(gst::State::Playing).unwrap();
togglerecord.set_property("record", true);
sender_input.send(SendData::Buffers(10)).unwrap();
drop(sender_input);
let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
let (buffers, _) = recv_buffers(&receiver_output, &mut segment, 0);
assert_eq!(buffers.len(), 10);
for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time.unwrap(), index * 20.mseconds());
assert_eq!(pts.unwrap(), index * 20.mseconds());
assert_eq!(duration.unwrap(), 20.mseconds());
}
thread.join().unwrap();
pipeline.set_state(gst::State::Null).unwrap();
}
#[test]
fn test_one_stream_open_nonlivein_liveout() {
init();
let pipeline = gst::Pipeline::default();
let togglerecord = gst::ElementFactory::make("togglerecord")
.property("is-live", true)
.build()
.unwrap();
pipeline.add(&togglerecord).unwrap();
let (sender_input, _, receiver_output, thread) =
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, false);
pipeline.set_state(gst::State::Playing).unwrap();
togglerecord.set_property("record", true);
sender_input.send(SendData::Buffers(10)).unwrap();
drop(sender_input);
let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
let (buffers, _) = recv_buffers(&receiver_output, &mut segment, 0);
assert_eq!(buffers.len(), 10);
for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time.unwrap(), index * 20.mseconds());
assert_eq!(pts.unwrap(), index * 20.mseconds());
assert_eq!(duration.unwrap(), 20.mseconds());
}
thread.join().unwrap();
pipeline.set_state(gst::State::Null).unwrap();
}
#[test]
fn test_one_stream_open() {
init();
@ -280,7 +370,7 @@ fn test_one_stream_open() {
pipeline.add(&togglerecord).unwrap();
let (sender_input, _, receiver_output, thread) =
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
pipeline.set_state(gst::State::Playing).unwrap();
@ -312,7 +402,7 @@ fn test_one_stream_gaps_open() {
pipeline.add(&togglerecord).unwrap();
let (sender_input, _, receiver_output, thread) =
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
pipeline.set_state(gst::State::Playing).unwrap();
@ -345,7 +435,7 @@ fn test_one_stream_close_open() {
pipeline.add(&togglerecord).unwrap();
let (sender_input, receiver_input_done, receiver_output, thread) =
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
pipeline.set_state(gst::State::Playing).unwrap();
@ -379,7 +469,7 @@ fn test_one_stream_open_close() {
pipeline.add(&togglerecord).unwrap();
let (sender_input, receiver_input_done, receiver_output, thread) =
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
pipeline.set_state(gst::State::Playing).unwrap();
@ -414,7 +504,7 @@ fn test_one_stream_open_close_open() {
pipeline.add(&togglerecord).unwrap();
let (sender_input, receiver_input_done, receiver_output, thread) =
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
pipeline.set_state(gst::State::Playing).unwrap();
@ -458,9 +548,15 @@ fn test_two_stream_open() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
setup_sender_receiver(
&pipeline,
&togglerecord,
"src_%u",
gst::ClockTime::ZERO,
true,
);
pipeline.set_state(gst::State::Playing).unwrap();
@ -511,9 +607,9 @@ fn test_two_stream_open_shift() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 5.mseconds());
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 5.mseconds(), true);
pipeline.set_state(gst::State::Playing).unwrap();
@ -568,9 +664,15 @@ fn test_two_stream_open_shift_main() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 5.mseconds());
setup_sender_receiver(&pipeline, &togglerecord, "src", 5.mseconds(), true);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
setup_sender_receiver(
&pipeline,
&togglerecord,
"src_%u",
gst::ClockTime::ZERO,
true,
);
pipeline.set_state(gst::State::Playing).unwrap();
@ -633,9 +735,15 @@ fn test_two_stream_open_close() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
setup_sender_receiver(
&pipeline,
&togglerecord,
"src_%u",
gst::ClockTime::ZERO,
true,
);
pipeline.set_state(gst::State::Playing).unwrap();
@ -702,9 +810,15 @@ fn test_two_stream_close_open() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
setup_sender_receiver(
&pipeline,
&togglerecord,
"src_%u",
gst::ClockTime::ZERO,
true,
);
pipeline.set_state(gst::State::Playing).unwrap();
@ -762,6 +876,256 @@ fn test_two_stream_close_open() {
pipeline.set_state(gst::State::Null).unwrap();
}
#[test]
fn test_two_stream_open_close_open_nonlivein_liveout() {
init();
let testclock = gst_check::TestClock::new();
let pipeline = gst::Pipeline::default();
pipeline.use_clock(Some(&testclock));
let togglerecord = gst::ElementFactory::make("togglerecord")
.property("is-live", true)
.build()
.unwrap();
togglerecord.set_clock(Some(&testclock)).unwrap();
pipeline.add(&togglerecord).unwrap();
let testclock = testclock.downcast::<gst_check::TestClock>().unwrap();
testclock.set_time(gst::ClockTime::ZERO);
let main_buffers_before_gap = 10u64;
let secondary_buffers_before_gap = main_buffers_before_gap + 1;
let buffers_in_gap = 10u64;
let buffers_after_gap = 10u64;
let recv_timeout = Duration::from_secs(10);
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, false);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(
&pipeline,
&togglerecord,
"src_%u",
gst::ClockTime::ZERO,
false,
);
pipeline.set_state(gst::State::Playing).unwrap();
togglerecord.set_property("record", true);
sender_input_1
.send(SendData::Buffers(main_buffers_before_gap as usize))
.unwrap();
sender_input_2
.send(SendData::Buffers(
(secondary_buffers_before_gap - 1) as usize,
))
.unwrap();
// Sender 2 is waiting for sender 1 to continue, sender 1 is finished
receiver_input_done_1.recv_timeout(recv_timeout).unwrap();
receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
sender_input_2.send(SendData::Buffers(1)).unwrap();
assert_eq!(
receiver_input_done_2.recv_timeout(Duration::from_millis(20)),
Err(mpsc::RecvTimeoutError::Timeout)
);
// Stop recording and push new buffers to sender 1, this will block
togglerecord.set_property("record", false);
sender_input_1
.send(SendData::Buffers(buffers_in_gap as usize))
.unwrap();
// Send another 10 buffers to sender 2, both are the same position at 9 buffers, the next one
// will block until record=true
sender_input_2
.send(SendData::Buffers(buffers_in_gap as usize))
.unwrap();
receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
// Advance the clock
let block_time = gst::ClockTime::from_mseconds(42);
testclock.advance_time(block_time.nseconds() as i64);
// Start recording again and send another set of buffers to both senders
togglerecord.set_property("record", true);
sender_input_1
.send(SendData::Buffers(buffers_after_gap as usize))
.unwrap();
sender_input_2
.send(SendData::Buffers(buffers_after_gap as usize))
.unwrap();
receiver_input_done_1.recv_timeout(recv_timeout).unwrap();
// The single buffer above for sender 1 should be handled now
receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
// Send EOS and wait for it to be handled
sender_input_1.send(SendData::Eos).unwrap();
sender_input_2.send(SendData::Eos).unwrap();
receiver_input_done_1.recv_timeout(recv_timeout).unwrap();
receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let (buffers_1, _) = recv_buffers(
&receiver_output_1,
&mut segment_1,
main_buffers_before_gap as usize,
);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time.unwrap(), index * 20.mseconds());
assert_eq!(pts.unwrap(), index * 20.mseconds());
assert_eq!(duration.unwrap(), 20.mseconds());
}
let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(
running_time.unwrap(),
block_time + (index + main_buffers_before_gap) * 20.mseconds()
);
assert_eq!(
pts.unwrap(),
(index + main_buffers_before_gap) * 20.mseconds()
);
assert_eq!(duration.unwrap(), 20.mseconds());
}
assert_eq!(
buffers_1.len(),
(buffers_in_gap + buffers_after_gap) as usize
);
// Last buffer should be dropped from second stream
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let (buffers_2, _) = recv_buffers(
&receiver_output_2,
&mut segment_2,
secondary_buffers_before_gap as usize,
);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time.unwrap(), index * 20.mseconds());
assert_eq!(pts.unwrap(), index * 20.mseconds());
assert_eq!(duration.unwrap(), 20.mseconds());
}
let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(
running_time.unwrap(),
block_time + (index + secondary_buffers_before_gap) * 20.mseconds()
);
assert_eq!(
pts.unwrap(),
(index + secondary_buffers_before_gap) * 20.mseconds()
);
assert_eq!(duration.unwrap(), 20.mseconds());
}
assert_eq!(
buffers_2.len(),
(buffers_in_gap + buffers_after_gap - 1) as usize
);
thread_1.join().unwrap();
thread_2.join().unwrap();
pipeline.set_state(gst::State::Null).unwrap();
}
#[test]
fn test_two_stream_open_close_open_nonlivein_nonliveout() {
init();
let pipeline = gst::Pipeline::default();
let togglerecord = gst::ElementFactory::make("togglerecord")
.property("is-live", false)
.build()
.unwrap();
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, false);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(
&pipeline,
&togglerecord,
"src_%u",
gst::ClockTime::ZERO,
false,
);
let recv_timeout = Duration::from_secs(10);
pipeline.set_state(gst::State::Playing).unwrap();
togglerecord.set_property("record", true);
sender_input_1.send(SendData::Buffers(10)).unwrap();
sender_input_2.send(SendData::Buffers(10)).unwrap();
receiver_input_done_1.recv_timeout(recv_timeout).unwrap();
receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
// Sender 2 is waiting for sender 1 to continue, sender 1 is finished
sender_input_2.send(SendData::Buffers(1)).unwrap();
assert_eq!(
receiver_input_done_2.recv_timeout(Duration::from_millis(20)),
Err(mpsc::RecvTimeoutError::Timeout)
);
// Stop recording and push new buffers to sender 1, this will block
togglerecord.set_property("record", false);
sender_input_1.send(SendData::Buffers(10)).unwrap();
// Send another 9 buffers to sender 2, both are the same position now
sender_input_2.send(SendData::Buffers(9)).unwrap();
// Send another buffer to sender 2, this will block until record=true
sender_input_2.send(SendData::Buffers(1)).unwrap();
// Start recording again and send another set of buffers to both senders
togglerecord.set_property("record", true);
sender_input_1.send(SendData::Buffers(10)).unwrap();
sender_input_2.send(SendData::Buffers(10)).unwrap();
receiver_input_done_1.recv_timeout(recv_timeout).unwrap();
// The single buffer above for sender 1 should be handled now
receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
// Send EOS and wait for it to be handled
sender_input_1.send(SendData::Eos).unwrap();
sender_input_2.send(SendData::Eos).unwrap();
receiver_input_done_1.recv_timeout(recv_timeout).unwrap();
receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
receiver_input_done_2.recv_timeout(recv_timeout).unwrap();
let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time.unwrap(), index * 20.mseconds());
assert_eq!(pts.unwrap(), index * 20.mseconds());
assert_eq!(duration.unwrap(), 20.mseconds());
}
assert_eq!(buffers_1.len(), 30);
// Last buffer should be dropped from second stream
let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time.unwrap(), index * 20.mseconds());
assert_eq!(pts.unwrap(), index * 20.mseconds());
assert_eq!(duration.unwrap(), 20.mseconds());
}
assert_eq!(buffers_2.len(), 30);
thread_1.join().unwrap();
thread_2.join().unwrap();
pipeline.set_state(gst::State::Null).unwrap();
}
#[test]
fn test_two_stream_open_close_open() {
init();
@ -771,9 +1135,15 @@ fn test_two_stream_open_close_open() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
setup_sender_receiver(
&pipeline,
&togglerecord,
"src_%u",
gst::ClockTime::ZERO,
true,
);
pipeline.set_state(gst::State::Playing).unwrap();
@ -865,9 +1235,15 @@ fn test_two_stream_open_close_open_gaps() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
setup_sender_receiver(
&pipeline,
&togglerecord,
"src_%u",
gst::ClockTime::ZERO,
true,
);
pipeline.set_state(gst::State::Playing).unwrap();
@ -965,9 +1341,15 @@ fn test_two_stream_close_open_close_delta() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
setup_sender_receiver(
&pipeline,
&togglerecord,
"src_%u",
gst::ClockTime::ZERO,
true,
);
pipeline.set_state(gst::State::Playing).unwrap();
@ -1054,11 +1436,23 @@ fn test_three_stream_open_close_open() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
setup_sender_receiver(
&pipeline,
&togglerecord,
"src_%u",
gst::ClockTime::ZERO,
true,
);
let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
setup_sender_receiver(
&pipeline,
&togglerecord,
"src_%u",
gst::ClockTime::ZERO,
true,
);
pipeline.set_state(gst::State::Playing).unwrap();
@ -1178,9 +1572,15 @@ fn test_two_stream_main_eos() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
setup_sender_receiver(
&pipeline,
&togglerecord,
"src_%u",
gst::ClockTime::ZERO,
true,
);
pipeline.set_state(gst::State::Playing).unwrap();
@ -1253,9 +1653,15 @@ fn test_two_stream_secondary_eos_first() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
setup_sender_receiver(
&pipeline,
&togglerecord,
"src_%u",
gst::ClockTime::ZERO,
true,
);
pipeline.set_state(gst::State::Playing).unwrap();
@ -1321,11 +1727,23 @@ fn test_three_stream_main_eos() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
setup_sender_receiver(
&pipeline,
&togglerecord,
"src_%u",
gst::ClockTime::ZERO,
true,
);
let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
setup_sender_receiver(
&pipeline,
&togglerecord,
"src_%u",
gst::ClockTime::ZERO,
true,
);
pipeline.set_state(gst::State::Playing).unwrap();
@ -1422,11 +1840,23 @@ fn test_three_stream_main_and_second_eos() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
setup_sender_receiver(
&pipeline,
&togglerecord,
"src_%u",
gst::ClockTime::ZERO,
true,
);
let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
setup_sender_receiver(
&pipeline,
&togglerecord,
"src_%u",
gst::ClockTime::ZERO,
true,
);
pipeline.set_state(gst::State::Playing).unwrap();
@ -1523,11 +1953,23 @@ fn test_three_stream_secondary_eos_first() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
setup_sender_receiver(
&pipeline,
&togglerecord,
"src_%u",
gst::ClockTime::ZERO,
true,
);
let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
setup_sender_receiver(
&pipeline,
&togglerecord,
"src_%u",
gst::ClockTime::ZERO,
true,
);
pipeline.set_state(gst::State::Playing).unwrap();