Fix ClockTime comparisons not being Ord and use saturating_sub

See:
https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/merge_requests/607
This commit is contained in:
François Laignel 2020-10-19 17:03:10 +02:00
parent bbc18d6349
commit 7c3e69bb4a
10 changed files with 81 additions and 73 deletions

View file

@ -117,7 +117,7 @@ fn csound_filter_eos() {
h.play();
// The input buffer pts and duration
let mut in_pts = gst::ClockTime(Some(0));
let mut in_pts = gst::ClockTime::zero();
let in_duration = duration_from_samples(EOS_NUM_SAMPLES as _, sr as _);
// The number of samples that were leftover during the previous iteration
let mut samples_offset = 0;
@ -125,7 +125,7 @@ fn csound_filter_eos() {
let mut num_samples: usize = 0;
let mut num_buffers = 0;
// The expected pts of output buffers
let mut expected_pts = gst::ClockTime(Some(0));
let mut expected_pts = gst::ClockTime::zero();
for _ in 0..EOS_NUM_BUFFERS {
let mut buffer =
@ -227,7 +227,7 @@ fn csound_filter_underflow() {
h.play();
// Input buffers timestamp
let mut in_pts = gst::ClockTime(Some(0));
let mut in_pts = gst::ClockTime::zero();
let in_samples_duration = duration_from_samples(UNDERFLOW_NUM_SAMPLES as _, sr as _);
for _ in 0..UNDERFLOW_NUM_BUFFERS {
@ -250,7 +250,7 @@ fn csound_filter_underflow() {
let expected_duration = duration_from_samples(UNDERFLOW_NUM_SAMPLES as u64 * 2, sr as _);
let expected_buffers = UNDERFLOW_NUM_BUFFERS / 2;
let mut expected_pts = gst::ClockTime(Some(0));
let mut expected_pts = gst::ClockTime::zero();
for _ in 0..expected_buffers {
let buffer = h.pull().unwrap();

View file

@ -113,9 +113,11 @@ impl InputSelectorPadSinkHandler {
async fn sync(&self, element: &gst::Element, running_time: gst::ClockTime) {
let now = element.get_current_running_time();
if now.is_some() && now < running_time {
let delay = running_time - now;
runtime::time::delay_for(Duration::from_nanos(delay.nseconds().unwrap())).await;
if let Some(delay) = running_time
.saturating_sub(now)
.and_then(|delay| delay.nseconds())
{
runtime::time::delay_for(Duration::from_nanos(delay)).await;
}
}
@ -324,7 +326,7 @@ impl PadSrcHandler for InputSelectorPadSrcHandler {
QueryView::Latency(ref mut q) => {
let mut ret = true;
let mut min_latency = 0.into();
let mut max_latency = gst::CLOCK_TIME_NONE;
let mut max_latency = gst::ClockTime::none();
let pads = {
let pads = inputselector.pads.lock().unwrap();
pads.sink_pads
@ -341,12 +343,8 @@ impl PadSrcHandler for InputSelectorPadSrcHandler {
if ret {
let (live, min, max) = peer_query.get_result();
if live {
min_latency = std::cmp::max(min, min_latency);
if max_latency.is_none() && max.is_some() {
max_latency = max;
} else if max_latency.is_some() && max.is_some() {
max_latency = std::cmp::min(max, max_latency);
}
min_latency = min.max(min_latency).unwrap_or(min_latency);
max_latency = max.min(max_latency).unwrap_or(max);
}
}
}

View file

@ -295,9 +295,10 @@ impl SinkHandler {
let new_packet_spacing = pts - inner.ips_pts;
let old_packet_spacing = state.packet_spacing;
assert!(old_packet_spacing.is_some());
if old_packet_spacing > new_packet_spacing {
state.packet_spacing = (new_packet_spacing + 3 * old_packet_spacing) / 4;
} else if old_packet_spacing > gst::ClockTime(Some(0)) {
} else if !old_packet_spacing.is_zero() {
state.packet_spacing = (3 * new_packet_spacing + old_packet_spacing) / 4;
} else {
state.packet_spacing = new_packet_spacing;
@ -412,13 +413,13 @@ impl SinkHandler {
pt
);
if dts == gst::CLOCK_TIME_NONE {
if dts.is_none() {
dts = pts;
} else if pts == gst::CLOCK_TIME_NONE {
} else if pts.is_none() {
pts = dts;
}
if dts == gst::CLOCK_TIME_NONE {
if dts.is_none() {
dts = element.get_current_running_time();
pts = dts;
@ -957,13 +958,11 @@ impl SrcHandler {
let next_wakeup = state.earliest_pts + latency - state.packet_spacing - context_wait / 2;
let delay = {
if next_wakeup > now {
(next_wakeup - now).nseconds().unwrap()
} else {
0
}
};
let delay = next_wakeup
.saturating_sub(now)
.unwrap_or_else(gst::ClockTime::zero)
.nseconds()
.unwrap();
gst_debug!(
CAT,
@ -1121,7 +1120,7 @@ impl Default for State {
segment: gst::FormattedSegment::<gst::ClockTime>::new(),
clock_rate: None,
packet_spacing: gst::ClockTime(Some(0)),
packet_spacing: gst::ClockTime::zero(),
equidistant: 0,
discont: true,

View file

@ -26,7 +26,9 @@ use std::time::Duration;
///
/// This must be called from within the target runtime environment.
pub async fn delay_for(delay: Duration) {
tokio::time::delay_for(delay).map(drop).await;
if delay > Duration::from_nanos(0) {
tokio::time::delay_for(delay).map(drop).await;
}
}
/// Builds a `Stream` that yields at `interval.

View file

@ -694,9 +694,11 @@ impl UdpSinkPadHandler {
async fn sync(&self, element: &gst::Element, running_time: gst::ClockTime) {
let now = element.get_current_running_time();
if now < running_time {
let delay = running_time - now;
runtime::time::delay_for(Duration::from_nanos(delay.nseconds().unwrap())).await;
if let Some(delay) = running_time
.saturating_sub(now)
.and_then(|delay| delay.nseconds())
{
runtime::time::delay_for(Duration::from_nanos(delay)).await;
}
}

View file

@ -758,7 +758,7 @@ impl FallbackSrc {
("max-size-bytes", &0u32),
(
"max-size-time",
&(std::cmp::max(5 * gst::SECOND, min_latency.into())),
&gst::ClockTime::max(5 * gst::SECOND, min_latency.into()).unwrap(),
),
])
.unwrap();
@ -1635,7 +1635,8 @@ impl FallbackSrc {
} else if video_is_eos {
audio_running_time
} else {
std::cmp::min(audio_running_time, video_running_time)
assert!(audio_running_time.is_some() && video_running_time.is_some());
audio_running_time.min(video_running_time).unwrap()
};
let offset = if current_running_time > min_running_time {
(current_running_time - min_running_time).unwrap() as i64

View file

@ -401,18 +401,21 @@ impl ToggleRecord {
};
// This will only do anything for non-raw data
dts_or_pts = cmp::max(state.in_segment.get_start(), dts_or_pts);
dts_or_pts_end = cmp::max(state.in_segment.get_start(), dts_or_pts_end);
dts_or_pts = state.in_segment.get_start().max(dts_or_pts).unwrap();
dts_or_pts_end = state.in_segment.get_start().max(dts_or_pts_end).unwrap();
if state.in_segment.get_stop().is_some() {
dts_or_pts = cmp::min(state.in_segment.get_stop(), dts_or_pts);
dts_or_pts_end = cmp::min(state.in_segment.get_stop(), dts_or_pts_end);
dts_or_pts = state.in_segment.get_stop().min(dts_or_pts).unwrap();
dts_or_pts_end = state.in_segment.get_stop().min(dts_or_pts_end).unwrap();
}
let current_running_time = state.in_segment.to_running_time(dts_or_pts);
let current_running_time_end = state.in_segment.to_running_time(dts_or_pts_end);
state.current_running_time = cmp::max(current_running_time, state.current_running_time);
state.current_running_time_end =
cmp::max(current_running_time_end, state.current_running_time_end);
state.current_running_time = current_running_time
.max(state.current_running_time)
.unwrap_or(current_running_time);
state.current_running_time_end = current_running_time_end
.max(state.current_running_time_end)
.unwrap_or(current_running_time_end);
// Wake up everybody, we advanced a bit
// Important: They will only be able to advance once we're done with this
@ -663,18 +666,21 @@ impl ToggleRecord {
};
// This will only do anything for non-raw data
pts = cmp::max(state.in_segment.get_start(), pts);
pts_end = cmp::max(state.in_segment.get_start(), pts_end);
pts = state.in_segment.get_start().max(pts).unwrap();
pts_end = state.in_segment.get_start().max(pts_end).unwrap();
if state.in_segment.get_stop().is_some() {
pts = cmp::min(state.in_segment.get_stop(), pts);
pts_end = cmp::min(state.in_segment.get_stop(), pts_end);
pts = state.in_segment.get_stop().min(pts).unwrap();
pts_end = state.in_segment.get_stop().min(pts_end).unwrap();
}
let current_running_time = state.in_segment.to_running_time(pts);
let current_running_time_end = state.in_segment.to_running_time(pts_end);
state.current_running_time = cmp::max(current_running_time, state.current_running_time);
state.current_running_time_end =
cmp::max(current_running_time_end, state.current_running_time_end);
state.current_running_time = current_running_time
.max(state.current_running_time)
.unwrap_or(current_running_time);
state.current_running_time_end = current_running_time_end
.max(state.current_running_time_end)
.unwrap_or(current_running_time_end);
gst_log!(
CAT,
@ -704,14 +710,16 @@ impl ToggleRecord {
// start/stop as in that case we should be in Recording/Stopped mode already. The main
// stream is waiting for us to reach that position to switch to Recording/Stopped mode so
// that in those modes we only have to pass through/drop the whole buffers.
while (main_state.current_running_time == gst::CLOCK_TIME_NONE
while (main_state.current_running_time.is_none()
|| rec_state.recording_state != RecordingState::Starting
&& rec_state.recording_state != RecordingState::Stopping
&& main_state.current_running_time_end < current_running_time_end
|| rec_state.recording_state == RecordingState::Starting
&& rec_state.last_recording_start <= current_running_time
&& (rec_state.last_recording_start.is_none()
|| rec_state.last_recording_start <= current_running_time)
|| rec_state.recording_state == RecordingState::Stopping
&& rec_state.last_recording_stop <= current_running_time)
&& (rec_state.last_recording_stop.is_none()
|| rec_state.last_recording_stop <= current_running_time))
&& !main_state.eos
&& !stream.state.lock().flushing
{
@ -957,10 +965,6 @@ impl ToggleRecord {
Ok(HandleResult::Drop)
}
RecordingState::Starting => {
// The start of our buffer must be before the last recording start as
// otherwise we would be in Recording state already
assert_lt!(current_running_time, rec_state.last_recording_start);
// If we have no start position yet, the main stream is waiting for a key-frame
if rec_state.last_recording_start.is_none() {
gst_log!(
@ -968,8 +972,13 @@ impl ToggleRecord {
obj: pad,
"Dropping buffer (starting: waiting for keyframe)",
);
Ok(HandleResult::Drop)
} else if current_running_time >= rec_state.last_recording_start {
return Ok(HandleResult::Drop);
}
// The start of our buffer must be before the last recording start as
// otherwise we would be in Recording state already
assert_lt!(current_running_time, rec_state.last_recording_start);
if current_running_time >= rec_state.last_recording_start {
gst_log!(
CAT,
obj: pad,
@ -1343,6 +1352,9 @@ impl ToggleRecord {
stream.srcpad.peer_query(query)
}
// FIXME `matches!` was introduced in rustc 1.42.0, current MSRV is 1.41.0
// FIXME uncomment when CI can upgrade to 1.47.1
//#[allow(clippy::match_like_matches_macro)]
fn src_event(&self, pad: &gst::Pad, element: &gst::Element, mut event: gst::Event) -> bool {
use gst::EventView;

View file

@ -240,7 +240,7 @@ impl State {
nsecs - self.start_position
};
if nsecs >= self.last_position {
if self.last_position.is_none() || nsecs >= self.last_position {
self.last_position = nsecs;
} else {
gst_fixme!(
@ -966,7 +966,7 @@ impl MccParse {
let (rate, flags, start_type, start, stop_type, stop) = event.get();
let mut start = match start.try_into() {
let mut start: gst::ClockTime = match start.try_into() {
Ok(start) => start,
Err(_) => {
gst_error!(CAT, obj: element, "seek has invalid format");
@ -974,7 +974,7 @@ impl MccParse {
}
};
let mut stop = match stop.try_into() {
let mut stop: gst::ClockTime = match stop.try_into() {
Ok(stop) => stop,
Err(_) => {
gst_error!(CAT, obj: element, "seek has invalid format");
@ -994,12 +994,12 @@ impl MccParse {
let pull = state.pull.as_ref().unwrap();
if start_type == gst::SeekType::Set && pull.duration.is_some() {
start = cmp::min(start, pull.duration);
if start_type == gst::SeekType::Set {
start = start.min(pull.duration).unwrap_or(start);
}
if stop_type == gst::SeekType::Set && pull.duration.is_some() {
stop = cmp::min(stop, pull.duration);
if stop_type == gst::SeekType::Set {
stop = stop.min(pull.duration).unwrap_or(stop);
}
state.seeking = true;

View file

@ -141,7 +141,7 @@ impl State {
) {
let nsecs = gst::ClockTime::from(timecode.nsec_since_daily_jam());
if nsecs >= self.last_position {
if self.last_position.is_none() || nsecs >= self.last_position {
self.last_position = nsecs;
} else {
gst_fixme!(

View file

@ -1193,18 +1193,12 @@ impl StreamingState {
}
fn update_position(&mut self, buffer: &gst::Buffer) {
if buffer.get_pts() != gst::CLOCK_TIME_NONE {
if buffer.get_pts().is_some() {
let pts = buffer.get_pts();
self.last_position = self
.last_position
.map(|last| cmp::max(last.into(), pts))
.unwrap_or(pts);
} else if buffer.get_dts() != gst::CLOCK_TIME_NONE {
self.last_position = self.last_position.max(pts).unwrap_or(pts);
} else if buffer.get_dts().is_some() {
let dts = buffer.get_dts();
self.last_position = self
.last_position
.map(|last| cmp::max(last.into(), dts))
.unwrap_or(dts);
self.last_position = self.last_position.max(dts).unwrap_or(dts);
}
}
}