diff --git a/utils/fallbackswitch/examples/gtk_fallbackswitch.rs b/utils/fallbackswitch/examples/gtk_fallbackswitch.rs index bf781b6b..c8c0bc13 100644 --- a/utils/fallbackswitch/examples/gtk_fallbackswitch.rs +++ b/utils/fallbackswitch/examples/gtk_fallbackswitch.rs @@ -42,7 +42,7 @@ fn create_pipeline() -> (gst::Pipeline, gst::Pad, gst::Element, gtk::Widget) { let fallbackswitch = gst::ElementFactory::make("fallbackswitch", None).unwrap(); fallbackswitch - .set_property("timeout", &gst::SECOND) + .set_property("timeout", &gst::ClockTime::SECOND) .unwrap(); let decodebin = gst::ElementFactory::make("decodebin", None).unwrap(); @@ -134,7 +134,7 @@ fn create_ui(app: >k::Application) { let position = video_sink .query_position::() - .unwrap_or_else(|| 0.into()); + .unwrap_or(gst::ClockTime::ZERO); position_label.set_text(&format!("Position: {:.1}", position)); glib::Continue(true) diff --git a/utils/fallbackswitch/src/base/aggregator.rs b/utils/fallbackswitch/src/base/aggregator.rs index 5b057903..e17d0f6c 100644 --- a/utils/fallbackswitch/src/base/aggregator.rs +++ b/utils/fallbackswitch/src/base/aggregator.rs @@ -1,19 +1,15 @@ -// Copyright (C) 2018 Sebastian Dröge -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. +// Take a look at the license at the top of the repository in the LICENSE file. use super::ffi; use super::Aggregator; + use glib::signal::{connect_raw, SignalHandlerId}; use glib::translate::*; use glib::IsA; use glib::Value; use gst::glib; use gst::prelude::*; + use std::boxed::Box as Box_; use std::mem; use std::ptr; @@ -26,7 +22,8 @@ pub trait AggregatorExtManual: 'static { fn set_min_upstream_latency(&self, min_upstream_latency: gst::ClockTime); - fn connect_property_min_upstream_latency_notify( + #[doc(alias = "min-upstream-latency")] + fn connect_min_upstream_latency_notify( &self, f: F, ) -> SignalHandlerId; @@ -47,13 +44,12 @@ impl> AggregatorExtManual for O { } fn finish_buffer(&self, buffer: gst::Buffer) -> Result { - let ret: gst::FlowReturn = unsafe { - from_glib(ffi::gst_aggregator_finish_buffer( + unsafe { + try_from_glib(ffi::gst_aggregator_finish_buffer( self.as_ref().to_glib_none().0, buffer.into_ptr(), )) - }; - ret.into_result() + } } fn min_upstream_latency(&self) -> gst::ClockTime { @@ -80,7 +76,7 @@ impl> AggregatorExtManual for O { } } - fn connect_property_min_upstream_latency_notify( + fn connect_min_upstream_latency_notify( &self, f: F, ) -> SignalHandlerId { @@ -89,8 +85,8 @@ impl> AggregatorExtManual for O { connect_raw( self.as_ptr() as *mut _, b"notify::min-upstream-latency\0".as_ptr() as *const _, - Some(mem::transmute( - notify_min_upstream_latency_trampoline:: as usize, + Some(mem::transmute::<_, unsafe extern "C" fn()>( + notify_min_upstream_latency_trampoline:: as *const (), )), Box_::into_raw(f), ) diff --git a/utils/fallbackswitch/src/base/aggregator_pad.rs b/utils/fallbackswitch/src/base/aggregator_pad.rs index a9ca17c9..794ba1cd 100644 --- a/utils/fallbackswitch/src/base/aggregator_pad.rs +++ b/utils/fallbackswitch/src/base/aggregator_pad.rs @@ -1,18 +1,14 @@ -// Copyright (C) 2018 Sebastian Dröge -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. +// Take a look at the license at the top of the repository in the LICENSE file. use super::ffi; use super::AggregatorPad; + use glib::object::IsA; use glib::translate::*; use gst::glib; pub trait AggregatorPadExtManual: 'static { + #[doc(alias = "get_segment")] fn segment(&self) -> gst::Segment; } diff --git a/utils/fallbackswitch/src/base/auto/aggregator.rs b/utils/fallbackswitch/src/base/auto/aggregator.rs index 66f6b9ad..05227fe8 100644 --- a/utils/fallbackswitch/src/base/auto/aggregator.rs +++ b/utils/fallbackswitch/src/base/auto/aggregator.rs @@ -4,13 +4,14 @@ // DO NOT EDIT use super::super::ffi; -use glib::object::Cast; -use glib::object::IsA; + use glib::signal::connect_raw; use glib::signal::SignalHandlerId; use glib::translate::*; -use glib::StaticType; + use gst::glib; +use gst::prelude::*; + use std::boxed::Box as Box_; use std::mem::transmute; @@ -29,38 +30,45 @@ pub const NONE_AGGREGATOR: Option<&Aggregator> = None; pub trait AggregatorExt: 'static { //#[doc(alias = "gst_aggregator_get_allocator")] + //#[doc(alias = "get_allocator")] //fn allocator(&self, allocator: /*Ignored*/Option, params: /*Ignored*/gst::AllocationParams); #[doc(alias = "gst_aggregator_get_buffer_pool")] + #[doc(alias = "get_buffer_pool")] fn buffer_pool(&self) -> Option; #[doc(alias = "gst_aggregator_get_latency")] - fn latency(&self) -> gst::ClockTime; + #[doc(alias = "get_latency")] + fn latency(&self) -> Option; #[doc(alias = "gst_aggregator_negotiate")] fn negotiate(&self) -> bool; #[doc(alias = "gst_aggregator_set_latency")] - fn set_latency(&self, min_latency: gst::ClockTime, max_latency: gst::ClockTime); + fn set_latency( + &self, + min_latency: gst::ClockTime, + max_latency: impl Into>, + ); #[doc(alias = "gst_aggregator_set_src_caps")] fn set_src_caps(&self, caps: &gst::Caps); #[doc(alias = "gst_aggregator_simple_get_next_time")] - fn simple_get_next_time(&self) -> gst::ClockTime; + fn simple_get_next_time(&self) -> Option; - #[doc(alias = "get_property_start_time")] + #[doc(alias = "start-time")] fn start_time(&self) -> u64; - #[doc(alias = "set_property_start_time")] + #[doc(alias = "start-time")] fn set_start_time(&self, start_time: u64); - fn connect_property_latency_notify( - &self, - f: F, - ) -> SignalHandlerId; + #[doc(alias = "latency")] + fn connect_latency_notify(&self, f: F) + -> SignalHandlerId; - fn connect_property_start_time_notify( + #[doc(alias = "start-time")] + fn connect_start_time_notify( &self, f: F, ) -> SignalHandlerId; @@ -79,7 +87,7 @@ impl> AggregatorExt for O { } } - fn latency(&self) -> gst::ClockTime { + fn latency(&self) -> Option { unsafe { from_glib(ffi::gst_aggregator_get_latency( self.as_ref().to_glib_none().0, @@ -95,12 +103,16 @@ impl> AggregatorExt for O { } } - fn set_latency(&self, min_latency: gst::ClockTime, max_latency: gst::ClockTime) { + fn set_latency( + &self, + min_latency: gst::ClockTime, + max_latency: impl Into>, + ) { unsafe { ffi::gst_aggregator_set_latency( self.as_ref().to_glib_none().0, min_latency.into_glib(), - max_latency.into_glib(), + max_latency.into().into_glib(), ); } } @@ -111,7 +123,7 @@ impl> AggregatorExt for O { } } - fn simple_get_next_time(&self) -> gst::ClockTime { + fn simple_get_next_time(&self) -> Option { unsafe { from_glib(ffi::gst_aggregator_simple_get_next_time( self.as_ref().to_glib_none().0, @@ -138,22 +150,24 @@ impl> AggregatorExt for O { glib::gobject_ffi::g_object_set_property( self.to_glib_none().0 as *mut glib::gobject_ffi::GObject, b"start-time\0".as_ptr() as *const _, - glib::Value::from(&start_time).to_glib_none().0, + start_time.to_value().to_glib_none().0, ); } } - fn connect_property_latency_notify( + #[doc(alias = "latency")] + fn connect_latency_notify( &self, f: F, ) -> SignalHandlerId { - unsafe extern "C" fn notify_latency_trampoline( + unsafe extern "C" fn notify_latency_trampoline< + P: IsA, + F: Fn(&P) + Send + Sync + 'static, + >( this: *mut ffi::GstAggregator, _param_spec: glib::ffi::gpointer, f: glib::ffi::gpointer, - ) where - P: IsA, - { + ) { let f: &F = &*(f as *const F); f(&Aggregator::from_glib_borrow(this).unsafe_cast_ref()) } @@ -170,17 +184,19 @@ impl> AggregatorExt for O { } } - fn connect_property_start_time_notify( + #[doc(alias = "start-time")] + fn connect_start_time_notify( &self, f: F, ) -> SignalHandlerId { - unsafe extern "C" fn notify_start_time_trampoline( + unsafe extern "C" fn notify_start_time_trampoline< + P: IsA, + F: Fn(&P) + Send + Sync + 'static, + >( this: *mut ffi::GstAggregator, _param_spec: glib::ffi::gpointer, f: glib::ffi::gpointer, - ) where - P: IsA, - { + ) { let f: &F = &*(f as *const F); f(&Aggregator::from_glib_borrow(this).unsafe_cast_ref()) } diff --git a/utils/fallbackswitch/src/base/auto/aggregator_pad.rs b/utils/fallbackswitch/src/base/auto/aggregator_pad.rs index fe7204de..a4c0a564 100644 --- a/utils/fallbackswitch/src/base/auto/aggregator_pad.rs +++ b/utils/fallbackswitch/src/base/auto/aggregator_pad.rs @@ -3,13 +3,14 @@ // DO NOT EDIT use super::super::ffi; -use glib::object::Cast; -use glib::object::IsA; + use glib::signal::connect_raw; use glib::signal::SignalHandlerId; use glib::translate::*; -use glib::StaticType; + use gst::glib; +use gst::prelude::*; + use std::boxed::Box as Box_; use std::mem::transmute; @@ -42,10 +43,10 @@ pub trait AggregatorPadExt: 'static { #[doc(alias = "gst_aggregator_pad_pop_buffer")] fn pop_buffer(&self) -> Option; - #[doc(alias = "get_property_emit_signals")] + #[doc(alias = "emit-signals")] fn emits_signals(&self) -> bool; - #[doc(alias = "set_property_emit_signals")] + #[doc(alias = "emit-signals")] fn set_emit_signals(&self, emit_signals: bool); fn connect_buffer_consumed( @@ -53,7 +54,8 @@ pub trait AggregatorPadExt: 'static { f: F, ) -> SignalHandlerId; - fn connect_property_emit_signals_notify( + #[doc(alias = "emit-signals")] + fn connect_emit_signals_notify( &self, f: F, ) -> SignalHandlerId; @@ -119,25 +121,24 @@ impl> AggregatorPadExt for O { glib::gobject_ffi::g_object_set_property( self.to_glib_none().0 as *mut glib::gobject_ffi::GObject, b"emit-signals\0".as_ptr() as *const _, - glib::Value::from(&emit_signals).to_glib_none().0, + emit_signals.to_value().to_glib_none().0, ); } } + #[doc(alias = "buffer-consumed")] fn connect_buffer_consumed( &self, f: F, ) -> SignalHandlerId { unsafe extern "C" fn buffer_consumed_trampoline< - P, + P: IsA, F: Fn(&P, &gst::Buffer) + Send + Sync + 'static, >( this: *mut ffi::GstAggregatorPad, object: *mut gst::ffi::GstBuffer, f: glib::ffi::gpointer, - ) where - P: IsA, - { + ) { let f: &F = &*(f as *const F); f( &AggregatorPad::from_glib_borrow(this).unsafe_cast_ref(), @@ -157,17 +158,19 @@ impl> AggregatorPadExt for O { } } - fn connect_property_emit_signals_notify( + #[doc(alias = "emit-signals")] + fn connect_emit_signals_notify( &self, f: F, ) -> SignalHandlerId { - unsafe extern "C" fn notify_emit_signals_trampoline( + unsafe extern "C" fn notify_emit_signals_trampoline< + P: IsA, + F: Fn(&P) + Send + Sync + 'static, + >( this: *mut ffi::GstAggregatorPad, _param_spec: glib::ffi::gpointer, f: glib::ffi::gpointer, - ) where - P: IsA, - { + ) { let f: &F = &*(f as *const F); f(&AggregatorPad::from_glib_borrow(this).unsafe_cast_ref()) } diff --git a/utils/fallbackswitch/src/base/subclass/aggregator.rs b/utils/fallbackswitch/src/base/subclass/aggregator.rs index 8d694fb0..af54d401 100644 --- a/utils/fallbackswitch/src/base/subclass/aggregator.rs +++ b/utils/fallbackswitch/src/base/subclass/aggregator.rs @@ -1,10 +1,4 @@ -// Copyright (C) 2017-2019 Sebastian Dröge -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. +// Take a look at the license at the top of the repository in the LICENSE file. use super::super::ffi; @@ -109,7 +103,7 @@ pub trait AggregatorImpl: AggregatorImplExt + ElementImpl { self.parent_stop(aggregator) } - fn next_time(&self, aggregator: &Self::Type) -> gst::ClockTime { + fn next_time(&self, aggregator: &Self::Type) -> Option { self.parent_next_time(aggregator) } @@ -213,7 +207,7 @@ pub trait AggregatorImplExt: ObjectSubclass { fn parent_stop(&self, aggregator: &Self::Type) -> Result<(), gst::ErrorMessage>; - fn parent_next_time(&self, aggregator: &Self::Type) -> gst::ClockTime; + fn parent_next_time(&self, aggregator: &Self::Type) -> Option; fn parent_create_new_pad( &self, @@ -243,18 +237,17 @@ pub trait AggregatorImplExt: ObjectSubclass { impl AggregatorImplExt for T { fn parent_flush(&self, aggregator: &Self::Type) -> Result { unsafe { - let data = T::type_data(); + let data = Self::type_data(); let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass; (*parent_class) .flush .map(|f| { - from_glib(f(aggregator + try_from_glib(f(aggregator .unsafe_cast_ref::() .to_glib_none() .0)) }) - .unwrap_or(gst::FlowReturn::Ok) - .into_result() + .unwrap_or(Ok(gst::FlowSuccess::Ok)) } } @@ -265,7 +258,7 @@ impl AggregatorImplExt for T { buffer: gst::Buffer, ) -> Option { unsafe { - let data = T::type_data(); + let data = Self::type_data(); let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass; match (*parent_class).clip { None => Some(buffer), @@ -284,16 +277,15 @@ impl AggregatorImplExt for T { buffer: gst::Buffer, ) -> Result { unsafe { - let data = T::type_data(); + let data = Self::type_data(); let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass; let f = (*parent_class) .finish_buffer .expect("Missing parent function `finish_buffer`"); - gst::FlowReturn::from_glib(f( + try_from_glib(f( aggregator.unsafe_cast_ref::().to_glib_none().0, buffer.into_ptr(), )) - .into_result() } } @@ -304,7 +296,7 @@ impl AggregatorImplExt for T { event: gst::Event, ) -> bool { unsafe { - let data = T::type_data(); + let data = Self::type_data(); let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass; let f = (*parent_class) .sink_event @@ -324,17 +316,16 @@ impl AggregatorImplExt for T { event: gst::Event, ) -> Result { unsafe { - let data = T::type_data(); + let data = Self::type_data(); let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass; let f = (*parent_class) .sink_event_pre_queue .expect("Missing parent function `sink_event_pre_queue`"); - gst::FlowReturn::from_glib(f( + try_from_glib(f( aggregator.unsafe_cast_ref::().to_glib_none().0, aggregator_pad.to_glib_none().0, event.into_ptr(), )) - .into_result() } } @@ -365,7 +356,7 @@ impl AggregatorImplExt for T { query: &mut gst::QueryRef, ) -> bool { unsafe { - let data = T::type_data(); + let data = Self::type_data(); let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass; let f = (*parent_class) .sink_query_pre_queue @@ -380,7 +371,7 @@ impl AggregatorImplExt for T { fn parent_src_event(&self, aggregator: &Self::Type, event: gst::Event) -> bool { unsafe { - let data = T::type_data(); + let data = Self::type_data(); let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass; let f = (*parent_class) .src_event @@ -394,7 +385,7 @@ impl AggregatorImplExt for T { fn parent_src_query(&self, aggregator: &Self::Type, query: &mut gst::QueryRef) -> bool { unsafe { - let data = T::type_data(); + let data = Self::type_data(); let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass; let f = (*parent_class) .src_query @@ -413,7 +404,7 @@ impl AggregatorImplExt for T { active: bool, ) -> Result<(), gst::LoggableError> { unsafe { - let data = T::type_data(); + let data = Self::type_data(); let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass; match (*parent_class).src_activate { None => Ok(()), @@ -436,22 +427,21 @@ impl AggregatorImplExt for T { timeout: bool, ) -> Result { unsafe { - let data = T::type_data(); + let data = Self::type_data(); let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass; let f = (*parent_class) .aggregate .expect("Missing parent function `aggregate`"); - gst::FlowReturn::from_glib(f( + try_from_glib(f( aggregator.unsafe_cast_ref::().to_glib_none().0, timeout.into_glib(), )) - .into_result() } } fn parent_start(&self, aggregator: &Self::Type) -> Result<(), gst::ErrorMessage> { unsafe { - let data = T::type_data(); + let data = Self::type_data(); let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass; (*parent_class) .start @@ -475,7 +465,7 @@ impl AggregatorImplExt for T { fn parent_stop(&self, aggregator: &Self::Type) -> Result<(), gst::ErrorMessage> { unsafe { - let data = T::type_data(); + let data = Self::type_data(); let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass; (*parent_class) .stop @@ -497,9 +487,9 @@ impl AggregatorImplExt for T { } } - fn parent_next_time(&self, aggregator: &Self::Type) -> gst::ClockTime { + fn parent_next_time(&self, aggregator: &Self::Type) -> Option { unsafe { - let data = T::type_data(); + let data = Self::type_data(); let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass; (*parent_class) .get_next_time @@ -509,7 +499,7 @@ impl AggregatorImplExt for T { .to_glib_none() .0)) }) - .unwrap_or(gst::CLOCK_TIME_NONE) + .unwrap_or(gst::ClockTime::NONE) } } @@ -521,7 +511,7 @@ impl AggregatorImplExt for T { caps: Option<&gst::Caps>, ) -> Option { unsafe { - let data = T::type_data(); + let data = Self::type_data(); let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass; let f = (*parent_class) .create_new_pad @@ -541,7 +531,7 @@ impl AggregatorImplExt for T { caps: &gst::Caps, ) -> Result { unsafe { - let data = T::type_data(); + let data = Self::type_data(); let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass; let f = (*parent_class) .update_src_caps @@ -559,7 +549,7 @@ impl AggregatorImplExt for T { fn parent_fixate_src_caps(&self, aggregator: &Self::Type, caps: gst::Caps) -> gst::Caps { unsafe { - let data = T::type_data(); + let data = Self::type_data(); let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass; let f = (*parent_class) @@ -578,7 +568,7 @@ impl AggregatorImplExt for T { caps: &gst::Caps, ) -> Result<(), gst::LoggableError> { unsafe { - let data = T::type_data(); + let data = Self::type_data(); let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass; (*parent_class) .negotiated_src_caps @@ -598,7 +588,7 @@ impl AggregatorImplExt for T { fn parent_negotiate(&self, aggregator: &Self::Type) -> bool { unsafe { - let data = T::type_data(); + let data = Self::type_data(); let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass; (*parent_class) .negotiate @@ -879,7 +869,7 @@ unsafe extern "C" fn aggregator_get_next_time( let imp = instance.impl_(); let wrap: Borrowed = from_glib_borrow(ptr); - gst::panic_to_error!(&wrap, &imp.panicked(), gst::CLOCK_TIME_NONE, { + gst::panic_to_error!(&wrap, &imp.panicked(), gst::ClockTime::NONE, { imp.next_time(wrap.unsafe_cast_ref()) }) .into_glib() diff --git a/utils/fallbackswitch/src/base/subclass/aggregator_pad.rs b/utils/fallbackswitch/src/base/subclass/aggregator_pad.rs index 3fb48710..1a24f30a 100644 --- a/utils/fallbackswitch/src/base/subclass/aggregator_pad.rs +++ b/utils/fallbackswitch/src/base/subclass/aggregator_pad.rs @@ -1,17 +1,10 @@ -// Copyright (C) 2018 Sebastian Dröge -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. +// Take a look at the license at the top of the repository in the LICENSE file. use super::super::ffi; use glib::translate::*; use gst::glib; use gst::prelude::*; - use gst::subclass::prelude::*; use super::super::Aggregator; @@ -58,12 +51,12 @@ impl AggregatorPadImplExt for T { aggregator: &Aggregator, ) -> Result { unsafe { - let data = T::type_data(); + let data = Self::type_data(); let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorPadClass; (*parent_class) .flush .map(|f| { - from_glib(f( + try_from_glib(f( aggregator_pad .unsafe_cast_ref::() .to_glib_none() @@ -71,8 +64,7 @@ impl AggregatorPadImplExt for T { aggregator.to_glib_none().0, )) }) - .unwrap_or(gst::FlowReturn::Ok) - .into_result() + .unwrap_or(Ok(gst::FlowSuccess::Ok)) } } @@ -83,7 +75,7 @@ impl AggregatorPadImplExt for T { buffer: &gst::Buffer, ) -> bool { unsafe { - let data = T::type_data(); + let data = Self::type_data(); let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorPadClass; (*parent_class) .skip_buffer diff --git a/utils/fallbackswitch/src/base/utils.rs b/utils/fallbackswitch/src/base/utils.rs index d6b33da9..07e76871 100644 --- a/utils/fallbackswitch/src/base/utils.rs +++ b/utils/fallbackswitch/src/base/utils.rs @@ -1,10 +1,4 @@ -// Copyright (C) 2017 Sebastian Dröge -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. +// Take a look at the license at the top of the repository in the LICENSE file. use glib::translate::mut_override; use gst::glib; @@ -14,6 +8,7 @@ pub struct MutexGuard<'a>(&'a glib::ffi::GMutex); impl<'a> MutexGuard<'a> { #[allow(clippy::trivially_copy_pass_by_ref)] + #[doc(alias = "g_mutex_lock")] pub fn lock(mutex: &'a glib::ffi::GMutex) -> Self { unsafe { glib::ffi::g_mutex_lock(mut_override(mutex)); diff --git a/utils/fallbackswitch/src/fallbacksrc/imp.rs b/utils/fallbackswitch/src/fallbacksrc/imp.rs index 8cf51415..5469b02b 100644 --- a/utils/fallbackswitch/src/fallbacksrc/imp.rs +++ b/utils/fallbackswitch/src/fallbacksrc/imp.rs @@ -20,9 +20,10 @@ use gst::prelude::*; use gst::subclass::prelude::*; use gst::{gst_debug, gst_error, gst_info, gst_warning}; +use std::convert::TryFrom; use std::mem; use std::sync::Mutex; -use std::time::{Duration, Instant}; +use std::time::Instant; use once_cell::sync::Lazy; @@ -72,11 +73,11 @@ struct Settings { uri: Option, source: Option, fallback_uri: Option, - timeout: u64, - restart_timeout: u64, - retry_timeout: u64, + timeout: gst::ClockTime, + restart_timeout: gst::ClockTime, + retry_timeout: gst::ClockTime, restart_on_eos: bool, - min_latency: u64, + min_latency: gst::ClockTime, buffer_duration: i64, } @@ -88,11 +89,11 @@ impl Default for Settings { uri: None, source: None, fallback_uri: None, - timeout: 5 * gst::SECOND_VAL, - restart_timeout: 5 * gst::SECOND_VAL, - retry_timeout: 60 * gst::SECOND_VAL, + timeout: 5 * gst::ClockTime::SECOND, + restart_timeout: 5 * gst::ClockTime::SECOND, + retry_timeout: 60 * gst::ClockTime::SECOND, restart_on_eos: false, - min_latency: 0, + min_latency: gst::ClockTime::ZERO, buffer_duration: -1, } } @@ -112,7 +113,7 @@ enum Source { struct Block { pad: gst::Pad, probe_id: gst::PadProbeId, - running_time: gst::ClockTime, + running_time: Option, } // Connects one source pad with fallbackswitch and the corresponding fallback input @@ -221,8 +222,8 @@ impl ObjectImpl for FallbackSrc { "Timeout", "Timeout for switching to the fallback URI", 0, - std::u64::MAX, - 5 * gst::SECOND_VAL, + std::u64::MAX - 1, + 5 * *gst::ClockTime::SECOND, glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, ), glib::ParamSpec::new_uint64( @@ -230,8 +231,8 @@ impl ObjectImpl for FallbackSrc { "Timeout", "Timeout for restarting an active source", 0, - std::u64::MAX, - 5 * gst::SECOND_VAL, + std::u64::MAX - 1, + 5 * *gst::ClockTime::SECOND, glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, ), glib::ParamSpec::new_uint64( @@ -239,8 +240,8 @@ impl ObjectImpl for FallbackSrc { "Retry Timeout", "Timeout for stopping after repeated failure", 0, - std::u64::MAX, - 60 * gst::SECOND_VAL, + std::u64::MAX - 1, + 60 * *gst::ClockTime::SECOND, glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, ), glib::ParamSpec::new_boolean( @@ -265,7 +266,7 @@ impl ObjectImpl for FallbackSrc { this allows to configure a minimum latency that would be configured \ if initially the fallback is enabled", 0, - std::u64::MAX, + std::u64::MAX - 1, 0, glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, ), @@ -274,7 +275,7 @@ impl ObjectImpl for FallbackSrc { "Buffer Duration", "Buffer duration when buffering streams (-1 default value)", -1, - std::i64::MAX, + std::i64::MAX - 1, -1, glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, ), @@ -740,7 +741,7 @@ impl FallbackSrc { fn create_fallback_video_input( &self, _element: &super::FallbackSrc, - min_latency: u64, + min_latency: gst::ClockTime, fallback_uri: Option<&str>, ) -> gst::Element { VideoFallbackSource::new(fallback_uri, min_latency).upcast() @@ -770,8 +771,8 @@ impl FallbackSrc { fn create_stream( &self, element: &super::FallbackSrc, - timeout: u64, - min_latency: u64, + timeout: gst::ClockTime, + min_latency: gst::ClockTime, is_audio: bool, fallback_uri: Option<&str>, ) -> Stream { @@ -797,7 +798,7 @@ impl FallbackSrc { .set_properties(&[ ("max-size-buffers", &0u32), ("max-size-bytes", &0u32), - ("max-size-time", &gst::SECOND), + ("max-size-time", &gst::ClockTime::SECOND), ]) .unwrap(); @@ -815,9 +816,9 @@ impl FallbackSrc { let src = FallbackSrc::from_instance(&element); src.handle_switch_active_pad_change(&element); }); - switch.set_property("timeout", &timeout).unwrap(); + switch.set_property("timeout", &timeout.nseconds()).unwrap(); switch - .set_property("min-upstream-latency", &min_latency) + .set_property("min-upstream-latency", &min_latency.nseconds()) .unwrap(); gst::Element::link_pads(&fallback_input, Some("src"), &switch, Some("fallback_sink")) @@ -1067,7 +1068,7 @@ impl FallbackSrc { || (!state.source_is_live && transition == gst::StateChange::PausedToPlaying) { assert!(state.source_restart_timeout.is_none()); - self.schedule_source_restart_timeout(element, state, 0.into()); + self.schedule_source_restart_timeout(element, state, gst::ClockTime::ZERO); } } } @@ -1230,7 +1231,7 @@ impl FallbackSrc { let pts = match info.data { Some(gst::PadProbeData::Buffer(ref buffer)) => buffer.pts(), Some(gst::PadProbeData::Event(ref ev)) => match ev.view() { - gst::EventView::Gap(ref ev) => ev.get().0, + gst::EventView::Gap(ref ev) => Some(ev.get().0), _ => return gst::PadProbeReturn::Pass, }, _ => unreachable!(), @@ -1250,7 +1251,7 @@ impl FallbackSrc { Block { pad: stream.clocksync_queue_srcpad.clone(), probe_id, - running_time: gst::CLOCK_TIME_NONE, + running_time: gst::ClockTime::NONE, } } @@ -1258,7 +1259,7 @@ impl FallbackSrc { &self, element: &super::FallbackSrc, pad: &gst::Pad, - pts: gst::ClockTime, + pts: impl Into>, ) -> Result<(), gst::ErrorMessage> { let mut state_guard = self.state.lock().unwrap(); let state = match &mut *state_guard { @@ -1354,21 +1355,22 @@ impl FallbackSrc { gst::error_msg!(gst::CoreError::Clock, ["Have no time segment"]) })?; - let running_time = if pts < segment.start() { - segment.start() - } else if segment.stop().is_some() && pts >= segment.stop() { - segment.stop() + let pts = pts.into(); + let running_time = if let Some((_, start)) = + pts.zip(segment.start()).filter(|(pts, start)| pts < start) + { + Some(start) + } else if let Some((_, stop)) = pts.zip(segment.stop()).filter(|(pts, stop)| pts >= stop) { + Some(stop) } else { segment.to_running_time(pts) }; - assert!(running_time.is_some()); - gst_debug!( CAT, obj: element, "Have block running time {}", - running_time, + running_time.display(), ); block.running_time = running_time; @@ -1414,13 +1416,13 @@ impl FallbackSrc { let audio_running_time = state .audio_stream .as_ref() - .and_then(|s| s.source_srcpad_block.as_ref().map(|b| b.running_time)) - .unwrap_or(gst::CLOCK_TIME_NONE); + .and_then(|s| s.source_srcpad_block.as_ref()) + .and_then(|b| b.running_time); let video_running_time = state .video_stream .as_ref() - .and_then(|s| s.source_srcpad_block.as_ref().map(|b| b.running_time)) - .unwrap_or(gst::CLOCK_TIME_NONE); + .and_then(|s| s.source_srcpad_block.as_ref()) + .and_then(|b| b.running_time); let audio_srcpad = state .audio_stream @@ -1444,7 +1446,14 @@ impl FallbackSrc { // Also consider EOS, we'd never get a new running time after EOS so don't need to wait. // FIXME: All this surely can be simplified somehow - let current_running_time = element.current_running_time(); + // FIXME I guess this could be moved up + let current_running_time = match element.current_running_time() { + Some(current_running_time) => current_running_time, + None => { + gst_debug!(CAT, obj: element, "Waiting for current_running_time"); + return; + } + }; if have_audio && want_audio && have_video && want_video { if audio_running_time.is_none() @@ -1466,18 +1475,21 @@ impl FallbackSrc { return; } + let audio_running_time = audio_running_time.expect("checked above"); + let video_running_time = video_running_time.expect("checked above"); + let min_running_time = if audio_is_eos { video_running_time } else if video_is_eos { audio_running_time } else { - assert!(audio_running_time.is_some() && video_running_time.is_some()); - audio_running_time.min(video_running_time).unwrap() + audio_running_time.min(video_running_time) }; + let offset = if current_running_time > min_running_time { - (current_running_time - min_running_time).unwrap() as i64 + (current_running_time - min_running_time).nseconds() as i64 } else { - -((min_running_time - current_running_time).unwrap() as i64) + -((min_running_time - current_running_time).nseconds() as i64) }; gst_debug!( @@ -1514,15 +1526,18 @@ impl FallbackSrc { block.pad.remove_probe(block.probe_id); } } else if have_audio && want_audio { - if audio_running_time.is_none() { - gst_debug!(CAT, obj: element, "Waiting for audio pad to block"); - return; - } + let audio_running_time = match audio_running_time { + Some(audio_running_time) => audio_running_time, + None => { + gst_debug!(CAT, obj: element, "Waiting for audio pad to block"); + return; + } + }; let offset = if current_running_time > audio_running_time { - (current_running_time - audio_running_time).unwrap() as i64 + (current_running_time - audio_running_time).nseconds() as i64 } else { - -((audio_running_time - current_running_time).unwrap() as i64) + -((audio_running_time - current_running_time).nseconds() as i64) }; gst_debug!( @@ -1546,15 +1561,18 @@ impl FallbackSrc { block.pad.remove_probe(block.probe_id); } } else if have_video && want_video { - if video_running_time.is_none() { - gst_debug!(CAT, obj: element, "Waiting for video pad to block"); - return; - } + let video_running_time = match video_running_time { + Some(video_running_time) => video_running_time, + None => { + gst_debug!(CAT, obj: element, "Waiting for video pad to block"); + return; + } + }; let offset = if current_running_time > video_running_time { - (current_running_time - video_running_time).unwrap() as i64 + (current_running_time - video_running_time).nseconds() as i64 } else { - -((video_running_time - current_running_time).unwrap() as i64) + -((video_running_time - current_running_time).nseconds() as i64) }; gst_debug!( @@ -1911,8 +1929,7 @@ impl FallbackSrc { gst_debug!(CAT, obj: element, "Waiting for 1s before retrying"); let clock = gst::SystemClock::obtain(); - let wait_time = clock.time() + gst::SECOND; - assert!(wait_time.is_some()); + let wait_time = clock.time().unwrap() + gst::ClockTime::SECOND; assert!(state.source_pending_restart_timeout.is_none()); let timeout = clock.new_single_shot_id(wait_time); @@ -1998,7 +2015,11 @@ impl FallbackSrc { let mut state_guard = src.state.lock().unwrap(); let state = state_guard.as_mut().expect("no state"); assert!(state.source_restart_timeout.is_none()); - src.schedule_source_restart_timeout(element, state, 0.into()); + src.schedule_source_restart_timeout( + element, + state, + gst::ClockTime::ZERO, + ); } }); }) @@ -2024,9 +2045,7 @@ impl FallbackSrc { } let clock = gst::SystemClock::obtain(); - let wait_time = - clock.time() + gst::ClockTime::from_nseconds(state.settings.restart_timeout) - elapsed; - assert!(wait_time.is_some()); + let wait_time = clock.time().unwrap() + state.settings.restart_timeout - elapsed; gst_debug!( CAT, obj: element, @@ -2063,9 +2082,7 @@ impl FallbackSrc { // If we're not actively buffering right now let's restart the source if state .last_buffering_update - .map(|i| { - i.elapsed() >= Duration::from_nanos(state.settings.restart_timeout) - }) + .map(|i| i.elapsed() >= state.settings.restart_timeout.into()) .unwrap_or(state.stats.buffering_percent == 100) { gst_debug!(CAT, obj: element, "Not buffering, restarting source"); @@ -2077,10 +2094,12 @@ impl FallbackSrc { gst_debug!(CAT, obj: element, "Buffering, restarting source later"); let elapsed = state .last_buffering_update - .map(|i| i.elapsed().as_nanos() as u64) - .unwrap_or(0); + .and_then(|last_buffering_update| { + gst::ClockTime::try_from(last_buffering_update.elapsed()).ok() + }) + .unwrap_or(gst::ClockTime::ZERO); - src.schedule_source_restart_timeout(element, state, elapsed.into()); + src.schedule_source_restart_timeout(element, state, elapsed); } } else { gst_debug!(CAT, obj: element, "Restarting source not needed anymore"); @@ -2150,7 +2169,7 @@ impl FallbackSrc { if self.have_fallback_activated(element, state) { gst_warning!(CAT, obj: element, "Switched to fallback stream"); if state.source_restart_timeout.is_none() { - self.schedule_source_restart_timeout(element, state, 0.into()); + self.schedule_source_restart_timeout(element, state, gst::ClockTime::ZERO); } drop(state_guard); diff --git a/utils/fallbackswitch/src/fallbacksrc/video_fallback/imp.rs b/utils/fallbackswitch/src/fallbacksrc/video_fallback/imp.rs index 1221fa3a..1ccb4fd9 100644 --- a/utils/fallbackswitch/src/fallbacksrc/video_fallback/imp.rs +++ b/utils/fallbackswitch/src/fallbacksrc/video_fallback/imp.rs @@ -36,14 +36,14 @@ static CAT: Lazy = Lazy::new(|| { #[derive(Debug, Clone)] struct Settings { uri: Option, - min_latency: u64, + min_latency: gst::ClockTime, } impl Default for Settings { fn default() -> Self { Settings { uri: None, - min_latency: 0, + min_latency: gst::ClockTime::ZERO, } } } @@ -131,7 +131,7 @@ impl ObjectImpl for VideoFallbackSource { gst_info!( CAT, obj: obj, - "Changing Minimum Latency from {:?} to {:?}", + "Changing Minimum Latency from {} to {}", settings.min_latency, new_value, ); @@ -276,7 +276,7 @@ impl VideoFallbackSource { fn create_source( &self, element: &super::VideoFallbackSource, - min_latency: u64, + min_latency: gst::ClockTime, uri: Option<&str>, ) -> gst::Element { gst_debug!(CAT, obj: element, "Creating source with uri {:?}", uri); @@ -313,7 +313,7 @@ impl VideoFallbackSource { ("max-size-bytes", &0u32), ( "max-size-time", - &gst::ClockTime::max(5 * gst::SECOND, min_latency.into()).unwrap(), + &min_latency.max(5 * gst::ClockTime::SECOND).nseconds(), ), ]) .unwrap(); diff --git a/utils/fallbackswitch/src/fallbacksrc/video_fallback/mod.rs b/utils/fallbackswitch/src/fallbacksrc/video_fallback/mod.rs index db51a3ea..7a83788d 100644 --- a/utils/fallbackswitch/src/fallbacksrc/video_fallback/mod.rs +++ b/utils/fallbackswitch/src/fallbacksrc/video_fallback/mod.rs @@ -30,7 +30,7 @@ unsafe impl Send for VideoFallbackSource {} unsafe impl Sync for VideoFallbackSource {} impl VideoFallbackSource { - pub fn new(uri: Option<&str>, min_latency: u64) -> VideoFallbackSource { - glib::Object::new(&[("uri", &uri), ("min-latency", &min_latency)]).unwrap() + pub fn new(uri: Option<&str>, min_latency: gst::ClockTime) -> VideoFallbackSource { + glib::Object::new(&[("uri", &uri), ("min-latency", &min_latency.nseconds())]).unwrap() } } diff --git a/utils/fallbackswitch/src/fallbackswitch/imp.rs b/utils/fallbackswitch/src/fallbackswitch/imp.rs index 94b09f75..89ef28a0 100644 --- a/utils/fallbackswitch/src/fallbackswitch/imp.rs +++ b/utils/fallbackswitch/src/fallbackswitch/imp.rs @@ -71,13 +71,13 @@ static CAT: Lazy = Lazy::new(|| { #[derive(Debug, Default)] struct PadOutputState { - last_sinkpad_time: gst::ClockTime, + last_sinkpad_time: Option, stream_health: StreamHealth, } #[derive(Debug)] struct OutputState { - last_output_time: gst::ClockTime, + last_output_time: Option, primary: PadOutputState, fallback: PadOutputState, } @@ -89,7 +89,7 @@ struct PadInputState { video_info: Option, } -const DEFAULT_TIMEOUT: u64 = 5 * gst::SECOND_VAL; +const DEFAULT_TIMEOUT: gst::ClockTime = gst::ClockTime::from_seconds(5); const DEFAULT_AUTO_SWITCH: bool = true; const DEFAULT_STREAM_HEALTH: StreamHealth = StreamHealth::Inactive; @@ -108,7 +108,7 @@ impl Default for StreamHealth { impl Default for OutputState { fn default() -> Self { OutputState { - last_output_time: gst::CLOCK_TIME_NONE, + last_output_time: gst::ClockTime::NONE, primary: PadOutputState::default(), fallback: PadOutputState::default(), } @@ -118,7 +118,7 @@ impl Default for OutputState { impl Default for Settings { fn default() -> Self { Settings { - timeout: DEFAULT_TIMEOUT.into(), + timeout: DEFAULT_TIMEOUT, auto_switch: DEFAULT_AUTO_SWITCH, } } @@ -129,7 +129,7 @@ impl OutputState { &self, settings: &Settings, check_primary_pad: bool, - cur_running_time: gst::ClockTime, + cur_running_time: impl Into>, ) -> StreamHealth { let last_sinkpad_time = if check_primary_pad { self.primary.last_sinkpad_time @@ -137,11 +137,11 @@ impl OutputState { self.fallback.last_sinkpad_time }; - if last_sinkpad_time == gst::ClockTime::none() { + if last_sinkpad_time.is_none() { StreamHealth::Inactive - } else if cur_running_time != gst::ClockTime::none() - && cur_running_time < last_sinkpad_time + settings.timeout - { + } else if cur_running_time.into().map_or(false, |cur_running_time| { + cur_running_time < last_sinkpad_time.expect("checked above") + settings.timeout + }) { StreamHealth::Present } else { StreamHealth::Inactive @@ -153,7 +153,7 @@ impl OutputState { settings: &Settings, backup_pad: &Option<&gst_base::AggregatorPad>, preferred_is_primary: bool, - cur_running_time: gst::ClockTime, + cur_running_time: impl Into> + Copy, ) -> (bool, bool) { let preferred_health = self.health(settings, preferred_is_primary, cur_running_time); let backup_health = if backup_pad.is_some() { @@ -187,7 +187,7 @@ impl FallbackSwitch { &self, state: &mut OutputState, pad: &gst_base::AggregatorPad, - target_running_time: gst::ClockTime, + target_running_time: impl Into> + Copy, ) -> Result<(), gst::FlowError> { let segment = pad.segment(); @@ -201,13 +201,17 @@ impl FallbackSwitch { gst::FlowError::Error })?; - let mut running_time = gst::ClockTime::none(); + let mut running_time = gst::ClockTime::NONE; while let Some(buffer) = pad.peek_buffer() { let pts = buffer.dts_or_pts(); let new_running_time = segment.to_running_time(pts); - if pts.is_none() || new_running_time <= target_running_time { + if pts.is_none() + || new_running_time + .zip(target_running_time.into()) + .map_or(false, |(new, target)| new <= target) + { gst_debug!(CAT, obj: pad, "Dropping trailing buffer {:?}", buffer); pad.drop_buffer(); running_time = new_running_time; @@ -215,7 +219,7 @@ impl FallbackSwitch { break; } } - if running_time != gst::ClockTime::none() { + if running_time.is_some() { if pad == &self.primary_sinkpad { state.primary.last_sinkpad_time = running_time; } else { @@ -234,7 +238,7 @@ impl FallbackSwitch { mut buffer: gst::Buffer, preferred_pad: &gst_base::AggregatorPad, backup_pad: &Option<&gst_base::AggregatorPad>, - cur_running_time: gst::ClockTime, + cur_running_time: impl Into>, ) -> Result, gst::FlowError> { // If we got a buffer on the sinkpad just handle it gst_debug!( @@ -273,48 +277,46 @@ impl FallbackSwitch { state.fallback.last_sinkpad_time = running_time; } - let is_late = { - if cur_running_time != gst::ClockTime::none() { - let latency = agg.latency(); - if latency.is_some() { - let deadline = running_time + latency + 40 * gst::MSECOND; + let cur_running_time = cur_running_time.into(); + let (is_late, deadline) = cur_running_time + .zip(agg.latency()) + .zip(running_time) + .map_or( + (false, None), + |((cur_running_time, latency), running_time)| { + let dealine = running_time + latency + 40 * gst::ClockTime::MSECOND; + (cur_running_time > dealine, Some(dealine)) + }, + ); - if cur_running_time > deadline { - gst_debug!( - CAT, - obj: preferred_pad, - "Buffer is too late: {} > {}", - cur_running_time, - deadline - ); - true - } else { - false - } - } else { - false - } - } else { - false - } - }; - - if state.last_output_time.is_some() - && is_late - && state.last_output_time + settings.timeout <= running_time - { - /* This buffer arrived too late - we either already switched - * to the other pad or there's no point outputting this anyway */ + if is_late { gst_debug!( CAT, obj: preferred_pad, - "Buffer is too late and timeout reached: {} + {} <= {}", - state.last_output_time, - settings.timeout, - running_time, + "Buffer is too late: {} > {}", + cur_running_time.display(), + deadline.display(), ); - return Ok(None); + if state.last_output_time.zip(running_time).map_or( + false, + |(last_output_time, running_time)| { + last_output_time + settings.timeout <= running_time + }, + ) { + /* This buffer arrived too late - we either already switched + * to the other pad or there's no point outputting this anyway */ + gst_debug!( + CAT, + obj: preferred_pad, + "Buffer is too late and timeout reached: {} + {} <= {}", + state.last_output_time.display(), + settings.timeout, + running_time.display(), + ); + + return Ok(None); + } } let mut active_sinkpad = self.active_sinkpad.lock().unwrap(); @@ -416,14 +418,19 @@ impl FallbackSwitch { } // Get the next one if this one is before the timeout - if state.last_output_time + settings.timeout > running_time { + if state.last_output_time.zip(running_time).map_or( + false, + |(last_output_time, running_time)| { + last_output_time + settings.timeout > running_time + }, + ) { gst_debug!( CAT, obj: backup_pad, "Timeout not reached yet: {} + {} > {}", - state.last_output_time, + state.last_output_time.display(), settings.timeout, - running_time + running_time.display(), ); continue; @@ -433,9 +440,9 @@ impl FallbackSwitch { CAT, obj: backup_pad, "Timeout reached: {} + {} <= {}", - state.last_output_time, + state.last_output_time.display(), settings.timeout, - running_time + running_time.display(), ); let mut active_sinkpad = self.active_sinkpad.lock().unwrap(); @@ -529,9 +536,12 @@ impl FallbackSwitch { let base_time = agg.base_time(); let cur_running_time = if let Some(clock) = clock { - clock.time() - base_time + clock + .time() + .zip(base_time) + .and_then(|(time, base_time)| time.checked_sub(base_time)) } else { - gst::ClockTime::none() + gst::ClockTime::NONE }; /* See if there's a buffer on the preferred pad and output that */ @@ -677,8 +687,8 @@ impl ObjectImpl for FallbackSwitch { "Timeout", "Timeout in nanoseconds", 0, - std::u64::MAX, - DEFAULT_TIMEOUT, + std::u64::MAX - 1, + DEFAULT_TIMEOUT.nseconds() as u64, glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, ), glib::ParamSpec::new_object( @@ -984,7 +994,7 @@ impl AggregatorImpl for FallbackSwitch { } } - fn next_time(&self, agg: &Self::Type) -> gst::ClockTime { + fn next_time(&self, agg: &Self::Type) -> Option { /* At each iteration, we have a preferred pad and a backup pad. If autoswitch is true, * the sinkpad is always preferred, otherwise it's the active sinkpad as set by the app. * The backup pad is the other one (may be None if there's no fallback pad yet). @@ -1020,10 +1030,10 @@ impl AggregatorImpl for FallbackSwitch { "Have buffer on sinkpad {}, immediate timeout", preferred_pad.name() ); - 0.into() + Some(gst::ClockTime::ZERO) } else if self.primary_sinkpad.is_eos() { gst_debug!(CAT, obj: agg, "Sinkpad is EOS, immediate timeout"); - 0.into() + Some(gst::ClockTime::ZERO) } else if let Some((buffer, backup_sinkpad)) = backup_pad .as_ref() .and_then(|p| p.peek_buffer().map(|buffer| (buffer, p))) @@ -1031,7 +1041,7 @@ impl AggregatorImpl for FallbackSwitch { if buffer.pts().is_none() { gst_error!(CAT, obj: agg, "Only buffers with PTS supported"); // Trigger aggregate immediately to error out immediately - return 0.into(); + return Some(gst::ClockTime::ZERO); } let segment = match backup_sinkpad.segment().downcast::() { @@ -1039,7 +1049,7 @@ impl AggregatorImpl for FallbackSwitch { Err(_) => { gst_error!(CAT, obj: agg, "Only TIME segments supported"); // Trigger aggregate immediately to error out immediately - return 0.into(); + return Some(gst::ClockTime::ZERO); } }; @@ -1049,12 +1059,12 @@ impl AggregatorImpl for FallbackSwitch { obj: agg, "Have buffer on {} pad, timeout at {}", backup_sinkpad.name(), - running_time + running_time.display(), ); running_time } else { gst_debug!(CAT, obj: agg, "No buffer available on either input"); - gst::CLOCK_TIME_NONE + gst::ClockTime::NONE } } @@ -1096,25 +1106,21 @@ impl AggregatorImpl for FallbackSwitch { return Some(buffer); } - let duration = if buffer.duration().is_some() { - buffer.duration() + let duration = if let Some(duration) = buffer.duration() { + Some(duration) } else if let Some(ref audio_info) = pad_state.audio_info { - gst::SECOND - .mul_div_floor( - buffer.size() as u64, - audio_info.rate() as u64 * audio_info.bpf() as u64, - ) - .unwrap() + gst::ClockTime::SECOND.mul_div_floor( + buffer.size() as u64, + audio_info.rate() as u64 * audio_info.bpf() as u64, + ) } else if let Some(ref video_info) = pad_state.video_info { if *video_info.fps().numer() > 0 { - gst::SECOND - .mul_div_floor( - *video_info.fps().denom() as u64, - *video_info.fps().numer() as u64, - ) - .unwrap() + gst::ClockTime::SECOND.mul_div_floor( + *video_info.fps().denom() as u64, + *video_info.fps().numer() as u64, + ) } else { - gst::CLOCK_TIME_NONE + gst::ClockTime::NONE } } else { unreachable!() @@ -1125,8 +1131,8 @@ impl AggregatorImpl for FallbackSwitch { obj: agg_pad, "Clipping buffer {:?} with PTS {} and duration {}", buffer, - pts, - duration + pts.display(), + duration.display(), ); if let Some(ref audio_info) = pad_state.audio_info { gst_audio::audio_buffer_clip( @@ -1136,12 +1142,16 @@ impl AggregatorImpl for FallbackSwitch { audio_info.bpf(), ) } else if pad_state.video_info.is_some() { - segment.clip(pts, pts + duration).map(|(start, stop)| { + let stop = pts.zip(duration).map(|(pts, duration)| pts + duration); + segment.clip(pts, stop).map(|(start, stop)| { { let buffer = buffer.make_mut(); buffer.set_pts(start); if duration.is_some() { - buffer.set_duration(stop - start); + buffer.set_duration( + stop.zip(start) + .and_then(|(stop, start)| stop.checked_sub(start)), + ); } } diff --git a/utils/fallbackswitch/tests/fallbackswitch.rs b/utils/fallbackswitch/tests/fallbackswitch.rs index 3d9d7580..b4039790 100644 --- a/utils/fallbackswitch/tests/fallbackswitch.rs +++ b/utils/fallbackswitch/tests/fallbackswitch.rs @@ -56,20 +56,20 @@ macro_rules! assert_buffer { fn test_no_fallback_no_drops() { let pipeline = setup_pipeline(None); - push_buffer(&pipeline, 0.into()); - set_time(&pipeline, 0.into()); + push_buffer(&pipeline, gst::ClockTime::ZERO); + set_time(&pipeline, gst::ClockTime::ZERO); let buffer = pull_buffer(&pipeline); - assert_buffer!(buffer, 0.into()); + assert_buffer!(buffer, Some(gst::ClockTime::ZERO)); - push_buffer(&pipeline, gst::SECOND); - set_time(&pipeline, gst::SECOND); + push_buffer(&pipeline, gst::ClockTime::SECOND); + set_time(&pipeline, gst::ClockTime::SECOND); let buffer = pull_buffer(&pipeline); - assert_buffer!(buffer, gst::SECOND); + assert_buffer!(buffer, Some(gst::ClockTime::SECOND)); - push_buffer(&pipeline, 2 * gst::SECOND); - set_time(&pipeline, 2 * gst::SECOND); + push_buffer(&pipeline, 2 * gst::ClockTime::SECOND); + set_time(&pipeline, 2 * gst::ClockTime::SECOND); let buffer = pull_buffer(&pipeline); - assert_buffer!(buffer, 2 * gst::SECOND); + assert_buffer!(buffer, Some(2 * gst::ClockTime::SECOND)); push_eos(&pipeline); wait_eos(&pipeline); @@ -90,23 +90,23 @@ fn test_no_drops_not_live() { fn test_no_drops(live: bool) { let pipeline = setup_pipeline(Some(live)); - push_buffer(&pipeline, 0.into()); - push_fallback_buffer(&pipeline, 0.into()); - set_time(&pipeline, 0.into()); + push_buffer(&pipeline, gst::ClockTime::ZERO); + push_fallback_buffer(&pipeline, gst::ClockTime::ZERO); + set_time(&pipeline, gst::ClockTime::ZERO); let buffer = pull_buffer(&pipeline); - assert_buffer!(buffer, 0.into()); + assert_buffer!(buffer, Some(gst::ClockTime::ZERO)); - push_fallback_buffer(&pipeline, gst::SECOND); - push_buffer(&pipeline, gst::SECOND); - set_time(&pipeline, gst::SECOND); + push_fallback_buffer(&pipeline, gst::ClockTime::SECOND); + push_buffer(&pipeline, gst::ClockTime::SECOND); + set_time(&pipeline, gst::ClockTime::SECOND); let buffer = pull_buffer(&pipeline); - assert_buffer!(buffer, gst::SECOND); + assert_buffer!(buffer, Some(gst::ClockTime::SECOND)); - push_buffer(&pipeline, 2 * gst::SECOND); - push_fallback_buffer(&pipeline, 2 * gst::SECOND); - set_time(&pipeline, 2 * gst::SECOND); + push_buffer(&pipeline, 2 * gst::ClockTime::SECOND); + push_fallback_buffer(&pipeline, 2 * gst::ClockTime::SECOND); + set_time(&pipeline, 2 * gst::ClockTime::SECOND); let buffer = pull_buffer(&pipeline); - assert_buffer!(buffer, 2 * gst::SECOND); + assert_buffer!(buffer, Some(2 * gst::ClockTime::SECOND)); // EOS on the fallback should not be required push_eos(&pipeline); @@ -128,22 +128,22 @@ fn test_no_drops_but_no_fallback_frames_not_live() { fn test_no_drops_but_no_fallback_frames(live: bool) { let pipeline = setup_pipeline(Some(live)); - push_buffer(&pipeline, 0.into()); + push_buffer(&pipeline, gst::ClockTime::ZERO); // +10ms needed here because the immediate timeout will be always at running time 0, but // aggregator also adds the latency to it so we end up at 10ms instead. - set_time(&pipeline, 10 * gst::MSECOND); + set_time(&pipeline, 10 * gst::ClockTime::MSECOND); let buffer = pull_buffer(&pipeline); - assert_buffer!(buffer, 0.into()); + assert_buffer!(buffer, Some(gst::ClockTime::ZERO)); - push_buffer(&pipeline, gst::SECOND); - set_time(&pipeline, gst::SECOND); + push_buffer(&pipeline, gst::ClockTime::SECOND); + set_time(&pipeline, gst::ClockTime::SECOND); let buffer = pull_buffer(&pipeline); - assert_buffer!(buffer, gst::SECOND); + assert_buffer!(buffer, Some(gst::ClockTime::SECOND)); - push_buffer(&pipeline, 2 * gst::SECOND); - set_time(&pipeline, 2 * gst::SECOND); + push_buffer(&pipeline, 2 * gst::ClockTime::SECOND); + set_time(&pipeline, 2 * gst::ClockTime::SECOND); let buffer = pull_buffer(&pipeline); - assert_buffer!(buffer, 2 * gst::SECOND); + assert_buffer!(buffer, Some(2 * gst::ClockTime::SECOND)); // EOS on the fallback should not be required push_eos(&pipeline); @@ -165,23 +165,26 @@ fn test_short_drop_not_live() { fn test_short_drop(live: bool) { let pipeline = setup_pipeline(Some(live)); - push_buffer(&pipeline, 0.into()); - push_fallback_buffer(&pipeline, 0.into()); - set_time(&pipeline, 0.into()); + push_buffer(&pipeline, gst::ClockTime::ZERO); + push_fallback_buffer(&pipeline, gst::ClockTime::ZERO); + set_time(&pipeline, gst::ClockTime::ZERO); let buffer = pull_buffer(&pipeline); - assert_buffer!(buffer, 0.into()); + assert_buffer!(buffer, Some(gst::ClockTime::ZERO)); // A timeout at 1s will get rid of the fallback buffer // but not output anything - push_fallback_buffer(&pipeline, gst::SECOND); + push_fallback_buffer(&pipeline, gst::ClockTime::SECOND); // Time out the fallback buffer at +10ms - set_time(&pipeline, gst::SECOND + 10 * gst::MSECOND); + set_time( + &pipeline, + gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND, + ); - push_fallback_buffer(&pipeline, 2 * gst::SECOND); - push_buffer(&pipeline, 2 * gst::SECOND); - set_time(&pipeline, 2 * gst::SECOND); + push_fallback_buffer(&pipeline, 2 * gst::ClockTime::SECOND); + push_buffer(&pipeline, 2 * gst::ClockTime::SECOND); + set_time(&pipeline, 2 * gst::ClockTime::SECOND); let buffer = pull_buffer(&pipeline); - assert_buffer!(buffer, 2 * gst::SECOND); + assert_buffer!(buffer, Some(2 * gst::ClockTime::SECOND)); push_eos(&pipeline); push_fallback_eos(&pipeline); @@ -204,33 +207,45 @@ fn test_long_drop_and_eos(live: bool) { let pipeline = setup_pipeline(Some(live)); // Produce the first frame - push_buffer(&pipeline, 0.into()); - push_fallback_buffer(&pipeline, 0.into()); - set_time(&pipeline, 0.into()); + push_buffer(&pipeline, gst::ClockTime::ZERO); + push_fallback_buffer(&pipeline, gst::ClockTime::ZERO); + set_time(&pipeline, gst::ClockTime::ZERO); let buffer = pull_buffer(&pipeline); - assert_buffer!(buffer, 0.into()); + assert_buffer!(buffer, Some(gst::ClockTime::ZERO)); // Produce a second frame but only from the fallback source - push_fallback_buffer(&pipeline, gst::SECOND); - set_time(&pipeline, gst::SECOND + 10 * gst::MSECOND); + push_fallback_buffer(&pipeline, gst::ClockTime::SECOND); + set_time( + &pipeline, + gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND, + ); // Produce a third frame but only from the fallback source - push_fallback_buffer(&pipeline, 2 * gst::SECOND); - set_time(&pipeline, 2 * gst::SECOND + 10 * gst::MSECOND); + push_fallback_buffer(&pipeline, 2 * gst::ClockTime::SECOND); + set_time( + &pipeline, + 2 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND, + ); // Produce a fourth frame but only from the fallback source // This should be output now - push_fallback_buffer(&pipeline, 3 * gst::SECOND); - set_time(&pipeline, 3 * gst::SECOND + 10 * gst::MSECOND); + push_fallback_buffer(&pipeline, 3 * gst::ClockTime::SECOND); + set_time( + &pipeline, + 3 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND, + ); let buffer = pull_buffer(&pipeline); - assert_fallback_buffer!(buffer, 3 * gst::SECOND); + assert_fallback_buffer!(buffer, Some(3 * gst::ClockTime::SECOND)); // Produce a fifth frame but only from the fallback source // This should be output now - push_fallback_buffer(&pipeline, 4 * gst::SECOND); - set_time(&pipeline, 4 * gst::SECOND + 10 * gst::MSECOND); + push_fallback_buffer(&pipeline, 4 * gst::ClockTime::SECOND); + set_time( + &pipeline, + 4 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND, + ); let buffer = pull_buffer(&pipeline); - assert_fallback_buffer!(buffer, 4 * gst::SECOND); + assert_fallback_buffer!(buffer, Some(4 * gst::ClockTime::SECOND)); // Wait for EOS to arrive at appsink push_eos(&pipeline); @@ -254,54 +269,66 @@ fn test_long_drop_and_recover(live: bool) { let pipeline = setup_pipeline(Some(live)); // Produce the first frame - push_buffer(&pipeline, 0.into()); - push_fallback_buffer(&pipeline, 0.into()); - set_time(&pipeline, 0.into()); + push_buffer(&pipeline, gst::ClockTime::ZERO); + push_fallback_buffer(&pipeline, gst::ClockTime::ZERO); + set_time(&pipeline, gst::ClockTime::ZERO); let buffer = pull_buffer(&pipeline); - assert_buffer!(buffer, 0.into()); + assert_buffer!(buffer, Some(gst::ClockTime::ZERO)); // Produce a second frame but only from the fallback source - push_fallback_buffer(&pipeline, gst::SECOND); - set_time(&pipeline, gst::SECOND + 10 * gst::MSECOND); + push_fallback_buffer(&pipeline, gst::ClockTime::SECOND); + set_time( + &pipeline, + gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND, + ); // Produce a third frame but only from the fallback source - push_fallback_buffer(&pipeline, 2 * gst::SECOND); - set_time(&pipeline, 2 * gst::SECOND + 10 * gst::MSECOND); + push_fallback_buffer(&pipeline, 2 * gst::ClockTime::SECOND); + set_time( + &pipeline, + 2 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND, + ); // Produce a fourth frame but only from the fallback source // This should be output now - push_fallback_buffer(&pipeline, 3 * gst::SECOND); - set_time(&pipeline, 3 * gst::SECOND + 10 * gst::MSECOND); + push_fallback_buffer(&pipeline, 3 * gst::ClockTime::SECOND); + set_time( + &pipeline, + 3 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND, + ); let buffer = pull_buffer(&pipeline); - assert_fallback_buffer!(buffer, 3 * gst::SECOND); + assert_fallback_buffer!(buffer, Some(3 * gst::ClockTime::SECOND)); // Produce a fifth frame but only from the fallback source // This should be output now - push_fallback_buffer(&pipeline, 4 * gst::SECOND); - set_time(&pipeline, 4 * gst::SECOND + 10 * gst::MSECOND); + push_fallback_buffer(&pipeline, 4 * gst::ClockTime::SECOND); + set_time( + &pipeline, + 4 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND, + ); let buffer = pull_buffer(&pipeline); - assert_fallback_buffer!(buffer, 4 * gst::SECOND); + assert_fallback_buffer!(buffer, Some(4 * gst::ClockTime::SECOND)); // Produce a sixth frame from the normal source - push_buffer(&pipeline, 5 * gst::SECOND); - push_fallback_buffer(&pipeline, 5 * gst::SECOND); - set_time(&pipeline, 5 * gst::SECOND); + push_buffer(&pipeline, 5 * gst::ClockTime::SECOND); + push_fallback_buffer(&pipeline, 5 * gst::ClockTime::SECOND); + set_time(&pipeline, 5 * gst::ClockTime::SECOND); let buffer = pull_buffer(&pipeline); - assert_buffer!(buffer, 5 * gst::SECOND); + assert_buffer!(buffer, Some(5 * gst::ClockTime::SECOND)); // Produce a seventh frame from the normal source but no fallback. // This should still be output immediately - push_buffer(&pipeline, 6 * gst::SECOND); - set_time(&pipeline, 6 * gst::SECOND); + push_buffer(&pipeline, 6 * gst::ClockTime::SECOND); + set_time(&pipeline, 6 * gst::ClockTime::SECOND); let buffer = pull_buffer(&pipeline); - assert_buffer!(buffer, 6 * gst::SECOND); + assert_buffer!(buffer, Some(6 * gst::ClockTime::SECOND)); // Produce a eight frame from the normal source - push_buffer(&pipeline, 7 * gst::SECOND); - push_fallback_buffer(&pipeline, 7 * gst::SECOND); - set_time(&pipeline, 7 * gst::SECOND); + push_buffer(&pipeline, 7 * gst::ClockTime::SECOND); + push_fallback_buffer(&pipeline, 7 * gst::ClockTime::SECOND); + set_time(&pipeline, 7 * gst::ClockTime::SECOND); let buffer = pull_buffer(&pipeline); - assert_buffer!(buffer, 7 * gst::SECOND); + assert_buffer!(buffer, Some(7 * gst::ClockTime::SECOND)); // Wait for EOS to arrive at appsink push_eos(&pipeline); @@ -330,15 +357,15 @@ fn setup_pipeline(with_live_fallback: Option) -> Pipeline { gst_debug!(TEST_CAT, "Setting up pipeline"); let clock = gst_check::TestClock::new(); - clock.set_time(0.into()); + clock.set_time(gst::ClockTime::ZERO); let pipeline = gst::Pipeline::new(None); // Running time 0 in our pipeline is going to be clock time 1s. All // clock ids before 1s are used for signalling to our clock advancing // thread. pipeline.use_clock(Some(&clock)); - pipeline.set_base_time(gst::SECOND); - pipeline.set_start_time(gst::CLOCK_TIME_NONE); + pipeline.set_base_time(gst::ClockTime::SECOND); + pipeline.set_start_time(gst::ClockTime::NONE); let src = gst::ElementFactory::make("appsrc", Some("src")) .unwrap() @@ -359,7 +386,9 @@ fn setup_pipeline(with_live_fallback: Option) -> Pipeline { .unwrap(); let switch = gst::ElementFactory::make("fallbackswitch", Some("switch")).unwrap(); - switch.set_property("timeout", &(3 * gst::SECOND)).unwrap(); + switch + .set_property("timeout", &(3 * gst::ClockTime::SECOND)) + .unwrap(); let sink = gst::ElementFactory::make("appsink", Some("sink")) .unwrap() @@ -411,7 +440,7 @@ fn setup_pipeline(with_live_fallback: Option) -> Pipeline { loop { while let Some(clock_id) = clock.peek_next_pending_id().and_then(|clock_id| { // Process if the clock ID is in the past or now - if clock.time() >= clock_id.time() { + if clock.time().map_or(false, |time| time >= clock_id.time()) { Some(clock_id) } else { None @@ -420,7 +449,7 @@ fn setup_pipeline(with_live_fallback: Option) -> Pipeline { gst_debug!(TEST_CAT, "Processing clock ID at {}", clock_id.time()); if let Some(clock_id) = clock.process_next_clock_id() { gst_debug!(TEST_CAT, "Processed clock ID at {}", clock_id.time()); - if clock_id.time() == 0.into() { + if clock_id.time().is_zero() { gst_debug!(TEST_CAT, "Stopping clock thread"); return; } @@ -431,7 +460,10 @@ fn setup_pipeline(with_live_fallback: Option) -> Pipeline { // at the top of the queue. We don't want to do a busy loop here. while clock.peek_next_pending_id().iter().any(|clock_id| { // Sleep if the clock ID is in the future - clock.time() < clock_id.time() + // FIXME probably can expect clock.time() + clock + .time() + .map_or(true, |clock_time| clock_time < clock_id.time()) }) { use std::{thread, time}; @@ -514,7 +546,7 @@ fn set_time(pipeline: &Pipeline, time: gst::ClockTime) { .unwrap(); gst_debug!(TEST_CAT, "Setting time to {}", time); - clock.set_time(gst::SECOND + time); + clock.set_time(gst::ClockTime::SECOND + time); } fn wait_eos(pipeline: &Pipeline) { @@ -545,7 +577,7 @@ fn stop_pipeline(mut pipeline: Pipeline) { .unwrap(); // Signal shutdown to the clock thread - let clock_id = clock.new_single_shot_id(0.into()); + let clock_id = clock.new_single_shot_id(gst::ClockTime::ZERO); let _ = clock_id.wait(); let switch = pipeline.by_name("switch").unwrap(); diff --git a/utils/togglerecord/Cargo.toml b/utils/togglerecord/Cargo.toml index 8adb3553..cf4906dc 100644 --- a/utils/togglerecord/Cargo.toml +++ b/utils/togglerecord/Cargo.toml @@ -14,7 +14,6 @@ gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org gtk = { git = "https://github.com/gtk-rs/gtk3-rs", optional = true } gio = { git = "https://github.com/gtk-rs/gtk-rs-core", optional = true } parking_lot = "0.11" -more-asserts = "0.2" once_cell = "1.0" [dev-dependencies] diff --git a/utils/togglerecord/examples/gtk_recording.rs b/utils/togglerecord/examples/gtk_recording.rs index ee7fc076..3db29856 100644 --- a/utils/togglerecord/examples/gtk_recording.rs +++ b/utils/togglerecord/examples/gtk_recording.rs @@ -237,14 +237,14 @@ fn create_ui(app: >k::Application) { let position = video_sink .query_position::() - .unwrap_or_else(|| 0.into()); + .unwrap_or(gst::ClockTime::ZERO); position_label.set_text(&format!("Position: {:.1}", position)); let recording_duration = togglerecord .static_pad("src") .unwrap() .query_position::() - .unwrap_or_else(|| 0.into()); + .unwrap_or(gst::ClockTime::ZERO); recorded_duration_label.set_text(&format!("Recorded: {:.1}", recording_duration)); glib::Continue(true) diff --git a/utils/togglerecord/src/togglerecord/imp.rs b/utils/togglerecord/src/togglerecord/imp.rs index 1ba79102..858a4bc3 100644 --- a/utils/togglerecord/src/togglerecord/imp.rs +++ b/utils/togglerecord/src/togglerecord/imp.rs @@ -20,8 +20,6 @@ use gst::prelude::*; use gst::subclass::prelude::*; use gst::{gst_debug, gst_log, gst_trace, gst_warning}; -use more_asserts::{assert_ge, assert_le, assert_lt}; - use once_cell::sync::Lazy; use parking_lot::{Condvar, Mutex}; use std::cmp; @@ -75,8 +73,8 @@ struct StreamState { out_segment: gst::FormattedSegment, segment_seqnum: gst::Seqnum, // Start/end running time of the current/last buffer - current_running_time: gst::ClockTime, - current_running_time_end: gst::ClockTime, + current_running_time: Option, + current_running_time_end: Option, eos: bool, flushing: bool, segment_pending: bool, @@ -92,8 +90,8 @@ impl Default for StreamState { in_segment: gst::FormattedSegment::new(), out_segment: gst::FormattedSegment::new(), segment_seqnum: gst::Seqnum::next(), - current_running_time: gst::CLOCK_TIME_NONE, - current_running_time_end: gst::CLOCK_TIME_NONE, + current_running_time: None, + current_running_time_end: None, eos: false, flushing: false, segment_pending: false, @@ -127,23 +125,23 @@ enum RecordingState { #[derive(Debug)] struct State { recording_state: RecordingState, - last_recording_start: gst::ClockTime, - last_recording_stop: gst::ClockTime, + last_recording_start: Option, + last_recording_stop: Option, // Accumulated duration of previous recording segments, // updated whenever going to Stopped recording_duration: gst::ClockTime, // Updated whenever going to Recording - running_time_offset: gst::ClockTime, + running_time_offset: i64, } impl Default for State { fn default() -> Self { Self { recording_state: RecordingState::Stopped, - last_recording_start: gst::CLOCK_TIME_NONE, - last_recording_stop: gst::CLOCK_TIME_NONE, - recording_duration: 0.into(), - running_time_offset: gst::CLOCK_TIME_NONE, + last_recording_start: None, + last_recording_stop: None, + recording_duration: gst::ClockTime::ZERO, + running_time_offset: 0, } } } @@ -157,9 +155,9 @@ enum HandleResult { } trait HandleData: Sized { - fn pts(&self) -> gst::ClockTime; - fn dts(&self) -> gst::ClockTime; - fn dts_or_pts(&self) -> gst::ClockTime { + fn pts(&self) -> Option; + fn dts(&self) -> Option; + fn dts_or_pts(&self) -> Option { let dts = self.dts(); if dts.is_some() { dts @@ -167,7 +165,7 @@ trait HandleData: Sized { self.pts() } } - fn duration(&self, state: &StreamState) -> gst::ClockTime; + fn duration(&self, state: &StreamState) -> Option; fn is_keyframe(&self) -> bool; fn can_clip(&self, state: &StreamState) -> bool; fn clip( @@ -177,16 +175,16 @@ trait HandleData: Sized { ) -> Option; } -impl HandleData for (gst::ClockTime, gst::ClockTime) { - fn pts(&self) -> gst::ClockTime { - self.0 +impl HandleData for (gst::ClockTime, Option) { + fn pts(&self) -> Option { + Some(self.0) } - fn dts(&self) -> gst::ClockTime { - self.0 + fn dts(&self) -> Option { + Some(self.0) } - fn duration(&self, _state: &StreamState) -> gst::ClockTime { + fn duration(&self, _state: &StreamState) -> Option { self.1 } @@ -203,55 +201,48 @@ impl HandleData for (gst::ClockTime, gst::ClockTime) { _state: &StreamState, segment: &gst::FormattedSegment, ) -> Option { - let stop = if self.1.is_some() { - self.0 + self.1 - } else { - self.0 - }; + let stop = self.0 + self.1.unwrap_or(gst::ClockTime::ZERO); - segment - .clip(self.0, stop) - .map(|(start, stop)| (start, stop - start)) + segment.clip(self.0, stop).map(|(start, stop)| { + let start = start.expect("provided a defined value"); + (start, stop.map(|stop| stop - start)) + }) } } impl HandleData for gst::Buffer { - fn pts(&self) -> gst::ClockTime { + fn pts(&self) -> Option { gst::BufferRef::pts(self) } - fn dts(&self) -> gst::ClockTime { + fn dts(&self) -> Option { gst::BufferRef::dts(self) } - fn duration(&self, state: &StreamState) -> gst::ClockTime { + fn duration(&self, state: &StreamState) -> Option { let duration = gst::BufferRef::duration(self); if duration.is_some() { duration } else if let Some(ref video_info) = state.video_info { if video_info.fps() != 0.into() { - gst::SECOND - .mul_div_floor( - *video_info.fps().denom() as u64, - *video_info.fps().numer() as u64, - ) - .unwrap_or(gst::CLOCK_TIME_NONE) + gst::ClockTime::SECOND.mul_div_floor( + *video_info.fps().denom() as u64, + *video_info.fps().numer() as u64, + ) } else { - gst::CLOCK_TIME_NONE + gst::ClockTime::NONE } } else if let Some(ref audio_info) = state.audio_info { if audio_info.bpf() == 0 || audio_info.rate() == 0 { - return gst::CLOCK_TIME_NONE; + return gst::ClockTime::NONE; } let size = self.size() as u64; let num_samples = size / audio_info.bpf() as u64; - gst::SECOND - .mul_div_floor(num_samples, audio_info.rate() as u64) - .unwrap_or(gst::CLOCK_TIME_NONE) + gst::ClockTime::SECOND.mul_div_floor(num_samples, audio_info.rate() as u64) } else { - gst::CLOCK_TIME_NONE + gst::ClockTime::NONE } } @@ -295,11 +286,7 @@ impl HandleData for gst::Buffer { let pts = HandleData::pts(&self); let duration = HandleData::duration(&self, state); - let stop = if duration.is_some() { - pts + duration - } else { - pts - }; + let stop = pts.map(|pts| pts + duration.unwrap_or(gst::ClockTime::ZERO)); if let Some(ref audio_info) = state.audio_info { gst_audio::audio_buffer_clip( @@ -313,7 +300,11 @@ impl HandleData for gst::Buffer { { let buffer = self.make_mut(); buffer.set_pts(start); - buffer.set_duration(stop - start); + buffer.set_duration( + stop.zip(start) + .and_then(|(stop, start)| stop.checked_sub(start)), + // FIXME we could expect here + ); } self @@ -354,48 +345,47 @@ impl ToggleRecord { ) -> Result, gst::FlowError> { let mut state = stream.state.lock(); - let mut dts_or_pts = data.dts_or_pts(); - let duration = data.duration(&state); - - if !dts_or_pts.is_some() { + let mut dts_or_pts = data.dts_or_pts().ok_or_else(|| { gst::element_error!( element, gst::StreamError::Format, ["Buffer without DTS or PTS"] ); - return Err(gst::FlowError::Error); - } + gst::FlowError::Error + })?; - let mut dts_or_pts_end = if duration.is_some() { - dts_or_pts + duration - } else { - dts_or_pts - }; + let mut dts_or_pts_end = dts_or_pts + data.duration(&state).unwrap_or(gst::ClockTime::ZERO); let data = match data.clip(&state, &state.in_segment) { + Some(data) => data, None => { gst_log!(CAT, obj: pad, "Dropping raw data outside segment"); return Ok(HandleResult::Drop); } - Some(data) => data, }; // This will only do anything for non-raw data - dts_or_pts = state.in_segment.start().max(dts_or_pts).unwrap(); - dts_or_pts_end = state.in_segment.start().max(dts_or_pts_end).unwrap(); - if state.in_segment.stop().is_some() { - dts_or_pts = state.in_segment.stop().min(dts_or_pts).unwrap(); - dts_or_pts_end = state.in_segment.stop().min(dts_or_pts_end).unwrap(); + // FIXME comment why we can unwrap + dts_or_pts = state.in_segment.start().unwrap().max(dts_or_pts); + dts_or_pts_end = state.in_segment.start().unwrap().max(dts_or_pts_end); + if let Some(stop) = state.in_segment.stop() { + dts_or_pts = stop.min(dts_or_pts); + dts_or_pts_end = stop.min(dts_or_pts_end); } let current_running_time = state.in_segment.to_running_time(dts_or_pts); let current_running_time_end = state.in_segment.to_running_time(dts_or_pts_end); state.current_running_time = current_running_time - .max(state.current_running_time) - .unwrap_or(current_running_time); + .zip(state.current_running_time) + .map(|(cur_rt, state_rt)| cur_rt.max(state_rt)) + .or(current_running_time); state.current_running_time_end = current_running_time_end - .max(state.current_running_time_end) - .unwrap_or(current_running_time_end); + .zip(state.current_running_time_end) + .map(|(cur_rt_end, state_rt_end)| cur_rt_end.max(state_rt_end)) + .or(current_running_time_end); + + // FIXME we should probably return if either current_running_time or current_running_time_end + // are None at this point // Wake up everybody, we advanced a bit // Important: They will only be able to advance once we're done with this @@ -407,10 +397,10 @@ impl ToggleRecord { CAT, obj: pad, "Main stream current running time {}-{} (position: {}-{})", - current_running_time, - current_running_time_end, + current_running_time.display(), + current_running_time_end.display(), dts_or_pts, - dts_or_pts_end + dts_or_pts_end, ); let settings = *self.settings.lock(); @@ -458,13 +448,17 @@ impl ToggleRecord { // Remember the time when we stopped: now, i.e. right before the current buffer! rec_state.last_recording_stop = current_running_time; + let last_recording_duration = rec_state + .last_recording_stop + .zip(rec_state.last_recording_start) + .and_then(|(stop, start)| stop.checked_sub(start)); gst_debug!( CAT, obj: pad, "Stopping at {}, started at {}, current duration {}, previous accumulated recording duration {}", - rec_state.last_recording_stop, - rec_state.last_recording_start, - rec_state.last_recording_stop - rec_state.last_recording_start, + rec_state.last_recording_stop.display(), + rec_state.last_recording_start.display(), + last_recording_duration.display(), rec_state.recording_duration ); @@ -477,8 +471,9 @@ impl ToggleRecord { && !self.other_streams.lock().0.iter().all(|s| { let s = s.state.lock(); s.eos - || (s.current_running_time.is_some() - && s.current_running_time >= current_running_time) + || s.current_running_time + .zip(current_running_time) + .map_or(false, |(s_cur_rt, cur_rt)| s_cur_rt >= cur_rt) }) { gst_log!(CAT, obj: pad, "Waiting for other streams to stop"); @@ -492,17 +487,17 @@ impl ToggleRecord { let mut rec_state = self.state.lock(); rec_state.recording_state = RecordingState::Stopped; - let advance_by = rec_state.last_recording_stop - rec_state.last_recording_start; - rec_state.recording_duration += advance_by; - rec_state.last_recording_start = gst::CLOCK_TIME_NONE; - rec_state.last_recording_stop = gst::CLOCK_TIME_NONE; + rec_state.recording_duration += + last_recording_duration.unwrap_or(gst::ClockTime::ZERO); + rec_state.last_recording_start = None; + rec_state.last_recording_stop = None; gst_debug!( CAT, obj: pad, "Stopped at {}, recording duration {}", - current_running_time, - rec_state.recording_duration + current_running_time.display(), + rec_state.recording_duration.display(), ); // Then become Stopped and drop this buffer. We always stop right before @@ -538,12 +533,17 @@ impl ToggleRecord { // Remember the time when we started: now! rec_state.last_recording_start = current_running_time; - rec_state.running_time_offset = current_running_time - rec_state.recording_duration; + rec_state.running_time_offset = + current_running_time.map_or(0, |current_running_time| { + current_running_time + .saturating_sub(rec_state.recording_duration) + .nseconds() + }) as i64; gst_debug!( CAT, obj: pad, "Starting at {}, previous accumulated recording duration {}", - current_running_time, + current_running_time.display(), rec_state.recording_duration, ); @@ -564,8 +564,9 @@ impl ToggleRecord { && !self.other_streams.lock().0.iter().all(|s| { let s = s.state.lock(); s.eos - || (s.current_running_time.is_some() - && s.current_running_time >= current_running_time) + || s.current_running_time + .zip(current_running_time) + .map_or(false, |(s_cur_rt, cur_rt)| s_cur_rt >= cur_rt) }) { gst_log!(CAT, obj: pad, "Waiting for other streams to start"); @@ -583,7 +584,7 @@ impl ToggleRecord { CAT, obj: pad, "Started at {}, recording duration {}", - current_running_time, + current_running_time.display(), rec_state.recording_duration ); @@ -608,16 +609,12 @@ impl ToggleRecord { // Calculate end pts & current running time and make sure we stay in the segment let mut state = stream.state.lock(); - let mut pts = data.pts(); - let duration = data.duration(&state); - - if pts.is_none() { + let mut pts = data.pts().ok_or_else(|| { gst::element_error!(element, gst::StreamError::Format, ["Buffer without PTS"]); - return Err(gst::FlowError::Error); - } + gst::FlowError::Error + })?; - let dts = data.dts(); - if dts.is_some() && pts.is_some() && dts != pts { + if data.dts().map_or(false, |dts| dts != pts) { gst::element_error!( element, gst::StreamError::Format, @@ -635,11 +632,7 @@ impl ToggleRecord { return Err(gst::FlowError::Error); } - let mut pts_end = if duration.is_some() { - pts + duration - } else { - pts - }; + let mut pts_end = pts + data.duration(&state).unwrap_or(gst::ClockTime::ZERO); let data = match data.clip(&state, &state.in_segment) { None => { @@ -650,28 +643,31 @@ impl ToggleRecord { }; // This will only do anything for non-raw data - pts = state.in_segment.start().max(pts).unwrap(); - pts_end = state.in_segment.start().max(pts_end).unwrap(); - if state.in_segment.stop().is_some() { - pts = state.in_segment.stop().min(pts).unwrap(); - pts_end = state.in_segment.stop().min(pts_end).unwrap(); + // FIXME comment why we can unwrap + pts = state.in_segment.start().unwrap().max(pts); + pts_end = state.in_segment.start().unwrap().max(pts_end); + if let Some(stop) = state.in_segment.stop() { + pts = stop.min(pts); + pts_end = stop.min(pts_end); } let current_running_time = state.in_segment.to_running_time(pts); let current_running_time_end = state.in_segment.to_running_time(pts_end); state.current_running_time = current_running_time - .max(state.current_running_time) - .unwrap_or(current_running_time); + .zip(state.current_running_time) + .map(|(cur_rt, state_rt)| cur_rt.max(state_rt)) + .or(current_running_time); state.current_running_time_end = current_running_time_end - .max(state.current_running_time_end) - .unwrap_or(current_running_time_end); + .zip(state.current_running_time_end) + .map(|(cur_rt_end, state_rt_end)| cur_rt_end.max(state_rt_end)) + .or(current_running_time_end); gst_log!( CAT, obj: pad, "Secondary stream current running time {}-{} (position: {}-{}", - current_running_time, - current_running_time_end, + current_running_time.display(), + current_running_time_end.display(), pts, pts_end ); @@ -697,13 +693,20 @@ impl ToggleRecord { while (main_state.current_running_time.is_none() || rec_state.recording_state != RecordingState::Starting && rec_state.recording_state != RecordingState::Stopping - && main_state.current_running_time_end < current_running_time_end + && main_state + .current_running_time_end + .zip(current_running_time_end) + .map_or(false, |(main_rt_end, cur_rt_end)| main_rt_end < cur_rt_end) || rec_state.recording_state == RecordingState::Starting - && (rec_state.last_recording_start.is_none() - || rec_state.last_recording_start <= current_running_time) + && rec_state + .last_recording_start + .map_or(true, |last_rec_start| { + current_running_time.map_or(false, |cur_rt| last_rec_start <= cur_rt) + }) || rec_state.recording_state == RecordingState::Stopping - && (rec_state.last_recording_stop.is_none() - || rec_state.last_recording_stop <= current_running_time)) + && rec_state.last_recording_stop.map_or(true, |last_rec_stop| { + current_running_time.map_or(false, |cur_rt| last_rec_stop <= cur_rt) + })) && !main_state.eos && !stream.state.lock().flushing { @@ -711,11 +714,11 @@ impl ToggleRecord { CAT, obj: pad, "Waiting at {}-{} in {:?} state, main stream at {}-{}", - current_running_time, - current_running_time_end, + current_running_time.display(), + current_running_time_end.display(), rec_state.recording_state, - main_state.current_running_time, - main_state.current_running_time_end + main_state.current_running_time.display(), + main_state.current_running_time_end.display(), ); drop(rec_state); @@ -742,9 +745,17 @@ impl ToggleRecord { &mut state, &mut rec_state, ))); - } else if data.can_clip(&*state) - && current_running_time < rec_state.last_recording_start - && current_running_time_end > rec_state.last_recording_start + } + + let last_recording_start = rec_state.last_recording_start.expect("recording started"); + + // FIXME it would help a lot if we could expect current_running_time + // and possibly current_running_time_end at some point. + + if data.can_clip(&*state) + && current_running_time.map_or(false, |cur_rt| cur_rt < last_recording_start) + && current_running_time_end + .map_or(false, |cur_rt_end| cur_rt_end > last_recording_start) { // Otherwise if we're before the recording start but the end of the buffer is after // the start and we can clip, clip the buffer and pass it onwards. @@ -752,9 +763,9 @@ impl ToggleRecord { CAT, obj: pad, "Main stream EOS and we're not EOS yet (overlapping recording start, {} < {} < {})", - current_running_time, - rec_state.last_recording_start, - current_running_time_end + current_running_time.display(), + last_recording_start, + current_running_time_end.display(), ); let mut clip_start = state @@ -773,7 +784,7 @@ impl ToggleRecord { segment.set_start(clip_start); segment.set_stop(clip_stop); - gst_log!(CAT, obj: pad, "Clipping to segment {:?}", segment,); + gst_log!(CAT, obj: pad, "Clipping to segment {:?}", segment); if let Some(data) = data.clip(&*state, &segment) { return Ok(HandleResult::Pass(data)); @@ -781,7 +792,7 @@ impl ToggleRecord { gst_warning!(CAT, obj: pad, "Complete buffer clipped!"); return Ok(HandleResult::Drop); } - } else if current_running_time < rec_state.last_recording_start { + } else if current_running_time.map_or(false, |cur_rt| cur_rt < last_recording_start) { // Otherwise if the buffer starts before the recording start, drop it. This // means that we either can't clip, or that the end is also before the // recording start @@ -789,13 +800,19 @@ impl ToggleRecord { CAT, obj: pad, "Main stream EOS and we're not EOS yet (before recording start, {} < {})", - current_running_time, - rec_state.last_recording_start + current_running_time.display(), + last_recording_start, ); return Ok(HandleResult::Drop); } else if data.can_clip(&*state) - && current_running_time < rec_state.last_recording_stop - && current_running_time_end > rec_state.last_recording_stop + && current_running_time + .zip(rec_state.last_recording_stop) + .map_or(false, |(cur_rt, last_rec_stop)| cur_rt < last_rec_stop) + && current_running_time_end + .zip(rec_state.last_recording_stop) + .map_or(false, |(cur_rt_end, last_rec_stop)| { + cur_rt_end > last_rec_stop + }) { // Similarly if the end is after the recording stop but the start is before and we // can clip, clip the buffer and pass it through. @@ -803,9 +820,9 @@ impl ToggleRecord { CAT, obj: pad, "Main stream EOS and we're not EOS yet (overlapping recording end, {} < {} < {})", - current_running_time, - rec_state.last_recording_stop, - current_running_time_end + current_running_time.display(), + rec_state.last_recording_stop.display(), + current_running_time_end.display(), ); let mut clip_start = state @@ -837,7 +854,12 @@ impl ToggleRecord { &mut rec_state, ))); } - } else if current_running_time_end > rec_state.last_recording_stop { + } else if current_running_time_end + .zip(rec_state.last_recording_stop) + .map_or(false, |(cur_rt_end, last_rec_stop)| { + cur_rt_end > last_rec_stop + }) + { // Otherwise if the end of the buffer is after the recording stop, we're EOS // now. This means that we either couldn't clip or that the start is also after // the recording stop @@ -845,8 +867,8 @@ impl ToggleRecord { CAT, obj: pad, "Main stream EOS and we're EOS too (after recording end, {} > {})", - current_running_time_end, - rec_state.last_recording_stop + current_running_time_end.display(), + rec_state.last_recording_stop.display(), ); return Ok(HandleResult::Eos(self.check_and_update_eos( pad, @@ -857,16 +879,19 @@ impl ToggleRecord { } else { // In all other cases the buffer is fully between recording start and end and // can be passed through as is - assert_ge!(current_running_time, rec_state.last_recording_start); - assert_le!(current_running_time_end, rec_state.last_recording_stop); + assert!(current_running_time.map_or(false, |cur_rt| cur_rt >= last_recording_start)); + assert!(current_running_time_end + .zip(rec_state.last_recording_stop) + .map_or(false, |(cur_rt_end, last_rec_stop)| cur_rt_end + <= last_rec_stop)); gst_debug!( CAT, obj: pad, "Main stream EOS and we're not EOS yet (before recording end, {} <= {} <= {})", - rec_state.last_recording_start, - current_running_time, - rec_state.last_recording_stop + last_recording_start, + current_running_time.display(), + rec_state.last_recording_stop.display(), ); return Ok(HandleResult::Pass(data)); } @@ -876,51 +901,61 @@ impl ToggleRecord { RecordingState::Recording => { // The end of our buffer must be before/at the end of the previous buffer of the main // stream - assert_le!( - current_running_time_end, - main_state.current_running_time_end - ); + assert!(current_running_time_end + .zip(main_state.current_running_time_end) + .map_or(false, |(cur_rt_end, main_cur_rt_end)| cur_rt_end + <= main_cur_rt_end)); // We're properly started, must have a start position and // be actually after that start position - assert!(rec_state.last_recording_start.is_some()); - assert_ge!(current_running_time, rec_state.last_recording_start); + assert!(current_running_time + .zip(rec_state.last_recording_start) + .map_or(false, |(cur_rt, last_rec_start)| cur_rt >= last_rec_start)); gst_log!(CAT, obj: pad, "Passing buffer (recording)"); Ok(HandleResult::Pass(data)) } RecordingState::Stopping => { + // If we have no start position yet, the main stream is waiting for a key-frame + let last_recording_stop = match rec_state.last_recording_stop { + Some(last_recording_stop) => last_recording_stop, + None => { + gst_log!( + CAT, + obj: pad, + "Passing buffer (stopping: waiting for keyframe)", + ); + return Ok(HandleResult::Pass(data)); + } + }; + // The start of our buffer must be before the last recording stop as // otherwise we would be in Stopped state already - assert_lt!(current_running_time, rec_state.last_recording_stop); + assert!(current_running_time.map_or(false, |cur_rt| cur_rt < last_recording_stop)); + let current_running_time = current_running_time.expect("checked above"); - // If we have no start position yet, the main stream is waiting for a key-frame - if rec_state.last_recording_stop.is_none() { - gst_log!( - CAT, - obj: pad, - "Passing buffer (stopping: waiting for keyframe)", - ); - Ok(HandleResult::Pass(data)) - } else if current_running_time_end <= rec_state.last_recording_stop { + if current_running_time_end + .map_or(false, |cur_rt_end| cur_rt_end <= last_recording_stop) + { gst_log!( CAT, obj: pad, "Passing buffer (stopping: {} <= {})", - current_running_time_end, - rec_state.last_recording_stop + current_running_time_end.display(), + last_recording_stop, ); Ok(HandleResult::Pass(data)) } else if data.can_clip(&*state) - && current_running_time < rec_state.last_recording_stop - && current_running_time_end > rec_state.last_recording_stop + && current_running_time < last_recording_stop + && current_running_time_end + .map_or(false, |cur_rt_end| cur_rt_end > last_recording_stop) { gst_log!( CAT, obj: pad, "Passing buffer (stopping: {} < {} < {})", current_running_time, - rec_state.last_recording_stop, - current_running_time_end, + last_recording_stop, + current_running_time_end.display(), ); let mut clip_stop = state @@ -945,8 +980,8 @@ impl ToggleRecord { CAT, obj: pad, "Dropping buffer (stopping: {} > {})", - current_running_time_end, - rec_state.last_recording_stop + current_running_time_end.display(), + rec_state.last_recording_stop.display(), ); Ok(HandleResult::Drop) } @@ -954,10 +989,10 @@ impl ToggleRecord { RecordingState::Stopped => { // The end of our buffer must be before/at the end of the previous buffer of the main // stream - assert_le!( - current_running_time_end, - main_state.current_running_time_end - ); + assert!(current_running_time_end + .zip(main_state.current_running_time_end) + .map_or(false, |(cur_rt_end, state_rt_end)| cur_rt_end + <= state_rt_end)); // We're properly stopped gst_log!(CAT, obj: pad, "Dropping buffer (stopped)"); @@ -965,38 +1000,44 @@ impl ToggleRecord { } RecordingState::Starting => { // If we have no start position yet, the main stream is waiting for a key-frame - if rec_state.last_recording_start.is_none() { - gst_log!( - CAT, - obj: pad, - "Dropping buffer (starting: waiting for keyframe)", - ); - return Ok(HandleResult::Drop); - } + let last_recording_start = match rec_state.last_recording_start { + Some(last_recording_start) => last_recording_start, + None => { + gst_log!( + CAT, + obj: pad, + "Dropping buffer (starting: waiting for keyframe)", + ); + return Ok(HandleResult::Drop); + } + }; // The start of our buffer must be before the last recording start as // otherwise we would be in Recording state already - assert_lt!(current_running_time, rec_state.last_recording_start); - if current_running_time >= rec_state.last_recording_start { + assert!(current_running_time.map_or(false, |cur_rt| cur_rt < last_recording_start)); + let current_running_time = current_running_time.expect("checked_above"); + + if current_running_time >= last_recording_start { gst_log!( CAT, obj: pad, "Passing buffer (starting: {} >= {})", current_running_time, - rec_state.last_recording_start + last_recording_start, ); Ok(HandleResult::Pass(data)) } else if data.can_clip(&*state) - && current_running_time < rec_state.last_recording_start - && current_running_time_end > rec_state.last_recording_start + && current_running_time < last_recording_start + && current_running_time_end + .map_or(false, |cur_rt_end| cur_rt_end > last_recording_start) { gst_log!( CAT, obj: pad, "Passing buffer (starting: {} < {} < {})", current_running_time, - rec_state.last_recording_start, - current_running_time_end, + last_recording_start, + current_running_time_end.display(), ); let mut clip_start = state @@ -1008,7 +1049,7 @@ impl ToggleRecord { let mut segment = state.in_segment.clone(); segment.set_start(clip_start); - gst_log!(CAT, obj: pad, "Clipping to segment {:?}", segment,); + gst_log!(CAT, obj: pad, "Clipping to segment {:?}", segment); if let Some(data) = data.clip(&*state, &segment) { Ok(HandleResult::Pass(data)) @@ -1022,7 +1063,7 @@ impl ToggleRecord { obj: pad, "Dropping buffer (starting: {} < {})", current_running_time, - rec_state.last_recording_start + last_recording_start, ); Ok(HandleResult::Drop) } @@ -1151,10 +1192,9 @@ impl ToggleRecord { // recording_duration state.out_segment = state.in_segment.clone(); - let offset = rec_state.running_time_offset.unwrap_or(0); state .out_segment - .offset_running_time(-(offset as i64)) + .offset_running_time(-rec_state.running_time_offset) .expect("Adjusting record duration"); events.push( gst::event::Segment::builder(&state.out_segment) @@ -1187,8 +1227,8 @@ impl ToggleRecord { CAT, obj: pad, "Pushing buffer with running time {}: {:?}", - out_running_time, - buffer + out_running_time.display(), + buffer, ); stream.srcpad.push(buffer) } @@ -1238,8 +1278,8 @@ impl ToggleRecord { state.flushing = false; state.segment_pending = true; state.discont_pending = true; - state.current_running_time = gst::CLOCK_TIME_NONE; - state.current_running_time_end = gst::CLOCK_TIME_NONE; + state.current_running_time = None; + state.current_running_time_end = None; } EventView::Caps(c) => { let mut state = stream.state.lock(); @@ -1288,8 +1328,8 @@ impl ToggleRecord { state.in_segment = segment; state.segment_seqnum = event.seqnum(); state.segment_pending = true; - state.current_running_time = gst::CLOCK_TIME_NONE; - state.current_running_time_end = gst::CLOCK_TIME_NONE; + state.current_running_time = None; + state.current_running_time_end = None; gst_debug!(CAT, obj: pad, "Got new Segment {:?}", state.in_segment); @@ -1305,14 +1345,19 @@ impl ToggleRecord { }; forward = match handle_result { - Ok(HandleResult::Pass((new_pts, new_duration))) if new_pts.is_some() => { - if new_pts != pts || new_duration != duration { - event = gst::event::Gap::new(new_pts, new_duration); + Ok(HandleResult::Pass((new_pts, new_duration))) => { + if new_pts != pts + || new_duration + .zip(duration) + .map_or(false, |(new_duration, duration)| new_duration != duration) + { + event = gst::event::Gap::builder(new_pts) + .duration(new_duration) + .build(); } true } - Ok(_) => false, - Err(_) => false, + _ => false, }; } EventView::Eos(..) => { @@ -1449,11 +1494,10 @@ impl ToggleRecord { let forward = !matches!(event.view(), EventView::Seek(..)); let rec_state = self.state.lock(); - let running_time_offset = rec_state.running_time_offset.unwrap_or(0) as i64; let offset = event.running_time_offset(); event .make_mut() - .set_running_time_offset(offset + running_time_offset); + .set_running_time_offset(offset + rec_state.running_time_offset); drop(rec_state); if forward { @@ -1530,20 +1574,26 @@ impl ToggleRecord { if rec_state.recording_state == RecordingState::Recording || rec_state.recording_state == RecordingState::Stopping { - gst_debug!( - CAT, - obj: pad, - "Returning position {} = {} - ({} + {})", - recording_duration - + (state.current_running_time_end - rec_state.last_recording_start), - recording_duration, - state.current_running_time_end, - rec_state.last_recording_start - ); - recording_duration += - state.current_running_time_end - rec_state.last_recording_start; + if let Some(delta) = state + .current_running_time_end + .zip(rec_state.last_recording_start) + .and_then(|(cur_rt_end, last_rec_start)| { + cur_rt_end.checked_sub(last_rec_start) + }) + { + gst_debug!( + CAT, + obj: pad, + "Returning position {} = {} - ({} + {})", + recording_duration + delta, + recording_duration, + state.current_running_time_end.display(), + rec_state.last_recording_start.display(), + ); + recording_duration += delta; + } } else { - gst_debug!(CAT, obj: pad, "Returning position {}", recording_duration,); + gst_debug!(CAT, obj: pad, "Returning position {}", recording_duration); } q.set(recording_duration); true @@ -1559,20 +1609,26 @@ impl ToggleRecord { if rec_state.recording_state == RecordingState::Recording || rec_state.recording_state == RecordingState::Stopping { - gst_debug!( - CAT, - obj: pad, - "Returning duration {} = {} - ({} + {})", - recording_duration - + (state.current_running_time_end - rec_state.last_recording_start), - recording_duration, - state.current_running_time_end, - rec_state.last_recording_start - ); - recording_duration += - state.current_running_time_end - rec_state.last_recording_start; + if let Some(delta) = state + .current_running_time_end + .zip(rec_state.last_recording_start) + .and_then(|(cur_rt_end, last_rec_start)| { + cur_rt_end.checked_sub(last_rec_start) + }) + { + gst_debug!( + CAT, + obj: pad, + "Returning duration {} = {} - ({} + {})", + recording_duration + delta, + recording_duration, + state.current_running_time_end.display(), + rec_state.last_recording_start.display(), + ); + recording_duration += delta; + } } else { - gst_debug!(CAT, obj: pad, "Returning duration {}", recording_duration,); + gst_debug!(CAT, obj: pad, "Returning duration {}", recording_duration); } q.set(recording_duration); true diff --git a/utils/togglerecord/tests/tests.rs b/utils/togglerecord/tests/tests.rs index 98db0554..999fe49a 100644 --- a/utils/togglerecord/tests/tests.rs +++ b/utils/togglerecord/tests/tests.rs @@ -153,8 +153,8 @@ fn setup_sender_receiver( let mut buffer = buffer.clone(); { let buffer = buffer.make_mut(); - buffer.set_pts(offset + i * 20 * gst::MSECOND); - buffer.set_duration(20 * gst::MSECOND); + buffer.set_pts(offset + i * 20 * gst::ClockTime::MSECOND); + buffer.set_duration(20 * gst::ClockTime::MSECOND); } let _ = sinkpad.chain(buffer); i += 1; @@ -166,8 +166,11 @@ fn setup_sender_receiver( buffer .get_mut() .unwrap() - .set_pts(offset + i * 20 * gst::MSECOND); - buffer.get_mut().unwrap().set_duration(20 * gst::MSECOND); + .set_pts(offset + i * 20 * gst::ClockTime::MSECOND); + buffer + .get_mut() + .unwrap() + .set_duration(20 * gst::ClockTime::MSECOND); buffer .get_mut() .unwrap() @@ -178,8 +181,10 @@ fn setup_sender_receiver( } SendData::Gaps(n) => { for _ in 0..n { - let event = - gst::event::Gap::new(offset + i * 20 * gst::MSECOND, 20 * gst::MSECOND); + let event = gst::event::Gap::new( + offset + i * 20 * gst::ClockTime::MSECOND, + 20 * gst::ClockTime::MSECOND, + ); let _ = sinkpad.send_event(event); i += 1; } @@ -204,7 +209,14 @@ fn recv_buffers( receiver_output: &mpsc::Receiver>, segment: &mut gst::FormattedSegment, wait_buffers: usize, -) -> (Vec<(gst::ClockTime, gst::ClockTime, gst::ClockTime)>, bool) { +) -> ( + Vec<( + Option, + Option, + Option, + )>, + bool, +) { let mut res = Vec::new(); let mut n_buffers = 0; let mut saw_eos = false; @@ -228,7 +240,7 @@ fn recv_buffers( EventView::Gap(ref e) => { let (ts, duration) = e.get(); - res.push((segment.to_running_time(ts), ts, duration)); + res.push((segment.to_running_time(ts), Some(ts), duration)); n_buffers += 1; if wait_buffers > 0 && n_buffers == wait_buffers { return (res, saw_eos); @@ -281,7 +293,7 @@ fn test_one_stream_open() { pipeline.add(&togglerecord).unwrap(); let (sender_input, _, receiver_output, thread) = - setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); pipeline.set_state(gst::State::Playing).unwrap(); @@ -294,9 +306,9 @@ fn test_one_stream_open() { assert_eq!(buffers.len(), 10); for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() { let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } thread.join().unwrap(); @@ -313,7 +325,7 @@ fn test_one_stream_gaps_open() { pipeline.add(&togglerecord).unwrap(); let (sender_input, _, receiver_output, thread) = - setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); pipeline.set_state(gst::State::Playing).unwrap(); @@ -327,9 +339,9 @@ fn test_one_stream_gaps_open() { assert_eq!(buffers.len(), 10); for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() { let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } thread.join().unwrap(); @@ -346,7 +358,7 @@ fn test_one_stream_close_open() { pipeline.add(&togglerecord).unwrap(); let (sender_input, receiver_input_done, receiver_output, thread) = - setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); pipeline.set_state(gst::State::Playing).unwrap(); @@ -361,9 +373,9 @@ fn test_one_stream_close_open() { assert_eq!(buffers.len(), 10); for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() { let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, (10 + index) * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), (10 + index) * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } thread.join().unwrap(); @@ -380,7 +392,7 @@ fn test_one_stream_open_close() { pipeline.add(&togglerecord).unwrap(); let (sender_input, receiver_input_done, receiver_output, thread) = - setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); pipeline.set_state(gst::State::Playing).unwrap(); @@ -396,9 +408,9 @@ fn test_one_stream_open_close() { assert_eq!(buffers.len(), 10); for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() { let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } thread.join().unwrap(); @@ -415,7 +427,7 @@ fn test_one_stream_open_close_open() { pipeline.add(&togglerecord).unwrap(); let (sender_input, receiver_input_done, receiver_output, thread) = - setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); pipeline.set_state(gst::State::Playing).unwrap(); @@ -434,15 +446,15 @@ fn test_one_stream_open_close_open() { assert_eq!(buffers.len(), 20); for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() { let pts_off = if index >= 10 { - 10 * 20 * gst::MSECOND + 10 * 20 * gst::ClockTime::MSECOND } else { - 0.into() + gst::ClockTime::ZERO }; let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), pts_off + index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } thread.join().unwrap(); @@ -459,9 +471,9 @@ fn test_two_stream_open() { pipeline.add(&togglerecord).unwrap(); let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = - setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); pipeline.set_state(gst::State::Playing).unwrap(); @@ -480,9 +492,9 @@ fn test_two_stream_open() { let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0); for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_1.len(), 10); @@ -491,9 +503,9 @@ fn test_two_stream_open() { let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0); for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_2.len(), 10); @@ -512,9 +524,14 @@ fn test_two_stream_open_shift() { pipeline.add(&togglerecord).unwrap(); let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = - setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 5 * gst::MSECOND); + setup_sender_receiver( + &pipeline, + &togglerecord, + "src_%u", + 5 * gst::ClockTime::MSECOND, + ); pipeline.set_state(gst::State::Playing).unwrap(); @@ -533,9 +550,9 @@ fn test_two_stream_open_shift() { let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0); for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_1.len(), 10); @@ -544,12 +561,18 @@ fn test_two_stream_open_shift() { let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0); for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let index = index as u64; - assert_eq!(running_time, 5 * gst::MSECOND + index * 20 * gst::MSECOND); - assert_eq!(pts, 5 * gst::MSECOND + index * 20 * gst::MSECOND); + assert_eq!( + running_time.unwrap(), + 5 * gst::ClockTime::MSECOND + index * 20 * gst::ClockTime::MSECOND + ); + assert_eq!( + pts.unwrap(), + 5 * gst::ClockTime::MSECOND + index * 20 * gst::ClockTime::MSECOND + ); if index == 9 { - assert_eq!(duration, 15 * gst::MSECOND); + assert_eq!(duration.unwrap(), 15 * gst::ClockTime::MSECOND); } else { - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } } assert_eq!(buffers_2.len(), 10); @@ -569,9 +592,9 @@ fn test_two_stream_open_shift_main() { pipeline.add(&togglerecord).unwrap(); let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = - setup_sender_receiver(&pipeline, &togglerecord, "src", 5 * gst::MSECOND); + setup_sender_receiver(&pipeline, &togglerecord, "src", 5 * gst::ClockTime::MSECOND); let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); pipeline.set_state(gst::State::Playing).unwrap(); @@ -591,9 +614,12 @@ fn test_two_stream_open_shift_main() { let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0); for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, 5 * gst::MSECOND + index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!( + pts.unwrap(), + 5 * gst::ClockTime::MSECOND + index * 20 * gst::ClockTime::MSECOND + ); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_1.len(), 10); @@ -604,17 +630,26 @@ fn test_two_stream_open_shift_main() { for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let index = index as u64; if index == 0 { - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, 5 * gst::MSECOND + index * 20 * gst::MSECOND); - assert_eq!(duration, 15 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!( + pts.unwrap(), + 5 * gst::ClockTime::MSECOND + index * 20 * gst::ClockTime::MSECOND + ); + assert_eq!(duration.unwrap(), 15 * gst::ClockTime::MSECOND); } else if index == 10 { - assert_eq!(running_time, index * 20 * gst::MSECOND - 5 * gst::MSECOND); - assert_eq!(pts, index * 20 * gst::MSECOND); - assert_eq!(duration, 5 * gst::MSECOND); + assert_eq!( + running_time.unwrap(), + index * 20 * gst::ClockTime::MSECOND - 5 * gst::ClockTime::MSECOND + ); + assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 5 * gst::ClockTime::MSECOND); } else { - assert_eq!(running_time, index * 20 * gst::MSECOND - 5 * gst::MSECOND); - assert_eq!(pts, index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!( + running_time.unwrap(), + index * 20 * gst::ClockTime::MSECOND - 5 * gst::ClockTime::MSECOND + ); + assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } } assert_eq!(buffers_2.len(), 11); @@ -634,9 +669,9 @@ fn test_two_stream_open_close() { pipeline.add(&togglerecord).unwrap(); let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = - setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); pipeline.set_state(gst::State::Playing).unwrap(); @@ -671,9 +706,9 @@ fn test_two_stream_open_close() { let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0); for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_1.len(), 10); @@ -682,9 +717,9 @@ fn test_two_stream_open_close() { let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0); for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_2.len(), 10); @@ -703,9 +738,9 @@ fn test_two_stream_close_open() { pipeline.add(&togglerecord).unwrap(); let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = - setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); pipeline.set_state(gst::State::Playing).unwrap(); @@ -740,9 +775,9 @@ fn test_two_stream_close_open() { let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0); for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, (10 + index) * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), (10 + index) * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_1.len(), 10); @@ -751,9 +786,9 @@ fn test_two_stream_close_open() { let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0); for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, (10 + index) * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), (10 + index) * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_2.len(), 10); @@ -772,9 +807,9 @@ fn test_two_stream_open_close_open() { pipeline.add(&togglerecord).unwrap(); let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = - setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); pipeline.set_state(gst::State::Playing).unwrap(); @@ -822,15 +857,15 @@ fn test_two_stream_open_close_open() { let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0); for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let pts_off = if index >= 10 { - 10 * 20 * gst::MSECOND + 10 * 20 * gst::ClockTime::MSECOND } else { - 0.into() + gst::ClockTime::ZERO }; let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), pts_off + index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_1.len(), 20); @@ -839,15 +874,15 @@ fn test_two_stream_open_close_open() { let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0); for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let pts_off = if index >= 10 { - 10 * 20 * gst::MSECOND + 10 * 20 * gst::ClockTime::MSECOND } else { - 0.into() + gst::ClockTime::ZERO }; let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), pts_off + index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_2.len(), 20); @@ -866,9 +901,9 @@ fn test_two_stream_open_close_open_gaps() { pipeline.add(&togglerecord).unwrap(); let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = - setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); pipeline.set_state(gst::State::Playing).unwrap(); @@ -922,15 +957,15 @@ fn test_two_stream_open_close_open_gaps() { let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0); for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let pts_off = if index >= 10 { - 10 * 20 * gst::MSECOND + 10 * 20 * gst::ClockTime::MSECOND } else { - 0.into() + gst::ClockTime::ZERO }; let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), pts_off + index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_1.len(), 20); @@ -939,15 +974,15 @@ fn test_two_stream_open_close_open_gaps() { let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0); for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let pts_off = if index >= 10 { - 10 * 20 * gst::MSECOND + 10 * 20 * gst::ClockTime::MSECOND } else { - 0.into() + gst::ClockTime::ZERO }; let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), pts_off + index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_2.len(), 20); @@ -966,9 +1001,9 @@ fn test_two_stream_close_open_close_delta() { pipeline.add(&togglerecord).unwrap(); let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = - setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); pipeline.set_state(gst::State::Playing).unwrap(); @@ -1023,9 +1058,9 @@ fn test_two_stream_close_open_close_delta() { let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0); for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, (11 + index) * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), (11 + index) * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_1.len(), 10); @@ -1034,9 +1069,9 @@ fn test_two_stream_close_open_close_delta() { let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0); for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, (11 + index) * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), (11 + index) * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_2.len(), 10); @@ -1055,11 +1090,11 @@ fn test_three_stream_open_close_open() { pipeline.add(&togglerecord).unwrap(); let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = - setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); pipeline.set_state(gst::State::Playing).unwrap(); @@ -1118,15 +1153,15 @@ fn test_three_stream_open_close_open() { let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0); for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let pts_off = if index >= 10 { - 10 * 20 * gst::MSECOND + 10 * 20 * gst::ClockTime::MSECOND } else { - 0.into() + gst::ClockTime::ZERO }; let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), pts_off + index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_1.len(), 20); @@ -1135,15 +1170,15 @@ fn test_three_stream_open_close_open() { let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0); for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let pts_off = if index >= 10 { - 10 * 20 * gst::MSECOND + 10 * 20 * gst::ClockTime::MSECOND } else { - 0.into() + gst::ClockTime::ZERO }; let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), pts_off + index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_2.len(), 20); @@ -1151,15 +1186,15 @@ fn test_three_stream_open_close_open() { let (buffers_3, _) = recv_buffers(&receiver_output_3, &mut segment_3, 0); for (index, &(running_time, pts, duration)) in buffers_3.iter().enumerate() { let pts_off = if index >= 10 { - 10 * 20 * gst::MSECOND + 10 * 20 * gst::ClockTime::MSECOND } else { - 0.into() + gst::ClockTime::ZERO }; let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), pts_off + index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_3.len(), 20); @@ -1179,9 +1214,9 @@ fn test_two_stream_main_eos() { pipeline.add(&togglerecord).unwrap(); let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = - setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); pipeline.set_state(gst::State::Playing).unwrap(); @@ -1228,9 +1263,9 @@ fn test_two_stream_main_eos() { let (buffers_1, saw_eos) = recv_buffers(&receiver_output_1, &mut segment_1, 0); for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_1.len(), 10); assert_eq!(saw_eos, true); @@ -1240,9 +1275,9 @@ fn test_two_stream_main_eos() { let (buffers_2, saw_eos) = recv_buffers(&receiver_output_2, &mut segment_2, 0); for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_2.len(), 10); assert_eq!(saw_eos, true); @@ -1262,9 +1297,9 @@ fn test_two_stream_secondary_eos_first() { pipeline.add(&togglerecord).unwrap(); let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = - setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); pipeline.set_state(gst::State::Playing).unwrap(); @@ -1303,9 +1338,9 @@ fn test_two_stream_secondary_eos_first() { let (buffers_1, saw_eos) = recv_buffers(&receiver_output_1, &mut segment_1, 0); for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_1.len(), 10); assert_eq!(saw_eos, true); @@ -1316,9 +1351,9 @@ fn test_two_stream_secondary_eos_first() { let (buffers_2, saw_eos) = recv_buffers(&receiver_output_2, &mut segment_2, 0); for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_2.len(), 9); assert_eq!(saw_eos, true); @@ -1338,11 +1373,11 @@ fn test_three_stream_main_eos() { pipeline.add(&togglerecord).unwrap(); let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = - setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); pipeline.set_state(gst::State::Playing).unwrap(); @@ -1405,9 +1440,9 @@ fn test_three_stream_main_eos() { let (buffers_1, saw_eos) = recv_buffers(&receiver_output_1, &mut segment_1, 0); for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_1.len(), 10); assert_eq!(saw_eos, true); @@ -1417,9 +1452,9 @@ fn test_three_stream_main_eos() { let (buffers_2, saw_eos) = recv_buffers(&receiver_output_2, &mut segment_2, 0); for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_2.len(), 10); assert_eq!(saw_eos, true); @@ -1428,9 +1463,9 @@ fn test_three_stream_main_eos() { let (buffers_3, saw_eos) = recv_buffers(&receiver_output_3, &mut segment_3, 0); for (index, &(running_time, pts, duration)) in buffers_3.iter().enumerate() { let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_3.len(), 10); assert_eq!(saw_eos, true); @@ -1451,11 +1486,11 @@ fn test_three_stream_main_and_second_eos() { pipeline.add(&togglerecord).unwrap(); let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = - setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); pipeline.set_state(gst::State::Playing).unwrap(); @@ -1517,9 +1552,9 @@ fn test_three_stream_main_and_second_eos() { let (buffers_1, saw_eos) = recv_buffers(&receiver_output_1, &mut segment_1, 0); for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_1.len(), 10); assert_eq!(saw_eos, true); @@ -1529,9 +1564,9 @@ fn test_three_stream_main_and_second_eos() { let (buffers_2, saw_eos) = recv_buffers(&receiver_output_2, &mut segment_2, 0); for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_2.len(), 9); assert_eq!(saw_eos, true); @@ -1541,9 +1576,9 @@ fn test_three_stream_main_and_second_eos() { let (buffers_3, saw_eos) = recv_buffers(&receiver_output_3, &mut segment_3, 0); for (index, &(running_time, pts, duration)) in buffers_3.iter().enumerate() { let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_3.len(), 10); assert_eq!(saw_eos, true); @@ -1564,11 +1599,11 @@ fn test_three_stream_secondary_eos_first() { pipeline.add(&togglerecord).unwrap(); let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = - setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO); let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) = - setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO); pipeline.set_state(gst::State::Playing).unwrap(); @@ -1612,9 +1647,9 @@ fn test_three_stream_secondary_eos_first() { let (buffers_1, saw_eos) = recv_buffers(&receiver_output_1, &mut segment_1, 0); for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_1.len(), 10); assert_eq!(saw_eos, true); @@ -1624,9 +1659,9 @@ fn test_three_stream_secondary_eos_first() { let (buffers_2, saw_eos) = recv_buffers(&receiver_output_2, &mut segment_2, 0); for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_2.len(), 9); assert_eq!(saw_eos, true); @@ -1635,9 +1670,9 @@ fn test_three_stream_secondary_eos_first() { let (buffers_3, saw_eos) = recv_buffers(&receiver_output_3, &mut segment_3, 0); for (index, &(running_time, pts, duration)) in buffers_3.iter().enumerate() { let index = index as u64; - assert_eq!(running_time, index * 20 * gst::MSECOND); - assert_eq!(pts, index * 20 * gst::MSECOND); - assert_eq!(duration, 20 * gst::MSECOND); + assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND); + assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND); } assert_eq!(buffers_3.len(), 9); assert_eq!(saw_eos, true);