From 8f81cb881262aba9ba4aa2c1d7579e2467b3a596 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Wed, 26 May 2021 11:54:34 +0200 Subject: [PATCH] generic: migrate to new ClockTime design --- generic/sodium/examples/decrypt_example.rs | 2 +- generic/sodium/examples/encrypt_example.rs | 2 +- generic/sodium/src/decrypter/imp.rs | 6 +- generic/sodium/src/encrypter/imp.rs | 6 +- generic/sodium/tests/decrypter.rs | 6 +- generic/threadshare/src/appsrc/imp.rs | 23 +- generic/threadshare/src/dataqueue.rs | 16 +- generic/threadshare/src/inputselector/imp.rs | 40 ++-- generic/threadshare/src/jitterbuffer/imp.rs | 210 +++++++++--------- .../src/jitterbuffer/jitterbuffer.rs | 61 +++-- generic/threadshare/src/proxy/imp.rs | 29 ++- generic/threadshare/src/queue/imp.rs | 27 ++- generic/threadshare/src/runtime/executor.rs | 26 +-- generic/threadshare/src/runtime/task.rs | 51 +++-- generic/threadshare/src/socket.rs | 14 +- generic/threadshare/src/tcpclientsrc/imp.rs | 16 +- generic/threadshare/src/udpsink/imp.rs | 51 +++-- generic/threadshare/src/udpsrc/imp.rs | 18 +- generic/threadshare/tests/pad.rs | 7 +- generic/threadshare/tests/proxy.rs | 6 +- generic/threadshare/tests/queue.rs | 2 +- generic/threadshare/tests/tcpclientsrc.rs | 2 +- 22 files changed, 344 insertions(+), 277 deletions(-) diff --git a/generic/sodium/examples/decrypt_example.rs b/generic/sodium/examples/decrypt_example.rs index 1a8a199a..d1f335ea 100644 --- a/generic/sodium/examples/decrypt_example.rs +++ b/generic/sodium/examples/decrypt_example.rs @@ -124,7 +124,7 @@ fn main() -> Result<(), Box> { .expect("Unable to set the pipeline to the `Playing` state"); let bus = pipeline.bus().unwrap(); - for msg in bus.iter_timed(gst::CLOCK_TIME_NONE) { + for msg in bus.iter_timed(gst::ClockTime::NONE) { use gst::MessageView; match msg.view() { MessageView::Error(err) => { diff --git a/generic/sodium/examples/encrypt_example.rs b/generic/sodium/examples/encrypt_example.rs index 29aa586f..ea125ddc 100644 --- a/generic/sodium/examples/encrypt_example.rs +++ b/generic/sodium/examples/encrypt_example.rs @@ -121,7 +121,7 @@ fn main() -> Result<(), Box> { pipeline.set_state(gst::State::Playing)?; let bus = pipeline.bus().unwrap(); - for msg in bus.iter_timed(gst::CLOCK_TIME_NONE) { + for msg in bus.iter_timed(gst::ClockTime::NONE) { use gst::MessageView; match msg.view() { MessageView::Error(err) => { diff --git a/generic/sodium/src/decrypter/imp.rs b/generic/sodium/src/decrypter/imp.rs index ce8a840a..428b252c 100644 --- a/generic/sodium/src/decrypter/imp.rs +++ b/generic/sodium/src/decrypter/imp.rs @@ -322,8 +322,8 @@ impl Decrypter { } let size = match peer_query.result().try_into().unwrap() { - gst::format::Bytes(Some(size)) => size, - gst::format::Bytes(None) => { + Some(gst::format::Bytes(size)) => size, + None => { gst_error!(CAT, "Failed to query upstream duration"); return false; @@ -348,7 +348,7 @@ impl Decrypter { let size = size - total_chunks * box_::MACBYTES as u64; gst_debug!(CAT, obj: pad, "Setting duration bytes: {}", size); - q.set(gst::format::Bytes::from(size)); + q.set(gst::format::Bytes(size)); true } diff --git a/generic/sodium/src/encrypter/imp.rs b/generic/sodium/src/encrypter/imp.rs index 3b5bde2a..ac1f1f40 100644 --- a/generic/sodium/src/encrypter/imp.rs +++ b/generic/sodium/src/encrypter/imp.rs @@ -299,8 +299,8 @@ impl Encrypter { } let size = match peer_query.result().try_into().unwrap() { - gst::format::Bytes(Some(size)) => size, - gst::format::Bytes(None) => { + Some(gst::format::Bytes(size)) => size, + None => { gst_error!(CAT, "Failed to query upstream duration"); return false; @@ -324,7 +324,7 @@ impl Encrypter { let size = size + crate::HEADERS_SIZE as u64; gst_debug!(CAT, obj: pad, "Setting duration bytes: {}", size); - q.set(gst::format::Bytes::from(size)); + q.set(gst::format::Bytes(size)); true } diff --git a/generic/sodium/tests/decrypter.rs b/generic/sodium/tests/decrypter.rs index 8feafaef..25f1afeb 100644 --- a/generic/sodium/tests/decrypter.rs +++ b/generic/sodium/tests/decrypter.rs @@ -119,7 +119,7 @@ fn test_pipeline() { .expect("Unable to set the pipeline to the `Playing` state"); let bus = pipeline.bus().unwrap(); - for msg in bus.iter_timed(gst::CLOCK_TIME_NONE) { + for msg in bus.iter_timed(gst::ClockTime::NONE) { use gst::MessageView; match msg.view() { MessageView::Error(err) => { @@ -200,11 +200,11 @@ fn test_pull_range() { assert_eq!(seekable, true); assert_eq!( start, - gst::GenericFormattedValue::Bytes(gst::format::Bytes(Some(0))) + gst::GenericFormattedValue::Bytes(Some(gst::format::Bytes(0))) ); assert_eq!( stop, - gst::GenericFormattedValue::Bytes(gst::format::Bytes(Some(6043))) + gst::GenericFormattedValue::Bytes(Some(gst::format::Bytes(6043))) ); // do pulls diff --git a/generic/threadshare/src/appsrc/imp.rs b/generic/threadshare/src/appsrc/imp.rs index bb0f622e..2fdf6cff 100644 --- a/generic/threadshare/src/appsrc/imp.rs +++ b/generic/threadshare/src/appsrc/imp.rs @@ -31,13 +31,15 @@ use once_cell::sync::Lazy; use std::convert::TryInto; use std::sync::Arc; use std::sync::Mutex as StdMutex; +use std::time::Duration; use std::u32; use crate::runtime::prelude::*; use crate::runtime::{Context, PadSrc, PadSrcRef, PadSrcWeak, Task, TaskState}; const DEFAULT_CONTEXT: &str = ""; -const DEFAULT_CONTEXT_WAIT: u32 = 0; +// FIXME use Duration::ZERO when MSVC >= 1.53.2 +const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0); const DEFAULT_CAPS: Option = None; const DEFAULT_MAX_BUFFERS: u32 = 10; const DEFAULT_DO_TIMESTAMP: bool = false; @@ -45,7 +47,7 @@ const DEFAULT_DO_TIMESTAMP: bool = false; #[derive(Debug, Clone)] struct Settings { context: String, - context_wait: u32, + context_wait: Duration, caps: Option, max_buffers: u32, do_timestamp: bool, @@ -223,7 +225,7 @@ impl PadSrcHandler for AppSrcPadHandler { gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); let ret = match query.view_mut() { QueryView::Latency(ref mut q) => { - q.set(true, 0.into(), gst::CLOCK_TIME_NONE); + q.set(true, gst::ClockTime::ZERO, gst::ClockTime::NONE); true } QueryView::Scheduling(ref mut q) => { @@ -389,8 +391,11 @@ impl AppSrc { let now = clock.time(); let buffer = buffer.make_mut(); - buffer.set_dts(now - base_time); - buffer.set_pts(gst::CLOCK_TIME_NONE); + buffer.set_dts( + now.zip(base_time) + .and_then(|(now, base_time)| now.checked_sub(base_time)), + ); + buffer.set_pts(None); } else { gst_error!(CAT, obj: element, "Don't have a clock yet"); return false; @@ -540,7 +545,7 @@ impl ObjectImpl for AppSrc { "Throttle poll loop to run at most once every this many ms", 0, 1000, - DEFAULT_CONTEXT_WAIT, + DEFAULT_CONTEXT_WAIT.as_millis() as u32, glib::ParamFlags::READWRITE, ), glib::ParamSpec::new_uint( @@ -620,7 +625,9 @@ impl ObjectImpl for AppSrc { .unwrap_or_else(|| "".into()); } "context-wait" => { - settings.context_wait = value.get().expect("type checked upstream"); + settings.context_wait = Duration::from_millis( + value.get::().expect("type checked upstream").into(), + ); } "caps" => { settings.caps = value.get().expect("type checked upstream"); @@ -639,7 +646,7 @@ impl ObjectImpl for AppSrc { let settings = self.settings.lock().unwrap(); match pspec.name() { "context" => settings.context.to_value(), - "context-wait" => settings.context_wait.to_value(), + "context-wait" => (settings.context_wait.as_millis() as u32).to_value(), "caps" => settings.caps.to_value(), "max-buffers" => settings.max_buffers.to_value(), "do-timestamp" => settings.do_timestamp.to_value(), diff --git a/generic/threadshare/src/dataqueue.rs b/generic/threadshare/src/dataqueue.rs index 263a25cf..48f9d31d 100644 --- a/generic/threadshare/src/dataqueue.rs +++ b/generic/threadshare/src/dataqueue.rs @@ -25,7 +25,7 @@ use once_cell::sync::Lazy; use std::collections::VecDeque; use std::sync::Arc; use std::sync::Mutex as StdMutex; -use std::{u32, u64}; +use std::u32; static DATA_QUEUE_CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( @@ -54,12 +54,10 @@ impl DataQueueItem { } } - fn timestamp(&self) -> Option { + fn timestamp(&self) -> Option { match *self { - DataQueueItem::Buffer(ref buffer) => buffer.dts_or_pts().0, - DataQueueItem::BufferList(ref list) => { - list.iter().filter_map(|b| b.dts_or_pts().0).next() - } + DataQueueItem::Buffer(ref buffer) => buffer.dts_or_pts(), + DataQueueItem::BufferList(ref list) => list.iter().find_map(|b| b.dts_or_pts()), DataQueueItem::Event(_) => None, } } @@ -86,7 +84,7 @@ struct DataQueueInner { cur_size_bytes: u32, max_size_buffers: Option, max_size_bytes: Option, - max_size_time: Option, + max_size_time: Option, pending_handle: Option, } @@ -105,7 +103,7 @@ impl DataQueue { src_pad: &gst::Pad, max_size_buffers: Option, max_size_bytes: Option, - max_size_time: Option, + max_size_time: impl Into>, ) -> DataQueue { DataQueue(Arc::new(StdMutex::new(DataQueueInner { element: element.clone(), @@ -116,7 +114,7 @@ impl DataQueue { cur_size_bytes: 0, max_size_buffers, max_size_bytes, - max_size_time, + max_size_time: max_size_time.into(), pending_handle: None, }))) } diff --git a/generic/threadshare/src/inputselector/imp.rs b/generic/threadshare/src/inputselector/imp.rs index d5dd6370..080816fd 100644 --- a/generic/threadshare/src/inputselector/imp.rs +++ b/generic/threadshare/src/inputselector/imp.rs @@ -35,12 +35,13 @@ use crate::runtime::prelude::*; use crate::runtime::{self, PadSink, PadSinkRef, PadSrc, PadSrcRef}; const DEFAULT_CONTEXT: &str = ""; -const DEFAULT_CONTEXT_WAIT: u32 = 0; +// FIXME use Duration::ZERO when MSVC >= 1.53.2 +const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0); #[derive(Debug, Clone)] struct Settings { context: String, - context_wait: u32, + context_wait: Duration, } impl Default for Settings { @@ -74,14 +75,20 @@ struct InputSelectorPadSinkHandler(Arc>) impl InputSelectorPadSinkHandler { /* Wait until specified time */ - async fn sync(&self, element: &super::InputSelector, running_time: gst::ClockTime) { + async fn sync( + &self, + element: &super::InputSelector, + running_time: impl Into>, + ) { let now = element.current_running_time(); - if let Some(delay) = running_time - .saturating_sub(now) - .and_then(|delay| delay.nseconds()) + match running_time + .into() + .zip(now) + .and_then(|(running_time, now)| running_time.checked_sub(now)) { - runtime::time::delay_for(Duration::from_nanos(delay)).await; + Some(delay) => runtime::time::delay_for(delay.into()).await, + None => runtime::executor::yield_now().await, } } @@ -289,8 +296,8 @@ impl PadSrcHandler for InputSelectorPadSrcHandler { match query.view_mut() { QueryView::Latency(ref mut q) => { let mut ret = true; - let mut min_latency = 0.into(); - let mut max_latency = gst::ClockTime::none(); + let mut min_latency = gst::ClockTime::ZERO; + let mut max_latency = gst::ClockTime::NONE; let pads = { let pads = inputselector.pads.lock().unwrap(); pads.sink_pads @@ -307,8 +314,11 @@ impl PadSrcHandler for InputSelectorPadSrcHandler { if ret { let (live, min, max) = peer_query.result(); if live { - min_latency = min.max(min_latency).unwrap_or(min_latency); - max_latency = max.min(max_latency).unwrap_or(max); + min_latency = min.max(min_latency); + max_latency = max + .zip(max_latency) + .map(|(max, max_latency)| max.min(max_latency)) + .or(max); } } } @@ -424,7 +434,7 @@ impl ObjectImpl for InputSelector { "Throttle poll loop to run at most once every this many ms", 0, 1000, - DEFAULT_CONTEXT_WAIT, + DEFAULT_CONTEXT_WAIT.as_millis() as u32, glib::ParamFlags::READWRITE, ), glib::ParamSpec::new_object( @@ -457,7 +467,9 @@ impl ObjectImpl for InputSelector { } "context-wait" => { let mut settings = self.settings.lock().unwrap(); - settings.context_wait = value.get().expect("type checked upstream"); + settings.context_wait = Duration::from_millis( + value.get::().expect("type checked upstream").into(), + ); } "active-pad" => { let pad = value @@ -501,7 +513,7 @@ impl ObjectImpl for InputSelector { } "context-wait" => { let settings = self.settings.lock().unwrap(); - settings.context_wait.to_value() + (settings.context_wait.as_millis() as u32).to_value() } "active-pad" => { let state = self.state.lock().unwrap(); diff --git a/generic/threadshare/src/jitterbuffer/imp.rs b/generic/threadshare/src/jitterbuffer/imp.rs index e89d939f..ef3880d6 100644 --- a/generic/threadshare/src/jitterbuffer/imp.rs +++ b/generic/threadshare/src/jitterbuffer/imp.rs @@ -39,27 +39,27 @@ use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, Task use super::jitterbuffer::{RTPJitterBuffer, RTPJitterBufferItem, RTPPacketRateCtx}; -const DEFAULT_LATENCY_MS: u32 = 200; +const DEFAULT_LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(200); const DEFAULT_DO_LOST: bool = false; const DEFAULT_MAX_DROPOUT_TIME: u32 = 60000; const DEFAULT_MAX_MISORDER_TIME: u32 = 2000; const DEFAULT_CONTEXT: &str = ""; -const DEFAULT_CONTEXT_WAIT: u32 = 0; +const DEFAULT_CONTEXT_WAIT: gst::ClockTime = gst::ClockTime::ZERO; #[derive(Debug, Clone)] struct Settings { - latency_ms: u32, + latency: gst::ClockTime, do_lost: bool, max_dropout_time: u32, max_misorder_time: u32, context: String, - context_wait: u32, + context_wait: gst::ClockTime, } impl Default for Settings { fn default() -> Self { Settings { - latency_ms: DEFAULT_LATENCY_MS, + latency: DEFAULT_LATENCY, do_lost: DEFAULT_DO_LOST, max_dropout_time: DEFAULT_MAX_DROPOUT_TIME, max_misorder_time: DEFAULT_MAX_MISORDER_TIME, @@ -108,7 +108,7 @@ impl PartialEq for GapPacket { struct SinkHandlerInner { packet_rate_ctx: RTPPacketRateCtx, ips_rtptime: Option, - ips_pts: gst::ClockTime, + ips_pts: Option, gap_packets: BTreeSet, @@ -123,7 +123,7 @@ impl Default for SinkHandlerInner { SinkHandlerInner { packet_rate_ctx: RTPPacketRateCtx::new(), ips_rtptime: None, - ips_pts: gst::CLOCK_TIME_NONE, + ips_pts: None, gap_packets: BTreeSet::new(), last_pt: None, last_in_seqnum: None, @@ -155,16 +155,16 @@ impl SinkHandler { state.discont = true; state.last_popped_seqnum = None; - state.last_popped_pts = gst::CLOCK_TIME_NONE; + state.last_popped_pts = None; inner.last_in_seqnum = None; inner.last_rtptime = None; - state.earliest_pts = gst::CLOCK_TIME_NONE; + state.earliest_pts = None; state.earliest_seqnum = None; inner.ips_rtptime = None; - inner.ips_pts = gst::CLOCK_TIME_NONE; + inner.ips_pts = None; mem::replace(&mut inner.gap_packets, BTreeSet::new()) } @@ -208,14 +208,17 @@ impl SinkHandler { inner: &mut SinkHandlerInner, state: &mut State, rtptime: u32, - pts: gst::ClockTime, + pts: impl Into>, ) { if inner.ips_rtptime != Some(rtptime) { - if inner.ips_pts.is_some() && pts.is_some() { - let new_packet_spacing = pts - inner.ips_pts; + let pts = pts.into(); + let new_packet_spacing = inner + .ips_pts + .zip(pts) + .and_then(|(ips_pts, pts)| pts.checked_sub(ips_pts)); + if let Some(new_packet_spacing) = new_packet_spacing { let old_packet_spacing = state.packet_spacing; - assert!(old_packet_spacing.is_some()); if old_packet_spacing > new_packet_spacing { state.packet_spacing = (new_packet_spacing + 3 * old_packet_spacing) / 4; } else if !old_packet_spacing.is_zero() { @@ -421,7 +424,7 @@ impl SinkHandler { return Ok(gst::FlowSuccess::Ok); } } - inner.ips_pts = gst::CLOCK_TIME_NONE; + inner.ips_pts = None; inner.ips_rtptime = None; } @@ -441,7 +444,7 @@ impl SinkHandler { inner.last_in_seqnum = Some(seq); let jb_item = if estimated_dts { - RTPJitterBufferItem::new(buffer, gst::CLOCK_TIME_NONE, pts, Some(seq), rtptime) + RTPJitterBufferItem::new(buffer, gst::ClockTime::NONE, pts, Some(seq), rtptime) } else { RTPJitterBufferItem::new(buffer, dts, pts, Some(seq), rtptime) }; @@ -463,15 +466,16 @@ impl SinkHandler { inner.last_rtptime = Some(rtptime); - if state.earliest_pts.is_none() - || (pts.is_some() - && (pts < state.earliest_pts - || (pts == state.earliest_pts - && state - .earliest_seqnum - .map(|earliest_seqnum| seq > earliest_seqnum) - .unwrap_or(false)))) - { + let must_update = match (state.earliest_pts, pts) { + (None, _) => true, + (Some(earliest_pts), Some(pts)) if pts < earliest_pts => true, + (Some(earliest_pts), Some(pts)) if pts == earliest_pts => state + .earliest_seqnum + .map_or(false, |earliest_seqnum| seq > earliest_seqnum), + _ => false, + }; + + if must_update { state.earliest_pts = pts; state.earliest_seqnum = Some(seq); } @@ -515,10 +519,7 @@ impl SinkHandler { let (latency, context_wait) = { let settings = jb.settings.lock().unwrap(); - ( - settings.latency_ms as u64 * gst::MSECOND, - settings.context_wait as u64 * gst::MSECOND, - ) + (settings.latency, settings.context_wait) }; // Reschedule if needed @@ -527,13 +528,15 @@ impl SinkHandler { .next_wakeup(&element, &state, latency, context_wait); if let Some((next_wakeup, _)) = next_wakeup { if let Some((previous_next_wakeup, ref abort_handle)) = state.wait_handle { - if previous_next_wakeup.is_none() || previous_next_wakeup > next_wakeup { + if previous_next_wakeup.is_none() + || next_wakeup.map_or(false, |next| previous_next_wakeup.unwrap() > next) + { gst_debug!( CAT, obj: pad, "Rescheduling for new item {} < {}", - next_wakeup, - previous_next_wakeup + next_wakeup.display(), + previous_next_wakeup.display(), ); abort_handle.abort(); state.wait_handle = None; @@ -666,16 +669,13 @@ impl SrcHandler { state: &mut State, element: &super::JitterBuffer, seqnum: u16, - pts: gst::ClockTime, + pts: impl Into>, discont: &mut bool, ) -> Vec { - let (latency_ns, do_lost) = { + let (latency, do_lost) = { let jb = JitterBuffer::from_instance(element); let settings = jb.settings.lock().unwrap(); - ( - settings.latency_ms as u64 * gst::MSECOND.nseconds().unwrap(), - settings.do_lost, - ) + (settings.latency, settings.do_lost) }; let mut events = vec![]; @@ -697,30 +697,24 @@ impl SrcHandler { let gap = gst_rtp::compare_seqnum(lost_seqnum, seqnum) as i64; if gap > 0 { - let interval = - pts.nseconds().unwrap() as i64 - state.last_popped_pts.nseconds().unwrap() as i64; let gap = gap as u64; - let spacing = if interval >= 0 { - interval as u64 / (gap + 1) - } else { - 0 - }; + // FIXME reason why we can expect Some for the 2 lines below + let mut last_popped_pts = state.last_popped_pts.unwrap(); + let interval = pts.into().unwrap().saturating_sub(last_popped_pts); + let spacing = interval / (gap as u64 + 1); *discont = true; - if state.equidistant > 0 && gap > 1 && gap * spacing > latency_ns { - let n_packets = gap - latency_ns / spacing; + if state.equidistant > 0 && gap > 1 && gap * spacing > latency { + let n_packets = gap - latency.nseconds() / spacing.nseconds(); if do_lost { let s = gst::Structure::new( "GstRTPPacketLost", &[ ("seqnum", &(lost_seqnum as u32)), - ( - "timestamp", - &(state.last_popped_pts + gst::ClockTime(Some(spacing))), - ), - ("duration", &(n_packets * spacing)), + ("timestamp", &(last_popped_pts + spacing)), + ("duration", &(n_packets * spacing).nseconds()), ("retry", &0), ], ); @@ -729,15 +723,20 @@ impl SrcHandler { } lost_seqnum = lost_seqnum.wrapping_add(n_packets as u16); - state.last_popped_pts += gst::ClockTime(Some(n_packets * spacing)); + last_popped_pts += n_packets * spacing; + state.last_popped_pts = Some(last_popped_pts); state.stats.num_lost += n_packets; } while lost_seqnum != seqnum { - let timestamp = state.last_popped_pts + gst::ClockTime(Some(spacing)); - let duration = if state.equidistant > 0 { spacing } else { 0 }; + let timestamp = last_popped_pts + spacing; + let duration = if state.equidistant > 0 { + spacing + } else { + gst::ClockTime::ZERO + }; - state.last_popped_pts = timestamp; + state.last_popped_pts = Some(timestamp); if do_lost { let s = gst::Structure::new( @@ -745,7 +744,8 @@ impl SrcHandler { &[ ("seqnum", &(lost_seqnum as u32)), ("timestamp", ×tamp), - ("duration", &duration), + // FIXME would probably make sense being a ClockTime + ("duration", &duration.nseconds()), ("retry", &0), ], ); @@ -819,8 +819,8 @@ impl SrcHandler { }; state.last_popped_pts = buffer.pts(); - if let Some(pts) = state.last_popped_pts.nseconds() { - state.position = pts.into(); + if state.last_popped_pts.is_some() { + state.position = state.last_popped_pts; } state.last_popped_seqnum = seq; @@ -845,22 +845,26 @@ impl SrcHandler { state: &State, latency: gst::ClockTime, context_wait: gst::ClockTime, - ) -> (gst::ClockTime, Option<(gst::ClockTime, Duration)>) { + ) -> ( + Option, + Option<(Option, Duration)>, + ) { let now = element.current_running_time(); gst_debug!( CAT, obj: element, "Now is {}, EOS {}, earliest pts is {}, packet_spacing {} and latency {}", - now, + now.display(), state.eos, - state.earliest_pts, + state.earliest_pts.display(), state.packet_spacing, latency ); if state.eos { gst_debug!(CAT, obj: element, "EOS, not waiting"); + // FIXME use Duration::ZERO when MSVC >= 1.53.2 return (now, Some((now, Duration::from_nanos(0)))); } @@ -868,23 +872,25 @@ impl SrcHandler { return (now, None); } - let next_wakeup = state.earliest_pts + latency - state.packet_spacing - context_wait / 2; + let next_wakeup = state + .earliest_pts + .map(|earliest_pts| earliest_pts + latency - state.packet_spacing - context_wait / 2); let delay = next_wakeup - .saturating_sub(now) - .unwrap_or_else(gst::ClockTime::zero) - .nseconds() - .unwrap(); + .zip(now) + .map_or(gst::ClockTime::ZERO, |(next_wakeup, now)| { + next_wakeup.saturating_sub(now) + }); gst_debug!( CAT, obj: element, "Next wakeup at {} with delay {}", - next_wakeup, + next_wakeup.display(), delay ); - (now, Some((next_wakeup, Duration::from_nanos(delay)))) + (now, Some((next_wakeup, delay.into()))) } } @@ -954,8 +960,8 @@ impl PadSrcHandler for SrcHandler { if ret { let settings = jb.settings.lock().unwrap(); let (_, mut min_latency, _) = peer_query.result(); - min_latency += (settings.latency_ms as u64) * gst::SECOND; - let max_latency = gst::CLOCK_TIME_NONE; + min_latency += settings.latency; + let max_latency = gst::ClockTime::NONE; q.set(true, min_latency, max_latency); } @@ -999,7 +1005,7 @@ struct State { jbuf: glib::SendUniqueCell, last_res: Result, - position: gst::ClockTime, + position: Option, segment: gst::FormattedSegment, clock_rate: Option, @@ -1011,14 +1017,14 @@ struct State { eos: bool, last_popped_seqnum: Option, - last_popped_pts: gst::ClockTime, + last_popped_pts: Option, stats: Stats, - earliest_pts: gst::ClockTime, + earliest_pts: Option, earliest_seqnum: Option, - wait_handle: Option<(gst::ClockTime, AbortHandle)>, + wait_handle: Option<(Option, AbortHandle)>, } impl Default for State { @@ -1027,23 +1033,23 @@ impl Default for State { jbuf: glib::SendUniqueCell::new(RTPJitterBuffer::new()).unwrap(), last_res: Ok(gst::FlowSuccess::Ok), - position: gst::CLOCK_TIME_NONE, + position: None, segment: gst::FormattedSegment::::new(), clock_rate: None, - packet_spacing: gst::ClockTime::zero(), + packet_spacing: gst::ClockTime::ZERO, equidistant: 0, discont: true, eos: false, last_popped_seqnum: None, - last_popped_pts: gst::CLOCK_TIME_NONE, + last_popped_pts: None, stats: Stats::default(), - earliest_pts: gst::CLOCK_TIME_NONE, + earliest_pts: None, earliest_seqnum: None, wait_handle: None, @@ -1093,10 +1099,7 @@ impl TaskImpl for JitterBufferTask { let jb = JitterBuffer::from_instance(&self.element); let (latency, context_wait) = { let settings = jb.settings.lock().unwrap(); - ( - settings.latency_ms as u64 * gst::MSECOND, - settings.context_wait as u64 * gst::MSECOND, - ) + (settings.latency, settings.context_wait) }; loop { @@ -1110,6 +1113,7 @@ impl TaskImpl for JitterBufferTask { ); let (delay_fut, abort_handle) = match next_wakeup { + // FIXME use Duration::ZERO when MSVC >= 1.53.2 Some((_, delay)) if delay == Duration::from_nanos(0) => (None, None), _ => { let (delay_fut, abort_handle) = abortable(async move { @@ -1123,8 +1127,7 @@ impl TaskImpl for JitterBufferTask { }; }); - let next_wakeup = - next_wakeup.map(|w| w.0).unwrap_or(gst::CLOCK_TIME_NONE); + let next_wakeup = next_wakeup.and_then(|w| w.0); (Some(delay_fut), Some((next_wakeup, abort_handle))) } }; @@ -1158,12 +1161,15 @@ impl TaskImpl for JitterBufferTask { CAT, obj: &self.element, "Woke up at {}, earliest_pts {}", - now, - state.earliest_pts + now.display(), + state.earliest_pts.display() ); if let Some((next_wakeup, _)) = next_wakeup { - if next_wakeup > now { + if next_wakeup + .zip(now) + .map_or(false, |(next_wakeup, now)| next_wakeup > now) + { // Reschedule and wait a bit longer in the next iteration return Ok(()); } @@ -1198,8 +1204,8 @@ impl TaskImpl for JitterBufferTask { latency, context_wait, ); - if let Some((next_wakeup, _)) = next_wakeup { - if next_wakeup > now { + if let Some((Some(next_wakeup), _)) = next_wakeup { + if now.map_or(false, |now| next_wakeup > now) { // Reschedule and wait a bit longer in the next iteration return Ok(()); } @@ -1281,7 +1287,7 @@ impl JitterBuffer { let context = { let settings = self.settings.lock().unwrap(); - Context::acquire(&settings.context, settings.context_wait).unwrap() + Context::acquire(&settings.context, settings.context_wait.into()).unwrap() }; self.task @@ -1367,7 +1373,7 @@ impl ObjectImpl for JitterBuffer { "Throttle poll loop to run at most once every this many ms", 0, 1000, - DEFAULT_CONTEXT_WAIT, + DEFAULT_CONTEXT_WAIT.mseconds() as u32, glib::ParamFlags::READWRITE, ), glib::ParamSpec::new_uint( @@ -1376,7 +1382,7 @@ impl ObjectImpl for JitterBuffer { "Amount of ms to buffer", 0, std::u32::MAX, - DEFAULT_LATENCY_MS, + DEFAULT_LATENCY.mseconds() as u32, glib::ParamFlags::READWRITE, ), glib::ParamSpec::new_boolean( @@ -1446,14 +1452,16 @@ impl ObjectImpl for JitterBuffer { ) { match pspec.name() { "latency" => { - let latency_ms = { + let latency = { let mut settings = self.settings.lock().unwrap(); - settings.latency_ms = value.get().expect("type checked upstream"); - settings.latency_ms as u64 + settings.latency = gst::ClockTime::from_mseconds( + value.get::().expect("type checked upstream").into(), + ); + settings.latency }; let state = self.state.lock().unwrap(); - state.jbuf.borrow().set_delay(latency_ms * gst::MSECOND); + state.jbuf.borrow().set_delay(latency); let _ = obj.post_message(gst::message::Latency::builder().src(obj).build()); } @@ -1478,7 +1486,9 @@ impl ObjectImpl for JitterBuffer { } "context-wait" => { let mut settings = self.settings.lock().unwrap(); - settings.context_wait = value.get().expect("type checked upstream"); + settings.context_wait = gst::ClockTime::from_mseconds( + value.get::().expect("type checked upstream").into(), + ); } _ => unimplemented!(), } @@ -1488,7 +1498,7 @@ impl ObjectImpl for JitterBuffer { match pspec.name() { "latency" => { let settings = self.settings.lock().unwrap(); - settings.latency_ms.to_value() + settings.latency.mseconds().to_value() } "do-lost" => { let settings = self.settings.lock().unwrap(); @@ -1520,7 +1530,7 @@ impl ObjectImpl for JitterBuffer { } "context-wait" => { let settings = self.settings.lock().unwrap(); - settings.context_wait.to_value() + (settings.context_wait.mseconds() as u32).to_value() } _ => unimplemented!(), } diff --git a/generic/threadshare/src/jitterbuffer/jitterbuffer.rs b/generic/threadshare/src/jitterbuffer/jitterbuffer.rs index 67e24402..3785ac0d 100644 --- a/generic/threadshare/src/jitterbuffer/jitterbuffer.rs +++ b/generic/threadshare/src/jitterbuffer/jitterbuffer.rs @@ -72,8 +72,8 @@ unsafe impl Send for RTPJitterBufferItem {} impl RTPJitterBufferItem { pub fn new( buffer: gst::Buffer, - dts: gst::ClockTime, - pts: gst::ClockTime, + dts: impl Into>, + pts: impl Into>, seqnum: Option, rtptime: u32, ) -> RTPJitterBufferItem { @@ -89,8 +89,8 @@ impl RTPJitterBufferItem { next: ptr::null_mut(), prev: ptr::null_mut(), r#type: 0, - dts: dts.into_glib(), - pts: pts.into_glib(), + dts: dts.into().into_glib(), + pts: pts.into().into_glib(), seqnum: seqnum.map(|s| s as u32).unwrap_or(std::u32::MAX), count: 1, rtptime, @@ -113,24 +113,24 @@ impl RTPJitterBufferItem { } } - pub fn dts(&self) -> gst::ClockTime { + pub fn dts(&self) -> Option { unsafe { let item = self.0.as_ref().expect("Invalid wrapper"); if item.as_ref().dts == gst::ffi::GST_CLOCK_TIME_NONE { - gst::CLOCK_TIME_NONE + None } else { - gst::ClockTime(Some(item.as_ref().dts)) + Some(gst::ClockTime::from_nseconds(item.as_ref().dts)) } } } - pub fn pts(&self) -> gst::ClockTime { + pub fn pts(&self) -> Option { unsafe { let item = self.0.as_ref().expect("Invalid wrapper"); if item.as_ref().pts == gst::ffi::GST_CLOCK_TIME_NONE { - gst::CLOCK_TIME_NONE + None } else { - gst::ClockTime(Some(item.as_ref().pts)) + Some(gst::ClockTime::from_nseconds(item.as_ref().pts)) } } } @@ -235,7 +235,10 @@ impl RTPJitterBuffer { #[allow(dead_code)] pub fn delay(&self) -> gst::ClockTime { - unsafe { from_glib(ffi::rtp_jitter_buffer_get_delay(self.to_glib_none().0)) } + unsafe { + try_from_glib(ffi::rtp_jitter_buffer_get_delay(self.to_glib_none().0)) + .expect("undefined delay") + } } pub fn set_delay(&self, delay: gst::ClockTime) { @@ -253,29 +256,23 @@ impl RTPJitterBuffer { pub fn calculate_pts( &self, - dts: gst::ClockTime, + dts: impl Into>, estimated_dts: bool, rtptime: u32, - base_time: gst::ClockTime, + base_time: impl Into>, gap: i32, is_rtx: bool, - ) -> gst::ClockTime { + ) -> Option { unsafe { - let pts = ffi::rtp_jitter_buffer_calculate_pts( + from_glib(ffi::rtp_jitter_buffer_calculate_pts( self.to_glib_none().0, - dts.into_glib(), + dts.into().into_glib(), estimated_dts.into_glib(), rtptime, - base_time.into_glib(), + base_time.into().into_glib(), gap, is_rtx.into_glib(), - ); - - if pts == gst::ffi::GST_CLOCK_TIME_NONE { - gst::CLOCK_TIME_NONE - } else { - pts.into() - } + )) } } @@ -297,7 +294,7 @@ impl RTPJitterBuffer { } } - pub fn find_earliest(&self) -> (gst::ClockTime, Option) { + pub fn find_earliest(&self) -> (Option, Option) { unsafe { let mut pts = mem::MaybeUninit::uninit(); let mut seqnum = mem::MaybeUninit::uninit(); @@ -307,7 +304,7 @@ impl RTPJitterBuffer { pts.as_mut_ptr(), seqnum.as_mut_ptr(), ); - let pts = pts.assume_init(); + let pts = from_glib(pts.assume_init()); let seqnum = seqnum.assume_init(); let seqnum = if seqnum == std::u32::MAX { @@ -316,11 +313,7 @@ impl RTPJitterBuffer { Some(seqnum as u16) }; - if pts == gst::ffi::GST_CLOCK_TIME_NONE { - (gst::CLOCK_TIME_NONE, seqnum) - } else { - (pts.into(), seqnum) - } + (pts, seqnum) } } @@ -340,11 +333,11 @@ impl RTPJitterBuffer { } } - pub fn peek(&self) -> (gst::ClockTime, Option) { + pub fn peek(&self) -> (Option, Option) { unsafe { let item = ffi::rtp_jitter_buffer_peek(self.to_glib_none().0); if item.is_null() { - (gst::CLOCK_TIME_NONE, None) + (None, None) } else { let seqnum = (*item).seqnum; let seqnum = if seqnum == std::u32::MAX { @@ -352,7 +345,7 @@ impl RTPJitterBuffer { } else { Some(seqnum as u16) }; - ((*item).pts.into(), seqnum) + (from_glib((*item).pts), seqnum) } } } diff --git a/generic/threadshare/src/proxy/imp.rs b/generic/threadshare/src/proxy/imp.rs index 4ee74019..b24fe580 100644 --- a/generic/threadshare/src/proxy/imp.rs +++ b/generic/threadshare/src/proxy/imp.rs @@ -30,6 +30,7 @@ use std::collections::{HashMap, VecDeque}; use std::sync::Mutex as StdMutex; use std::sync::MutexGuard as StdMutexGuard; use std::sync::{Arc, Weak}; +use std::time::Duration; use std::{u32, u64}; use crate::runtime::prelude::*; @@ -50,9 +51,10 @@ const DEFAULT_PROXY_CONTEXT: &str = ""; const DEFAULT_MAX_SIZE_BUFFERS: u32 = 200; const DEFAULT_MAX_SIZE_BYTES: u32 = 1024 * 1024; -const DEFAULT_MAX_SIZE_TIME: u64 = gst::SECOND_VAL; +const DEFAULT_MAX_SIZE_TIME: gst::ClockTime = gst::ClockTime::SECOND; const DEFAULT_CONTEXT: &str = ""; -const DEFAULT_CONTEXT_WAIT: u32 = 0; +// FIXME use Duration::ZERO when MSVC >= 1.53.2 +const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0); #[derive(Debug, Clone)] struct SettingsSink { @@ -71,9 +73,9 @@ impl Default for SettingsSink { struct SettingsSrc { max_size_buffers: u32, max_size_bytes: u32, - max_size_time: u64, + max_size_time: gst::ClockTime, context: String, - context_wait: u32, + context_wait: Duration, proxy_context: String, } @@ -810,7 +812,7 @@ impl PadSrcHandler for ProxySrcPadHandler { gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", query); let ret = match query.view_mut() { QueryView::Latency(ref mut q) => { - q.set(true, 0.into(), gst::CLOCK_TIME_NONE); + q.set(true, gst::ClockTime::ZERO, gst::ClockTime::NONE); true } QueryView::Scheduling(ref mut q) => { @@ -1031,7 +1033,7 @@ impl ProxySrc { } else { Some(settings.max_size_bytes) }, - if settings.max_size_time == 0 { + if settings.max_size_time.is_zero() { None } else { Some(settings.max_size_time) @@ -1141,7 +1143,7 @@ impl ObjectImpl for ProxySrc { "Throttle poll loop to run at most once every this many ms", 0, 1000, - DEFAULT_CONTEXT_WAIT, + DEFAULT_CONTEXT_WAIT.as_millis() as u32, glib::ParamFlags::READWRITE, ), glib::ParamSpec::new_string( @@ -1175,7 +1177,7 @@ impl ObjectImpl for ProxySrc { "Maximum number of nanoseconds to queue (0=unlimited)", 0, u64::MAX - 1, - DEFAULT_MAX_SIZE_TIME, + DEFAULT_MAX_SIZE_TIME.nseconds(), glib::ParamFlags::READWRITE, ), ] @@ -1200,7 +1202,8 @@ impl ObjectImpl for ProxySrc { settings.max_size_bytes = value.get().expect("type checked upstream"); } "max-size-time" => { - settings.max_size_time = value.get().expect("type checked upstream"); + settings.max_size_time = + gst::ClockTime::from_nseconds(value.get().expect("type checked upstream")); } "context" => { settings.context = value @@ -1209,7 +1212,9 @@ impl ObjectImpl for ProxySrc { .unwrap_or_else(|| "".into()); } "context-wait" => { - settings.context_wait = value.get().expect("type checked upstream"); + settings.context_wait = Duration::from_millis( + value.get::().expect("type checked upstream").into(), + ); } "proxy-context" => { settings.proxy_context = value @@ -1226,9 +1231,9 @@ impl ObjectImpl for ProxySrc { match pspec.name() { "max-size-buffers" => settings.max_size_buffers.to_value(), "max-size-bytes" => settings.max_size_bytes.to_value(), - "max-size-time" => settings.max_size_time.to_value(), + "max-size-time" => settings.max_size_time.nseconds().to_value(), "context" => settings.context.to_value(), - "context-wait" => settings.context_wait.to_value(), + "context-wait" => (settings.context_wait.as_millis() as u32).to_value(), "proxy-context" => settings.proxy_context.to_value(), _ => unimplemented!(), } diff --git a/generic/threadshare/src/queue/imp.rs b/generic/threadshare/src/queue/imp.rs index cc62a0fa..b4501cb0 100644 --- a/generic/threadshare/src/queue/imp.rs +++ b/generic/threadshare/src/queue/imp.rs @@ -28,6 +28,7 @@ use once_cell::sync::Lazy; use std::collections::VecDeque; use std::sync::Mutex as StdMutex; +use std::time::Duration; use std::{u32, u64}; use crate::runtime::prelude::*; @@ -37,17 +38,18 @@ use crate::dataqueue::{DataQueue, DataQueueItem}; const DEFAULT_MAX_SIZE_BUFFERS: u32 = 200; const DEFAULT_MAX_SIZE_BYTES: u32 = 1024 * 1024; -const DEFAULT_MAX_SIZE_TIME: u64 = gst::SECOND_VAL; +const DEFAULT_MAX_SIZE_TIME: gst::ClockTime = gst::ClockTime::SECOND; const DEFAULT_CONTEXT: &str = ""; -const DEFAULT_CONTEXT_WAIT: u32 = 0; +// FIXME use Duration::ZERO when MSVC >= 1.53.2 +const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0); #[derive(Debug, Clone)] struct Settings { max_size_buffers: u32, max_size_bytes: u32, - max_size_time: u64, + max_size_time: gst::ClockTime, context: String, - context_wait: u32, + context_wait: Duration, } impl Default for Settings { @@ -628,7 +630,7 @@ impl Queue { } else { Some(settings.max_size_bytes) }, - if settings.max_size_time == 0 { + if settings.max_size_time.is_zero() { None } else { Some(settings.max_size_time) @@ -729,7 +731,7 @@ impl ObjectImpl for Queue { "Throttle poll loop to run at most once every this many ms", 0, 1000, - DEFAULT_CONTEXT_WAIT, + DEFAULT_CONTEXT_WAIT.as_millis() as u32, glib::ParamFlags::READWRITE, ), glib::ParamSpec::new_uint( @@ -756,7 +758,7 @@ impl ObjectImpl for Queue { "Maximum number of nanoseconds to queue (0=unlimited)", 0, u64::MAX - 1, - DEFAULT_MAX_SIZE_TIME, + DEFAULT_MAX_SIZE_TIME.nseconds(), glib::ParamFlags::READWRITE, ), ] @@ -781,7 +783,8 @@ impl ObjectImpl for Queue { settings.max_size_bytes = value.get().expect("type checked upstream"); } "max-size-time" => { - settings.max_size_time = value.get().expect("type checked upstream"); + settings.max_size_time = + gst::ClockTime::from_nseconds(value.get().expect("type checked upstream")); } "context" => { settings.context = value @@ -790,7 +793,9 @@ impl ObjectImpl for Queue { .unwrap_or_else(|| "".into()); } "context-wait" => { - settings.context_wait = value.get().expect("type checked upstream"); + settings.context_wait = Duration::from_millis( + value.get::().expect("type checked upstream").into(), + ); } _ => unimplemented!(), } @@ -801,9 +806,9 @@ impl ObjectImpl for Queue { match pspec.name() { "max-size-buffers" => settings.max_size_buffers.to_value(), "max-size-bytes" => settings.max_size_bytes.to_value(), - "max-size-time" => settings.max_size_time.to_value(), + "max-size-time" => settings.max_size_time.nseconds().to_value(), "context" => settings.context.to_value(), - "context-wait" => settings.context_wait.to_value(), + "context-wait" => (settings.context_wait.as_millis() as u32).to_value(), _ => unimplemented!(), } } diff --git a/generic/threadshare/src/runtime/executor.rs b/generic/threadshare/src/runtime/executor.rs index fe355290..445c56eb 100644 --- a/generic/threadshare/src/runtime/executor.rs +++ b/generic/threadshare/src/runtime/executor.rs @@ -166,7 +166,7 @@ struct ContextThread { } impl ContextThread { - fn start(name: &str, wait: u32) -> Context { + fn start(name: &str, wait: Duration) -> Context { let context_thread = ContextThread { name: name.into() }; let (context_sender, context_receiver) = sync_mpsc::channel(); let join = thread::spawn(move || { @@ -187,14 +187,14 @@ impl ContextThread { context } - fn spawn(&self, wait: u32, context_sender: sync_mpsc::Sender) { + fn spawn(&self, wait: Duration, context_sender: sync_mpsc::Sender) { gst_debug!(RUNTIME_CAT, "Started context thread '{}'", self.name); let mut runtime = tokio::runtime::Builder::new() .basic_scheduler() .thread_name(self.name.clone()) .enable_all() - .max_throttling(Duration::from_millis(wait as u64)) + .max_throttling(wait) .build() .expect("Couldn't build the runtime"); @@ -406,7 +406,7 @@ impl PartialEq for Context { impl Eq for Context {} impl Context { - pub fn acquire(context_name: &str, wait: u32) -> Result { + pub fn acquire(context_name: &str, wait: Duration) -> Result { assert_ne!(context_name, "DUMMY"); let mut contexts = CONTEXTS.lock().unwrap(); @@ -693,16 +693,16 @@ mod tests { type Item = i32; - const SLEEP_DURATION_MS: u32 = 2; - const SLEEP_DURATION: Duration = Duration::from_millis(SLEEP_DURATION_MS as u64); - const DELAY: Duration = Duration::from_millis(SLEEP_DURATION_MS as u64 * 10); + const SLEEP_DURATION_MS: u64 = 2; + const SLEEP_DURATION: Duration = Duration::from_millis(SLEEP_DURATION_MS); + const DELAY: Duration = Duration::from_millis(SLEEP_DURATION_MS * 10); #[tokio::test] async fn drain_sub_tasks() { // Setup gst::init().unwrap(); - let context = Context::acquire("drain_sub_tasks", SLEEP_DURATION_MS).unwrap(); + let context = Context::acquire("drain_sub_tasks", SLEEP_DURATION).unwrap(); let join_handle = context.spawn(async move { let (sender, mut receiver) = mpsc::channel(1); @@ -755,7 +755,7 @@ mod tests { async fn block_on_within_tokio() { gst::init().unwrap(); - let context = Context::acquire("block_on_within_tokio", SLEEP_DURATION_MS).unwrap(); + let context = Context::acquire("block_on_within_tokio", SLEEP_DURATION).unwrap(); let bytes_sent = crate::runtime::executor::block_on(context.spawn(async { let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000); @@ -781,7 +781,7 @@ mod tests { fn block_on_from_sync() { gst::init().unwrap(); - let context = Context::acquire("block_on_from_sync", SLEEP_DURATION_MS).unwrap(); + let context = Context::acquire("block_on_from_sync", SLEEP_DURATION).unwrap(); let bytes_sent = crate::runtime::executor::block_on(context.spawn(async { let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5001); @@ -807,7 +807,7 @@ mod tests { fn block_on_from_context() { gst::init().unwrap(); - let context = Context::acquire("block_on_from_context", SLEEP_DURATION_MS).unwrap(); + let context = Context::acquire("block_on_from_context", SLEEP_DURATION).unwrap(); let join_handle = context.spawn(async { crate::runtime::executor::block_on(async { crate::runtime::time::delay_for(DELAY).await; @@ -821,7 +821,7 @@ mod tests { async fn enter_context_from_tokio() { gst::init().unwrap(); - let context = Context::acquire("enter_context_from_tokio", SLEEP_DURATION_MS).unwrap(); + let context = Context::acquire("enter_context_from_tokio", SLEEP_DURATION).unwrap(); let mut socket = context .enter(|| { let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5002); @@ -849,7 +849,7 @@ mod tests { fn enter_context_from_sync() { gst::init().unwrap(); - let context = Context::acquire("enter_context_from_sync", SLEEP_DURATION_MS).unwrap(); + let context = Context::acquire("enter_context_from_sync", SLEEP_DURATION).unwrap(); let mut socket = context .enter(|| { let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5003); diff --git a/generic/threadshare/src/runtime/task.rs b/generic/threadshare/src/runtime/task.rs index d2528e6c..7c2ff256 100644 --- a/generic/threadshare/src/runtime/task.rs +++ b/generic/threadshare/src/runtime/task.rs @@ -1252,7 +1252,7 @@ mod tests { } } - let context = Context::acquire("task_iterate", 2).unwrap(); + let context = Context::acquire("task_iterate", Duration::from_millis(2)).unwrap(); let task = Task::default(); @@ -1519,7 +1519,7 @@ mod tests { } } - let context = Context::acquire("prepare_error", 2).unwrap(); + let context = Context::acquire("prepare_error", Duration::from_millis(2)).unwrap(); let task = Task::default(); @@ -1604,7 +1604,7 @@ mod tests { } } - let context = Context::acquire("prepare_start_ok", 2).unwrap(); + let context = Context::acquire("prepare_start_ok", Duration::from_millis(2)).unwrap(); let task = Task::default(); @@ -1612,7 +1612,9 @@ mod tests { task.prepare(TaskPrepareTest { prepare_receiver }, context.clone()) .unwrap(); - let start_ctx = Context::acquire("prepare_start_ok_requester", 0).unwrap(); + // FIXME use Duration::ZERO when MSVC >= 1.53.2 + let start_ctx = + Context::acquire("prepare_start_ok_requester", Duration::from_nanos(0)).unwrap(); let task_clone = task.clone(); let (ready_sender, ready_receiver) = oneshot::channel(); let start_handle = start_ctx.spawn(async move { @@ -1720,7 +1722,7 @@ mod tests { } } - let context = Context::acquire("prepare_start_error", 2).unwrap(); + let context = Context::acquire("prepare_start_error", Duration::from_millis(2)).unwrap(); let task = Task::default(); @@ -1735,7 +1737,9 @@ mod tests { ) .unwrap(); - let start_ctx = Context::acquire("prepare_start_error_requester", 0).unwrap(); + // FIXME use Duration::ZERO when MSVC >= 1.53.2 + let start_ctx = + Context::acquire("prepare_start_error_requester", Duration::from_nanos(0)).unwrap(); let task_clone = task.clone(); let (ready_sender, ready_receiver) = oneshot::channel(); let start_handle = start_ctx.spawn(async move { @@ -1808,7 +1812,7 @@ mod tests { } } - let context = Context::acquire("pause_start", 2).unwrap(); + let context = Context::acquire("pause_start", Duration::from_millis(2)).unwrap(); let task = Task::default(); @@ -1901,7 +1905,7 @@ mod tests { } } - let context = Context::acquire("successive_pause_start", 2).unwrap(); + let context = Context::acquire("successive_pause_start", Duration::from_millis(2)).unwrap(); let task = Task::default(); @@ -1962,7 +1966,7 @@ mod tests { } } - let context = Context::acquire("flush_regular_sync", 2).unwrap(); + let context = Context::acquire("flush_regular_sync", Duration::from_millis(2)).unwrap(); let task = Task::default(); @@ -2049,7 +2053,8 @@ mod tests { } } - let context = Context::acquire("flush_regular_different_context", 2).unwrap(); + let context = + Context::acquire("flush_regular_different_context", Duration::from_millis(2)).unwrap(); let task = Task::default(); @@ -2067,7 +2072,11 @@ mod tests { gst_debug!(RUNTIME_CAT, "flush_regular_different_context: start"); task.start().unwrap(); - let oob_context = Context::acquire("flush_regular_different_context_oob", 2).unwrap(); + let oob_context = Context::acquire( + "flush_regular_different_context_oob", + Duration::from_millis(2), + ) + .unwrap(); let task_clone = task.clone(); let flush_handle = oob_context.spawn(async move { @@ -2134,7 +2143,8 @@ mod tests { } } - let context = Context::acquire("flush_regular_same_context", 2).unwrap(); + let context = + Context::acquire("flush_regular_same_context", Duration::from_millis(2)).unwrap(); let task = Task::default(); @@ -2218,7 +2228,7 @@ mod tests { } } - let context = Context::acquire("flush_from_loop", 2).unwrap(); + let context = Context::acquire("flush_from_loop", Duration::from_millis(2)).unwrap(); let task = Task::default(); @@ -2291,7 +2301,7 @@ mod tests { } } - let context = Context::acquire("pause_from_loop", 2).unwrap(); + let context = Context::acquire("pause_from_loop", Duration::from_millis(2)).unwrap(); let task = Task::default(); @@ -2357,7 +2367,7 @@ mod tests { } } - let context = Context::acquire("trigger_from_action", 2).unwrap(); + let context = Context::acquire("trigger_from_action", Duration::from_millis(2)).unwrap(); let task = Task::default(); @@ -2427,7 +2437,7 @@ mod tests { } } - let context = Context::acquire("pause_flush_start", 2).unwrap(); + let context = Context::acquire("pause_flush_start", Duration::from_millis(2)).unwrap(); let task = Task::default(); @@ -2538,7 +2548,7 @@ mod tests { } } - let context = Context::acquire("pause_flushing_start", 2).unwrap(); + let context = Context::acquire("pause_flushing_start", Duration::from_millis(2)).unwrap(); let task = Task::default(); @@ -2629,7 +2639,7 @@ mod tests { } } - let context = Context::acquire("flush_concurrent_start", 2).unwrap(); + let context = Context::acquire("flush_concurrent_start", Duration::from_millis(2)).unwrap(); let task = Task::default(); @@ -2644,7 +2654,8 @@ mod tests { ) .unwrap(); - let oob_context = Context::acquire("flush_concurrent_start_oob", 2).unwrap(); + let oob_context = + Context::acquire("flush_concurrent_start_oob", Duration::from_millis(2)).unwrap(); let task_clone = task.clone(); task.pause().unwrap(); @@ -2746,7 +2757,7 @@ mod tests { } } - let context = Context::acquire("start_timer", 2).unwrap(); + let context = Context::acquire("start_timer", Duration::from_millis(2)).unwrap(); let task = Task::default(); diff --git a/generic/threadshare/src/socket.rs b/generic/threadshare/src/socket.rs index 169a62d3..8d8da5ca 100644 --- a/generic/threadshare/src/socket.rs +++ b/generic/threadshare/src/socket.rs @@ -143,19 +143,25 @@ impl Socket { Ok((len, saddr)) => { let dts = if T::DO_TIMESTAMP { let time = self.clock.as_ref().unwrap().time(); - let running_time = time - self.base_time.unwrap(); + let running_time = time + .zip(self.base_time) + // TODO Do we want None if time < base_time + // or do we expect Some? + .and_then(|(time, base_time)| time.checked_sub(base_time)); + // FIXME maybe we should check if running_time.is_none + // so as to display another message gst_debug!( SOCKET_CAT, obj: &self.element, "Read {} bytes at {} (clock {})", len, - running_time, - time + running_time.display(), + time.display(), ); running_time } else { gst_debug!(SOCKET_CAT, obj: &self.element, "Read {} bytes", len); - gst::CLOCK_TIME_NONE + gst::ClockTime::NONE }; let mut buffer = self.mapped_buffer.take().unwrap().into_buffer(); diff --git a/generic/threadshare/src/tcpclientsrc/imp.rs b/generic/threadshare/src/tcpclientsrc/imp.rs index 2d580897..748f5ead 100644 --- a/generic/threadshare/src/tcpclientsrc/imp.rs +++ b/generic/threadshare/src/tcpclientsrc/imp.rs @@ -31,6 +31,7 @@ use std::io; use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use std::sync::Mutex as StdMutex; +use std::time::Duration; use std::u16; use std::u32; @@ -47,7 +48,8 @@ const DEFAULT_PORT: i32 = 4953; const DEFAULT_CAPS: Option = None; const DEFAULT_BLOCKSIZE: u32 = 4096; const DEFAULT_CONTEXT: &str = ""; -const DEFAULT_CONTEXT_WAIT: u32 = 0; +// FIXME use Duration::ZERO when MSVC >= 1.53.2 +const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0); #[derive(Debug, Clone)] struct Settings { @@ -56,7 +58,7 @@ struct Settings { caps: Option, blocksize: u32, context: String, - context_wait: u32, + context_wait: Duration, } impl Default for Settings { @@ -224,7 +226,7 @@ impl PadSrcHandler for TcpClientSrcPadHandler { gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); let ret = match query.view_mut() { QueryView::Latency(ref mut q) => { - q.set(false, 0.into(), gst::CLOCK_TIME_NONE); + q.set(false, gst::ClockTime::ZERO, gst::ClockTime::NONE); true } QueryView::Scheduling(ref mut q) => { @@ -580,7 +582,7 @@ impl ObjectImpl for TcpClientSrc { "Throttle poll loop to run at most once every this many ms", 0, 1000, - DEFAULT_CONTEXT_WAIT, + DEFAULT_CONTEXT_WAIT.as_millis() as u32, glib::ParamFlags::READWRITE, ), glib::ParamSpec::new_string( @@ -649,7 +651,9 @@ impl ObjectImpl for TcpClientSrc { .unwrap_or_else(|| "".into()); } "context-wait" => { - settings.context_wait = value.get().expect("type checked upstream"); + settings.context_wait = Duration::from_millis( + value.get::().expect("type checked upstream").into(), + ); } _ => unimplemented!(), } @@ -663,7 +667,7 @@ impl ObjectImpl for TcpClientSrc { "caps" => settings.caps.to_value(), "blocksize" => settings.blocksize.to_value(), "context" => settings.context.to_value(), - "context-wait" => settings.context_wait.to_value(), + "context-wait" => (settings.context_wait.as_millis() as u32).to_value(), _ => unimplemented!(), } } diff --git a/generic/threadshare/src/udpsink/imp.rs b/generic/threadshare/src/udpsink/imp.rs index 2a6f96b7..2a9d54b2 100644 --- a/generic/threadshare/src/udpsink/imp.rs +++ b/generic/threadshare/src/udpsink/imp.rs @@ -62,7 +62,8 @@ const DEFAULT_TTL_MC: u32 = 1; const DEFAULT_QOS_DSCP: i32 = -1; const DEFAULT_CLIENTS: &str = ""; const DEFAULT_CONTEXT: &str = ""; -const DEFAULT_CONTEXT_WAIT: u32 = 0; +// FIXME use Duration::ZERO when MSVC >= 1.53.2 +const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0); #[derive(Debug, Clone)] struct Settings { @@ -81,7 +82,7 @@ struct Settings { ttl_mc: u32, qos_dscp: i32, context: String, - context_wait: u32, + context_wait: Duration, } impl Default for Settings { @@ -125,7 +126,7 @@ enum TaskItem { struct UdpSinkPadHandlerInner { sync: bool, segment: Option, - latency: gst::ClockTime, + latency: Option, socket: Arc>>, socket_v6: Arc>>, #[allow(clippy::rc_buffer)] @@ -141,7 +142,7 @@ impl UdpSinkPadHandlerInner { UdpSinkPadHandlerInner { sync: DEFAULT_SYNC, segment: None, - latency: gst::CLOCK_TIME_NONE, + latency: None, socket: Arc::new(Mutex::new(None)), socket_v6: Arc::new(Mutex::new(None)), clients: Arc::new(vec![SocketAddr::new( @@ -217,7 +218,7 @@ impl UdpSinkPadHandler { } fn set_latency(&self, latency: gst::ClockTime) { - self.0.write().unwrap().latency = latency; + self.0.write().unwrap().latency = Some(latency); } fn prepare(&self) { @@ -405,15 +406,17 @@ impl UdpSinkPadHandler { ) = { let mut inner = self.0.write().unwrap(); let do_sync = inner.sync; - let mut rtime: gst::ClockTime = 0.into(); + let mut rtime = gst::ClockTime::NONE; if let Some(segment) = &inner.segment { - if let Some(segment) = segment.downcast_ref::() { - rtime = segment.to_running_time(buffer.pts()); - if inner.latency.is_some() { - rtime += inner.latency; - } - } + rtime = segment + .downcast_ref::() + .and_then(|segment| { + segment + .to_running_time(buffer.pts()) + .zip(inner.latency) + .map(|(rtime, latency)| rtime + latency) + }); } let clients_to_configure = mem::replace(&mut inner.clients_to_configure, vec![]); @@ -519,14 +522,20 @@ impl UdpSinkPadHandler { } /* Wait until specified time */ - async fn sync(&self, element: &super::UdpSink, running_time: gst::ClockTime) { + async fn sync( + &self, + element: &super::UdpSink, + running_time: impl Into>, + ) { let now = element.current_running_time(); - if let Some(delay) = running_time - .saturating_sub(now) - .and_then(|delay| delay.nseconds()) + match running_time + .into() + .zip(now) + .and_then(|(running_time, now)| running_time.checked_sub(now)) { - runtime::time::delay_for(Duration::from_nanos(delay)).await; + Some(delay) => runtime::time::delay_for(delay.into()).await, + None => runtime::executor::yield_now().await, } } @@ -980,7 +989,7 @@ impl ObjectImpl for UdpSink { "Throttle poll loop to run at most once every this many ms", 0, 1000, - DEFAULT_CONTEXT_WAIT, + DEFAULT_CONTEXT_WAIT.as_millis() as u32, glib::ParamFlags::READWRITE, ), glib::ParamSpec::new_boolean( @@ -1257,7 +1266,9 @@ impl ObjectImpl for UdpSink { .unwrap_or_else(|| "".into()); } "context-wait" => { - settings.context_wait = value.get().expect("type checked upstream"); + settings.context_wait = Duration::from_millis( + value.get::().expect("type checked upstream").into(), + ); } _ => unimplemented!(), } @@ -1309,7 +1320,7 @@ impl ObjectImpl for UdpSink { clients.join(",").to_value() } "context" => settings.context.to_value(), - "context-wait" => settings.context_wait.to_value(), + "context-wait" => (settings.context_wait.as_millis() as u32).to_value(), _ => unimplemented!(), } } diff --git a/generic/threadshare/src/udpsrc/imp.rs b/generic/threadshare/src/udpsrc/imp.rs index 347cc444..8facdb49 100644 --- a/generic/threadshare/src/udpsrc/imp.rs +++ b/generic/threadshare/src/udpsrc/imp.rs @@ -32,6 +32,7 @@ use std::io; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::Arc; use std::sync::Mutex as StdMutex; +use std::time::Duration; use std::u16; use crate::runtime::prelude::*; @@ -47,7 +48,8 @@ const DEFAULT_MTU: u32 = 1492; const DEFAULT_SOCKET: Option = None; const DEFAULT_USED_SOCKET: Option = None; const DEFAULT_CONTEXT: &str = ""; -const DEFAULT_CONTEXT_WAIT: u32 = 0; +// FIXME use Duration::ZERO when MSVC >= 1.53.2 +const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0); const DEFAULT_RETRIEVE_SENDER_ADDRESS: bool = true; #[derive(Debug, Clone)] @@ -60,7 +62,7 @@ struct Settings { socket: Option, used_socket: Option, context: String, - context_wait: u32, + context_wait: Duration, retrieve_sender_address: bool, } @@ -237,7 +239,7 @@ impl PadSrcHandler for UdpSrcPadHandler { let ret = match query.view_mut() { QueryView::Latency(ref mut q) => { - q.set(true, 0.into(), gst::CLOCK_TIME_NONE); + q.set(true, gst::ClockTime::ZERO, gst::ClockTime::NONE); true } QueryView::Scheduling(ref mut q) => { @@ -301,7 +303,7 @@ impl TaskImpl for UdpSrcTask { async move { gst_log!(CAT, obj: &self.element, "Starting task"); self.socket - .set_clock(self.element.clock(), Some(self.element.base_time())); + .set_clock(self.element.clock(), self.element.base_time()); gst_log!(CAT, obj: &self.element, "Task started"); Ok(()) } @@ -721,7 +723,7 @@ impl ObjectImpl for UdpSrc { "Throttle poll loop to run at most once every this many ms", 0, 1000, - DEFAULT_CONTEXT_WAIT, + DEFAULT_CONTEXT_WAIT.as_millis() as u32, glib::ParamFlags::READWRITE, ), glib::ParamSpec::new_string( @@ -836,7 +838,9 @@ impl ObjectImpl for UdpSrc { .unwrap_or_else(|| "".into()); } "context-wait" => { - settings.context_wait = value.get().expect("type checked upstream"); + settings.context_wait = Duration::from_millis( + value.get::().expect("type checked upstream").into(), + ); } "retrieve-sender-address" => { settings.retrieve_sender_address = value.get().expect("type checked upstream"); @@ -864,7 +868,7 @@ impl ObjectImpl for UdpSrc { .map(GioSocketWrapper::as_socket) .to_value(), "context" => settings.context.to_value(), - "context-wait" => settings.context_wait.to_value(), + "context-wait" => (settings.context_wait.as_millis() as u32).to_value(), "retrieve-sender-address" => settings.retrieve_sender_address.to_value(), _ => unimplemented!(), } diff --git a/generic/threadshare/tests/pad.rs b/generic/threadshare/tests/pad.rs index 686a6f48..1c7b84c6 100644 --- a/generic/threadshare/tests/pad.rs +++ b/generic/threadshare/tests/pad.rs @@ -34,6 +34,7 @@ use once_cell::sync::Lazy; use std::boxed::Box; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Mutex as StdMutex; +use std::time::Duration; use gstthreadshare::runtime::prelude::*; use gstthreadshare::runtime::{ @@ -41,7 +42,7 @@ use gstthreadshare::runtime::{ }; const DEFAULT_CONTEXT: &str = ""; -const THROTTLING_DURATION: u32 = 2; +const THROTTLING_DURATION: Duration = Duration::from_millis(2); fn init() { use std::sync::Once; @@ -968,7 +969,7 @@ fn src_tsqueue_sink_nominal() { .set_property("context", &format!("{}_queue", name)) .unwrap(); ts_queue - .set_property("context-wait", &THROTTLING_DURATION) + .set_property("context-wait", &(THROTTLING_DURATION.as_millis() as u32)) .unwrap(); let (pipeline, src_element, _sink_element, receiver) = setup(name, Some(ts_queue), None); @@ -1007,7 +1008,7 @@ fn src_tsproxy_sink_nominal() { .set_property("context", &format!("{}_context", name)) .unwrap(); ts_proxy_src - .set_property("context-wait", &THROTTLING_DURATION) + .set_property("context-wait", &(THROTTLING_DURATION.as_millis() as u32)) .unwrap(); let (pipeline, src_element, _sink_element, receiver) = diff --git a/generic/threadshare/tests/proxy.rs b/generic/threadshare/tests/proxy.rs index 5c2b28df..87c1e1b1 100644 --- a/generic/threadshare/tests/proxy.rs +++ b/generic/threadshare/tests/proxy.rs @@ -76,7 +76,7 @@ fn test_push() { let mut eos = false; let bus = pipeline.bus().unwrap(); - while let Some(msg) = bus.timed_pop(5 * gst::SECOND) { + while let Some(msg) = bus.timed_pop(5 * gst::ClockTime::SECOND) { use gst::MessageView; match msg.view() { MessageView::Eos(..) => { @@ -123,8 +123,8 @@ fn test_from_pipeline_to_pipeline() { pipe_1.set_state(gst::State::Paused).unwrap(); pipe_2.set_state(gst::State::Paused).unwrap(); - let _ = pipe_1.state(gst::CLOCK_TIME_NONE); - let _ = pipe_2.state(gst::CLOCK_TIME_NONE); + let _ = pipe_1.state(gst::ClockTime::NONE); + let _ = pipe_2.state(gst::ClockTime::NONE); pipe_1.set_state(gst::State::Null).unwrap(); diff --git a/generic/threadshare/tests/queue.rs b/generic/threadshare/tests/queue.rs index f107ff50..f74fb4cc 100644 --- a/generic/threadshare/tests/queue.rs +++ b/generic/threadshare/tests/queue.rs @@ -71,7 +71,7 @@ fn test_push() { let mut eos = false; let bus = pipeline.bus().unwrap(); - while let Some(msg) = bus.timed_pop(5 * gst::SECOND) { + while let Some(msg) = bus.timed_pop(5 * gst::ClockTime::SECOND) { use gst::MessageView; match msg.view() { MessageView::Eos(..) => { diff --git a/generic/threadshare/tests/tcpclientsrc.rs b/generic/threadshare/tests/tcpclientsrc.rs index c60417bf..0a8f68d2 100644 --- a/generic/threadshare/tests/tcpclientsrc.rs +++ b/generic/threadshare/tests/tcpclientsrc.rs @@ -95,7 +95,7 @@ fn test_push() { let mut eos = false; let bus = pipeline.bus().unwrap(); - while let Some(msg) = bus.timed_pop(5 * gst::SECOND) { + while let Some(msg) = bus.timed_pop(5 * gst::ClockTime::SECOND) { use gst::MessageView; match msg.view() { MessageView::Eos(..) => {