From 063871a1ebbab48d1a13b209b39bd0ec9daa3f14 Mon Sep 17 00:00:00 2001 From: Vivia Nikolaidou Date: Thu, 27 Apr 2023 19:59:19 +0300 Subject: [PATCH] togglerecord: Add support for non-live inputs Live input + is-live=false: While not recording, drop input When recording is started, offset to collapse the gap Live input + is-live=true: While not recording, drop input Don't modify the offset Non-live input + is-live=false: While not recording, block input Don't modify the offset Non-live input + is-live=true: While not recording, block input When recording is started, offset to current running time Co-authored-by: Jan Alexander Steffens (heftig) Part-of: --- docs/plugins/gst_plugins_cache.json | 4 +- utils/togglerecord/Cargo.toml | 1 + utils/togglerecord/README.md | 30 ++ utils/togglerecord/src/togglerecord/imp.rs | 226 +++++++-- utils/togglerecord/tests/tests.rs | 518 +++++++++++++++++++-- 5 files changed, 704 insertions(+), 75 deletions(-) create mode 100644 utils/togglerecord/README.md diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 0e3cdc8c3..3f382c40e 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -8486,7 +8486,7 @@ "elements": { "togglerecord": { "author": "Sebastian Dröge ", - "description": "Valve that ensures multiple streams start/end at the same time", + "description": "Valve that ensures multiple streams start/end at the same time. If the input comes from a live stream, when not recording it will be dropped. If it comes from a non-live stream, when not recording it will be blocked.", "hierarchy": [ "GstToggleRecord", "GstElement", @@ -8520,7 +8520,7 @@ }, "properties": { "is-live": { - "blurb": "Live mode: no \"gap eating\", forward incoming segment", + "blurb": "Live output mode: no \"gap eating\", forward incoming segment for live input, create a gap to fill the paused duration for non-live input", "conditionally-available": false, "construct": false, "construct-only": false, diff --git a/utils/togglerecord/Cargo.toml b/utils/togglerecord/Cargo.toml index 1e5368c8e..36edeb2c1 100644 --- a/utils/togglerecord/Cargo.toml +++ b/utils/togglerecord/Cargo.toml @@ -20,6 +20,7 @@ once_cell = "1.0" [dev-dependencies] either = "1.0" +gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } [lib] name = "gsttogglerecord" diff --git a/utils/togglerecord/README.md b/utils/togglerecord/README.md new file mode 100644 index 000000000..b42e2a4ee --- /dev/null +++ b/utils/togglerecord/README.md @@ -0,0 +1,30 @@ +# togglerecord + +A multistream valve-like plugin that ensures multiple streams start/end at the +same time. + +It supports both live and non-live input and toggles recording via the +`record` property. Live inputs will be dropped when not recording, while +non-live inputs will be blocked. + +## Use cases + +The `is-live` property refers to whether the output of the element will be +live. So, based on whether the input is live and on whether the output +`is-live`, we have these four behaviours: + +- Live input + `is-live=false`: + - While not recording, drop input + - When recording is started, offset to collapse the gap + +- Live input + `is-live=true`: + - While not recording, drop input + - Don't modify the offset + +- Non-live input + `is-live=false`: + - While not recording, block input + - Don't modify the offset + +- Non-live input + `is-live=true`: + - While not recording, block input + - When recording is started, offset to current running time diff --git a/utils/togglerecord/src/togglerecord/imp.rs b/utils/togglerecord/src/togglerecord/imp.rs index d32ba3508..d66437229 100644 --- a/utils/togglerecord/src/togglerecord/imp.rs +++ b/utils/togglerecord/src/togglerecord/imp.rs @@ -6,12 +6,18 @@ // // SPDX-License-Identifier: MPL-2.0 +/** + * element-togglerecord: + * + * {{ utils/togglerecord/README.md[2:30] }} + * + */ use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; use once_cell::sync::Lazy; -use parking_lot::{Condvar, Mutex}; +use parking_lot::{Condvar, Mutex, MutexGuard}; use std::cmp; use std::collections::HashMap; use std::f64; @@ -72,6 +78,7 @@ struct StreamState { flushing: bool, segment_pending: bool, discont_pending: bool, + upstream_live: Option, pending_events: Vec, audio_info: Option, video_info: Option, @@ -89,6 +96,7 @@ impl Default for StreamState { flushing: false, segment_pending: false, discont_pending: true, + upstream_live: None, pending_events: Vec::new(), audio_info: None, video_info: None, @@ -104,7 +112,7 @@ impl Default for StreamState { // Recording: Passing through all data // Stopping: Main stream remembering current last_recording_stop, waiting for all // other streams to reach this position -// Stopped: Dropping all data +// Stopped: Dropping (live input) or blocking (non-live input) all data // Starting: Main stream waiting until next keyframe and setting last_recording_start, waiting // for all other streams to reach this position #[derive(Clone, Copy, Debug, PartialEq, Eq)] @@ -120,11 +128,21 @@ struct State { recording_state: RecordingState, last_recording_start: Option, last_recording_stop: Option, + // Accumulated duration of previous recording segments, // updated whenever going to Stopped recording_duration: gst::ClockTime, + + // Accumulated duration of blocked segments + blocked_duration: gst::ClockTime, + + // What time we started blocking + time_start_block: Option, + // Updated whenever going to Recording running_time_offset: i64, + + // Copied from settings live: bool, } @@ -135,6 +153,8 @@ impl Default for State { last_recording_start: None, last_recording_stop: None, recording_duration: gst::ClockTime::ZERO, + blocked_duration: gst::ClockTime::ZERO, + time_start_block: gst::ClockTime::NONE, running_time_offset: 0, live: false, } @@ -146,7 +166,6 @@ enum HandleResult { Pass(T), Drop, Eos(bool), - Flushing, } trait HandleData: Sized { @@ -329,11 +348,54 @@ static CAT: Lazy = Lazy::new(|| { }); impl ToggleRecord { + fn block_if_upstream_not_live( + &self, + pad: &gst::Pad, + mut settings: Settings, + state: &mut MutexGuard, + upstream_live: bool, + ) -> Result { + if !upstream_live { + while !settings.record && !state.flushing { + gst::debug!(CAT, obj: pad, "Waiting for record=true"); + self.main_stream_cond.wait(state); + settings = *self.settings.lock(); + } + if state.flushing { + gst::debug!(CAT, obj: pad, "Flushing"); + return Err(gst::FlowError::Flushing); + } + state.segment_pending = true; + state.discont_pending = true; + for other_stream in &self.other_streams.lock().0 { + let mut other_state = other_stream.state.lock(); + other_state.segment_pending = true; + other_state.discont_pending = true; + } + let mut rec_state = self.state.lock(); + if let Some(time_start_block) = rec_state.time_start_block { + let clock = self.obj().clock().expect("Cannot find pipeline clock"); + rec_state.blocked_duration += clock.time().unwrap() - time_start_block; + if settings.live { + rec_state.running_time_offset = rec_state.blocked_duration.nseconds() as i64; + } + rec_state.time_start_block = gst::ClockTime::NONE; + } + drop(rec_state); + gst::log!(CAT, obj: pad, "Done blocking main stream"); + Ok(true) + } else { + gst::log!(CAT, obj: pad, "Dropping buffer (stopped)"); + Ok(false) + } + } + fn handle_main_stream( &self, pad: &gst::Pad, stream: &Stream, data: T, + upstream_live: bool, ) -> Result, gst::FlowError> { let mut state = stream.state.lock(); @@ -395,10 +457,14 @@ impl ToggleRecord { let settings = *self.settings.lock(); - // First check if we have to update our recording state + // First check if we need to block for non-live input let mut rec_state = self.state.lock(); + + // Check if we have to update our recording state let settings_changed = match rec_state.recording_state { RecordingState::Recording if !settings.record => { + let clock = self.obj().clock().expect("Cannot find pipeline clock"); + rec_state.time_start_block = Some(clock.time().unwrap()); gst::debug!(CAT, obj: pad, "Stopping recording"); rec_state.recording_state = RecordingState::Stopping; true @@ -473,7 +539,7 @@ impl ToggleRecord { if state.flushing { gst::debug!(CAT, obj: pad, "Flushing"); - return Ok(HandleResult::Flushing); + return Err(gst::FlowError::Flushing); } let mut rec_state = self.state.lock(); @@ -493,17 +559,25 @@ impl ToggleRecord { // Then become Stopped and drop this buffer. We always stop right before // a keyframe - gst::log!(CAT, obj: pad, "Dropping buffer (stopped)"); - drop(rec_state); + + let ret = + self.block_if_upstream_not_live(pad, settings, &mut state, upstream_live)?; drop(state); self.obj().notify("recording"); - Ok(HandleResult::Drop) + if ret { + Ok(HandleResult::Pass(data)) + } else { + Ok(HandleResult::Drop) + } } RecordingState::Stopped => { - gst::log!(CAT, obj: pad, "Dropping buffer (stopped)"); - Ok(HandleResult::Drop) + if self.block_if_upstream_not_live(pad, settings, &mut state, upstream_live)? { + Ok(HandleResult::Pass(data)) + } else { + Ok(HandleResult::Drop) + } } RecordingState::Starting => { // If this is no keyframe, we can directly go out again here and drop the frame @@ -519,23 +593,35 @@ impl ToggleRecord { .push_event(gst_video::UpstreamForceKeyUnitEvent::builder().build()); } + if !upstream_live { + gst::log!( + CAT, + obj: pad, + "Always passing data when upstream is not live" + ); + return Ok(HandleResult::Pass(data)); + } return Ok(HandleResult::Drop); } // Remember the time when we started: now! rec_state.last_recording_start = current_running_time; - rec_state.running_time_offset = - current_running_time.map_or(0, |current_running_time| { - current_running_time - .saturating_sub(rec_state.recording_duration) - .nseconds() - }) as i64; + // We made sure a few lines above, but let's be sure again + if !settings.live || upstream_live { + rec_state.running_time_offset = + 0 - current_running_time.map_or(0, |current_running_time| { + current_running_time + .saturating_sub(rec_state.recording_duration) + .nseconds() + }) as i64 + }; gst::debug!( CAT, obj: pad, - "Starting at {}, previous accumulated recording duration {}", + "Starting at {}, previous accumulated recording duration {}, offset {}", current_running_time.display(), rec_state.recording_duration, + rec_state.running_time_offset, ); state.segment_pending = true; @@ -566,7 +652,7 @@ impl ToggleRecord { if state.flushing { gst::debug!(CAT, obj: pad, "Flushing"); - return Ok(HandleResult::Flushing); + return Err(gst::FlowError::Flushing); } let mut rec_state = self.state.lock(); @@ -596,6 +682,7 @@ impl ToggleRecord { pad: &gst::Pad, stream: &Stream, data: T, + upstream_live: bool, ) -> Result, gst::FlowError> { // Calculate end pts & current running time and make sure we stay in the segment let mut state = stream.state.lock(); @@ -722,7 +809,7 @@ impl ToggleRecord { if state.flushing { gst::debug!(CAT, obj: pad, "Flushing"); - return Ok(HandleResult::Flushing); + return Err(gst::FlowError::Flushing); } // If the main stream is EOS, we are also EOS unless we are @@ -889,6 +976,10 @@ impl ToggleRecord { } } + if !upstream_live { + return Ok(HandleResult::Pass(data)); + } + match rec_state.recording_state { RecordingState::Recording => { // The end of our buffer must be before/at the end of the previous buffer of the main @@ -1154,31 +1245,48 @@ impl ToggleRecord { gst::FlowError::Error })?; - gst::log!(CAT, obj: pad, "Handling buffer {:?}", buffer); + let upstream_live; { - let state = stream.state.lock(); + let mut state = stream.state.lock(); if state.eos { return Err(gst::FlowError::Eos); } if state.flushing { return Err(gst::FlowError::Flushing); } + match state.upstream_live { + None => { + // Not handling anything here, the pad's query function will catch it + let mut query = gst::query::Latency::new(); + let success = pad.peer_query(&mut query); + if success { + (upstream_live, _, _) = query.result(); + state.upstream_live = Some(upstream_live); + } else { + state.upstream_live = None; + upstream_live = false; + gst::warning!( + CAT, + obj: pad, + "Latency query failed, assuming non-live input, will retry" + ); + } + } + Some(is_live) => upstream_live = is_live, + } } let handle_result = if stream != self.main_stream { - self.handle_secondary_stream(pad, &stream, buffer) + self.handle_secondary_stream(pad, &stream, buffer, upstream_live) } else { - self.handle_main_stream(pad, &stream, buffer) + self.handle_main_stream(pad, &stream, buffer, upstream_live) }?; let mut buffer = match handle_result { HandleResult::Drop => { return Ok(gst::FlowSuccess::Ok); } - HandleResult::Flushing => { - return Err(gst::FlowError::Flushing); - } HandleResult::Eos(recording_state_updated) => { stream.srcpad.push_event( gst::event::Eos::builder() @@ -1224,10 +1332,13 @@ impl ToggleRecord { state.out_segment = state.in_segment.clone(); - if !rec_state.live { + // state.upstream_live should have a value from a few lines above + // segment offset is taken into account in case upstream is live and we are not + // (collapse gap) + if rec_state.live != upstream_live { state .out_segment - .offset_running_time(-rec_state.running_time_offset) + .offset_running_time(rec_state.running_time_offset) .expect("Adjusting record duration"); } events.push( @@ -1368,10 +1479,35 @@ impl ToggleRecord { EventView::Gap(e) => { gst::debug!(CAT, obj: pad, "Handling Gap event {:?}", event); let (pts, duration) = e.get(); + let upstream_live; + + { + let mut state = stream.state.lock(); + match state.upstream_live { + None => { + // Not handling anything here, the pad's query function will catch it + let mut query = gst::query::Latency::new(); + let success = pad.peer_query(&mut query); + if success { + (upstream_live, _, _) = query.result(); + state.upstream_live = Some(upstream_live); + } else { + state.upstream_live = None; + upstream_live = false; + gst::warning!( + CAT, + obj: pad, + "Latency query failed, assuming non-live input, will retry" + ); + } + } + Some(is_live) => upstream_live = is_live, + } + } let handle_result = if stream == self.main_stream { - self.handle_main_stream(pad, &stream, (pts, duration)) + self.handle_main_stream(pad, &stream, (pts, duration), upstream_live) } else { - self.handle_secondary_stream(pad, &stream, (pts, duration)) + self.handle_secondary_stream(pad, &stream, (pts, duration), upstream_live) }; forward = match handle_result { @@ -1508,7 +1644,19 @@ impl ToggleRecord { gst::log!(CAT, obj: pad, "Handling query {:?}", query); - stream.srcpad.peer_query(query) + let success = stream.srcpad.peer_query(query); + + if let gst::QueryView::Latency(latency) = query.view() { + let mut state = stream.state.lock(); + if success { + let (is_live, _, _) = latency.result(); + state.upstream_live = Some(is_live); + } else { + state.upstream_live = None; + } + } + + success } // FIXME `matches!` was introduced in rustc 1.42.0, current MSRV is 1.41.0 @@ -1537,7 +1685,7 @@ impl ToggleRecord { let offset = event.running_time_offset(); event .make_mut() - .set_running_time_offset(offset + rec_state.running_time_offset); + .set_running_time_offset(offset - rec_state.running_time_offset); drop(rec_state); if forward { @@ -1795,8 +1943,12 @@ impl ObjectImpl for ToggleRecord { .read_only() .build(), glib::ParamSpecBoolean::builder("is-live") - .nick("Live mode") - .blurb("Live mode: no \"gap eating\", forward incoming segment") + .nick("Live output mode") + .blurb( + "Live output mode: no \"gap eating\", \ + forward incoming segment for live input, \ + create a gap to fill the paused duration for non-live input", + ) .default_value(DEFAULT_LIVE) .mutable_ready() .build(), @@ -1820,6 +1972,7 @@ impl ObjectImpl for ToggleRecord { ); settings.record = record; + self.main_stream_cond.notify_all(); } "is-live" => { let mut settings = self.settings.lock(); @@ -1873,7 +2026,9 @@ impl ElementImpl for ToggleRecord { gst::subclass::ElementMetadata::new( "Toggle Record", "Generic", - "Valve that ensures multiple streams start/end at the same time", + "Valve that ensures multiple streams start/end at the same time. \ + If the input comes from a live stream, when not recording it will be dropped. \ + If it comes from a non-live stream, when not recording it will be blocked.", "Sebastian Dröge ", ) }); @@ -1948,6 +2103,7 @@ impl ElementImpl for ToggleRecord { let mut rec_state = self.state.lock(); *rec_state = State::default(); + let settings = *self.settings.lock(); rec_state.live = settings.live; } diff --git a/utils/togglerecord/tests/tests.rs b/utils/togglerecord/tests/tests.rs index 6836d9c6f..988a9b142 100644 --- a/utils/togglerecord/tests/tests.rs +++ b/utils/togglerecord/tests/tests.rs @@ -11,7 +11,7 @@ use gst::prelude::*; use either::*; use std::sync::{mpsc, Mutex}; -use std::thread; +use std::{thread, time::Duration}; fn init() { use std::sync::Once; @@ -37,6 +37,7 @@ fn setup_sender_receiver( togglerecord: &gst::Element, pad: &str, offset: gst::ClockTime, + live: bool, ) -> ( mpsc::Sender, mpsc::Receiver<()>, @@ -62,6 +63,25 @@ fn setup_sender_receiver( (srcpad, sinkpad) }; + sinkpad.add_probe( + gst::PadProbeType::QUERY_UPSTREAM, + move |_pad, probe_info| { + let query = match &mut probe_info.data { + Some(gst::PadProbeData::Query(q)) => q, + _ => unreachable!(), + }; + + use gst::QueryViewMut::*; + match query.view_mut() { + Latency(q) => { + q.set(live, gst::ClockTime::ZERO, None); + gst::PadProbeReturn::Handled + } + _ => gst::PadProbeReturn::Ok, + } + }, + ); + let fakesink_sinkpad = fakesink.static_pad("sink").unwrap(); srcpad.link(&fakesink_sinkpad).unwrap(); @@ -271,6 +291,76 @@ fn test_create_pads() { assert!(srcpad.parent().is_none()); } +#[test] +fn test_one_stream_open_nonlivein_nonliveout() { + init(); + + let pipeline = gst::Pipeline::default(); + let togglerecord = gst::ElementFactory::make("togglerecord") + .property("is-live", false) + .build() + .unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input, _, receiver_output, thread) = + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, false); + + pipeline.set_state(gst::State::Playing).unwrap(); + + togglerecord.set_property("record", true); + sender_input.send(SendData::Buffers(10)).unwrap(); + drop(sender_input); + + let mut segment = gst::FormattedSegment::::new(); + let (buffers, _) = recv_buffers(&receiver_output, &mut segment, 0); + assert_eq!(buffers.len(), 10); + for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time.unwrap(), index * 20.mseconds()); + assert_eq!(pts.unwrap(), index * 20.mseconds()); + assert_eq!(duration.unwrap(), 20.mseconds()); + } + + thread.join().unwrap(); + + pipeline.set_state(gst::State::Null).unwrap(); +} + +#[test] +fn test_one_stream_open_nonlivein_liveout() { + init(); + + let pipeline = gst::Pipeline::default(); + let togglerecord = gst::ElementFactory::make("togglerecord") + .property("is-live", true) + .build() + .unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input, _, receiver_output, thread) = + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, false); + + pipeline.set_state(gst::State::Playing).unwrap(); + + togglerecord.set_property("record", true); + sender_input.send(SendData::Buffers(10)).unwrap(); + drop(sender_input); + + let mut segment = gst::FormattedSegment::::new(); + let (buffers, _) = recv_buffers(&receiver_output, &mut segment, 0); + assert_eq!(buffers.len(), 10); + for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time.unwrap(), index * 20.mseconds()); + assert_eq!(pts.unwrap(), index * 20.mseconds()); + assert_eq!(duration.unwrap(), 20.mseconds()); + } + + thread.join().unwrap(); + + pipeline.set_state(gst::State::Null).unwrap(); +} + #[test] fn test_one_stream_open() { init(); @@ -280,7 +370,7 @@ fn test_one_stream_open() { pipeline.add(&togglerecord).unwrap(); let (sender_input, _, receiver_output, thread) = - setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true); pipeline.set_state(gst::State::Playing).unwrap(); @@ -312,7 +402,7 @@ fn test_one_stream_gaps_open() { pipeline.add(&togglerecord).unwrap(); let (sender_input, _, receiver_output, thread) = - setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true); pipeline.set_state(gst::State::Playing).unwrap(); @@ -345,7 +435,7 @@ fn test_one_stream_close_open() { pipeline.add(&togglerecord).unwrap(); let (sender_input, receiver_input_done, receiver_output, thread) = - setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true); pipeline.set_state(gst::State::Playing).unwrap(); @@ -379,7 +469,7 @@ fn test_one_stream_open_close() { pipeline.add(&togglerecord).unwrap(); let (sender_input, receiver_input_done, receiver_output, thread) = - setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true); pipeline.set_state(gst::State::Playing).unwrap(); @@ -414,7 +504,7 @@ fn test_one_stream_open_close_open() { pipeline.add(&togglerecord).unwrap(); let (sender_input, receiver_input_done, receiver_output, thread) = - setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true); pipeline.set_state(gst::State::Playing).unwrap(); @@ -458,9 +548,15 @@ fn test_two_stream_open() { pipeline.add(&togglerecord).unwrap(); let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = - setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true); let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); + setup_sender_receiver( + &pipeline, + &togglerecord, + "src_%u", + gst::ClockTime::ZERO, + true, + ); pipeline.set_state(gst::State::Playing).unwrap(); @@ -511,9 +607,9 @@ fn test_two_stream_open_shift() { pipeline.add(&togglerecord).unwrap(); let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = - setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true); let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 5.mseconds()); + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 5.mseconds(), true); pipeline.set_state(gst::State::Playing).unwrap(); @@ -568,9 +664,15 @@ fn test_two_stream_open_shift_main() { pipeline.add(&togglerecord).unwrap(); let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = - setup_sender_receiver(&pipeline, &togglerecord, "src", 5.mseconds()); + setup_sender_receiver(&pipeline, &togglerecord, "src", 5.mseconds(), true); let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); + setup_sender_receiver( + &pipeline, + &togglerecord, + "src_%u", + gst::ClockTime::ZERO, + true, + ); pipeline.set_state(gst::State::Playing).unwrap(); @@ -633,9 +735,15 @@ fn test_two_stream_open_close() { pipeline.add(&togglerecord).unwrap(); let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = - setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true); let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); + setup_sender_receiver( + &pipeline, + &togglerecord, + "src_%u", + gst::ClockTime::ZERO, + true, + ); pipeline.set_state(gst::State::Playing).unwrap(); @@ -702,9 +810,15 @@ fn test_two_stream_close_open() { pipeline.add(&togglerecord).unwrap(); let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = - setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true); let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); + setup_sender_receiver( + &pipeline, + &togglerecord, + "src_%u", + gst::ClockTime::ZERO, + true, + ); pipeline.set_state(gst::State::Playing).unwrap(); @@ -762,6 +876,256 @@ fn test_two_stream_close_open() { pipeline.set_state(gst::State::Null).unwrap(); } +#[test] +fn test_two_stream_open_close_open_nonlivein_liveout() { + init(); + + let testclock = gst_check::TestClock::new(); + let pipeline = gst::Pipeline::default(); + pipeline.use_clock(Some(&testclock)); + let togglerecord = gst::ElementFactory::make("togglerecord") + .property("is-live", true) + .build() + .unwrap(); + togglerecord.set_clock(Some(&testclock)).unwrap(); + pipeline.add(&togglerecord).unwrap(); + let testclock = testclock.downcast::().unwrap(); + testclock.set_time(gst::ClockTime::ZERO); + + let main_buffers_before_gap = 10u64; + let secondary_buffers_before_gap = main_buffers_before_gap + 1; + let buffers_in_gap = 10u64; + let buffers_after_gap = 10u64; + let recv_timeout = Duration::from_secs(10); + + let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, false); + let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = + setup_sender_receiver( + &pipeline, + &togglerecord, + "src_%u", + gst::ClockTime::ZERO, + false, + ); + + pipeline.set_state(gst::State::Playing).unwrap(); + + togglerecord.set_property("record", true); + + sender_input_1 + .send(SendData::Buffers(main_buffers_before_gap as usize)) + .unwrap(); + sender_input_2 + .send(SendData::Buffers( + (secondary_buffers_before_gap - 1) as usize, + )) + .unwrap(); + + // Sender 2 is waiting for sender 1 to continue, sender 1 is finished + receiver_input_done_1.recv_timeout(recv_timeout).unwrap(); + receiver_input_done_2.recv_timeout(recv_timeout).unwrap(); + sender_input_2.send(SendData::Buffers(1)).unwrap(); + assert_eq!( + receiver_input_done_2.recv_timeout(Duration::from_millis(20)), + Err(mpsc::RecvTimeoutError::Timeout) + ); + + // Stop recording and push new buffers to sender 1, this will block + togglerecord.set_property("record", false); + sender_input_1 + .send(SendData::Buffers(buffers_in_gap as usize)) + .unwrap(); + + // Send another 10 buffers to sender 2, both are the same position at 9 buffers, the next one + // will block until record=true + sender_input_2 + .send(SendData::Buffers(buffers_in_gap as usize)) + .unwrap(); + + receiver_input_done_2.recv_timeout(recv_timeout).unwrap(); + // Advance the clock + let block_time = gst::ClockTime::from_mseconds(42); + testclock.advance_time(block_time.nseconds() as i64); + + // Start recording again and send another set of buffers to both senders + togglerecord.set_property("record", true); + sender_input_1 + .send(SendData::Buffers(buffers_after_gap as usize)) + .unwrap(); + sender_input_2 + .send(SendData::Buffers(buffers_after_gap as usize)) + .unwrap(); + receiver_input_done_1.recv_timeout(recv_timeout).unwrap(); + // The single buffer above for sender 1 should be handled now + receiver_input_done_2.recv_timeout(recv_timeout).unwrap(); + + // Send EOS and wait for it to be handled + sender_input_1.send(SendData::Eos).unwrap(); + sender_input_2.send(SendData::Eos).unwrap(); + receiver_input_done_1.recv_timeout(recv_timeout).unwrap(); + receiver_input_done_2.recv_timeout(recv_timeout).unwrap(); + receiver_input_done_2.recv_timeout(recv_timeout).unwrap(); + + let mut segment_1 = gst::FormattedSegment::::new(); + let (buffers_1, _) = recv_buffers( + &receiver_output_1, + &mut segment_1, + main_buffers_before_gap as usize, + ); + for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time.unwrap(), index * 20.mseconds()); + assert_eq!(pts.unwrap(), index * 20.mseconds()); + assert_eq!(duration.unwrap(), 20.mseconds()); + } + let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0); + for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { + let index = index as u64; + assert_eq!( + running_time.unwrap(), + block_time + (index + main_buffers_before_gap) * 20.mseconds() + ); + assert_eq!( + pts.unwrap(), + (index + main_buffers_before_gap) * 20.mseconds() + ); + assert_eq!(duration.unwrap(), 20.mseconds()); + } + assert_eq!( + buffers_1.len(), + (buffers_in_gap + buffers_after_gap) as usize + ); + + // Last buffer should be dropped from second stream + let mut segment_2 = gst::FormattedSegment::::new(); + let (buffers_2, _) = recv_buffers( + &receiver_output_2, + &mut segment_2, + secondary_buffers_before_gap as usize, + ); + for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time.unwrap(), index * 20.mseconds()); + assert_eq!(pts.unwrap(), index * 20.mseconds()); + assert_eq!(duration.unwrap(), 20.mseconds()); + } + let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0); + for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { + let index = index as u64; + assert_eq!( + running_time.unwrap(), + block_time + (index + secondary_buffers_before_gap) * 20.mseconds() + ); + assert_eq!( + pts.unwrap(), + (index + secondary_buffers_before_gap) * 20.mseconds() + ); + assert_eq!(duration.unwrap(), 20.mseconds()); + } + assert_eq!( + buffers_2.len(), + (buffers_in_gap + buffers_after_gap - 1) as usize + ); + + thread_1.join().unwrap(); + thread_2.join().unwrap(); + + pipeline.set_state(gst::State::Null).unwrap(); +} +#[test] +fn test_two_stream_open_close_open_nonlivein_nonliveout() { + init(); + + let pipeline = gst::Pipeline::default(); + let togglerecord = gst::ElementFactory::make("togglerecord") + .property("is-live", false) + .build() + .unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, false); + let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = + setup_sender_receiver( + &pipeline, + &togglerecord, + "src_%u", + gst::ClockTime::ZERO, + false, + ); + + let recv_timeout = Duration::from_secs(10); + + pipeline.set_state(gst::State::Playing).unwrap(); + + togglerecord.set_property("record", true); + + sender_input_1.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(10)).unwrap(); + + receiver_input_done_1.recv_timeout(recv_timeout).unwrap(); + receiver_input_done_2.recv_timeout(recv_timeout).unwrap(); + + // Sender 2 is waiting for sender 1 to continue, sender 1 is finished + sender_input_2.send(SendData::Buffers(1)).unwrap(); + assert_eq!( + receiver_input_done_2.recv_timeout(Duration::from_millis(20)), + Err(mpsc::RecvTimeoutError::Timeout) + ); + + // Stop recording and push new buffers to sender 1, this will block + togglerecord.set_property("record", false); + sender_input_1.send(SendData::Buffers(10)).unwrap(); + + // Send another 9 buffers to sender 2, both are the same position now + sender_input_2.send(SendData::Buffers(9)).unwrap(); + + // Send another buffer to sender 2, this will block until record=true + sender_input_2.send(SendData::Buffers(1)).unwrap(); + + // Start recording again and send another set of buffers to both senders + togglerecord.set_property("record", true); + sender_input_1.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(10)).unwrap(); + receiver_input_done_1.recv_timeout(recv_timeout).unwrap(); + // The single buffer above for sender 1 should be handled now + receiver_input_done_2.recv_timeout(recv_timeout).unwrap(); + + // Send EOS and wait for it to be handled + sender_input_1.send(SendData::Eos).unwrap(); + sender_input_2.send(SendData::Eos).unwrap(); + receiver_input_done_1.recv_timeout(recv_timeout).unwrap(); + receiver_input_done_2.recv_timeout(recv_timeout).unwrap(); + receiver_input_done_2.recv_timeout(recv_timeout).unwrap(); + + let mut segment_1 = gst::FormattedSegment::::new(); + let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0); + for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time.unwrap(), index * 20.mseconds()); + assert_eq!(pts.unwrap(), index * 20.mseconds()); + assert_eq!(duration.unwrap(), 20.mseconds()); + } + assert_eq!(buffers_1.len(), 30); + + // Last buffer should be dropped from second stream + let mut segment_2 = gst::FormattedSegment::::new(); + let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0); + for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time.unwrap(), index * 20.mseconds()); + assert_eq!(pts.unwrap(), index * 20.mseconds()); + assert_eq!(duration.unwrap(), 20.mseconds()); + } + assert_eq!(buffers_2.len(), 30); + + thread_1.join().unwrap(); + thread_2.join().unwrap(); + + pipeline.set_state(gst::State::Null).unwrap(); +} + #[test] fn test_two_stream_open_close_open() { init(); @@ -771,9 +1135,15 @@ fn test_two_stream_open_close_open() { pipeline.add(&togglerecord).unwrap(); let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = - setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true); let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); + setup_sender_receiver( + &pipeline, + &togglerecord, + "src_%u", + gst::ClockTime::ZERO, + true, + ); pipeline.set_state(gst::State::Playing).unwrap(); @@ -865,9 +1235,15 @@ fn test_two_stream_open_close_open_gaps() { pipeline.add(&togglerecord).unwrap(); let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = - setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true); let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); + setup_sender_receiver( + &pipeline, + &togglerecord, + "src_%u", + gst::ClockTime::ZERO, + true, + ); pipeline.set_state(gst::State::Playing).unwrap(); @@ -965,9 +1341,15 @@ fn test_two_stream_close_open_close_delta() { pipeline.add(&togglerecord).unwrap(); let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = - setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true); let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); + setup_sender_receiver( + &pipeline, + &togglerecord, + "src_%u", + gst::ClockTime::ZERO, + true, + ); pipeline.set_state(gst::State::Playing).unwrap(); @@ -1054,11 +1436,23 @@ fn test_three_stream_open_close_open() { pipeline.add(&togglerecord).unwrap(); let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = - setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true); let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); + setup_sender_receiver( + &pipeline, + &togglerecord, + "src_%u", + gst::ClockTime::ZERO, + true, + ); let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); + setup_sender_receiver( + &pipeline, + &togglerecord, + "src_%u", + gst::ClockTime::ZERO, + true, + ); pipeline.set_state(gst::State::Playing).unwrap(); @@ -1178,9 +1572,15 @@ fn test_two_stream_main_eos() { pipeline.add(&togglerecord).unwrap(); let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = - setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true); let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); + setup_sender_receiver( + &pipeline, + &togglerecord, + "src_%u", + gst::ClockTime::ZERO, + true, + ); pipeline.set_state(gst::State::Playing).unwrap(); @@ -1253,9 +1653,15 @@ fn test_two_stream_secondary_eos_first() { pipeline.add(&togglerecord).unwrap(); let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = - setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true); let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); + setup_sender_receiver( + &pipeline, + &togglerecord, + "src_%u", + gst::ClockTime::ZERO, + true, + ); pipeline.set_state(gst::State::Playing).unwrap(); @@ -1321,11 +1727,23 @@ fn test_three_stream_main_eos() { pipeline.add(&togglerecord).unwrap(); let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = - setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true); let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); + setup_sender_receiver( + &pipeline, + &togglerecord, + "src_%u", + gst::ClockTime::ZERO, + true, + ); let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); + setup_sender_receiver( + &pipeline, + &togglerecord, + "src_%u", + gst::ClockTime::ZERO, + true, + ); pipeline.set_state(gst::State::Playing).unwrap(); @@ -1422,11 +1840,23 @@ fn test_three_stream_main_and_second_eos() { pipeline.add(&togglerecord).unwrap(); let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = - setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true); let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); + setup_sender_receiver( + &pipeline, + &togglerecord, + "src_%u", + gst::ClockTime::ZERO, + true, + ); let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); + setup_sender_receiver( + &pipeline, + &togglerecord, + "src_%u", + gst::ClockTime::ZERO, + true, + ); pipeline.set_state(gst::State::Playing).unwrap(); @@ -1523,11 +1953,23 @@ fn test_three_stream_secondary_eos_first() { pipeline.add(&togglerecord).unwrap(); let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = - setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO, true); let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); + setup_sender_receiver( + &pipeline, + &togglerecord, + "src_%u", + gst::ClockTime::ZERO, + true, + ); let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); + setup_sender_receiver( + &pipeline, + &togglerecord, + "src_%u", + gst::ClockTime::ZERO, + true, + ); pipeline.set_state(gst::State::Playing).unwrap();