From 7c3e69bb4aba6e3a5da95e3c610b85d5b629fb72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Mon, 19 Oct 2020 17:03:10 +0200 Subject: [PATCH] Fix ClockTime comparisons not being Ord and use saturating_sub See: https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/merge_requests/607 --- audio/csound/tests/csound_filter.rs | 8 +-- generic/threadshare/src/inputselector.rs | 18 +++--- .../src/jitterbuffer/jitterbuffer.rs | 23 ++++---- generic/threadshare/src/runtime/time.rs | 4 +- generic/threadshare/src/udpsink.rs | 8 ++- utils/fallbackswitch/src/fallbacksrc.rs | 5 +- utils/togglerecord/src/togglerecord.rs | 58 +++++++++++-------- video/closedcaption/src/mcc_parse.rs | 14 ++--- video/closedcaption/src/scc_parse.rs | 2 +- video/flavors/src/flvdemux.rs | 14 ++--- 10 files changed, 81 insertions(+), 73 deletions(-) diff --git a/audio/csound/tests/csound_filter.rs b/audio/csound/tests/csound_filter.rs index 735f8b2a..bf4d8d82 100644 --- a/audio/csound/tests/csound_filter.rs +++ b/audio/csound/tests/csound_filter.rs @@ -117,7 +117,7 @@ fn csound_filter_eos() { h.play(); // The input buffer pts and duration - let mut in_pts = gst::ClockTime(Some(0)); + let mut in_pts = gst::ClockTime::zero(); let in_duration = duration_from_samples(EOS_NUM_SAMPLES as _, sr as _); // The number of samples that were leftover during the previous iteration let mut samples_offset = 0; @@ -125,7 +125,7 @@ fn csound_filter_eos() { let mut num_samples: usize = 0; let mut num_buffers = 0; // The expected pts of output buffers - let mut expected_pts = gst::ClockTime(Some(0)); + let mut expected_pts = gst::ClockTime::zero(); for _ in 0..EOS_NUM_BUFFERS { let mut buffer = @@ -227,7 +227,7 @@ fn csound_filter_underflow() { h.play(); // Input buffers timestamp - let mut in_pts = gst::ClockTime(Some(0)); + let mut in_pts = gst::ClockTime::zero(); let in_samples_duration = duration_from_samples(UNDERFLOW_NUM_SAMPLES as _, sr as _); for _ in 0..UNDERFLOW_NUM_BUFFERS { @@ -250,7 +250,7 @@ fn csound_filter_underflow() { let expected_duration = duration_from_samples(UNDERFLOW_NUM_SAMPLES as u64 * 2, sr as _); let expected_buffers = UNDERFLOW_NUM_BUFFERS / 2; - let mut expected_pts = gst::ClockTime(Some(0)); + let mut expected_pts = gst::ClockTime::zero(); for _ in 0..expected_buffers { let buffer = h.pull().unwrap(); diff --git a/generic/threadshare/src/inputselector.rs b/generic/threadshare/src/inputselector.rs index cfb9bd26..295d4537 100644 --- a/generic/threadshare/src/inputselector.rs +++ b/generic/threadshare/src/inputselector.rs @@ -113,9 +113,11 @@ impl InputSelectorPadSinkHandler { async fn sync(&self, element: &gst::Element, running_time: gst::ClockTime) { let now = element.get_current_running_time(); - if now.is_some() && now < running_time { - let delay = running_time - now; - runtime::time::delay_for(Duration::from_nanos(delay.nseconds().unwrap())).await; + if let Some(delay) = running_time + .saturating_sub(now) + .and_then(|delay| delay.nseconds()) + { + runtime::time::delay_for(Duration::from_nanos(delay)).await; } } @@ -324,7 +326,7 @@ impl PadSrcHandler for InputSelectorPadSrcHandler { QueryView::Latency(ref mut q) => { let mut ret = true; let mut min_latency = 0.into(); - let mut max_latency = gst::CLOCK_TIME_NONE; + let mut max_latency = gst::ClockTime::none(); let pads = { let pads = inputselector.pads.lock().unwrap(); pads.sink_pads @@ -341,12 +343,8 @@ impl PadSrcHandler for InputSelectorPadSrcHandler { if ret { let (live, min, max) = peer_query.get_result(); if live { - min_latency = std::cmp::max(min, min_latency); - if max_latency.is_none() && max.is_some() { - max_latency = max; - } else if max_latency.is_some() && max.is_some() { - max_latency = std::cmp::min(max, max_latency); - } + min_latency = min.max(min_latency).unwrap_or(min_latency); + max_latency = max.min(max_latency).unwrap_or(max); } } } diff --git a/generic/threadshare/src/jitterbuffer/jitterbuffer.rs b/generic/threadshare/src/jitterbuffer/jitterbuffer.rs index 7191d90b..3579f9f3 100644 --- a/generic/threadshare/src/jitterbuffer/jitterbuffer.rs +++ b/generic/threadshare/src/jitterbuffer/jitterbuffer.rs @@ -295,9 +295,10 @@ impl SinkHandler { let new_packet_spacing = pts - inner.ips_pts; let old_packet_spacing = state.packet_spacing; + assert!(old_packet_spacing.is_some()); if old_packet_spacing > new_packet_spacing { state.packet_spacing = (new_packet_spacing + 3 * old_packet_spacing) / 4; - } else if old_packet_spacing > gst::ClockTime(Some(0)) { + } else if !old_packet_spacing.is_zero() { state.packet_spacing = (3 * new_packet_spacing + old_packet_spacing) / 4; } else { state.packet_spacing = new_packet_spacing; @@ -412,13 +413,13 @@ impl SinkHandler { pt ); - if dts == gst::CLOCK_TIME_NONE { + if dts.is_none() { dts = pts; - } else if pts == gst::CLOCK_TIME_NONE { + } else if pts.is_none() { pts = dts; } - if dts == gst::CLOCK_TIME_NONE { + if dts.is_none() { dts = element.get_current_running_time(); pts = dts; @@ -957,13 +958,11 @@ impl SrcHandler { let next_wakeup = state.earliest_pts + latency - state.packet_spacing - context_wait / 2; - let delay = { - if next_wakeup > now { - (next_wakeup - now).nseconds().unwrap() - } else { - 0 - } - }; + let delay = next_wakeup + .saturating_sub(now) + .unwrap_or_else(gst::ClockTime::zero) + .nseconds() + .unwrap(); gst_debug!( CAT, @@ -1121,7 +1120,7 @@ impl Default for State { segment: gst::FormattedSegment::::new(), clock_rate: None, - packet_spacing: gst::ClockTime(Some(0)), + packet_spacing: gst::ClockTime::zero(), equidistant: 0, discont: true, diff --git a/generic/threadshare/src/runtime/time.rs b/generic/threadshare/src/runtime/time.rs index 0ea857b0..5dac6c49 100644 --- a/generic/threadshare/src/runtime/time.rs +++ b/generic/threadshare/src/runtime/time.rs @@ -26,7 +26,9 @@ use std::time::Duration; /// /// This must be called from within the target runtime environment. pub async fn delay_for(delay: Duration) { - tokio::time::delay_for(delay).map(drop).await; + if delay > Duration::from_nanos(0) { + tokio::time::delay_for(delay).map(drop).await; + } } /// Builds a `Stream` that yields at `interval. diff --git a/generic/threadshare/src/udpsink.rs b/generic/threadshare/src/udpsink.rs index 46fa1c23..35ea4c59 100644 --- a/generic/threadshare/src/udpsink.rs +++ b/generic/threadshare/src/udpsink.rs @@ -694,9 +694,11 @@ impl UdpSinkPadHandler { async fn sync(&self, element: &gst::Element, running_time: gst::ClockTime) { let now = element.get_current_running_time(); - if now < running_time { - let delay = running_time - now; - runtime::time::delay_for(Duration::from_nanos(delay.nseconds().unwrap())).await; + if let Some(delay) = running_time + .saturating_sub(now) + .and_then(|delay| delay.nseconds()) + { + runtime::time::delay_for(Duration::from_nanos(delay)).await; } } diff --git a/utils/fallbackswitch/src/fallbacksrc.rs b/utils/fallbackswitch/src/fallbacksrc.rs index a735430f..8c03622c 100644 --- a/utils/fallbackswitch/src/fallbacksrc.rs +++ b/utils/fallbackswitch/src/fallbacksrc.rs @@ -758,7 +758,7 @@ impl FallbackSrc { ("max-size-bytes", &0u32), ( "max-size-time", - &(std::cmp::max(5 * gst::SECOND, min_latency.into())), + &gst::ClockTime::max(5 * gst::SECOND, min_latency.into()).unwrap(), ), ]) .unwrap(); @@ -1635,7 +1635,8 @@ impl FallbackSrc { } else if video_is_eos { audio_running_time } else { - std::cmp::min(audio_running_time, video_running_time) + assert!(audio_running_time.is_some() && video_running_time.is_some()); + audio_running_time.min(video_running_time).unwrap() }; let offset = if current_running_time > min_running_time { (current_running_time - min_running_time).unwrap() as i64 diff --git a/utils/togglerecord/src/togglerecord.rs b/utils/togglerecord/src/togglerecord.rs index 1a4ecf5a..938f2f0c 100644 --- a/utils/togglerecord/src/togglerecord.rs +++ b/utils/togglerecord/src/togglerecord.rs @@ -401,18 +401,21 @@ impl ToggleRecord { }; // This will only do anything for non-raw data - dts_or_pts = cmp::max(state.in_segment.get_start(), dts_or_pts); - dts_or_pts_end = cmp::max(state.in_segment.get_start(), dts_or_pts_end); + dts_or_pts = state.in_segment.get_start().max(dts_or_pts).unwrap(); + dts_or_pts_end = state.in_segment.get_start().max(dts_or_pts_end).unwrap(); if state.in_segment.get_stop().is_some() { - dts_or_pts = cmp::min(state.in_segment.get_stop(), dts_or_pts); - dts_or_pts_end = cmp::min(state.in_segment.get_stop(), dts_or_pts_end); + dts_or_pts = state.in_segment.get_stop().min(dts_or_pts).unwrap(); + dts_or_pts_end = state.in_segment.get_stop().min(dts_or_pts_end).unwrap(); } let current_running_time = state.in_segment.to_running_time(dts_or_pts); let current_running_time_end = state.in_segment.to_running_time(dts_or_pts_end); - state.current_running_time = cmp::max(current_running_time, state.current_running_time); - state.current_running_time_end = - cmp::max(current_running_time_end, state.current_running_time_end); + state.current_running_time = current_running_time + .max(state.current_running_time) + .unwrap_or(current_running_time); + state.current_running_time_end = current_running_time_end + .max(state.current_running_time_end) + .unwrap_or(current_running_time_end); // Wake up everybody, we advanced a bit // Important: They will only be able to advance once we're done with this @@ -663,18 +666,21 @@ impl ToggleRecord { }; // This will only do anything for non-raw data - pts = cmp::max(state.in_segment.get_start(), pts); - pts_end = cmp::max(state.in_segment.get_start(), pts_end); + pts = state.in_segment.get_start().max(pts).unwrap(); + pts_end = state.in_segment.get_start().max(pts_end).unwrap(); if state.in_segment.get_stop().is_some() { - pts = cmp::min(state.in_segment.get_stop(), pts); - pts_end = cmp::min(state.in_segment.get_stop(), pts_end); + pts = state.in_segment.get_stop().min(pts).unwrap(); + pts_end = state.in_segment.get_stop().min(pts_end).unwrap(); } let current_running_time = state.in_segment.to_running_time(pts); let current_running_time_end = state.in_segment.to_running_time(pts_end); - state.current_running_time = cmp::max(current_running_time, state.current_running_time); - state.current_running_time_end = - cmp::max(current_running_time_end, state.current_running_time_end); + state.current_running_time = current_running_time + .max(state.current_running_time) + .unwrap_or(current_running_time); + state.current_running_time_end = current_running_time_end + .max(state.current_running_time_end) + .unwrap_or(current_running_time_end); gst_log!( CAT, @@ -704,14 +710,16 @@ impl ToggleRecord { // start/stop as in that case we should be in Recording/Stopped mode already. The main // stream is waiting for us to reach that position to switch to Recording/Stopped mode so // that in those modes we only have to pass through/drop the whole buffers. - while (main_state.current_running_time == gst::CLOCK_TIME_NONE + while (main_state.current_running_time.is_none() || rec_state.recording_state != RecordingState::Starting && rec_state.recording_state != RecordingState::Stopping && main_state.current_running_time_end < current_running_time_end || rec_state.recording_state == RecordingState::Starting - && rec_state.last_recording_start <= current_running_time + && (rec_state.last_recording_start.is_none() + || rec_state.last_recording_start <= current_running_time) || rec_state.recording_state == RecordingState::Stopping - && rec_state.last_recording_stop <= current_running_time) + && (rec_state.last_recording_stop.is_none() + || rec_state.last_recording_stop <= current_running_time)) && !main_state.eos && !stream.state.lock().flushing { @@ -957,10 +965,6 @@ impl ToggleRecord { Ok(HandleResult::Drop) } RecordingState::Starting => { - // The start of our buffer must be before the last recording start as - // otherwise we would be in Recording state already - assert_lt!(current_running_time, rec_state.last_recording_start); - // If we have no start position yet, the main stream is waiting for a key-frame if rec_state.last_recording_start.is_none() { gst_log!( @@ -968,8 +972,13 @@ impl ToggleRecord { obj: pad, "Dropping buffer (starting: waiting for keyframe)", ); - Ok(HandleResult::Drop) - } else if current_running_time >= rec_state.last_recording_start { + return Ok(HandleResult::Drop); + } + + // The start of our buffer must be before the last recording start as + // otherwise we would be in Recording state already + assert_lt!(current_running_time, rec_state.last_recording_start); + if current_running_time >= rec_state.last_recording_start { gst_log!( CAT, obj: pad, @@ -1343,6 +1352,9 @@ impl ToggleRecord { stream.srcpad.peer_query(query) } + // FIXME `matches!` was introduced in rustc 1.42.0, current MSRV is 1.41.0 + // FIXME uncomment when CI can upgrade to 1.47.1 + //#[allow(clippy::match_like_matches_macro)] fn src_event(&self, pad: &gst::Pad, element: &gst::Element, mut event: gst::Event) -> bool { use gst::EventView; diff --git a/video/closedcaption/src/mcc_parse.rs b/video/closedcaption/src/mcc_parse.rs index 83e304c3..90304b2f 100644 --- a/video/closedcaption/src/mcc_parse.rs +++ b/video/closedcaption/src/mcc_parse.rs @@ -240,7 +240,7 @@ impl State { nsecs - self.start_position }; - if nsecs >= self.last_position { + if self.last_position.is_none() || nsecs >= self.last_position { self.last_position = nsecs; } else { gst_fixme!( @@ -966,7 +966,7 @@ impl MccParse { let (rate, flags, start_type, start, stop_type, stop) = event.get(); - let mut start = match start.try_into() { + let mut start: gst::ClockTime = match start.try_into() { Ok(start) => start, Err(_) => { gst_error!(CAT, obj: element, "seek has invalid format"); @@ -974,7 +974,7 @@ impl MccParse { } }; - let mut stop = match stop.try_into() { + let mut stop: gst::ClockTime = match stop.try_into() { Ok(stop) => stop, Err(_) => { gst_error!(CAT, obj: element, "seek has invalid format"); @@ -994,12 +994,12 @@ impl MccParse { let pull = state.pull.as_ref().unwrap(); - if start_type == gst::SeekType::Set && pull.duration.is_some() { - start = cmp::min(start, pull.duration); + if start_type == gst::SeekType::Set { + start = start.min(pull.duration).unwrap_or(start); } - if stop_type == gst::SeekType::Set && pull.duration.is_some() { - stop = cmp::min(stop, pull.duration); + if stop_type == gst::SeekType::Set { + stop = stop.min(pull.duration).unwrap_or(stop); } state.seeking = true; diff --git a/video/closedcaption/src/scc_parse.rs b/video/closedcaption/src/scc_parse.rs index 92d62a48..1b70bc01 100644 --- a/video/closedcaption/src/scc_parse.rs +++ b/video/closedcaption/src/scc_parse.rs @@ -141,7 +141,7 @@ impl State { ) { let nsecs = gst::ClockTime::from(timecode.nsec_since_daily_jam()); - if nsecs >= self.last_position { + if self.last_position.is_none() || nsecs >= self.last_position { self.last_position = nsecs; } else { gst_fixme!( diff --git a/video/flavors/src/flvdemux.rs b/video/flavors/src/flvdemux.rs index 5bb69c42..7908cf11 100644 --- a/video/flavors/src/flvdemux.rs +++ b/video/flavors/src/flvdemux.rs @@ -1193,18 +1193,12 @@ impl StreamingState { } fn update_position(&mut self, buffer: &gst::Buffer) { - if buffer.get_pts() != gst::CLOCK_TIME_NONE { + if buffer.get_pts().is_some() { let pts = buffer.get_pts(); - self.last_position = self - .last_position - .map(|last| cmp::max(last.into(), pts)) - .unwrap_or(pts); - } else if buffer.get_dts() != gst::CLOCK_TIME_NONE { + self.last_position = self.last_position.max(pts).unwrap_or(pts); + } else if buffer.get_dts().is_some() { let dts = buffer.get_dts(); - self.last_position = self - .last_position - .map(|last| cmp::max(last.into(), dts)) - .unwrap_or(dts); + self.last_position = self.last_position.max(dts).unwrap_or(dts); } } }