From f7fd1e3f997a71e363257fafa33570853e76042e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Thu, 11 Jul 2019 00:45:02 +0300 Subject: [PATCH] togglerecord: Clip raw audio/video buffers to the segment/recording boundaries And extend tests to actually check for this to work. --- gst-plugin-togglerecord/src/togglerecord.rs | 366 ++++++++++++++++---- gst-plugin-togglerecord/tests/tests.rs | 154 +++++--- 2 files changed, 414 insertions(+), 106 deletions(-) diff --git a/gst-plugin-togglerecord/src/togglerecord.rs b/gst-plugin-togglerecord/src/togglerecord.rs index f7e89784..9f7b4dc2 100644 --- a/gst-plugin-togglerecord/src/togglerecord.rs +++ b/gst-plugin-togglerecord/src/togglerecord.rs @@ -187,6 +187,12 @@ trait HandleData: Sized { } fn get_duration(&self, state: &StreamState) -> gst::ClockTime; fn is_keyframe(&self) -> bool; + fn can_clip(&self, state: &StreamState) -> bool; + fn clip( + self, + state: &StreamState, + segment: &gst::FormattedSegment, + ) -> Option; } impl HandleData for (gst::ClockTime, gst::ClockTime) { @@ -205,6 +211,26 @@ impl HandleData for (gst::ClockTime, gst::ClockTime) { fn is_keyframe(&self) -> bool { true } + + fn can_clip(&self, _state: &StreamState) -> bool { + true + } + + fn clip( + self, + _state: &StreamState, + segment: &gst::FormattedSegment, + ) -> Option { + let stop = if self.1.is_some() { + self.0 + self.1 + } else { + self.0 + }; + + segment + .clip(self.0, stop) + .map(|(start, stop)| (start, stop - start)) + } } impl HandleData for gst::Buffer { @@ -239,11 +265,9 @@ impl HandleData for gst::Buffer { let size = self.get_size() as u64; let num_samples = size / audio_info.bpf() as u64; - let duration = gst::SECOND + gst::SECOND .mul_div_floor(num_samples, audio_info.rate() as u64) - .unwrap_or(gst::CLOCK_TIME_NONE); - - duration + .unwrap_or(gst::CLOCK_TIME_NONE) } else { gst::CLOCK_TIME_NONE } @@ -252,6 +276,71 @@ impl HandleData for gst::Buffer { fn is_keyframe(&self) -> bool { !gst::BufferRef::get_flags(self).contains(gst::BufferFlags::DELTA_UNIT) } + + fn can_clip(&self, state: &StreamState) -> bool { + // Only do actual clipping for raw audio/video + if let Some(ref audio_info) = state.audio_info { + if audio_info.format() == gst_audio::AudioFormat::Unknown + || audio_info.format() == gst_audio::AudioFormat::Encoded + || audio_info.rate() == 0 + || audio_info.bpf() == 0 + { + return false; + } + } else if let Some(ref video_info) = state.video_info { + if video_info.format() == gst_video::VideoFormat::Unknown + || video_info.format() == gst_video::VideoFormat::Encoded + || self.get_dts_or_pts() != self.get_pts() + { + return false; + } + } else { + return false; + } + + true + } + + fn clip( + mut self, + state: &StreamState, + segment: &gst::FormattedSegment, + ) -> Option { + // Only do actual clipping for raw audio/video + if !self.can_clip(state) { + return Some(self); + } + + let pts = HandleData::get_pts(&self); + let duration = HandleData::get_duration(&self, state); + let stop = if duration.is_some() { + pts + duration + } else { + pts + }; + + if let Some(ref audio_info) = state.audio_info { + gst_audio::audio_buffer_clip( + self, + segment.upcast_ref(), + audio_info.rate(), + audio_info.bpf(), + ) + } else if let Some(_) = state.video_info { + segment.clip(pts, stop).map(move |(start, stop)| { + { + let buffer = self.make_mut(); + buffer.set_pts(start); + buffer.set_dts(start); + buffer.set_duration(stop - start); + } + + self + }) + } else { + unreachable!(); + } + } } struct ToggleRecord { @@ -348,6 +437,15 @@ impl ToggleRecord { dts_or_pts }; + let data = match data.clip(&state, &state.in_segment) { + None => { + gst_log!(self.cat, obj: pad, "Dropping raw data outside segment"); + return Ok(HandleResult::Drop); + } + Some(data) => data, + }; + + // This will only do anything for non-raw data dts_or_pts = cmp::max(state.in_segment.get_start(), dts_or_pts); dts_or_pts_end = cmp::max(state.in_segment.get_start(), dts_or_pts_end); if state.in_segment.get_stop().is_some() { @@ -355,9 +453,9 @@ impl ToggleRecord { dts_or_pts_end = cmp::min(state.in_segment.get_stop(), dts_or_pts_end); } - let mut current_running_time = state.in_segment.to_running_time(dts_or_pts); - current_running_time = cmp::max(current_running_time, state.current_running_time); - state.current_running_time = current_running_time; + let current_running_time = state.in_segment.to_running_time(dts_or_pts); + let current_running_time_end = state.in_segment.to_running_time(dts_or_pts_end); + state.current_running_time = cmp::max(current_running_time_end, state.current_running_time); // Wake up everybody, we advanced a bit // Important: They will only be able to advance once we're done with this @@ -365,8 +463,6 @@ impl ToggleRecord { // get the wrong state self.main_stream_cond.notify_all(); - let current_running_time_end = state.in_segment.to_running_time(dts_or_pts_end); - gst_log!( self.cat, obj: pad, @@ -420,19 +516,18 @@ impl ToggleRecord { return Ok(HandleResult::Pass(data)); } - // Remember the time when we stopped: now! + // Remember the time when we stopped: now, i.e. right before the current buffer! rec_state.last_recording_stop = current_running_time; gst_debug!(self.cat, obj: pad, "Stopping at {}", current_running_time); - // Then unlock and wait for all other streams to reach - // it or go EOS instead. + // Then unlock and wait for all other streams to reach it or go EOS instead. drop(rec_state); while !self.other_streams.lock().0.iter().all(|s| { let s = s.state.lock(); s.eos || (s.current_running_time.is_some() - && s.current_running_time >= current_running_time) + && s.current_running_time >= current_running_time_end) }) { gst_log!(self.cat, obj: pad, "Waiting for other streams to stop"); self.main_stream_cond.wait(&mut state); @@ -511,7 +606,7 @@ impl ToggleRecord { let s = s.state.lock(); s.eos || (s.current_running_time.is_some() - && s.current_running_time >= current_running_time) + && s.current_running_time >= current_running_time_end) }) { gst_log!(self.cat, obj: pad, "Waiting for other streams to start"); self.main_stream_cond.wait(&mut state); @@ -586,32 +681,26 @@ impl ToggleRecord { pts }; - pts = cmp::max(state.in_segment.get_start(), pts); - if state.in_segment.get_stop().is_some() && pts >= state.in_segment.get_stop() { - state.current_running_time = state - .in_segment - .to_running_time(state.in_segment.get_stop()); - state.eos = true; - gst_debug!( - self.cat, - obj: pad, - "After segment end {} >= {}, EOS", - pts, - state.in_segment.get_stop() - ); + let data = match data.clip(&state, &state.in_segment) { + None => { + gst_log!(self.cat, obj: pad, "Dropping raw data outside segment"); + return Ok(HandleResult::Drop); + } + Some(data) => data, + }; - return Ok(HandleResult::Eos); - } + // This will only do anything for non-raw data + pts = cmp::max(state.in_segment.get_start(), pts); pts_end = cmp::max(state.in_segment.get_start(), pts_end); if state.in_segment.get_stop().is_some() { + pts = cmp::min(state.in_segment.get_stop(), pts); pts_end = cmp::min(state.in_segment.get_stop(), pts_end); } - let mut current_running_time = state.in_segment.to_running_time(pts); - current_running_time = cmp::max(current_running_time, state.current_running_time); - state.current_running_time = current_running_time; - + let current_running_time = state.in_segment.to_running_time(pts); let current_running_time_end = state.in_segment.to_running_time(pts_end); + state.current_running_time = cmp::max(current_running_time_end, state.current_running_time); + gst_log!( self.cat, obj: pad, @@ -631,7 +720,7 @@ impl ToggleRecord { self.main_stream_cond.notify_all(); while (main_state.current_running_time == gst::CLOCK_TIME_NONE - || main_state.current_running_time < current_running_time) + || main_state.current_running_time < current_running_time_end) && !main_state.eos && !stream.state.lock().flushing { @@ -645,7 +734,10 @@ impl ToggleRecord { self.main_stream_cond.wait(&mut main_state); } - if stream.state.lock().flushing { + + state = stream.state.lock(); + + if state.flushing { gst_debug!(self.cat, obj: pad, "Flushing"); return Ok(HandleResult::Flushing); } @@ -655,36 +747,126 @@ impl ToggleRecord { // If the main stream is EOS, we are also EOS unless we are // before the final last recording stop running time if main_state.eos { - // If we have no start or stop position (we never recorded), or are after the current - // stop position that we're EOS now - // If we're before the start position (we were starting before EOS), - // drop the buffer - if rec_state.last_recording_stop.is_none() - || rec_state.last_recording_start.is_none() - || current_running_time_end > rec_state.last_recording_stop - { + // If we have no start or stop position (we never recorded) then we're EOS too now + if rec_state.last_recording_stop.is_none() || rec_state.last_recording_start.is_none() { gst_debug!( self.cat, obj: pad, - "Main stream EOS and we're EOS ({} > {})", - current_running_time_end, - rec_state.last_recording_stop + "Main stream EOS and recording never started", ); return Ok(HandleResult::Eos); + } else if data.can_clip(&*state) + && current_running_time < rec_state.last_recording_start + && current_running_time_end > rec_state.last_recording_start + { + // Otherwise if we're before the recording start but the end of the buffer is after + // the start and we can clip, clip the buffer and pass it onwards. + gst_debug!( + self.cat, + obj: pad, + "Main stream EOS and we're not EOS yet (overlapping recording start, {} < {} < {})", + current_running_time, + rec_state.last_recording_start, + current_running_time_end + ); + + let mut clip_start = state + .in_segment + .position_from_running_time(rec_state.last_recording_start); + if clip_start.is_none() { + clip_start = state.in_segment.get_start(); + } + let mut clip_stop = state + .in_segment + .position_from_running_time(rec_state.last_recording_stop); + if clip_stop.is_none() { + clip_stop = state.in_segment.get_stop(); + } + let mut segment = state.in_segment.clone(); + segment.set_start(clip_start); + segment.set_stop(clip_stop); + + gst_log!(self.cat, obj: pad, "Clipping to segment {:?}", segment,); + + if let Some(data) = data.clip(&*state, &segment) { + return Ok(HandleResult::Pass(data)); + } else { + gst_warning!(self.cat, obj: pad, "Complete buffer clipped!"); + return Ok(HandleResult::Drop); + } } else if current_running_time < rec_state.last_recording_start { + // Otherwise if the buffer starts before the recording start, drop it. This + // means that we either can't clip, or that the end is also before the + // recording start gst_debug!( self.cat, obj: pad, - "Main stream EOS and we're not EOS yet (before recording start, {} <= {})", + "Main stream EOS and we're not EOS yet (before recording start, {} < {})", current_running_time, rec_state.last_recording_start ); return Ok(HandleResult::Drop); - } else { + } else if data.can_clip(&*state) + && current_running_time < rec_state.last_recording_stop + && current_running_time_end > rec_state.last_recording_stop + { + // Similarly if the end is after the recording stop but the start is before and we + // can clip, clip the buffer and pass it through. + gst_debug!( + self.cat, + obj: pad, + "Main stream EOS and we're not EOS yet (overlapping recording end, {} < {} < {})", + current_running_time, + rec_state.last_recording_stop, + current_running_time_end + ); + + let mut clip_start = state + .in_segment + .position_from_running_time(rec_state.last_recording_start); + if clip_start.is_none() { + clip_start = state.in_segment.get_start(); + } + let mut clip_stop = state + .in_segment + .position_from_running_time(rec_state.last_recording_stop); + if clip_stop.is_none() { + clip_stop = state.in_segment.get_stop(); + } + let mut segment = state.in_segment.clone(); + segment.set_start(clip_start); + segment.set_stop(clip_stop); + + gst_log!(self.cat, obj: pad, "Clipping to segment {:?}", segment,); + + if let Some(data) = data.clip(&*state, &segment) { + return Ok(HandleResult::Pass(data)); + } else { + gst_warning!(self.cat, obj: pad, "Complete buffer clipped!"); + return Ok(HandleResult::Eos); + } + } else if current_running_time_end > rec_state.last_recording_stop { + // Otherwise if the end of the buffer is after the recording stop, we're EOS + // now. This means that we either couldn't clip or that the start is also after + // the recording stop gst_debug!( self.cat, obj: pad, - "Main stream EOS and we're not EOS yet (before recording end, {} <= {} < {})", + "Main stream EOS and we're EOS too (after recording end, {} > {})", + current_running_time_end, + rec_state.last_recording_stop + ); + return Ok(HandleResult::Eos); + } else { + // In all other cases the buffer is fully between recording start and end and + // can be passed through as is + assert!(current_running_time >= rec_state.last_recording_start); + assert!(current_running_time_end <= rec_state.last_recording_stop); + + gst_debug!( + self.cat, + obj: pad, + "Main stream EOS and we're not EOS yet (before recording end, {} <= {} <= {})", rec_state.last_recording_start, current_running_time, rec_state.last_recording_stop @@ -693,7 +875,8 @@ impl ToggleRecord { } } - assert!(main_state.current_running_time >= current_running_time); + // The end of our buffer is before the end of the previous buffer of the main stream + assert!(main_state.current_running_time >= current_running_time_end); match rec_state.recording_state { RecordingState::Recording => { @@ -722,6 +905,36 @@ impl ToggleRecord { rec_state.last_recording_stop ); Ok(HandleResult::Pass(data)) + } else if data.can_clip(&*state) + && current_running_time < rec_state.last_recording_stop + && current_running_time_end > rec_state.last_recording_stop + { + gst_log!( + self.cat, + obj: pad, + "Passing buffer (stopping: {} < {} < {})", + current_running_time, + rec_state.last_recording_stop, + current_running_time_end, + ); + + let mut clip_stop = state + .in_segment + .position_from_running_time(rec_state.last_recording_stop); + if clip_stop.is_none() { + clip_stop = state.in_segment.get_stop(); + } + let mut segment = state.in_segment.clone(); + segment.set_stop(clip_stop); + + gst_log!(self.cat, obj: pad, "Clipping to segment {:?}", segment,); + + if let Some(data) = data.clip(&*state, &segment) { + Ok(HandleResult::Pass(data)) + } else { + gst_warning!(self.cat, obj: pad, "Complete buffer clipped!"); + Ok(HandleResult::Drop) + } } else { gst_log!( self.cat, @@ -756,6 +969,36 @@ impl ToggleRecord { rec_state.last_recording_start ); Ok(HandleResult::Pass(data)) + } else if data.can_clip(&*state) + && current_running_time < rec_state.last_recording_start + && current_running_time_end > rec_state.last_recording_start + { + gst_log!( + self.cat, + obj: pad, + "Passing buffer (starting: {} < {} < {})", + current_running_time, + rec_state.last_recording_start, + current_running_time_end, + ); + + let mut clip_start = state + .in_segment + .position_from_running_time(rec_state.last_recording_start); + if clip_start.is_none() { + clip_start = state.in_segment.get_start(); + } + let mut segment = state.in_segment.clone(); + segment.set_start(clip_start); + + gst_log!(self.cat, obj: pad, "Clipping to segment {:?}", segment,); + + if let Some(data) = data.clip(&*state, &segment) { + Ok(HandleResult::Pass(data)) + } else { + gst_warning!(self.cat, obj: pad, "Complete buffer clipped!"); + Ok(HandleResult::Drop) + } } else { gst_log!( self.cat, @@ -776,19 +1019,14 @@ impl ToggleRecord { element: &gst::Element, buffer: gst::Buffer, ) -> Result { - let stream = self - .pads - .lock() - .get(pad) - .map(|stream| stream.clone()) - .ok_or_else(|| { - gst_element_error!( - element, - gst::CoreError::Pad, - ["Unknown pad {:?}", pad.get_name()] - ); - gst::FlowError::Error - })?; + let stream = self.pads.lock().get(pad).cloned().ok_or_else(|| { + gst_element_error!( + element, + gst::CoreError::Pad, + ["Unknown pad {:?}", pad.get_name()] + ); + gst::FlowError::Error + })?; gst_log!(self.cat, obj: pad, "Handling buffer {:?}", buffer); @@ -930,10 +1168,12 @@ impl ToggleRecord { let s = caps.get_structure(0).unwrap(); if s.get_name().starts_with("audio/") { state.audio_info = gst_audio::AudioInfo::from_caps(caps); + gst_log!(self.cat, obj: pad, "Got audio caps {:?}", state.audio_info); state.video_info = None; } else if s.get_name().starts_with("video/") { state.audio_info = None; state.video_info = gst_video::VideoInfo::from_caps(caps); + gst_log!(self.cat, obj: pad, "Got video caps {:?}", state.video_info); } else { state.audio_info = None; state.video_info = None; diff --git a/gst-plugin-togglerecord/tests/tests.rs b/gst-plugin-togglerecord/tests/tests.rs index 465fd2a8..64a7852f 100644 --- a/gst-plugin-togglerecord/tests/tests.rs +++ b/gst-plugin-togglerecord/tests/tests.rs @@ -61,7 +61,9 @@ fn setup_sender_receiver( fakesink.set_property("async", &false).unwrap(); pipeline.add(&fakesink).unwrap(); - let (srcpad, sinkpad) = if pad == "src" { + let main_stream = pad == "src"; + + let (srcpad, sinkpad) = if main_stream { ( togglerecord.get_static_pad("src").unwrap(), togglerecord.get_static_pad("sink").unwrap(), @@ -112,6 +114,23 @@ fn setup_sender_receiver( while let Ok(send_data) = receiver_input.recv() { if first { assert!(sinkpad.send_event(gst::Event::new_stream_start("test").build())); + let caps = if main_stream { + gst::Caps::builder("video/x-raw") + .field("format", &"ARGB") + .field("width", &320i32) + .field("height", &240i32) + .field("framerate", &gst::Fraction::new(50, 1)) + .build() + } else { + gst::Caps::builder("audio/x-raw") + .field("format", &"U8") + .field("layout", &"interleaved") + .field("rate", &8000i32) + .field("channels", &1i32) + .build() + }; + assert!(sinkpad.send_event(gst::Event::new_caps(&caps).build())); + let segment = gst::FormattedSegment::::new(); assert!(sinkpad.send_event(gst::Event::new_segment(&segment).build())); @@ -124,18 +143,24 @@ fn setup_sender_receiver( first = false; } + let buffer = if main_stream { + gst::Buffer::with_size(320 * 240 * 4).unwrap() + } else { + gst::Buffer::with_size(160).unwrap() + }; + match send_data { SendData::Eos => { break; } SendData::Buffers(n) => { for _ in 0..n { - let mut buffer = gst::Buffer::new(); - buffer - .get_mut() - .unwrap() - .set_pts(offset + i * 20 * gst::MSECOND); - buffer.get_mut().unwrap().set_duration(20 * gst::MSECOND); + let mut buffer = buffer.clone(); + { + let buffer = buffer.make_mut(); + buffer.set_pts(offset + i * 20 * gst::MSECOND); + buffer.set_duration(20 * gst::MSECOND); + } let _ = sinkpad.chain(buffer); i += 1; } @@ -181,13 +206,17 @@ fn recv_buffers( receiver_output: &mpsc::Receiver>, segment: &mut gst::FormattedSegment, wait_buffers: usize, -) -> Vec<(gst::ClockTime, gst::ClockTime)> { +) -> Vec<(gst::ClockTime, gst::ClockTime, gst::ClockTime)> { let mut res = Vec::new(); let mut n_buffers = 0; while let Ok(val) = receiver_output.recv() { match val { Left(buffer) => { - res.push((segment.to_running_time(buffer.get_pts()), buffer.get_pts())); + res.push(( + segment.to_running_time(buffer.get_pts()), + buffer.get_pts(), + buffer.get_duration(), + )); n_buffers += 1; if wait_buffers > 0 && n_buffers == wait_buffers { return res; @@ -198,9 +227,9 @@ fn recv_buffers( match event.view() { EventView::Gap(ref e) => { - let (ts, _) = e.get(); + let (ts, duration) = e.get(); - res.push((segment.to_running_time(ts), ts)); + res.push((segment.to_running_time(ts), ts, duration)); n_buffers += 1; if wait_buffers > 0 && n_buffers == wait_buffers { return res; @@ -263,10 +292,11 @@ fn test_one_stream_open() { let mut segment = gst::FormattedSegment::::new(); let buffers = recv_buffers(&receiver_output, &mut segment, 0); assert_eq!(buffers.len(), 10); - for (index, &(running_time, pts)) in buffers.iter().enumerate() { + for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); } thread.join().unwrap(); @@ -295,10 +325,11 @@ fn test_one_stream_gaps_open() { let mut segment = gst::FormattedSegment::::new(); let buffers = recv_buffers(&receiver_output, &mut segment, 0); assert_eq!(buffers.len(), 10); - for (index, &(running_time, pts)) in buffers.iter().enumerate() { + for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); } thread.join().unwrap(); @@ -328,10 +359,11 @@ fn test_one_stream_close_open() { let mut segment = gst::FormattedSegment::::new(); let buffers = recv_buffers(&receiver_output, &mut segment, 0); assert_eq!(buffers.len(), 10); - for (index, &(running_time, pts)) in buffers.iter().enumerate() { + for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); assert_eq!(pts, (10 + index) * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); } thread.join().unwrap(); @@ -362,10 +394,11 @@ fn test_one_stream_open_close() { let mut segment = gst::FormattedSegment::::new(); let buffers = recv_buffers(&receiver_output, &mut segment, 0); assert_eq!(buffers.len(), 10); - for (index, &(running_time, pts)) in buffers.iter().enumerate() { + for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); } thread.join().unwrap(); @@ -399,7 +432,7 @@ fn test_one_stream_open_close_open() { let mut segment = gst::FormattedSegment::::new(); let buffers = recv_buffers(&receiver_output, &mut segment, 0); assert_eq!(buffers.len(), 20); - for (index, &(running_time, pts)) in buffers.iter().enumerate() { + for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() { let pts_off = if index >= 10 { 10 * 20 * gst::MSECOND } else { @@ -409,6 +442,7 @@ fn test_one_stream_open_close_open() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); } thread.join().unwrap(); @@ -444,20 +478,22 @@ fn test_two_stream_open() { let mut segment_1 = gst::FormattedSegment::::new(); let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); - for (index, &(running_time, pts)) in buffers_1.iter().enumerate() { + for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); } assert_eq!(buffers_1.len(), 10); // Last buffer should be dropped from second stream let mut segment_2 = gst::FormattedSegment::::new(); let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); - for (index, &(running_time, pts)) in buffers_2.iter().enumerate() { + for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); } assert_eq!(buffers_2.len(), 10); @@ -485,7 +521,7 @@ fn test_two_stream_open_shift() { togglerecord.set_property("record", &true).unwrap(); sender_input_1.send(SendData::Buffers(10)).unwrap(); - sender_input_2.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(11)).unwrap(); receiver_input_done_1.recv().unwrap(); sender_input_1.send(SendData::Eos).unwrap(); receiver_input_done_1.recv().unwrap(); @@ -495,22 +531,28 @@ fn test_two_stream_open_shift() { let mut segment_1 = gst::FormattedSegment::::new(); let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); - for (index, &(running_time, pts)) in buffers_1.iter().enumerate() { + for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); } assert_eq!(buffers_1.len(), 10); - // Last buffer should be dropped from second stream + // Second to last buffer should be clipped from second stream, last should be dropped let mut segment_2 = gst::FormattedSegment::::new(); let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); - for (index, &(running_time, pts)) in buffers_2.iter().enumerate() { + for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let index = index as u64; assert_eq!(running_time, 5 * gst::MSECOND + index * 20 * gst::MSECOND); assert_eq!(pts, 5 * gst::MSECOND + index * 20 * gst::MSECOND); + if index == 9 { + assert_eq!(duration, 15 * gst::MSECOND); + } else { + assert_eq!(duration, 20 * gst::MSECOND); + } } - assert_eq!(buffers_2.len(), 9); + assert_eq!(buffers_2.len(), 10); thread_1.join().unwrap(); thread_2.join().unwrap(); @@ -536,7 +578,7 @@ fn test_two_stream_open_shift_main() { togglerecord.set_property("record", &true).unwrap(); sender_input_1.send(SendData::Buffers(10)).unwrap(); - sender_input_2.send(SendData::Buffers(11)).unwrap(); + sender_input_2.send(SendData::Buffers(12)).unwrap(); receiver_input_done_1.recv().unwrap(); sender_input_1.send(SendData::Eos).unwrap(); receiver_input_done_1.recv().unwrap(); @@ -547,22 +589,35 @@ fn test_two_stream_open_shift_main() { // PTS 5 maps to running time 0 now let mut segment_1 = gst::FormattedSegment::::new(); let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); - for (index, &(running_time, pts)) in buffers_1.iter().enumerate() { + for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); assert_eq!(pts, 5 * gst::MSECOND + index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); } assert_eq!(buffers_1.len(), 10); - // First and last buffer should be dropped from second stream + // First and second last buffer should be clipped from second stream, + // last buffer should be dropped let mut segment_2 = gst::FormattedSegment::::new(); let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); - for (index, &(running_time, pts)) in buffers_2.iter().enumerate() { + for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let index = index as u64; - assert_eq!(running_time, 15 * gst::MSECOND + index * 20 * gst::MSECOND); - assert_eq!(pts, 20 * gst::MSECOND + index * 20 * gst::MSECOND); + if index == 0 { + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, 5 * gst::MSECOND + index * 20 * gst::MSECOND); + assert_eq!(duration, 15 * gst::MSECOND); + } else if index == 10 { + assert_eq!(running_time, index * 20 * gst::MSECOND - 5 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 5 * gst::MSECOND); + } else { + assert_eq!(running_time, index * 20 * gst::MSECOND - 5 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } } - assert_eq!(buffers_2.len(), 9); + assert_eq!(buffers_2.len(), 11); thread_1.join().unwrap(); thread_2.join().unwrap(); @@ -614,20 +669,22 @@ fn test_two_stream_open_close() { let mut segment_1 = gst::FormattedSegment::::new(); let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); - for (index, &(running_time, pts)) in buffers_1.iter().enumerate() { + for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); } assert_eq!(buffers_1.len(), 10); // Last buffer should be dropped from second stream let mut segment_2 = gst::FormattedSegment::::new(); let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); - for (index, &(running_time, pts)) in buffers_2.iter().enumerate() { + for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); } assert_eq!(buffers_2.len(), 10); @@ -681,20 +738,22 @@ fn test_two_stream_close_open() { let mut segment_1 = gst::FormattedSegment::::new(); let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); - for (index, &(running_time, pts)) in buffers_1.iter().enumerate() { + for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); assert_eq!(pts, (10 + index) * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); } assert_eq!(buffers_1.len(), 10); // Last buffer should be dropped from second stream let mut segment_2 = gst::FormattedSegment::::new(); let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); - for (index, &(running_time, pts)) in buffers_2.iter().enumerate() { + for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); assert_eq!(pts, (10 + index) * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); } assert_eq!(buffers_2.len(), 10); @@ -761,7 +820,7 @@ fn test_two_stream_open_close_open() { let mut segment_1 = gst::FormattedSegment::::new(); let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); - for (index, &(running_time, pts)) in buffers_1.iter().enumerate() { + for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let pts_off = if index >= 10 { 10 * 20 * gst::MSECOND } else { @@ -771,13 +830,14 @@ fn test_two_stream_open_close_open() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); } assert_eq!(buffers_1.len(), 20); // Last buffer should be dropped from second stream let mut segment_2 = gst::FormattedSegment::::new(); let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); - for (index, &(running_time, pts)) in buffers_2.iter().enumerate() { + for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let pts_off = if index >= 10 { 10 * 20 * gst::MSECOND } else { @@ -787,6 +847,7 @@ fn test_two_stream_open_close_open() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); } assert_eq!(buffers_2.len(), 20); @@ -859,7 +920,7 @@ fn test_two_stream_open_close_open_gaps() { let mut segment_1 = gst::FormattedSegment::::new(); let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); - for (index, &(running_time, pts)) in buffers_1.iter().enumerate() { + for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let pts_off = if index >= 10 { 10 * 20 * gst::MSECOND } else { @@ -869,13 +930,14 @@ fn test_two_stream_open_close_open_gaps() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); } assert_eq!(buffers_1.len(), 20); // Last buffer should be dropped from second stream let mut segment_2 = gst::FormattedSegment::::new(); let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); - for (index, &(running_time, pts)) in buffers_2.iter().enumerate() { + for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let pts_off = if index >= 10 { 10 * 20 * gst::MSECOND } else { @@ -885,6 +947,7 @@ fn test_two_stream_open_close_open_gaps() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); } assert_eq!(buffers_2.len(), 20); @@ -958,20 +1021,22 @@ fn test_two_stream_close_open_close_delta() { let mut segment_1 = gst::FormattedSegment::::new(); let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); - for (index, &(running_time, pts)) in buffers_1.iter().enumerate() { + for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); assert_eq!(pts, (11 + index) * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); } assert_eq!(buffers_1.len(), 10); // Last buffer should be dropped from second stream let mut segment_2 = gst::FormattedSegment::::new(); let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); - for (index, &(running_time, pts)) in buffers_2.iter().enumerate() { + for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); assert_eq!(pts, (11 + index) * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); } assert_eq!(buffers_2.len(), 10); @@ -1052,7 +1117,7 @@ fn test_three_stream_open_close_open() { let mut segment_1 = gst::FormattedSegment::::new(); let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); - for (index, &(running_time, pts)) in buffers_1.iter().enumerate() { + for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let pts_off = if index >= 10 { 10 * 20 * gst::MSECOND } else { @@ -1062,13 +1127,14 @@ fn test_three_stream_open_close_open() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); } assert_eq!(buffers_1.len(), 20); // Last buffer should be dropped from second stream let mut segment_2 = gst::FormattedSegment::::new(); let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); - for (index, &(running_time, pts)) in buffers_2.iter().enumerate() { + for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let pts_off = if index >= 10 { 10 * 20 * gst::MSECOND } else { @@ -1078,12 +1144,13 @@ fn test_three_stream_open_close_open() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); } assert_eq!(buffers_2.len(), 20); let mut segment_3 = gst::FormattedSegment::::new(); let buffers_3 = recv_buffers(&receiver_output_3, &mut segment_3, 0); - for (index, &(running_time, pts)) in buffers_3.iter().enumerate() { + for (index, &(running_time, pts, duration)) in buffers_3.iter().enumerate() { let pts_off = if index >= 10 { 10 * 20 * gst::MSECOND } else { @@ -1093,6 +1160,7 @@ fn test_three_stream_open_close_open() { let index = index as u64; assert_eq!(running_time, index * 20 * gst::MSECOND); assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); } assert_eq!(buffers_3.len(), 20);