generic: migrate to new ClockTime design

This commit is contained in:
François Laignel 2021-05-26 11:54:34 +02:00
parent 17feaa8c71
commit 8f81cb8812
22 changed files with 344 additions and 277 deletions

View file

@ -124,7 +124,7 @@ fn main() -> Result<(), Box<dyn Error>> {
.expect("Unable to set the pipeline to the `Playing` state"); .expect("Unable to set the pipeline to the `Playing` state");
let bus = pipeline.bus().unwrap(); let bus = pipeline.bus().unwrap();
for msg in bus.iter_timed(gst::CLOCK_TIME_NONE) { for msg in bus.iter_timed(gst::ClockTime::NONE) {
use gst::MessageView; use gst::MessageView;
match msg.view() { match msg.view() {
MessageView::Error(err) => { MessageView::Error(err) => {

View file

@ -121,7 +121,7 @@ fn main() -> Result<(), Box<dyn Error>> {
pipeline.set_state(gst::State::Playing)?; pipeline.set_state(gst::State::Playing)?;
let bus = pipeline.bus().unwrap(); let bus = pipeline.bus().unwrap();
for msg in bus.iter_timed(gst::CLOCK_TIME_NONE) { for msg in bus.iter_timed(gst::ClockTime::NONE) {
use gst::MessageView; use gst::MessageView;
match msg.view() { match msg.view() {
MessageView::Error(err) => { MessageView::Error(err) => {

View file

@ -322,8 +322,8 @@ impl Decrypter {
} }
let size = match peer_query.result().try_into().unwrap() { let size = match peer_query.result().try_into().unwrap() {
gst::format::Bytes(Some(size)) => size, Some(gst::format::Bytes(size)) => size,
gst::format::Bytes(None) => { None => {
gst_error!(CAT, "Failed to query upstream duration"); gst_error!(CAT, "Failed to query upstream duration");
return false; return false;
@ -348,7 +348,7 @@ impl Decrypter {
let size = size - total_chunks * box_::MACBYTES as u64; let size = size - total_chunks * box_::MACBYTES as u64;
gst_debug!(CAT, obj: pad, "Setting duration bytes: {}", size); gst_debug!(CAT, obj: pad, "Setting duration bytes: {}", size);
q.set(gst::format::Bytes::from(size)); q.set(gst::format::Bytes(size));
true true
} }

View file

@ -299,8 +299,8 @@ impl Encrypter {
} }
let size = match peer_query.result().try_into().unwrap() { let size = match peer_query.result().try_into().unwrap() {
gst::format::Bytes(Some(size)) => size, Some(gst::format::Bytes(size)) => size,
gst::format::Bytes(None) => { None => {
gst_error!(CAT, "Failed to query upstream duration"); gst_error!(CAT, "Failed to query upstream duration");
return false; return false;
@ -324,7 +324,7 @@ impl Encrypter {
let size = size + crate::HEADERS_SIZE as u64; let size = size + crate::HEADERS_SIZE as u64;
gst_debug!(CAT, obj: pad, "Setting duration bytes: {}", size); gst_debug!(CAT, obj: pad, "Setting duration bytes: {}", size);
q.set(gst::format::Bytes::from(size)); q.set(gst::format::Bytes(size));
true true
} }

View file

@ -119,7 +119,7 @@ fn test_pipeline() {
.expect("Unable to set the pipeline to the `Playing` state"); .expect("Unable to set the pipeline to the `Playing` state");
let bus = pipeline.bus().unwrap(); let bus = pipeline.bus().unwrap();
for msg in bus.iter_timed(gst::CLOCK_TIME_NONE) { for msg in bus.iter_timed(gst::ClockTime::NONE) {
use gst::MessageView; use gst::MessageView;
match msg.view() { match msg.view() {
MessageView::Error(err) => { MessageView::Error(err) => {
@ -200,11 +200,11 @@ fn test_pull_range() {
assert_eq!(seekable, true); assert_eq!(seekable, true);
assert_eq!( assert_eq!(
start, start,
gst::GenericFormattedValue::Bytes(gst::format::Bytes(Some(0))) gst::GenericFormattedValue::Bytes(Some(gst::format::Bytes(0)))
); );
assert_eq!( assert_eq!(
stop, stop,
gst::GenericFormattedValue::Bytes(gst::format::Bytes(Some(6043))) gst::GenericFormattedValue::Bytes(Some(gst::format::Bytes(6043)))
); );
// do pulls // do pulls

View file

@ -31,13 +31,15 @@ use once_cell::sync::Lazy;
use std::convert::TryInto; use std::convert::TryInto;
use std::sync::Arc; use std::sync::Arc;
use std::sync::Mutex as StdMutex; use std::sync::Mutex as StdMutex;
use std::time::Duration;
use std::u32; use std::u32;
use crate::runtime::prelude::*; use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSrc, PadSrcRef, PadSrcWeak, Task, TaskState}; use crate::runtime::{Context, PadSrc, PadSrcRef, PadSrcWeak, Task, TaskState};
const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: u32 = 0; // FIXME use Duration::ZERO when MSVC >= 1.53.2
const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0);
const DEFAULT_CAPS: Option<gst::Caps> = None; const DEFAULT_CAPS: Option<gst::Caps> = None;
const DEFAULT_MAX_BUFFERS: u32 = 10; const DEFAULT_MAX_BUFFERS: u32 = 10;
const DEFAULT_DO_TIMESTAMP: bool = false; const DEFAULT_DO_TIMESTAMP: bool = false;
@ -45,7 +47,7 @@ const DEFAULT_DO_TIMESTAMP: bool = false;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct Settings { struct Settings {
context: String, context: String,
context_wait: u32, context_wait: Duration,
caps: Option<gst::Caps>, caps: Option<gst::Caps>,
max_buffers: u32, max_buffers: u32,
do_timestamp: bool, do_timestamp: bool,
@ -223,7 +225,7 @@ impl PadSrcHandler for AppSrcPadHandler {
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
let ret = match query.view_mut() { let ret = match query.view_mut() {
QueryView::Latency(ref mut q) => { QueryView::Latency(ref mut q) => {
q.set(true, 0.into(), gst::CLOCK_TIME_NONE); q.set(true, gst::ClockTime::ZERO, gst::ClockTime::NONE);
true true
} }
QueryView::Scheduling(ref mut q) => { QueryView::Scheduling(ref mut q) => {
@ -389,8 +391,11 @@ impl AppSrc {
let now = clock.time(); let now = clock.time();
let buffer = buffer.make_mut(); let buffer = buffer.make_mut();
buffer.set_dts(now - base_time); buffer.set_dts(
buffer.set_pts(gst::CLOCK_TIME_NONE); now.zip(base_time)
.and_then(|(now, base_time)| now.checked_sub(base_time)),
);
buffer.set_pts(None);
} else { } else {
gst_error!(CAT, obj: element, "Don't have a clock yet"); gst_error!(CAT, obj: element, "Don't have a clock yet");
return false; return false;
@ -540,7 +545,7 @@ impl ObjectImpl for AppSrc {
"Throttle poll loop to run at most once every this many ms", "Throttle poll loop to run at most once every this many ms",
0, 0,
1000, 1000,
DEFAULT_CONTEXT_WAIT, DEFAULT_CONTEXT_WAIT.as_millis() as u32,
glib::ParamFlags::READWRITE, glib::ParamFlags::READWRITE,
), ),
glib::ParamSpec::new_uint( glib::ParamSpec::new_uint(
@ -620,7 +625,9 @@ impl ObjectImpl for AppSrc {
.unwrap_or_else(|| "".into()); .unwrap_or_else(|| "".into());
} }
"context-wait" => { "context-wait" => {
settings.context_wait = value.get().expect("type checked upstream"); settings.context_wait = Duration::from_millis(
value.get::<u32>().expect("type checked upstream").into(),
);
} }
"caps" => { "caps" => {
settings.caps = value.get().expect("type checked upstream"); settings.caps = value.get().expect("type checked upstream");
@ -639,7 +646,7 @@ impl ObjectImpl for AppSrc {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
match pspec.name() { match pspec.name() {
"context" => settings.context.to_value(), "context" => settings.context.to_value(),
"context-wait" => settings.context_wait.to_value(), "context-wait" => (settings.context_wait.as_millis() as u32).to_value(),
"caps" => settings.caps.to_value(), "caps" => settings.caps.to_value(),
"max-buffers" => settings.max_buffers.to_value(), "max-buffers" => settings.max_buffers.to_value(),
"do-timestamp" => settings.do_timestamp.to_value(), "do-timestamp" => settings.do_timestamp.to_value(),

View file

@ -25,7 +25,7 @@ use once_cell::sync::Lazy;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::sync::Arc; use std::sync::Arc;
use std::sync::Mutex as StdMutex; use std::sync::Mutex as StdMutex;
use std::{u32, u64}; use std::u32;
static DATA_QUEUE_CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { static DATA_QUEUE_CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new( gst::DebugCategory::new(
@ -54,12 +54,10 @@ impl DataQueueItem {
} }
} }
fn timestamp(&self) -> Option<u64> { fn timestamp(&self) -> Option<gst::ClockTime> {
match *self { match *self {
DataQueueItem::Buffer(ref buffer) => buffer.dts_or_pts().0, DataQueueItem::Buffer(ref buffer) => buffer.dts_or_pts(),
DataQueueItem::BufferList(ref list) => { DataQueueItem::BufferList(ref list) => list.iter().find_map(|b| b.dts_or_pts()),
list.iter().filter_map(|b| b.dts_or_pts().0).next()
}
DataQueueItem::Event(_) => None, DataQueueItem::Event(_) => None,
} }
} }
@ -86,7 +84,7 @@ struct DataQueueInner {
cur_size_bytes: u32, cur_size_bytes: u32,
max_size_buffers: Option<u32>, max_size_buffers: Option<u32>,
max_size_bytes: Option<u32>, max_size_bytes: Option<u32>,
max_size_time: Option<u64>, max_size_time: Option<gst::ClockTime>,
pending_handle: Option<AbortHandle>, pending_handle: Option<AbortHandle>,
} }
@ -105,7 +103,7 @@ impl DataQueue {
src_pad: &gst::Pad, src_pad: &gst::Pad,
max_size_buffers: Option<u32>, max_size_buffers: Option<u32>,
max_size_bytes: Option<u32>, max_size_bytes: Option<u32>,
max_size_time: Option<u64>, max_size_time: impl Into<Option<gst::ClockTime>>,
) -> DataQueue { ) -> DataQueue {
DataQueue(Arc::new(StdMutex::new(DataQueueInner { DataQueue(Arc::new(StdMutex::new(DataQueueInner {
element: element.clone(), element: element.clone(),
@ -116,7 +114,7 @@ impl DataQueue {
cur_size_bytes: 0, cur_size_bytes: 0,
max_size_buffers, max_size_buffers,
max_size_bytes, max_size_bytes,
max_size_time, max_size_time: max_size_time.into(),
pending_handle: None, pending_handle: None,
}))) })))
} }

View file

@ -35,12 +35,13 @@ use crate::runtime::prelude::*;
use crate::runtime::{self, PadSink, PadSinkRef, PadSrc, PadSrcRef}; use crate::runtime::{self, PadSink, PadSinkRef, PadSrc, PadSrcRef};
const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: u32 = 0; // FIXME use Duration::ZERO when MSVC >= 1.53.2
const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0);
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct Settings { struct Settings {
context: String, context: String,
context_wait: u32, context_wait: Duration,
} }
impl Default for Settings { impl Default for Settings {
@ -74,14 +75,20 @@ struct InputSelectorPadSinkHandler(Arc<Mutex<InputSelectorPadSinkHandlerInner>>)
impl InputSelectorPadSinkHandler { impl InputSelectorPadSinkHandler {
/* Wait until specified time */ /* Wait until specified time */
async fn sync(&self, element: &super::InputSelector, running_time: gst::ClockTime) { async fn sync(
&self,
element: &super::InputSelector,
running_time: impl Into<Option<gst::ClockTime>>,
) {
let now = element.current_running_time(); let now = element.current_running_time();
if let Some(delay) = running_time match running_time
.saturating_sub(now) .into()
.and_then(|delay| delay.nseconds()) .zip(now)
.and_then(|(running_time, now)| running_time.checked_sub(now))
{ {
runtime::time::delay_for(Duration::from_nanos(delay)).await; Some(delay) => runtime::time::delay_for(delay.into()).await,
None => runtime::executor::yield_now().await,
} }
} }
@ -289,8 +296,8 @@ impl PadSrcHandler for InputSelectorPadSrcHandler {
match query.view_mut() { match query.view_mut() {
QueryView::Latency(ref mut q) => { QueryView::Latency(ref mut q) => {
let mut ret = true; let mut ret = true;
let mut min_latency = 0.into(); let mut min_latency = gst::ClockTime::ZERO;
let mut max_latency = gst::ClockTime::none(); let mut max_latency = gst::ClockTime::NONE;
let pads = { let pads = {
let pads = inputselector.pads.lock().unwrap(); let pads = inputselector.pads.lock().unwrap();
pads.sink_pads pads.sink_pads
@ -307,8 +314,11 @@ impl PadSrcHandler for InputSelectorPadSrcHandler {
if ret { if ret {
let (live, min, max) = peer_query.result(); let (live, min, max) = peer_query.result();
if live { if live {
min_latency = min.max(min_latency).unwrap_or(min_latency); min_latency = min.max(min_latency);
max_latency = max.min(max_latency).unwrap_or(max); max_latency = max
.zip(max_latency)
.map(|(max, max_latency)| max.min(max_latency))
.or(max);
} }
} }
} }
@ -424,7 +434,7 @@ impl ObjectImpl for InputSelector {
"Throttle poll loop to run at most once every this many ms", "Throttle poll loop to run at most once every this many ms",
0, 0,
1000, 1000,
DEFAULT_CONTEXT_WAIT, DEFAULT_CONTEXT_WAIT.as_millis() as u32,
glib::ParamFlags::READWRITE, glib::ParamFlags::READWRITE,
), ),
glib::ParamSpec::new_object( glib::ParamSpec::new_object(
@ -457,7 +467,9 @@ impl ObjectImpl for InputSelector {
} }
"context-wait" => { "context-wait" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
settings.context_wait = value.get().expect("type checked upstream"); settings.context_wait = Duration::from_millis(
value.get::<u32>().expect("type checked upstream").into(),
);
} }
"active-pad" => { "active-pad" => {
let pad = value let pad = value
@ -501,7 +513,7 @@ impl ObjectImpl for InputSelector {
} }
"context-wait" => { "context-wait" => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
settings.context_wait.to_value() (settings.context_wait.as_millis() as u32).to_value()
} }
"active-pad" => { "active-pad" => {
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();

View file

@ -39,27 +39,27 @@ use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, Task
use super::jitterbuffer::{RTPJitterBuffer, RTPJitterBufferItem, RTPPacketRateCtx}; use super::jitterbuffer::{RTPJitterBuffer, RTPJitterBufferItem, RTPPacketRateCtx};
const DEFAULT_LATENCY_MS: u32 = 200; const DEFAULT_LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(200);
const DEFAULT_DO_LOST: bool = false; const DEFAULT_DO_LOST: bool = false;
const DEFAULT_MAX_DROPOUT_TIME: u32 = 60000; const DEFAULT_MAX_DROPOUT_TIME: u32 = 60000;
const DEFAULT_MAX_MISORDER_TIME: u32 = 2000; const DEFAULT_MAX_MISORDER_TIME: u32 = 2000;
const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: u32 = 0; const DEFAULT_CONTEXT_WAIT: gst::ClockTime = gst::ClockTime::ZERO;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct Settings { struct Settings {
latency_ms: u32, latency: gst::ClockTime,
do_lost: bool, do_lost: bool,
max_dropout_time: u32, max_dropout_time: u32,
max_misorder_time: u32, max_misorder_time: u32,
context: String, context: String,
context_wait: u32, context_wait: gst::ClockTime,
} }
impl Default for Settings { impl Default for Settings {
fn default() -> Self { fn default() -> Self {
Settings { Settings {
latency_ms: DEFAULT_LATENCY_MS, latency: DEFAULT_LATENCY,
do_lost: DEFAULT_DO_LOST, do_lost: DEFAULT_DO_LOST,
max_dropout_time: DEFAULT_MAX_DROPOUT_TIME, max_dropout_time: DEFAULT_MAX_DROPOUT_TIME,
max_misorder_time: DEFAULT_MAX_MISORDER_TIME, max_misorder_time: DEFAULT_MAX_MISORDER_TIME,
@ -108,7 +108,7 @@ impl PartialEq for GapPacket {
struct SinkHandlerInner { struct SinkHandlerInner {
packet_rate_ctx: RTPPacketRateCtx, packet_rate_ctx: RTPPacketRateCtx,
ips_rtptime: Option<u32>, ips_rtptime: Option<u32>,
ips_pts: gst::ClockTime, ips_pts: Option<gst::ClockTime>,
gap_packets: BTreeSet<GapPacket>, gap_packets: BTreeSet<GapPacket>,
@ -123,7 +123,7 @@ impl Default for SinkHandlerInner {
SinkHandlerInner { SinkHandlerInner {
packet_rate_ctx: RTPPacketRateCtx::new(), packet_rate_ctx: RTPPacketRateCtx::new(),
ips_rtptime: None, ips_rtptime: None,
ips_pts: gst::CLOCK_TIME_NONE, ips_pts: None,
gap_packets: BTreeSet::new(), gap_packets: BTreeSet::new(),
last_pt: None, last_pt: None,
last_in_seqnum: None, last_in_seqnum: None,
@ -155,16 +155,16 @@ impl SinkHandler {
state.discont = true; state.discont = true;
state.last_popped_seqnum = None; state.last_popped_seqnum = None;
state.last_popped_pts = gst::CLOCK_TIME_NONE; state.last_popped_pts = None;
inner.last_in_seqnum = None; inner.last_in_seqnum = None;
inner.last_rtptime = None; inner.last_rtptime = None;
state.earliest_pts = gst::CLOCK_TIME_NONE; state.earliest_pts = None;
state.earliest_seqnum = None; state.earliest_seqnum = None;
inner.ips_rtptime = None; inner.ips_rtptime = None;
inner.ips_pts = gst::CLOCK_TIME_NONE; inner.ips_pts = None;
mem::replace(&mut inner.gap_packets, BTreeSet::new()) mem::replace(&mut inner.gap_packets, BTreeSet::new())
} }
@ -208,14 +208,17 @@ impl SinkHandler {
inner: &mut SinkHandlerInner, inner: &mut SinkHandlerInner,
state: &mut State, state: &mut State,
rtptime: u32, rtptime: u32,
pts: gst::ClockTime, pts: impl Into<Option<gst::ClockTime>>,
) { ) {
if inner.ips_rtptime != Some(rtptime) { if inner.ips_rtptime != Some(rtptime) {
if inner.ips_pts.is_some() && pts.is_some() { let pts = pts.into();
let new_packet_spacing = pts - inner.ips_pts; let new_packet_spacing = inner
.ips_pts
.zip(pts)
.and_then(|(ips_pts, pts)| pts.checked_sub(ips_pts));
if let Some(new_packet_spacing) = new_packet_spacing {
let old_packet_spacing = state.packet_spacing; let old_packet_spacing = state.packet_spacing;
assert!(old_packet_spacing.is_some());
if old_packet_spacing > new_packet_spacing { if old_packet_spacing > new_packet_spacing {
state.packet_spacing = (new_packet_spacing + 3 * old_packet_spacing) / 4; state.packet_spacing = (new_packet_spacing + 3 * old_packet_spacing) / 4;
} else if !old_packet_spacing.is_zero() { } else if !old_packet_spacing.is_zero() {
@ -421,7 +424,7 @@ impl SinkHandler {
return Ok(gst::FlowSuccess::Ok); return Ok(gst::FlowSuccess::Ok);
} }
} }
inner.ips_pts = gst::CLOCK_TIME_NONE; inner.ips_pts = None;
inner.ips_rtptime = None; inner.ips_rtptime = None;
} }
@ -441,7 +444,7 @@ impl SinkHandler {
inner.last_in_seqnum = Some(seq); inner.last_in_seqnum = Some(seq);
let jb_item = if estimated_dts { let jb_item = if estimated_dts {
RTPJitterBufferItem::new(buffer, gst::CLOCK_TIME_NONE, pts, Some(seq), rtptime) RTPJitterBufferItem::new(buffer, gst::ClockTime::NONE, pts, Some(seq), rtptime)
} else { } else {
RTPJitterBufferItem::new(buffer, dts, pts, Some(seq), rtptime) RTPJitterBufferItem::new(buffer, dts, pts, Some(seq), rtptime)
}; };
@ -463,15 +466,16 @@ impl SinkHandler {
inner.last_rtptime = Some(rtptime); inner.last_rtptime = Some(rtptime);
if state.earliest_pts.is_none() let must_update = match (state.earliest_pts, pts) {
|| (pts.is_some() (None, _) => true,
&& (pts < state.earliest_pts (Some(earliest_pts), Some(pts)) if pts < earliest_pts => true,
|| (pts == state.earliest_pts (Some(earliest_pts), Some(pts)) if pts == earliest_pts => state
&& state .earliest_seqnum
.earliest_seqnum .map_or(false, |earliest_seqnum| seq > earliest_seqnum),
.map(|earliest_seqnum| seq > earliest_seqnum) _ => false,
.unwrap_or(false)))) };
{
if must_update {
state.earliest_pts = pts; state.earliest_pts = pts;
state.earliest_seqnum = Some(seq); state.earliest_seqnum = Some(seq);
} }
@ -515,10 +519,7 @@ impl SinkHandler {
let (latency, context_wait) = { let (latency, context_wait) = {
let settings = jb.settings.lock().unwrap(); let settings = jb.settings.lock().unwrap();
( (settings.latency, settings.context_wait)
settings.latency_ms as u64 * gst::MSECOND,
settings.context_wait as u64 * gst::MSECOND,
)
}; };
// Reschedule if needed // Reschedule if needed
@ -527,13 +528,15 @@ impl SinkHandler {
.next_wakeup(&element, &state, latency, context_wait); .next_wakeup(&element, &state, latency, context_wait);
if let Some((next_wakeup, _)) = next_wakeup { if let Some((next_wakeup, _)) = next_wakeup {
if let Some((previous_next_wakeup, ref abort_handle)) = state.wait_handle { if let Some((previous_next_wakeup, ref abort_handle)) = state.wait_handle {
if previous_next_wakeup.is_none() || previous_next_wakeup > next_wakeup { if previous_next_wakeup.is_none()
|| next_wakeup.map_or(false, |next| previous_next_wakeup.unwrap() > next)
{
gst_debug!( gst_debug!(
CAT, CAT,
obj: pad, obj: pad,
"Rescheduling for new item {} < {}", "Rescheduling for new item {} < {}",
next_wakeup, next_wakeup.display(),
previous_next_wakeup previous_next_wakeup.display(),
); );
abort_handle.abort(); abort_handle.abort();
state.wait_handle = None; state.wait_handle = None;
@ -666,16 +669,13 @@ impl SrcHandler {
state: &mut State, state: &mut State,
element: &super::JitterBuffer, element: &super::JitterBuffer,
seqnum: u16, seqnum: u16,
pts: gst::ClockTime, pts: impl Into<Option<gst::ClockTime>>,
discont: &mut bool, discont: &mut bool,
) -> Vec<gst::Event> { ) -> Vec<gst::Event> {
let (latency_ns, do_lost) = { let (latency, do_lost) = {
let jb = JitterBuffer::from_instance(element); let jb = JitterBuffer::from_instance(element);
let settings = jb.settings.lock().unwrap(); let settings = jb.settings.lock().unwrap();
( (settings.latency, settings.do_lost)
settings.latency_ms as u64 * gst::MSECOND.nseconds().unwrap(),
settings.do_lost,
)
}; };
let mut events = vec![]; let mut events = vec![];
@ -697,30 +697,24 @@ impl SrcHandler {
let gap = gst_rtp::compare_seqnum(lost_seqnum, seqnum) as i64; let gap = gst_rtp::compare_seqnum(lost_seqnum, seqnum) as i64;
if gap > 0 { if gap > 0 {
let interval =
pts.nseconds().unwrap() as i64 - state.last_popped_pts.nseconds().unwrap() as i64;
let gap = gap as u64; let gap = gap as u64;
let spacing = if interval >= 0 { // FIXME reason why we can expect Some for the 2 lines below
interval as u64 / (gap + 1) let mut last_popped_pts = state.last_popped_pts.unwrap();
} else { let interval = pts.into().unwrap().saturating_sub(last_popped_pts);
0 let spacing = interval / (gap as u64 + 1);
};
*discont = true; *discont = true;
if state.equidistant > 0 && gap > 1 && gap * spacing > latency_ns { if state.equidistant > 0 && gap > 1 && gap * spacing > latency {
let n_packets = gap - latency_ns / spacing; let n_packets = gap - latency.nseconds() / spacing.nseconds();
if do_lost { if do_lost {
let s = gst::Structure::new( let s = gst::Structure::new(
"GstRTPPacketLost", "GstRTPPacketLost",
&[ &[
("seqnum", &(lost_seqnum as u32)), ("seqnum", &(lost_seqnum as u32)),
( ("timestamp", &(last_popped_pts + spacing)),
"timestamp", ("duration", &(n_packets * spacing).nseconds()),
&(state.last_popped_pts + gst::ClockTime(Some(spacing))),
),
("duration", &(n_packets * spacing)),
("retry", &0), ("retry", &0),
], ],
); );
@ -729,15 +723,20 @@ impl SrcHandler {
} }
lost_seqnum = lost_seqnum.wrapping_add(n_packets as u16); lost_seqnum = lost_seqnum.wrapping_add(n_packets as u16);
state.last_popped_pts += gst::ClockTime(Some(n_packets * spacing)); last_popped_pts += n_packets * spacing;
state.last_popped_pts = Some(last_popped_pts);
state.stats.num_lost += n_packets; state.stats.num_lost += n_packets;
} }
while lost_seqnum != seqnum { while lost_seqnum != seqnum {
let timestamp = state.last_popped_pts + gst::ClockTime(Some(spacing)); let timestamp = last_popped_pts + spacing;
let duration = if state.equidistant > 0 { spacing } else { 0 }; let duration = if state.equidistant > 0 {
spacing
} else {
gst::ClockTime::ZERO
};
state.last_popped_pts = timestamp; state.last_popped_pts = Some(timestamp);
if do_lost { if do_lost {
let s = gst::Structure::new( let s = gst::Structure::new(
@ -745,7 +744,8 @@ impl SrcHandler {
&[ &[
("seqnum", &(lost_seqnum as u32)), ("seqnum", &(lost_seqnum as u32)),
("timestamp", &timestamp), ("timestamp", &timestamp),
("duration", &duration), // FIXME would probably make sense being a ClockTime
("duration", &duration.nseconds()),
("retry", &0), ("retry", &0),
], ],
); );
@ -819,8 +819,8 @@ impl SrcHandler {
}; };
state.last_popped_pts = buffer.pts(); state.last_popped_pts = buffer.pts();
if let Some(pts) = state.last_popped_pts.nseconds() { if state.last_popped_pts.is_some() {
state.position = pts.into(); state.position = state.last_popped_pts;
} }
state.last_popped_seqnum = seq; state.last_popped_seqnum = seq;
@ -845,22 +845,26 @@ impl SrcHandler {
state: &State, state: &State,
latency: gst::ClockTime, latency: gst::ClockTime,
context_wait: gst::ClockTime, context_wait: gst::ClockTime,
) -> (gst::ClockTime, Option<(gst::ClockTime, Duration)>) { ) -> (
Option<gst::ClockTime>,
Option<(Option<gst::ClockTime>, Duration)>,
) {
let now = element.current_running_time(); let now = element.current_running_time();
gst_debug!( gst_debug!(
CAT, CAT,
obj: element, obj: element,
"Now is {}, EOS {}, earliest pts is {}, packet_spacing {} and latency {}", "Now is {}, EOS {}, earliest pts is {}, packet_spacing {} and latency {}",
now, now.display(),
state.eos, state.eos,
state.earliest_pts, state.earliest_pts.display(),
state.packet_spacing, state.packet_spacing,
latency latency
); );
if state.eos { if state.eos {
gst_debug!(CAT, obj: element, "EOS, not waiting"); gst_debug!(CAT, obj: element, "EOS, not waiting");
// FIXME use Duration::ZERO when MSVC >= 1.53.2
return (now, Some((now, Duration::from_nanos(0)))); return (now, Some((now, Duration::from_nanos(0))));
} }
@ -868,23 +872,25 @@ impl SrcHandler {
return (now, None); return (now, None);
} }
let next_wakeup = state.earliest_pts + latency - state.packet_spacing - context_wait / 2; let next_wakeup = state
.earliest_pts
.map(|earliest_pts| earliest_pts + latency - state.packet_spacing - context_wait / 2);
let delay = next_wakeup let delay = next_wakeup
.saturating_sub(now) .zip(now)
.unwrap_or_else(gst::ClockTime::zero) .map_or(gst::ClockTime::ZERO, |(next_wakeup, now)| {
.nseconds() next_wakeup.saturating_sub(now)
.unwrap(); });
gst_debug!( gst_debug!(
CAT, CAT,
obj: element, obj: element,
"Next wakeup at {} with delay {}", "Next wakeup at {} with delay {}",
next_wakeup, next_wakeup.display(),
delay delay
); );
(now, Some((next_wakeup, Duration::from_nanos(delay)))) (now, Some((next_wakeup, delay.into())))
} }
} }
@ -954,8 +960,8 @@ impl PadSrcHandler for SrcHandler {
if ret { if ret {
let settings = jb.settings.lock().unwrap(); let settings = jb.settings.lock().unwrap();
let (_, mut min_latency, _) = peer_query.result(); let (_, mut min_latency, _) = peer_query.result();
min_latency += (settings.latency_ms as u64) * gst::SECOND; min_latency += settings.latency;
let max_latency = gst::CLOCK_TIME_NONE; let max_latency = gst::ClockTime::NONE;
q.set(true, min_latency, max_latency); q.set(true, min_latency, max_latency);
} }
@ -999,7 +1005,7 @@ struct State {
jbuf: glib::SendUniqueCell<RTPJitterBuffer>, jbuf: glib::SendUniqueCell<RTPJitterBuffer>,
last_res: Result<gst::FlowSuccess, gst::FlowError>, last_res: Result<gst::FlowSuccess, gst::FlowError>,
position: gst::ClockTime, position: Option<gst::ClockTime>,
segment: gst::FormattedSegment<gst::ClockTime>, segment: gst::FormattedSegment<gst::ClockTime>,
clock_rate: Option<u32>, clock_rate: Option<u32>,
@ -1011,14 +1017,14 @@ struct State {
eos: bool, eos: bool,
last_popped_seqnum: Option<u16>, last_popped_seqnum: Option<u16>,
last_popped_pts: gst::ClockTime, last_popped_pts: Option<gst::ClockTime>,
stats: Stats, stats: Stats,
earliest_pts: gst::ClockTime, earliest_pts: Option<gst::ClockTime>,
earliest_seqnum: Option<u16>, earliest_seqnum: Option<u16>,
wait_handle: Option<(gst::ClockTime, AbortHandle)>, wait_handle: Option<(Option<gst::ClockTime>, AbortHandle)>,
} }
impl Default for State { impl Default for State {
@ -1027,23 +1033,23 @@ impl Default for State {
jbuf: glib::SendUniqueCell::new(RTPJitterBuffer::new()).unwrap(), jbuf: glib::SendUniqueCell::new(RTPJitterBuffer::new()).unwrap(),
last_res: Ok(gst::FlowSuccess::Ok), last_res: Ok(gst::FlowSuccess::Ok),
position: gst::CLOCK_TIME_NONE, position: None,
segment: gst::FormattedSegment::<gst::ClockTime>::new(), segment: gst::FormattedSegment::<gst::ClockTime>::new(),
clock_rate: None, clock_rate: None,
packet_spacing: gst::ClockTime::zero(), packet_spacing: gst::ClockTime::ZERO,
equidistant: 0, equidistant: 0,
discont: true, discont: true,
eos: false, eos: false,
last_popped_seqnum: None, last_popped_seqnum: None,
last_popped_pts: gst::CLOCK_TIME_NONE, last_popped_pts: None,
stats: Stats::default(), stats: Stats::default(),
earliest_pts: gst::CLOCK_TIME_NONE, earliest_pts: None,
earliest_seqnum: None, earliest_seqnum: None,
wait_handle: None, wait_handle: None,
@ -1093,10 +1099,7 @@ impl TaskImpl for JitterBufferTask {
let jb = JitterBuffer::from_instance(&self.element); let jb = JitterBuffer::from_instance(&self.element);
let (latency, context_wait) = { let (latency, context_wait) = {
let settings = jb.settings.lock().unwrap(); let settings = jb.settings.lock().unwrap();
( (settings.latency, settings.context_wait)
settings.latency_ms as u64 * gst::MSECOND,
settings.context_wait as u64 * gst::MSECOND,
)
}; };
loop { loop {
@ -1110,6 +1113,7 @@ impl TaskImpl for JitterBufferTask {
); );
let (delay_fut, abort_handle) = match next_wakeup { let (delay_fut, abort_handle) = match next_wakeup {
// FIXME use Duration::ZERO when MSVC >= 1.53.2
Some((_, delay)) if delay == Duration::from_nanos(0) => (None, None), Some((_, delay)) if delay == Duration::from_nanos(0) => (None, None),
_ => { _ => {
let (delay_fut, abort_handle) = abortable(async move { let (delay_fut, abort_handle) = abortable(async move {
@ -1123,8 +1127,7 @@ impl TaskImpl for JitterBufferTask {
}; };
}); });
let next_wakeup = let next_wakeup = next_wakeup.and_then(|w| w.0);
next_wakeup.map(|w| w.0).unwrap_or(gst::CLOCK_TIME_NONE);
(Some(delay_fut), Some((next_wakeup, abort_handle))) (Some(delay_fut), Some((next_wakeup, abort_handle)))
} }
}; };
@ -1158,12 +1161,15 @@ impl TaskImpl for JitterBufferTask {
CAT, CAT,
obj: &self.element, obj: &self.element,
"Woke up at {}, earliest_pts {}", "Woke up at {}, earliest_pts {}",
now, now.display(),
state.earliest_pts state.earliest_pts.display()
); );
if let Some((next_wakeup, _)) = next_wakeup { if let Some((next_wakeup, _)) = next_wakeup {
if next_wakeup > now { if next_wakeup
.zip(now)
.map_or(false, |(next_wakeup, now)| next_wakeup > now)
{
// Reschedule and wait a bit longer in the next iteration // Reschedule and wait a bit longer in the next iteration
return Ok(()); return Ok(());
} }
@ -1198,8 +1204,8 @@ impl TaskImpl for JitterBufferTask {
latency, latency,
context_wait, context_wait,
); );
if let Some((next_wakeup, _)) = next_wakeup { if let Some((Some(next_wakeup), _)) = next_wakeup {
if next_wakeup > now { if now.map_or(false, |now| next_wakeup > now) {
// Reschedule and wait a bit longer in the next iteration // Reschedule and wait a bit longer in the next iteration
return Ok(()); return Ok(());
} }
@ -1281,7 +1287,7 @@ impl JitterBuffer {
let context = { let context = {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
Context::acquire(&settings.context, settings.context_wait).unwrap() Context::acquire(&settings.context, settings.context_wait.into()).unwrap()
}; };
self.task self.task
@ -1367,7 +1373,7 @@ impl ObjectImpl for JitterBuffer {
"Throttle poll loop to run at most once every this many ms", "Throttle poll loop to run at most once every this many ms",
0, 0,
1000, 1000,
DEFAULT_CONTEXT_WAIT, DEFAULT_CONTEXT_WAIT.mseconds() as u32,
glib::ParamFlags::READWRITE, glib::ParamFlags::READWRITE,
), ),
glib::ParamSpec::new_uint( glib::ParamSpec::new_uint(
@ -1376,7 +1382,7 @@ impl ObjectImpl for JitterBuffer {
"Amount of ms to buffer", "Amount of ms to buffer",
0, 0,
std::u32::MAX, std::u32::MAX,
DEFAULT_LATENCY_MS, DEFAULT_LATENCY.mseconds() as u32,
glib::ParamFlags::READWRITE, glib::ParamFlags::READWRITE,
), ),
glib::ParamSpec::new_boolean( glib::ParamSpec::new_boolean(
@ -1446,14 +1452,16 @@ impl ObjectImpl for JitterBuffer {
) { ) {
match pspec.name() { match pspec.name() {
"latency" => { "latency" => {
let latency_ms = { let latency = {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
settings.latency_ms = value.get().expect("type checked upstream"); settings.latency = gst::ClockTime::from_mseconds(
settings.latency_ms as u64 value.get::<u32>().expect("type checked upstream").into(),
);
settings.latency
}; };
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
state.jbuf.borrow().set_delay(latency_ms * gst::MSECOND); state.jbuf.borrow().set_delay(latency);
let _ = obj.post_message(gst::message::Latency::builder().src(obj).build()); let _ = obj.post_message(gst::message::Latency::builder().src(obj).build());
} }
@ -1478,7 +1486,9 @@ impl ObjectImpl for JitterBuffer {
} }
"context-wait" => { "context-wait" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
settings.context_wait = value.get().expect("type checked upstream"); settings.context_wait = gst::ClockTime::from_mseconds(
value.get::<u32>().expect("type checked upstream").into(),
);
} }
_ => unimplemented!(), _ => unimplemented!(),
} }
@ -1488,7 +1498,7 @@ impl ObjectImpl for JitterBuffer {
match pspec.name() { match pspec.name() {
"latency" => { "latency" => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
settings.latency_ms.to_value() settings.latency.mseconds().to_value()
} }
"do-lost" => { "do-lost" => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
@ -1520,7 +1530,7 @@ impl ObjectImpl for JitterBuffer {
} }
"context-wait" => { "context-wait" => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
settings.context_wait.to_value() (settings.context_wait.mseconds() as u32).to_value()
} }
_ => unimplemented!(), _ => unimplemented!(),
} }

View file

@ -72,8 +72,8 @@ unsafe impl Send for RTPJitterBufferItem {}
impl RTPJitterBufferItem { impl RTPJitterBufferItem {
pub fn new( pub fn new(
buffer: gst::Buffer, buffer: gst::Buffer,
dts: gst::ClockTime, dts: impl Into<Option<gst::ClockTime>>,
pts: gst::ClockTime, pts: impl Into<Option<gst::ClockTime>>,
seqnum: Option<u16>, seqnum: Option<u16>,
rtptime: u32, rtptime: u32,
) -> RTPJitterBufferItem { ) -> RTPJitterBufferItem {
@ -89,8 +89,8 @@ impl RTPJitterBufferItem {
next: ptr::null_mut(), next: ptr::null_mut(),
prev: ptr::null_mut(), prev: ptr::null_mut(),
r#type: 0, r#type: 0,
dts: dts.into_glib(), dts: dts.into().into_glib(),
pts: pts.into_glib(), pts: pts.into().into_glib(),
seqnum: seqnum.map(|s| s as u32).unwrap_or(std::u32::MAX), seqnum: seqnum.map(|s| s as u32).unwrap_or(std::u32::MAX),
count: 1, count: 1,
rtptime, rtptime,
@ -113,24 +113,24 @@ impl RTPJitterBufferItem {
} }
} }
pub fn dts(&self) -> gst::ClockTime { pub fn dts(&self) -> Option<gst::ClockTime> {
unsafe { unsafe {
let item = self.0.as_ref().expect("Invalid wrapper"); let item = self.0.as_ref().expect("Invalid wrapper");
if item.as_ref().dts == gst::ffi::GST_CLOCK_TIME_NONE { if item.as_ref().dts == gst::ffi::GST_CLOCK_TIME_NONE {
gst::CLOCK_TIME_NONE None
} else { } else {
gst::ClockTime(Some(item.as_ref().dts)) Some(gst::ClockTime::from_nseconds(item.as_ref().dts))
} }
} }
} }
pub fn pts(&self) -> gst::ClockTime { pub fn pts(&self) -> Option<gst::ClockTime> {
unsafe { unsafe {
let item = self.0.as_ref().expect("Invalid wrapper"); let item = self.0.as_ref().expect("Invalid wrapper");
if item.as_ref().pts == gst::ffi::GST_CLOCK_TIME_NONE { if item.as_ref().pts == gst::ffi::GST_CLOCK_TIME_NONE {
gst::CLOCK_TIME_NONE None
} else { } else {
gst::ClockTime(Some(item.as_ref().pts)) Some(gst::ClockTime::from_nseconds(item.as_ref().pts))
} }
} }
} }
@ -235,7 +235,10 @@ impl RTPJitterBuffer {
#[allow(dead_code)] #[allow(dead_code)]
pub fn delay(&self) -> gst::ClockTime { pub fn delay(&self) -> gst::ClockTime {
unsafe { from_glib(ffi::rtp_jitter_buffer_get_delay(self.to_glib_none().0)) } unsafe {
try_from_glib(ffi::rtp_jitter_buffer_get_delay(self.to_glib_none().0))
.expect("undefined delay")
}
} }
pub fn set_delay(&self, delay: gst::ClockTime) { pub fn set_delay(&self, delay: gst::ClockTime) {
@ -253,29 +256,23 @@ impl RTPJitterBuffer {
pub fn calculate_pts( pub fn calculate_pts(
&self, &self,
dts: gst::ClockTime, dts: impl Into<Option<gst::ClockTime>>,
estimated_dts: bool, estimated_dts: bool,
rtptime: u32, rtptime: u32,
base_time: gst::ClockTime, base_time: impl Into<Option<gst::ClockTime>>,
gap: i32, gap: i32,
is_rtx: bool, is_rtx: bool,
) -> gst::ClockTime { ) -> Option<gst::ClockTime> {
unsafe { unsafe {
let pts = ffi::rtp_jitter_buffer_calculate_pts( from_glib(ffi::rtp_jitter_buffer_calculate_pts(
self.to_glib_none().0, self.to_glib_none().0,
dts.into_glib(), dts.into().into_glib(),
estimated_dts.into_glib(), estimated_dts.into_glib(),
rtptime, rtptime,
base_time.into_glib(), base_time.into().into_glib(),
gap, gap,
is_rtx.into_glib(), is_rtx.into_glib(),
); ))
if pts == gst::ffi::GST_CLOCK_TIME_NONE {
gst::CLOCK_TIME_NONE
} else {
pts.into()
}
} }
} }
@ -297,7 +294,7 @@ impl RTPJitterBuffer {
} }
} }
pub fn find_earliest(&self) -> (gst::ClockTime, Option<u16>) { pub fn find_earliest(&self) -> (Option<gst::ClockTime>, Option<u16>) {
unsafe { unsafe {
let mut pts = mem::MaybeUninit::uninit(); let mut pts = mem::MaybeUninit::uninit();
let mut seqnum = mem::MaybeUninit::uninit(); let mut seqnum = mem::MaybeUninit::uninit();
@ -307,7 +304,7 @@ impl RTPJitterBuffer {
pts.as_mut_ptr(), pts.as_mut_ptr(),
seqnum.as_mut_ptr(), seqnum.as_mut_ptr(),
); );
let pts = pts.assume_init(); let pts = from_glib(pts.assume_init());
let seqnum = seqnum.assume_init(); let seqnum = seqnum.assume_init();
let seqnum = if seqnum == std::u32::MAX { let seqnum = if seqnum == std::u32::MAX {
@ -316,11 +313,7 @@ impl RTPJitterBuffer {
Some(seqnum as u16) Some(seqnum as u16)
}; };
if pts == gst::ffi::GST_CLOCK_TIME_NONE { (pts, seqnum)
(gst::CLOCK_TIME_NONE, seqnum)
} else {
(pts.into(), seqnum)
}
} }
} }
@ -340,11 +333,11 @@ impl RTPJitterBuffer {
} }
} }
pub fn peek(&self) -> (gst::ClockTime, Option<u16>) { pub fn peek(&self) -> (Option<gst::ClockTime>, Option<u16>) {
unsafe { unsafe {
let item = ffi::rtp_jitter_buffer_peek(self.to_glib_none().0); let item = ffi::rtp_jitter_buffer_peek(self.to_glib_none().0);
if item.is_null() { if item.is_null() {
(gst::CLOCK_TIME_NONE, None) (None, None)
} else { } else {
let seqnum = (*item).seqnum; let seqnum = (*item).seqnum;
let seqnum = if seqnum == std::u32::MAX { let seqnum = if seqnum == std::u32::MAX {
@ -352,7 +345,7 @@ impl RTPJitterBuffer {
} else { } else {
Some(seqnum as u16) Some(seqnum as u16)
}; };
((*item).pts.into(), seqnum) (from_glib((*item).pts), seqnum)
} }
} }
} }

View file

@ -30,6 +30,7 @@ use std::collections::{HashMap, VecDeque};
use std::sync::Mutex as StdMutex; use std::sync::Mutex as StdMutex;
use std::sync::MutexGuard as StdMutexGuard; use std::sync::MutexGuard as StdMutexGuard;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use std::time::Duration;
use std::{u32, u64}; use std::{u32, u64};
use crate::runtime::prelude::*; use crate::runtime::prelude::*;
@ -50,9 +51,10 @@ const DEFAULT_PROXY_CONTEXT: &str = "";
const DEFAULT_MAX_SIZE_BUFFERS: u32 = 200; const DEFAULT_MAX_SIZE_BUFFERS: u32 = 200;
const DEFAULT_MAX_SIZE_BYTES: u32 = 1024 * 1024; const DEFAULT_MAX_SIZE_BYTES: u32 = 1024 * 1024;
const DEFAULT_MAX_SIZE_TIME: u64 = gst::SECOND_VAL; const DEFAULT_MAX_SIZE_TIME: gst::ClockTime = gst::ClockTime::SECOND;
const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: u32 = 0; // FIXME use Duration::ZERO when MSVC >= 1.53.2
const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0);
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct SettingsSink { struct SettingsSink {
@ -71,9 +73,9 @@ impl Default for SettingsSink {
struct SettingsSrc { struct SettingsSrc {
max_size_buffers: u32, max_size_buffers: u32,
max_size_bytes: u32, max_size_bytes: u32,
max_size_time: u64, max_size_time: gst::ClockTime,
context: String, context: String,
context_wait: u32, context_wait: Duration,
proxy_context: String, proxy_context: String,
} }
@ -810,7 +812,7 @@ impl PadSrcHandler for ProxySrcPadHandler {
gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", query); gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", query);
let ret = match query.view_mut() { let ret = match query.view_mut() {
QueryView::Latency(ref mut q) => { QueryView::Latency(ref mut q) => {
q.set(true, 0.into(), gst::CLOCK_TIME_NONE); q.set(true, gst::ClockTime::ZERO, gst::ClockTime::NONE);
true true
} }
QueryView::Scheduling(ref mut q) => { QueryView::Scheduling(ref mut q) => {
@ -1031,7 +1033,7 @@ impl ProxySrc {
} else { } else {
Some(settings.max_size_bytes) Some(settings.max_size_bytes)
}, },
if settings.max_size_time == 0 { if settings.max_size_time.is_zero() {
None None
} else { } else {
Some(settings.max_size_time) Some(settings.max_size_time)
@ -1141,7 +1143,7 @@ impl ObjectImpl for ProxySrc {
"Throttle poll loop to run at most once every this many ms", "Throttle poll loop to run at most once every this many ms",
0, 0,
1000, 1000,
DEFAULT_CONTEXT_WAIT, DEFAULT_CONTEXT_WAIT.as_millis() as u32,
glib::ParamFlags::READWRITE, glib::ParamFlags::READWRITE,
), ),
glib::ParamSpec::new_string( glib::ParamSpec::new_string(
@ -1175,7 +1177,7 @@ impl ObjectImpl for ProxySrc {
"Maximum number of nanoseconds to queue (0=unlimited)", "Maximum number of nanoseconds to queue (0=unlimited)",
0, 0,
u64::MAX - 1, u64::MAX - 1,
DEFAULT_MAX_SIZE_TIME, DEFAULT_MAX_SIZE_TIME.nseconds(),
glib::ParamFlags::READWRITE, glib::ParamFlags::READWRITE,
), ),
] ]
@ -1200,7 +1202,8 @@ impl ObjectImpl for ProxySrc {
settings.max_size_bytes = value.get().expect("type checked upstream"); settings.max_size_bytes = value.get().expect("type checked upstream");
} }
"max-size-time" => { "max-size-time" => {
settings.max_size_time = value.get().expect("type checked upstream"); settings.max_size_time =
gst::ClockTime::from_nseconds(value.get().expect("type checked upstream"));
} }
"context" => { "context" => {
settings.context = value settings.context = value
@ -1209,7 +1212,9 @@ impl ObjectImpl for ProxySrc {
.unwrap_or_else(|| "".into()); .unwrap_or_else(|| "".into());
} }
"context-wait" => { "context-wait" => {
settings.context_wait = value.get().expect("type checked upstream"); settings.context_wait = Duration::from_millis(
value.get::<u32>().expect("type checked upstream").into(),
);
} }
"proxy-context" => { "proxy-context" => {
settings.proxy_context = value settings.proxy_context = value
@ -1226,9 +1231,9 @@ impl ObjectImpl for ProxySrc {
match pspec.name() { match pspec.name() {
"max-size-buffers" => settings.max_size_buffers.to_value(), "max-size-buffers" => settings.max_size_buffers.to_value(),
"max-size-bytes" => settings.max_size_bytes.to_value(), "max-size-bytes" => settings.max_size_bytes.to_value(),
"max-size-time" => settings.max_size_time.to_value(), "max-size-time" => settings.max_size_time.nseconds().to_value(),
"context" => settings.context.to_value(), "context" => settings.context.to_value(),
"context-wait" => settings.context_wait.to_value(), "context-wait" => (settings.context_wait.as_millis() as u32).to_value(),
"proxy-context" => settings.proxy_context.to_value(), "proxy-context" => settings.proxy_context.to_value(),
_ => unimplemented!(), _ => unimplemented!(),
} }

View file

@ -28,6 +28,7 @@ use once_cell::sync::Lazy;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::sync::Mutex as StdMutex; use std::sync::Mutex as StdMutex;
use std::time::Duration;
use std::{u32, u64}; use std::{u32, u64};
use crate::runtime::prelude::*; use crate::runtime::prelude::*;
@ -37,17 +38,18 @@ use crate::dataqueue::{DataQueue, DataQueueItem};
const DEFAULT_MAX_SIZE_BUFFERS: u32 = 200; const DEFAULT_MAX_SIZE_BUFFERS: u32 = 200;
const DEFAULT_MAX_SIZE_BYTES: u32 = 1024 * 1024; const DEFAULT_MAX_SIZE_BYTES: u32 = 1024 * 1024;
const DEFAULT_MAX_SIZE_TIME: u64 = gst::SECOND_VAL; const DEFAULT_MAX_SIZE_TIME: gst::ClockTime = gst::ClockTime::SECOND;
const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: u32 = 0; // FIXME use Duration::ZERO when MSVC >= 1.53.2
const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0);
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct Settings { struct Settings {
max_size_buffers: u32, max_size_buffers: u32,
max_size_bytes: u32, max_size_bytes: u32,
max_size_time: u64, max_size_time: gst::ClockTime,
context: String, context: String,
context_wait: u32, context_wait: Duration,
} }
impl Default for Settings { impl Default for Settings {
@ -628,7 +630,7 @@ impl Queue {
} else { } else {
Some(settings.max_size_bytes) Some(settings.max_size_bytes)
}, },
if settings.max_size_time == 0 { if settings.max_size_time.is_zero() {
None None
} else { } else {
Some(settings.max_size_time) Some(settings.max_size_time)
@ -729,7 +731,7 @@ impl ObjectImpl for Queue {
"Throttle poll loop to run at most once every this many ms", "Throttle poll loop to run at most once every this many ms",
0, 0,
1000, 1000,
DEFAULT_CONTEXT_WAIT, DEFAULT_CONTEXT_WAIT.as_millis() as u32,
glib::ParamFlags::READWRITE, glib::ParamFlags::READWRITE,
), ),
glib::ParamSpec::new_uint( glib::ParamSpec::new_uint(
@ -756,7 +758,7 @@ impl ObjectImpl for Queue {
"Maximum number of nanoseconds to queue (0=unlimited)", "Maximum number of nanoseconds to queue (0=unlimited)",
0, 0,
u64::MAX - 1, u64::MAX - 1,
DEFAULT_MAX_SIZE_TIME, DEFAULT_MAX_SIZE_TIME.nseconds(),
glib::ParamFlags::READWRITE, glib::ParamFlags::READWRITE,
), ),
] ]
@ -781,7 +783,8 @@ impl ObjectImpl for Queue {
settings.max_size_bytes = value.get().expect("type checked upstream"); settings.max_size_bytes = value.get().expect("type checked upstream");
} }
"max-size-time" => { "max-size-time" => {
settings.max_size_time = value.get().expect("type checked upstream"); settings.max_size_time =
gst::ClockTime::from_nseconds(value.get().expect("type checked upstream"));
} }
"context" => { "context" => {
settings.context = value settings.context = value
@ -790,7 +793,9 @@ impl ObjectImpl for Queue {
.unwrap_or_else(|| "".into()); .unwrap_or_else(|| "".into());
} }
"context-wait" => { "context-wait" => {
settings.context_wait = value.get().expect("type checked upstream"); settings.context_wait = Duration::from_millis(
value.get::<u32>().expect("type checked upstream").into(),
);
} }
_ => unimplemented!(), _ => unimplemented!(),
} }
@ -801,9 +806,9 @@ impl ObjectImpl for Queue {
match pspec.name() { match pspec.name() {
"max-size-buffers" => settings.max_size_buffers.to_value(), "max-size-buffers" => settings.max_size_buffers.to_value(),
"max-size-bytes" => settings.max_size_bytes.to_value(), "max-size-bytes" => settings.max_size_bytes.to_value(),
"max-size-time" => settings.max_size_time.to_value(), "max-size-time" => settings.max_size_time.nseconds().to_value(),
"context" => settings.context.to_value(), "context" => settings.context.to_value(),
"context-wait" => settings.context_wait.to_value(), "context-wait" => (settings.context_wait.as_millis() as u32).to_value(),
_ => unimplemented!(), _ => unimplemented!(),
} }
} }

View file

@ -166,7 +166,7 @@ struct ContextThread {
} }
impl ContextThread { impl ContextThread {
fn start(name: &str, wait: u32) -> Context { fn start(name: &str, wait: Duration) -> Context {
let context_thread = ContextThread { name: name.into() }; let context_thread = ContextThread { name: name.into() };
let (context_sender, context_receiver) = sync_mpsc::channel(); let (context_sender, context_receiver) = sync_mpsc::channel();
let join = thread::spawn(move || { let join = thread::spawn(move || {
@ -187,14 +187,14 @@ impl ContextThread {
context context
} }
fn spawn(&self, wait: u32, context_sender: sync_mpsc::Sender<Context>) { fn spawn(&self, wait: Duration, context_sender: sync_mpsc::Sender<Context>) {
gst_debug!(RUNTIME_CAT, "Started context thread '{}'", self.name); gst_debug!(RUNTIME_CAT, "Started context thread '{}'", self.name);
let mut runtime = tokio::runtime::Builder::new() let mut runtime = tokio::runtime::Builder::new()
.basic_scheduler() .basic_scheduler()
.thread_name(self.name.clone()) .thread_name(self.name.clone())
.enable_all() .enable_all()
.max_throttling(Duration::from_millis(wait as u64)) .max_throttling(wait)
.build() .build()
.expect("Couldn't build the runtime"); .expect("Couldn't build the runtime");
@ -406,7 +406,7 @@ impl PartialEq for Context {
impl Eq for Context {} impl Eq for Context {}
impl Context { impl Context {
pub fn acquire(context_name: &str, wait: u32) -> Result<Self, io::Error> { pub fn acquire(context_name: &str, wait: Duration) -> Result<Self, io::Error> {
assert_ne!(context_name, "DUMMY"); assert_ne!(context_name, "DUMMY");
let mut contexts = CONTEXTS.lock().unwrap(); let mut contexts = CONTEXTS.lock().unwrap();
@ -693,16 +693,16 @@ mod tests {
type Item = i32; type Item = i32;
const SLEEP_DURATION_MS: u32 = 2; const SLEEP_DURATION_MS: u64 = 2;
const SLEEP_DURATION: Duration = Duration::from_millis(SLEEP_DURATION_MS as u64); const SLEEP_DURATION: Duration = Duration::from_millis(SLEEP_DURATION_MS);
const DELAY: Duration = Duration::from_millis(SLEEP_DURATION_MS as u64 * 10); const DELAY: Duration = Duration::from_millis(SLEEP_DURATION_MS * 10);
#[tokio::test] #[tokio::test]
async fn drain_sub_tasks() { async fn drain_sub_tasks() {
// Setup // Setup
gst::init().unwrap(); gst::init().unwrap();
let context = Context::acquire("drain_sub_tasks", SLEEP_DURATION_MS).unwrap(); let context = Context::acquire("drain_sub_tasks", SLEEP_DURATION).unwrap();
let join_handle = context.spawn(async move { let join_handle = context.spawn(async move {
let (sender, mut receiver) = mpsc::channel(1); let (sender, mut receiver) = mpsc::channel(1);
@ -755,7 +755,7 @@ mod tests {
async fn block_on_within_tokio() { async fn block_on_within_tokio() {
gst::init().unwrap(); gst::init().unwrap();
let context = Context::acquire("block_on_within_tokio", SLEEP_DURATION_MS).unwrap(); let context = Context::acquire("block_on_within_tokio", SLEEP_DURATION).unwrap();
let bytes_sent = crate::runtime::executor::block_on(context.spawn(async { let bytes_sent = crate::runtime::executor::block_on(context.spawn(async {
let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000); let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
@ -781,7 +781,7 @@ mod tests {
fn block_on_from_sync() { fn block_on_from_sync() {
gst::init().unwrap(); gst::init().unwrap();
let context = Context::acquire("block_on_from_sync", SLEEP_DURATION_MS).unwrap(); let context = Context::acquire("block_on_from_sync", SLEEP_DURATION).unwrap();
let bytes_sent = crate::runtime::executor::block_on(context.spawn(async { let bytes_sent = crate::runtime::executor::block_on(context.spawn(async {
let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5001); let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5001);
@ -807,7 +807,7 @@ mod tests {
fn block_on_from_context() { fn block_on_from_context() {
gst::init().unwrap(); gst::init().unwrap();
let context = Context::acquire("block_on_from_context", SLEEP_DURATION_MS).unwrap(); let context = Context::acquire("block_on_from_context", SLEEP_DURATION).unwrap();
let join_handle = context.spawn(async { let join_handle = context.spawn(async {
crate::runtime::executor::block_on(async { crate::runtime::executor::block_on(async {
crate::runtime::time::delay_for(DELAY).await; crate::runtime::time::delay_for(DELAY).await;
@ -821,7 +821,7 @@ mod tests {
async fn enter_context_from_tokio() { async fn enter_context_from_tokio() {
gst::init().unwrap(); gst::init().unwrap();
let context = Context::acquire("enter_context_from_tokio", SLEEP_DURATION_MS).unwrap(); let context = Context::acquire("enter_context_from_tokio", SLEEP_DURATION).unwrap();
let mut socket = context let mut socket = context
.enter(|| { .enter(|| {
let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5002); let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5002);
@ -849,7 +849,7 @@ mod tests {
fn enter_context_from_sync() { fn enter_context_from_sync() {
gst::init().unwrap(); gst::init().unwrap();
let context = Context::acquire("enter_context_from_sync", SLEEP_DURATION_MS).unwrap(); let context = Context::acquire("enter_context_from_sync", SLEEP_DURATION).unwrap();
let mut socket = context let mut socket = context
.enter(|| { .enter(|| {
let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5003); let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5003);

View file

@ -1252,7 +1252,7 @@ mod tests {
} }
} }
let context = Context::acquire("task_iterate", 2).unwrap(); let context = Context::acquire("task_iterate", Duration::from_millis(2)).unwrap();
let task = Task::default(); let task = Task::default();
@ -1519,7 +1519,7 @@ mod tests {
} }
} }
let context = Context::acquire("prepare_error", 2).unwrap(); let context = Context::acquire("prepare_error", Duration::from_millis(2)).unwrap();
let task = Task::default(); let task = Task::default();
@ -1604,7 +1604,7 @@ mod tests {
} }
} }
let context = Context::acquire("prepare_start_ok", 2).unwrap(); let context = Context::acquire("prepare_start_ok", Duration::from_millis(2)).unwrap();
let task = Task::default(); let task = Task::default();
@ -1612,7 +1612,9 @@ mod tests {
task.prepare(TaskPrepareTest { prepare_receiver }, context.clone()) task.prepare(TaskPrepareTest { prepare_receiver }, context.clone())
.unwrap(); .unwrap();
let start_ctx = Context::acquire("prepare_start_ok_requester", 0).unwrap(); // FIXME use Duration::ZERO when MSVC >= 1.53.2
let start_ctx =
Context::acquire("prepare_start_ok_requester", Duration::from_nanos(0)).unwrap();
let task_clone = task.clone(); let task_clone = task.clone();
let (ready_sender, ready_receiver) = oneshot::channel(); let (ready_sender, ready_receiver) = oneshot::channel();
let start_handle = start_ctx.spawn(async move { let start_handle = start_ctx.spawn(async move {
@ -1720,7 +1722,7 @@ mod tests {
} }
} }
let context = Context::acquire("prepare_start_error", 2).unwrap(); let context = Context::acquire("prepare_start_error", Duration::from_millis(2)).unwrap();
let task = Task::default(); let task = Task::default();
@ -1735,7 +1737,9 @@ mod tests {
) )
.unwrap(); .unwrap();
let start_ctx = Context::acquire("prepare_start_error_requester", 0).unwrap(); // FIXME use Duration::ZERO when MSVC >= 1.53.2
let start_ctx =
Context::acquire("prepare_start_error_requester", Duration::from_nanos(0)).unwrap();
let task_clone = task.clone(); let task_clone = task.clone();
let (ready_sender, ready_receiver) = oneshot::channel(); let (ready_sender, ready_receiver) = oneshot::channel();
let start_handle = start_ctx.spawn(async move { let start_handle = start_ctx.spawn(async move {
@ -1808,7 +1812,7 @@ mod tests {
} }
} }
let context = Context::acquire("pause_start", 2).unwrap(); let context = Context::acquire("pause_start", Duration::from_millis(2)).unwrap();
let task = Task::default(); let task = Task::default();
@ -1901,7 +1905,7 @@ mod tests {
} }
} }
let context = Context::acquire("successive_pause_start", 2).unwrap(); let context = Context::acquire("successive_pause_start", Duration::from_millis(2)).unwrap();
let task = Task::default(); let task = Task::default();
@ -1962,7 +1966,7 @@ mod tests {
} }
} }
let context = Context::acquire("flush_regular_sync", 2).unwrap(); let context = Context::acquire("flush_regular_sync", Duration::from_millis(2)).unwrap();
let task = Task::default(); let task = Task::default();
@ -2049,7 +2053,8 @@ mod tests {
} }
} }
let context = Context::acquire("flush_regular_different_context", 2).unwrap(); let context =
Context::acquire("flush_regular_different_context", Duration::from_millis(2)).unwrap();
let task = Task::default(); let task = Task::default();
@ -2067,7 +2072,11 @@ mod tests {
gst_debug!(RUNTIME_CAT, "flush_regular_different_context: start"); gst_debug!(RUNTIME_CAT, "flush_regular_different_context: start");
task.start().unwrap(); task.start().unwrap();
let oob_context = Context::acquire("flush_regular_different_context_oob", 2).unwrap(); let oob_context = Context::acquire(
"flush_regular_different_context_oob",
Duration::from_millis(2),
)
.unwrap();
let task_clone = task.clone(); let task_clone = task.clone();
let flush_handle = oob_context.spawn(async move { let flush_handle = oob_context.spawn(async move {
@ -2134,7 +2143,8 @@ mod tests {
} }
} }
let context = Context::acquire("flush_regular_same_context", 2).unwrap(); let context =
Context::acquire("flush_regular_same_context", Duration::from_millis(2)).unwrap();
let task = Task::default(); let task = Task::default();
@ -2218,7 +2228,7 @@ mod tests {
} }
} }
let context = Context::acquire("flush_from_loop", 2).unwrap(); let context = Context::acquire("flush_from_loop", Duration::from_millis(2)).unwrap();
let task = Task::default(); let task = Task::default();
@ -2291,7 +2301,7 @@ mod tests {
} }
} }
let context = Context::acquire("pause_from_loop", 2).unwrap(); let context = Context::acquire("pause_from_loop", Duration::from_millis(2)).unwrap();
let task = Task::default(); let task = Task::default();
@ -2357,7 +2367,7 @@ mod tests {
} }
} }
let context = Context::acquire("trigger_from_action", 2).unwrap(); let context = Context::acquire("trigger_from_action", Duration::from_millis(2)).unwrap();
let task = Task::default(); let task = Task::default();
@ -2427,7 +2437,7 @@ mod tests {
} }
} }
let context = Context::acquire("pause_flush_start", 2).unwrap(); let context = Context::acquire("pause_flush_start", Duration::from_millis(2)).unwrap();
let task = Task::default(); let task = Task::default();
@ -2538,7 +2548,7 @@ mod tests {
} }
} }
let context = Context::acquire("pause_flushing_start", 2).unwrap(); let context = Context::acquire("pause_flushing_start", Duration::from_millis(2)).unwrap();
let task = Task::default(); let task = Task::default();
@ -2629,7 +2639,7 @@ mod tests {
} }
} }
let context = Context::acquire("flush_concurrent_start", 2).unwrap(); let context = Context::acquire("flush_concurrent_start", Duration::from_millis(2)).unwrap();
let task = Task::default(); let task = Task::default();
@ -2644,7 +2654,8 @@ mod tests {
) )
.unwrap(); .unwrap();
let oob_context = Context::acquire("flush_concurrent_start_oob", 2).unwrap(); let oob_context =
Context::acquire("flush_concurrent_start_oob", Duration::from_millis(2)).unwrap();
let task_clone = task.clone(); let task_clone = task.clone();
task.pause().unwrap(); task.pause().unwrap();
@ -2746,7 +2757,7 @@ mod tests {
} }
} }
let context = Context::acquire("start_timer", 2).unwrap(); let context = Context::acquire("start_timer", Duration::from_millis(2)).unwrap();
let task = Task::default(); let task = Task::default();

View file

@ -143,19 +143,25 @@ impl<T: SocketRead> Socket<T> {
Ok((len, saddr)) => { Ok((len, saddr)) => {
let dts = if T::DO_TIMESTAMP { let dts = if T::DO_TIMESTAMP {
let time = self.clock.as_ref().unwrap().time(); let time = self.clock.as_ref().unwrap().time();
let running_time = time - self.base_time.unwrap(); let running_time = time
.zip(self.base_time)
// TODO Do we want None if time < base_time
// or do we expect Some?
.and_then(|(time, base_time)| time.checked_sub(base_time));
// FIXME maybe we should check if running_time.is_none
// so as to display another message
gst_debug!( gst_debug!(
SOCKET_CAT, SOCKET_CAT,
obj: &self.element, obj: &self.element,
"Read {} bytes at {} (clock {})", "Read {} bytes at {} (clock {})",
len, len,
running_time, running_time.display(),
time time.display(),
); );
running_time running_time
} else { } else {
gst_debug!(SOCKET_CAT, obj: &self.element, "Read {} bytes", len); gst_debug!(SOCKET_CAT, obj: &self.element, "Read {} bytes", len);
gst::CLOCK_TIME_NONE gst::ClockTime::NONE
}; };
let mut buffer = self.mapped_buffer.take().unwrap().into_buffer(); let mut buffer = self.mapped_buffer.take().unwrap().into_buffer();

View file

@ -31,6 +31,7 @@ use std::io;
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use std::sync::Arc; use std::sync::Arc;
use std::sync::Mutex as StdMutex; use std::sync::Mutex as StdMutex;
use std::time::Duration;
use std::u16; use std::u16;
use std::u32; use std::u32;
@ -47,7 +48,8 @@ const DEFAULT_PORT: i32 = 4953;
const DEFAULT_CAPS: Option<gst::Caps> = None; const DEFAULT_CAPS: Option<gst::Caps> = None;
const DEFAULT_BLOCKSIZE: u32 = 4096; const DEFAULT_BLOCKSIZE: u32 = 4096;
const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: u32 = 0; // FIXME use Duration::ZERO when MSVC >= 1.53.2
const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0);
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct Settings { struct Settings {
@ -56,7 +58,7 @@ struct Settings {
caps: Option<gst::Caps>, caps: Option<gst::Caps>,
blocksize: u32, blocksize: u32,
context: String, context: String,
context_wait: u32, context_wait: Duration,
} }
impl Default for Settings { impl Default for Settings {
@ -224,7 +226,7 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
let ret = match query.view_mut() { let ret = match query.view_mut() {
QueryView::Latency(ref mut q) => { QueryView::Latency(ref mut q) => {
q.set(false, 0.into(), gst::CLOCK_TIME_NONE); q.set(false, gst::ClockTime::ZERO, gst::ClockTime::NONE);
true true
} }
QueryView::Scheduling(ref mut q) => { QueryView::Scheduling(ref mut q) => {
@ -580,7 +582,7 @@ impl ObjectImpl for TcpClientSrc {
"Throttle poll loop to run at most once every this many ms", "Throttle poll loop to run at most once every this many ms",
0, 0,
1000, 1000,
DEFAULT_CONTEXT_WAIT, DEFAULT_CONTEXT_WAIT.as_millis() as u32,
glib::ParamFlags::READWRITE, glib::ParamFlags::READWRITE,
), ),
glib::ParamSpec::new_string( glib::ParamSpec::new_string(
@ -649,7 +651,9 @@ impl ObjectImpl for TcpClientSrc {
.unwrap_or_else(|| "".into()); .unwrap_or_else(|| "".into());
} }
"context-wait" => { "context-wait" => {
settings.context_wait = value.get().expect("type checked upstream"); settings.context_wait = Duration::from_millis(
value.get::<u32>().expect("type checked upstream").into(),
);
} }
_ => unimplemented!(), _ => unimplemented!(),
} }
@ -663,7 +667,7 @@ impl ObjectImpl for TcpClientSrc {
"caps" => settings.caps.to_value(), "caps" => settings.caps.to_value(),
"blocksize" => settings.blocksize.to_value(), "blocksize" => settings.blocksize.to_value(),
"context" => settings.context.to_value(), "context" => settings.context.to_value(),
"context-wait" => settings.context_wait.to_value(), "context-wait" => (settings.context_wait.as_millis() as u32).to_value(),
_ => unimplemented!(), _ => unimplemented!(),
} }
} }

View file

@ -62,7 +62,8 @@ const DEFAULT_TTL_MC: u32 = 1;
const DEFAULT_QOS_DSCP: i32 = -1; const DEFAULT_QOS_DSCP: i32 = -1;
const DEFAULT_CLIENTS: &str = ""; const DEFAULT_CLIENTS: &str = "";
const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: u32 = 0; // FIXME use Duration::ZERO when MSVC >= 1.53.2
const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0);
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct Settings { struct Settings {
@ -81,7 +82,7 @@ struct Settings {
ttl_mc: u32, ttl_mc: u32,
qos_dscp: i32, qos_dscp: i32,
context: String, context: String,
context_wait: u32, context_wait: Duration,
} }
impl Default for Settings { impl Default for Settings {
@ -125,7 +126,7 @@ enum TaskItem {
struct UdpSinkPadHandlerInner { struct UdpSinkPadHandlerInner {
sync: bool, sync: bool,
segment: Option<gst::Segment>, segment: Option<gst::Segment>,
latency: gst::ClockTime, latency: Option<gst::ClockTime>,
socket: Arc<Mutex<Option<tokio::net::UdpSocket>>>, socket: Arc<Mutex<Option<tokio::net::UdpSocket>>>,
socket_v6: Arc<Mutex<Option<tokio::net::UdpSocket>>>, socket_v6: Arc<Mutex<Option<tokio::net::UdpSocket>>>,
#[allow(clippy::rc_buffer)] #[allow(clippy::rc_buffer)]
@ -141,7 +142,7 @@ impl UdpSinkPadHandlerInner {
UdpSinkPadHandlerInner { UdpSinkPadHandlerInner {
sync: DEFAULT_SYNC, sync: DEFAULT_SYNC,
segment: None, segment: None,
latency: gst::CLOCK_TIME_NONE, latency: None,
socket: Arc::new(Mutex::new(None)), socket: Arc::new(Mutex::new(None)),
socket_v6: Arc::new(Mutex::new(None)), socket_v6: Arc::new(Mutex::new(None)),
clients: Arc::new(vec![SocketAddr::new( clients: Arc::new(vec![SocketAddr::new(
@ -217,7 +218,7 @@ impl UdpSinkPadHandler {
} }
fn set_latency(&self, latency: gst::ClockTime) { fn set_latency(&self, latency: gst::ClockTime) {
self.0.write().unwrap().latency = latency; self.0.write().unwrap().latency = Some(latency);
} }
fn prepare(&self) { fn prepare(&self) {
@ -405,15 +406,17 @@ impl UdpSinkPadHandler {
) = { ) = {
let mut inner = self.0.write().unwrap(); let mut inner = self.0.write().unwrap();
let do_sync = inner.sync; let do_sync = inner.sync;
let mut rtime: gst::ClockTime = 0.into(); let mut rtime = gst::ClockTime::NONE;
if let Some(segment) = &inner.segment { if let Some(segment) = &inner.segment {
if let Some(segment) = segment.downcast_ref::<gst::format::Time>() { rtime = segment
rtime = segment.to_running_time(buffer.pts()); .downcast_ref::<gst::format::Time>()
if inner.latency.is_some() { .and_then(|segment| {
rtime += inner.latency; segment
} .to_running_time(buffer.pts())
} .zip(inner.latency)
.map(|(rtime, latency)| rtime + latency)
});
} }
let clients_to_configure = mem::replace(&mut inner.clients_to_configure, vec![]); let clients_to_configure = mem::replace(&mut inner.clients_to_configure, vec![]);
@ -519,14 +522,20 @@ impl UdpSinkPadHandler {
} }
/* Wait until specified time */ /* Wait until specified time */
async fn sync(&self, element: &super::UdpSink, running_time: gst::ClockTime) { async fn sync(
&self,
element: &super::UdpSink,
running_time: impl Into<Option<gst::ClockTime>>,
) {
let now = element.current_running_time(); let now = element.current_running_time();
if let Some(delay) = running_time match running_time
.saturating_sub(now) .into()
.and_then(|delay| delay.nseconds()) .zip(now)
.and_then(|(running_time, now)| running_time.checked_sub(now))
{ {
runtime::time::delay_for(Duration::from_nanos(delay)).await; Some(delay) => runtime::time::delay_for(delay.into()).await,
None => runtime::executor::yield_now().await,
} }
} }
@ -980,7 +989,7 @@ impl ObjectImpl for UdpSink {
"Throttle poll loop to run at most once every this many ms", "Throttle poll loop to run at most once every this many ms",
0, 0,
1000, 1000,
DEFAULT_CONTEXT_WAIT, DEFAULT_CONTEXT_WAIT.as_millis() as u32,
glib::ParamFlags::READWRITE, glib::ParamFlags::READWRITE,
), ),
glib::ParamSpec::new_boolean( glib::ParamSpec::new_boolean(
@ -1257,7 +1266,9 @@ impl ObjectImpl for UdpSink {
.unwrap_or_else(|| "".into()); .unwrap_or_else(|| "".into());
} }
"context-wait" => { "context-wait" => {
settings.context_wait = value.get().expect("type checked upstream"); settings.context_wait = Duration::from_millis(
value.get::<u32>().expect("type checked upstream").into(),
);
} }
_ => unimplemented!(), _ => unimplemented!(),
} }
@ -1309,7 +1320,7 @@ impl ObjectImpl for UdpSink {
clients.join(",").to_value() clients.join(",").to_value()
} }
"context" => settings.context.to_value(), "context" => settings.context.to_value(),
"context-wait" => settings.context_wait.to_value(), "context-wait" => (settings.context_wait.as_millis() as u32).to_value(),
_ => unimplemented!(), _ => unimplemented!(),
} }
} }

View file

@ -32,6 +32,7 @@ use std::io;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::Arc; use std::sync::Arc;
use std::sync::Mutex as StdMutex; use std::sync::Mutex as StdMutex;
use std::time::Duration;
use std::u16; use std::u16;
use crate::runtime::prelude::*; use crate::runtime::prelude::*;
@ -47,7 +48,8 @@ const DEFAULT_MTU: u32 = 1492;
const DEFAULT_SOCKET: Option<GioSocketWrapper> = None; const DEFAULT_SOCKET: Option<GioSocketWrapper> = None;
const DEFAULT_USED_SOCKET: Option<GioSocketWrapper> = None; const DEFAULT_USED_SOCKET: Option<GioSocketWrapper> = None;
const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: u32 = 0; // FIXME use Duration::ZERO when MSVC >= 1.53.2
const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0);
const DEFAULT_RETRIEVE_SENDER_ADDRESS: bool = true; const DEFAULT_RETRIEVE_SENDER_ADDRESS: bool = true;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -60,7 +62,7 @@ struct Settings {
socket: Option<GioSocketWrapper>, socket: Option<GioSocketWrapper>,
used_socket: Option<GioSocketWrapper>, used_socket: Option<GioSocketWrapper>,
context: String, context: String,
context_wait: u32, context_wait: Duration,
retrieve_sender_address: bool, retrieve_sender_address: bool,
} }
@ -237,7 +239,7 @@ impl PadSrcHandler for UdpSrcPadHandler {
let ret = match query.view_mut() { let ret = match query.view_mut() {
QueryView::Latency(ref mut q) => { QueryView::Latency(ref mut q) => {
q.set(true, 0.into(), gst::CLOCK_TIME_NONE); q.set(true, gst::ClockTime::ZERO, gst::ClockTime::NONE);
true true
} }
QueryView::Scheduling(ref mut q) => { QueryView::Scheduling(ref mut q) => {
@ -301,7 +303,7 @@ impl TaskImpl for UdpSrcTask {
async move { async move {
gst_log!(CAT, obj: &self.element, "Starting task"); gst_log!(CAT, obj: &self.element, "Starting task");
self.socket self.socket
.set_clock(self.element.clock(), Some(self.element.base_time())); .set_clock(self.element.clock(), self.element.base_time());
gst_log!(CAT, obj: &self.element, "Task started"); gst_log!(CAT, obj: &self.element, "Task started");
Ok(()) Ok(())
} }
@ -721,7 +723,7 @@ impl ObjectImpl for UdpSrc {
"Throttle poll loop to run at most once every this many ms", "Throttle poll loop to run at most once every this many ms",
0, 0,
1000, 1000,
DEFAULT_CONTEXT_WAIT, DEFAULT_CONTEXT_WAIT.as_millis() as u32,
glib::ParamFlags::READWRITE, glib::ParamFlags::READWRITE,
), ),
glib::ParamSpec::new_string( glib::ParamSpec::new_string(
@ -836,7 +838,9 @@ impl ObjectImpl for UdpSrc {
.unwrap_or_else(|| "".into()); .unwrap_or_else(|| "".into());
} }
"context-wait" => { "context-wait" => {
settings.context_wait = value.get().expect("type checked upstream"); settings.context_wait = Duration::from_millis(
value.get::<u32>().expect("type checked upstream").into(),
);
} }
"retrieve-sender-address" => { "retrieve-sender-address" => {
settings.retrieve_sender_address = value.get().expect("type checked upstream"); settings.retrieve_sender_address = value.get().expect("type checked upstream");
@ -864,7 +868,7 @@ impl ObjectImpl for UdpSrc {
.map(GioSocketWrapper::as_socket) .map(GioSocketWrapper::as_socket)
.to_value(), .to_value(),
"context" => settings.context.to_value(), "context" => settings.context.to_value(),
"context-wait" => settings.context_wait.to_value(), "context-wait" => (settings.context_wait.as_millis() as u32).to_value(),
"retrieve-sender-address" => settings.retrieve_sender_address.to_value(), "retrieve-sender-address" => settings.retrieve_sender_address.to_value(),
_ => unimplemented!(), _ => unimplemented!(),
} }

View file

@ -34,6 +34,7 @@ use once_cell::sync::Lazy;
use std::boxed::Box; use std::boxed::Box;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex as StdMutex; use std::sync::Mutex as StdMutex;
use std::time::Duration;
use gstthreadshare::runtime::prelude::*; use gstthreadshare::runtime::prelude::*;
use gstthreadshare::runtime::{ use gstthreadshare::runtime::{
@ -41,7 +42,7 @@ use gstthreadshare::runtime::{
}; };
const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT: &str = "";
const THROTTLING_DURATION: u32 = 2; const THROTTLING_DURATION: Duration = Duration::from_millis(2);
fn init() { fn init() {
use std::sync::Once; use std::sync::Once;
@ -968,7 +969,7 @@ fn src_tsqueue_sink_nominal() {
.set_property("context", &format!("{}_queue", name)) .set_property("context", &format!("{}_queue", name))
.unwrap(); .unwrap();
ts_queue ts_queue
.set_property("context-wait", &THROTTLING_DURATION) .set_property("context-wait", &(THROTTLING_DURATION.as_millis() as u32))
.unwrap(); .unwrap();
let (pipeline, src_element, _sink_element, receiver) = setup(name, Some(ts_queue), None); let (pipeline, src_element, _sink_element, receiver) = setup(name, Some(ts_queue), None);
@ -1007,7 +1008,7 @@ fn src_tsproxy_sink_nominal() {
.set_property("context", &format!("{}_context", name)) .set_property("context", &format!("{}_context", name))
.unwrap(); .unwrap();
ts_proxy_src ts_proxy_src
.set_property("context-wait", &THROTTLING_DURATION) .set_property("context-wait", &(THROTTLING_DURATION.as_millis() as u32))
.unwrap(); .unwrap();
let (pipeline, src_element, _sink_element, receiver) = let (pipeline, src_element, _sink_element, receiver) =

View file

@ -76,7 +76,7 @@ fn test_push() {
let mut eos = false; let mut eos = false;
let bus = pipeline.bus().unwrap(); let bus = pipeline.bus().unwrap();
while let Some(msg) = bus.timed_pop(5 * gst::SECOND) { while let Some(msg) = bus.timed_pop(5 * gst::ClockTime::SECOND) {
use gst::MessageView; use gst::MessageView;
match msg.view() { match msg.view() {
MessageView::Eos(..) => { MessageView::Eos(..) => {
@ -123,8 +123,8 @@ fn test_from_pipeline_to_pipeline() {
pipe_1.set_state(gst::State::Paused).unwrap(); pipe_1.set_state(gst::State::Paused).unwrap();
pipe_2.set_state(gst::State::Paused).unwrap(); pipe_2.set_state(gst::State::Paused).unwrap();
let _ = pipe_1.state(gst::CLOCK_TIME_NONE); let _ = pipe_1.state(gst::ClockTime::NONE);
let _ = pipe_2.state(gst::CLOCK_TIME_NONE); let _ = pipe_2.state(gst::ClockTime::NONE);
pipe_1.set_state(gst::State::Null).unwrap(); pipe_1.set_state(gst::State::Null).unwrap();

View file

@ -71,7 +71,7 @@ fn test_push() {
let mut eos = false; let mut eos = false;
let bus = pipeline.bus().unwrap(); let bus = pipeline.bus().unwrap();
while let Some(msg) = bus.timed_pop(5 * gst::SECOND) { while let Some(msg) = bus.timed_pop(5 * gst::ClockTime::SECOND) {
use gst::MessageView; use gst::MessageView;
match msg.view() { match msg.view() {
MessageView::Eos(..) => { MessageView::Eos(..) => {

View file

@ -95,7 +95,7 @@ fn test_push() {
let mut eos = false; let mut eos = false;
let bus = pipeline.bus().unwrap(); let bus = pipeline.bus().unwrap();
while let Some(msg) = bus.timed_pop(5 * gst::SECOND) { while let Some(msg) = bus.timed_pop(5 * gst::ClockTime::SECOND) {
use gst::MessageView; use gst::MessageView;
match msg.view() { match msg.view() {
MessageView::Eos(..) => { MessageView::Eos(..) => {