diff --git a/net/reqwest/src/reqwesthttpsrc/imp.rs b/net/reqwest/src/reqwesthttpsrc/imp.rs index 0b46ccd7..ee1cadf3 100644 --- a/net/reqwest/src/reqwesthttpsrc/imp.rs +++ b/net/reqwest/src/reqwesthttpsrc/imp.rs @@ -1116,18 +1116,18 @@ impl BaseSrcImpl for ReqwestHttpSrc { } }; - let start = segment.start().expect("No start position given"); - let stop = segment.stop(); + let start = *segment.start().expect("No start position given"); + let stop = segment.stop().map(|stop| *stop); gst_debug!(CAT, obj: src, "Seeking to {}-{:?}", start, stop); - if position == start && old_stop == stop.0 { + if position == start && old_stop == stop { gst_debug!(CAT, obj: src, "No change to current request"); return true; } *state = State::Stopped; - match self.do_request(src, uri, start, stop.0) { + match self.do_request(src, uri, start, stop) { Ok(s) => { *state = s; true diff --git a/net/reqwest/tests/reqwesthttpsrc.rs b/net/reqwest/tests/reqwesthttpsrc.rs index e30323d2..9f65544e 100644 --- a/net/reqwest/tests/reqwesthttpsrc.rs +++ b/net/reqwest/tests/reqwesthttpsrc.rs @@ -372,7 +372,7 @@ fn test_basic_request() { if cursor.position() == 0 { assert_eq!( h.src.query_duration::(), - Some(gst::format::Bytes::from(expected_output.len() as u64)) + Some(gst::format::Bytes(expected_output.len() as u64)) ); } @@ -430,7 +430,7 @@ fn test_basic_request_inverted_defaults() { if cursor.position() == 0 { assert_eq!( h.src.query_duration::(), - Some(gst::format::Bytes::from(expected_output.len() as u64)) + Some(gst::format::Bytes(expected_output.len() as u64)) ); } @@ -508,7 +508,7 @@ fn test_extra_headers() { if cursor.position() == 0 { assert_eq!( h.src.query_duration::(), - Some(gst::format::Bytes::from(expected_output.len() as u64)) + Some(gst::format::Bytes(expected_output.len() as u64)) ); } @@ -568,7 +568,7 @@ fn test_cookies_property() { if cursor.position() == 0 { assert_eq!( h.src.query_duration::(), - Some(gst::format::Bytes::from(expected_output.len() as u64)) + Some(gst::format::Bytes(expected_output.len() as u64)) ); } @@ -628,7 +628,7 @@ fn test_iradio_mode() { if cursor.position() == 0 { assert_eq!( h.src.query_duration::(), - Some(gst::format::Bytes::from(expected_output.len() as u64)) + Some(gst::format::Bytes(expected_output.len() as u64)) ); } @@ -706,7 +706,7 @@ fn test_audio_l16() { if cursor.position() == 0 { assert_eq!( h.src.query_duration::(), - Some(gst::format::Bytes::from(expected_output.len() as u64)) + Some(gst::format::Bytes(expected_output.len() as u64)) ); } @@ -780,7 +780,7 @@ fn test_authorization() { if cursor.position() == 0 { assert_eq!( h.src.query_duration::(), - Some(gst::format::Bytes::from(expected_output.len() as u64)) + Some(gst::format::Bytes(expected_output.len() as u64)) ); } @@ -930,13 +930,13 @@ fn test_seek_after_ready() { assert_eq!(current_state, gst::State::Ready); h.run(|src| { - src.seek_simple(gst::SeekFlags::FLUSH, gst::format::Bytes::from(123)) + src.seek_simple(gst::SeekFlags::FLUSH, gst::format::Bytes(123)) .unwrap(); src.set_state(gst::State::Playing).unwrap(); }); let segment = h.wait_for_segment(false); - assert_eq!(segment.start(), gst::format::Bytes::from(123)); + assert_eq!(segment.start(), Some(gst::format::Bytes(123))); let mut expected_output = vec![0; 8192 - 123]; for (i, d) in expected_output.iter_mut().enumerate() { @@ -1009,12 +1009,12 @@ fn test_seek_after_buffer_received() { //seek to a position after a buffer is Received h.run(|src| { - src.seek_simple(gst::SeekFlags::FLUSH, gst::format::Bytes::from(123)) + src.seek_simple(gst::SeekFlags::FLUSH, gst::format::Bytes(123)) .unwrap(); }); let segment = h.wait_for_segment(true); - assert_eq!(segment.start(), gst::format::Bytes::from(123)); + assert_eq!(segment.start(), Some(gst::format::Bytes(123))); let mut expected_output = vec![0; 8192 - 123]; for (i, d) in expected_output.iter_mut().enumerate() { @@ -1091,16 +1091,16 @@ fn test_seek_with_stop_position() { 1.0, gst::SeekFlags::FLUSH, gst::SeekType::Set, - gst::format::Bytes::from(123), + gst::format::Bytes(123), gst::SeekType::Set, - gst::format::Bytes::from(131), + gst::format::Bytes(131), ) .unwrap(); }); let segment = h.wait_for_segment(true); - assert_eq!(segment.start(), gst::format::Bytes::from(123)); - assert_eq!(segment.stop(), gst::format::Bytes::from(131)); + assert_eq!(segment.start(), Some(gst::format::Bytes(123))); + assert_eq!(segment.stop(), Some(gst::format::Bytes(131))); let mut expected_output = vec![0; 8]; for (i, d) in expected_output.iter_mut().enumerate() { diff --git a/net/rusoto/src/aws_transcriber/imp.rs b/net/rusoto/src/aws_transcriber/imp.rs index ba548246..90a86363 100644 --- a/net/rusoto/src/aws_transcriber/imp.rs +++ b/net/rusoto/src/aws_transcriber/imp.rs @@ -39,7 +39,6 @@ use tokio::runtime; use std::collections::VecDeque; use std::pin::Pin; use std::sync::Mutex; -use std::time::Duration; use atomic_refcell::AtomicRefCell; @@ -110,13 +109,13 @@ static RUNTIME: Lazy = Lazy::new(|| { .unwrap() }); -const DEFAULT_LATENCY_MS: u32 = 8000; +const DEFAULT_LATENCY: gst::ClockTime = gst::ClockTime::from_seconds(8); const DEFAULT_USE_PARTIAL_RESULTS: bool = true; -const GRANULARITY_MS: u32 = 100; +const GRANULARITY: gst::ClockTime = gst::ClockTime::from_mseconds(100); #[derive(Debug, Clone)] struct Settings { - latency_ms: u32, + latency: gst::ClockTime, language_code: Option, use_partial_results: bool, vocabulary: Option, @@ -125,7 +124,7 @@ struct Settings { impl Default for Settings { fn default() -> Self { Self { - latency_ms: DEFAULT_LATENCY_MS, + latency: DEFAULT_LATENCY, language_code: Some("en-US".to_string()), use_partial_results: DEFAULT_USE_PARTIAL_RESULTS, vocabulary: None, @@ -144,7 +143,7 @@ struct State { buffers: VecDeque, send_eos: bool, discont: bool, - last_partial_end_time: gst::ClockTime, + last_partial_end_time: Option, partial_alternative: Option, } @@ -161,7 +160,7 @@ impl Default for State { buffers: VecDeque::new(), send_eos: false, discont: true, - last_partial_end_time: gst::CLOCK_TIME_NONE, + last_partial_end_time: None, partial_alternative: None, } } @@ -207,10 +206,8 @@ impl Transcriber { let (latency, now, mut last_position, send_eos, seqnum) = { let mut state = self.state.lock().unwrap(); // Multiply GRANULARITY by 2 in order to not send buffers that - // are less than GRANULARITY milliseconds away too late - let latency: gst::ClockTime = (self.settings.lock().unwrap().latency_ms as u64 - - (2 * GRANULARITY_MS) as u64) - * gst::MSECOND; + // are less than GRANULARITY away too late + let latency = self.settings.lock().unwrap().latency - 2 * GRANULARITY; let now = element.current_running_time(); if let Some(alternative) = state.partial_alternative.take() { @@ -220,7 +217,10 @@ impl Transcriber { let send_eos = state.send_eos && state.buffers.is_empty(); while let Some(buf) = state.buffers.front() { - if now - buf.pts() > latency { + if now + .zip(buf.pts()) + .map_or(false, |(now, pts)| now - pts > latency) + { /* Safe unwrap, we know we have an item */ let buf = state.buffers.pop_front().unwrap(); items.push(buf); @@ -248,16 +248,31 @@ impl Transcriber { } for mut buf in items.drain(..) { - if buf.pts() > last_position { - let gap_event = gst::event::Gap::builder(last_position, buf.pts() - last_position) + let delta = buf + .pts() + .zip(last_position) + .map(|(pts, last_pos)| pts.checked_sub(last_pos)); + if let Some(delta) = delta { + let last_pos = last_position.expect("defined since delta could be computed"); + + let gap_event = gst::event::Gap::builder(last_pos) + .duration(delta) .seqnum(seqnum) .build(); - gst_debug!(CAT, "Pushing gap: {} -> {}", last_position, buf.pts()); + gst_debug!( + CAT, + "Pushing gap: {} -> {}", + last_pos, + buf.pts().display() + ); if !self.srcpad.push_event(gap_event) { return false; } } - last_position = buf.pts() + buf.duration(); + last_position = buf + .pts() + .zip(buf.duration()) + .map(|(pts, duration)| pts + duration); { let buf = buf.get_mut().unwrap(); buf.set_pts(buf.pts()); @@ -265,8 +280,11 @@ impl Transcriber { gst_debug!( CAT, "Pushing buffer: {} -> {}", - buf.pts(), - buf.pts() + buf.duration() + buf.pts().display(), + buf.pts() + .zip(buf.duration()) + .map(|(pts, duration)| pts + duration) + .display(), ); if self.srcpad.push(buf).is_err() { return false; @@ -275,19 +293,19 @@ impl Transcriber { /* next, push a gap if we're lagging behind the target position */ - if now - last_position > latency { - let duration = now - last_position - latency; - - let gap_event = gst::event::Gap::builder(last_position, duration) + let duration = now + .zip(last_position) + .and_then(|(now, last_position)| now.checked_sub(last_position)) + .and_then(|delta| delta.checked_sub(latency)); + if let Some(duration) = duration { + let last_pos = last_position.expect("defined since duration could be computed"); + let gap_event = gst::event::Gap::builder(last_pos) + .duration(duration) .seqnum(seqnum) .build(); - gst_debug!( - CAT, - "Pushing gap: {} -> {}", - last_position, - last_position + duration - ); - last_position += duration; + let next_position = last_pos + duration; + gst_debug!(CAT, "Pushing gap: {} -> {}", last_pos, next_position,); + last_position = Some(next_position); if !self.srcpad.push_event(gap_event) { return false; } @@ -309,22 +327,27 @@ impl Transcriber { alternative: &TranscriptAlternative, partial: bool, latency: gst::ClockTime, - now: gst::ClockTime, + now: impl Into> + Copy, ) { for item in &alternative.items { - let mut start_time: gst::ClockTime = - ((item.start_time as f64 * 1_000_000_000.0) as u64).into(); - let mut end_time: gst::ClockTime = - ((item.end_time as f64 * 1_000_000_000.0) as u64).into(); + let mut start_time = + gst::ClockTime::from_nseconds((item.start_time as f64 * 1_000_000_000.0) as u64); + let mut end_time = + gst::ClockTime::from_nseconds((item.end_time as f64 * 1_000_000_000.0) as u64); - if start_time <= state.last_partial_end_time { + if state + .last_partial_end_time + .map_or(false, |last_partial_end_time| { + start_time <= last_partial_end_time + }) + { /* Already sent (hopefully) */ continue; - } else if !partial || start_time + latency < now { + } else if !partial || now.into().map_or(false, |now| start_time + latency < now) { /* Should be sent now */ gst_debug!(CAT, obj: element, "Item is ready: {}", item.content); let mut buf = gst::Buffer::from_mut_slice(item.content.clone().into_bytes()); - state.last_partial_end_time = end_time; + state.last_partial_end_time = Some(end_time); { let buf = buf.get_mut().unwrap(); @@ -334,15 +357,23 @@ impl Transcriber { state.discont = false; } - if start_time < state.out_segment.position() { + if state + .out_segment + .position() + .map_or(false, |pos| start_time < pos) + { + let pos = state + .out_segment + .position() + .expect("position checked above"); gst_debug!( CAT, obj: element, - "Adjusting item timing({:?} < {:?})", + "Adjusting item timing({} < {})", start_time, - state.out_segment.position() + pos, ); - start_time = state.out_segment.position(); + start_time = pos; if end_time < start_time { end_time = start_time; } @@ -436,24 +467,29 @@ impl Transcriber { alternative.transcript ); - let mut start_time: gst::ClockTime = - ((result.start_time as f64 * 1_000_000_000.0) as u64).into(); - let end_time: gst::ClockTime = - ((result.end_time as f64 * 1_000_000_000.0) as u64).into(); + let mut start_time = gst::ClockTime::from_nseconds( + (result.start_time as f64 * 1_000_000_000.0) as u64, + ); + let end_time = gst::ClockTime::from_nseconds( + (result.end_time as f64 * 1_000_000_000.0) as u64, + ); let mut state = self.state.lock().unwrap(); let position = state.out_segment.position(); - if end_time < position { + if position.map_or(false, |position| end_time < position) { + let pos = position.expect("position checked above"); gst_warning!(CAT, obj: element, - "Received transcript is too late by {:?}, dropping, consider increasing the latency", - position - start_time); + "Received transcript is too late by {}, dropping, consider increasing the latency", + pos - start_time); } else { - if start_time < position { + if let Some(delta) = + position.and_then(|pos| pos.checked_sub(start_time)) + { gst_warning!(CAT, obj: element, - "Received transcript is too late by {:?}, clipping, consider increasing the latency", - position - start_time); - start_time = position; + "Received transcript is too late by {}, clipping, consider increasing the latency", + delta); + start_time = position.expect("position checked above"); } let mut buf = gst::Buffer::from_mut_slice( @@ -489,8 +525,8 @@ impl Transcriber { &mut state, &alternative, false, - 0.into(), - 0.into(), + gst::ClockTime::ZERO, + gst::ClockTime::ZERO, ); state.partial_alternative = None; } @@ -508,7 +544,7 @@ impl Transcriber { /* Wrap in a timeout so we can push gaps regularly */ let future = async move { - match tokio::time::timeout(Duration::from_millis(GRANULARITY_MS.into()), future).await { + match tokio::time::timeout(GRANULARITY.into(), future).await { Err(_) => { if !self.dequeue(element) { gst_info!(CAT, obj: element, "Failed to push gap event, pausing"); @@ -611,9 +647,8 @@ impl Transcriber { if ret { let (_, min, _) = peer_query.result(); - let our_latency: gst::ClockTime = - self.settings.lock().unwrap().latency_ms as u64 * gst::MSECOND; - q.set(true, our_latency + min, gst::CLOCK_TIME_NONE); + let our_latency = self.settings.lock().unwrap().latency; + q.set(true, our_latency + min, gst::ClockTime::NONE); } ret } @@ -694,9 +729,7 @@ impl Transcriber { let mut state = self.state.lock().unwrap(); state.out_segment.set_time(segment.time()); - state - .out_segment - .set_position(gst::ClockTime::from_nseconds(0)); + state.out_segment.set_position(gst::ClockTime::ZERO); state.in_segment = segment; state.seqnum = e.seqnum(); @@ -738,14 +771,14 @@ impl Transcriber { let running_time = state.in_segment.to_running_time(buffer.pts()); let now = element.current_running_time(); - if now.is_some() && now < running_time { - delay = Some(running_time - now); - } + delay = running_time + .zip(now) + .and_then(|(running_time, now)| running_time.checked_sub(now)); } } if let Some(delay) = delay { - tokio::time::sleep(Duration::from_nanos(delay.nseconds().unwrap())).await; + tokio::time::sleep(delay.into()).await; } if let Some(ws_sink) = self.ws_sink.borrow_mut().as_mut() { @@ -1032,9 +1065,9 @@ impl ObjectImpl for Transcriber { "latency", "Latency", "Amount of milliseconds to allow AWS transcribe", - 2 * GRANULARITY_MS, + 2 * GRANULARITY.mseconds() as u32, std::u32::MAX, - DEFAULT_LATENCY_MS, + DEFAULT_LATENCY.mseconds() as u32, glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, ), glib::ParamSpec::new_string( @@ -1074,7 +1107,9 @@ impl ObjectImpl for Transcriber { } "latency" => { let mut settings = self.settings.lock().unwrap(); - settings.latency_ms = value.get().expect("type checked upstream"); + settings.latency = gst::ClockTime::from_mseconds( + value.get::().expect("type checked upstream").into(), + ); } "use-partial-results" => { let mut settings = self.settings.lock().unwrap(); @@ -1096,7 +1131,7 @@ impl ObjectImpl for Transcriber { } "latency" => { let settings = self.settings.lock().unwrap(); - settings.latency_ms.to_value() + (settings.latency.mseconds() as u32).to_value() } "use-partial-results" => { let settings = self.settings.lock().unwrap();