utils: migrate to new ClockTime design

This commit is contained in:
François Laignel 2021-05-28 18:35:28 +02:00
parent b8ad30610b
commit c2de0649a7
17 changed files with 933 additions and 794 deletions

View file

@ -42,7 +42,7 @@ fn create_pipeline() -> (gst::Pipeline, gst::Pad, gst::Element, gtk::Widget) {
let fallbackswitch = gst::ElementFactory::make("fallbackswitch", None).unwrap();
fallbackswitch
.set_property("timeout", &gst::SECOND)
.set_property("timeout", &gst::ClockTime::SECOND)
.unwrap();
let decodebin = gst::ElementFactory::make("decodebin", None).unwrap();
@ -134,7 +134,7 @@ fn create_ui(app: &gtk::Application) {
let position = video_sink
.query_position::<gst::ClockTime>()
.unwrap_or_else(|| 0.into());
.unwrap_or(gst::ClockTime::ZERO);
position_label.set_text(&format!("Position: {:.1}", position));
glib::Continue(true)

View file

@ -1,19 +1,15 @@
// Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com>
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
// Take a look at the license at the top of the repository in the LICENSE file.
use super::ffi;
use super::Aggregator;
use glib::signal::{connect_raw, SignalHandlerId};
use glib::translate::*;
use glib::IsA;
use glib::Value;
use gst::glib;
use gst::prelude::*;
use std::boxed::Box as Box_;
use std::mem;
use std::ptr;
@ -26,7 +22,8 @@ pub trait AggregatorExtManual: 'static {
fn set_min_upstream_latency(&self, min_upstream_latency: gst::ClockTime);
fn connect_property_min_upstream_latency_notify<F: Fn(&Self) + Send + Sync + 'static>(
#[doc(alias = "min-upstream-latency")]
fn connect_min_upstream_latency_notify<F: Fn(&Self) + Send + Sync + 'static>(
&self,
f: F,
) -> SignalHandlerId;
@ -47,13 +44,12 @@ impl<O: IsA<Aggregator>> AggregatorExtManual for O {
}
fn finish_buffer(&self, buffer: gst::Buffer) -> Result<gst::FlowSuccess, gst::FlowError> {
let ret: gst::FlowReturn = unsafe {
from_glib(ffi::gst_aggregator_finish_buffer(
unsafe {
try_from_glib(ffi::gst_aggregator_finish_buffer(
self.as_ref().to_glib_none().0,
buffer.into_ptr(),
))
};
ret.into_result()
}
}
fn min_upstream_latency(&self) -> gst::ClockTime {
@ -80,7 +76,7 @@ impl<O: IsA<Aggregator>> AggregatorExtManual for O {
}
}
fn connect_property_min_upstream_latency_notify<F: Fn(&Self) + Send + Sync + 'static>(
fn connect_min_upstream_latency_notify<F: Fn(&Self) + Send + Sync + 'static>(
&self,
f: F,
) -> SignalHandlerId {
@ -89,8 +85,8 @@ impl<O: IsA<Aggregator>> AggregatorExtManual for O {
connect_raw(
self.as_ptr() as *mut _,
b"notify::min-upstream-latency\0".as_ptr() as *const _,
Some(mem::transmute(
notify_min_upstream_latency_trampoline::<Self, F> as usize,
Some(mem::transmute::<_, unsafe extern "C" fn()>(
notify_min_upstream_latency_trampoline::<Self, F> as *const (),
)),
Box_::into_raw(f),
)

View file

@ -1,18 +1,14 @@
// Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com>
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
// Take a look at the license at the top of the repository in the LICENSE file.
use super::ffi;
use super::AggregatorPad;
use glib::object::IsA;
use glib::translate::*;
use gst::glib;
pub trait AggregatorPadExtManual: 'static {
#[doc(alias = "get_segment")]
fn segment(&self) -> gst::Segment;
}

View file

@ -4,13 +4,14 @@
// DO NOT EDIT
use super::super::ffi;
use glib::object::Cast;
use glib::object::IsA;
use glib::signal::connect_raw;
use glib::signal::SignalHandlerId;
use glib::translate::*;
use glib::StaticType;
use gst::glib;
use gst::prelude::*;
use std::boxed::Box as Box_;
use std::mem::transmute;
@ -29,38 +30,45 @@ pub const NONE_AGGREGATOR: Option<&Aggregator> = None;
pub trait AggregatorExt: 'static {
//#[doc(alias = "gst_aggregator_get_allocator")]
//#[doc(alias = "get_allocator")]
//fn allocator(&self, allocator: /*Ignored*/Option<gst::Allocator>, params: /*Ignored*/gst::AllocationParams);
#[doc(alias = "gst_aggregator_get_buffer_pool")]
#[doc(alias = "get_buffer_pool")]
fn buffer_pool(&self) -> Option<gst::BufferPool>;
#[doc(alias = "gst_aggregator_get_latency")]
fn latency(&self) -> gst::ClockTime;
#[doc(alias = "get_latency")]
fn latency(&self) -> Option<gst::ClockTime>;
#[doc(alias = "gst_aggregator_negotiate")]
fn negotiate(&self) -> bool;
#[doc(alias = "gst_aggregator_set_latency")]
fn set_latency(&self, min_latency: gst::ClockTime, max_latency: gst::ClockTime);
fn set_latency(
&self,
min_latency: gst::ClockTime,
max_latency: impl Into<Option<gst::ClockTime>>,
);
#[doc(alias = "gst_aggregator_set_src_caps")]
fn set_src_caps(&self, caps: &gst::Caps);
#[doc(alias = "gst_aggregator_simple_get_next_time")]
fn simple_get_next_time(&self) -> gst::ClockTime;
fn simple_get_next_time(&self) -> Option<gst::ClockTime>;
#[doc(alias = "get_property_start_time")]
#[doc(alias = "start-time")]
fn start_time(&self) -> u64;
#[doc(alias = "set_property_start_time")]
#[doc(alias = "start-time")]
fn set_start_time(&self, start_time: u64);
fn connect_property_latency_notify<F: Fn(&Self) + Send + Sync + 'static>(
&self,
f: F,
) -> SignalHandlerId;
#[doc(alias = "latency")]
fn connect_latency_notify<F: Fn(&Self) + Send + Sync + 'static>(&self, f: F)
-> SignalHandlerId;
fn connect_property_start_time_notify<F: Fn(&Self) + Send + Sync + 'static>(
#[doc(alias = "start-time")]
fn connect_start_time_notify<F: Fn(&Self) + Send + Sync + 'static>(
&self,
f: F,
) -> SignalHandlerId;
@ -79,7 +87,7 @@ impl<O: IsA<Aggregator>> AggregatorExt for O {
}
}
fn latency(&self) -> gst::ClockTime {
fn latency(&self) -> Option<gst::ClockTime> {
unsafe {
from_glib(ffi::gst_aggregator_get_latency(
self.as_ref().to_glib_none().0,
@ -95,12 +103,16 @@ impl<O: IsA<Aggregator>> AggregatorExt for O {
}
}
fn set_latency(&self, min_latency: gst::ClockTime, max_latency: gst::ClockTime) {
fn set_latency(
&self,
min_latency: gst::ClockTime,
max_latency: impl Into<Option<gst::ClockTime>>,
) {
unsafe {
ffi::gst_aggregator_set_latency(
self.as_ref().to_glib_none().0,
min_latency.into_glib(),
max_latency.into_glib(),
max_latency.into().into_glib(),
);
}
}
@ -111,7 +123,7 @@ impl<O: IsA<Aggregator>> AggregatorExt for O {
}
}
fn simple_get_next_time(&self) -> gst::ClockTime {
fn simple_get_next_time(&self) -> Option<gst::ClockTime> {
unsafe {
from_glib(ffi::gst_aggregator_simple_get_next_time(
self.as_ref().to_glib_none().0,
@ -138,22 +150,24 @@ impl<O: IsA<Aggregator>> AggregatorExt for O {
glib::gobject_ffi::g_object_set_property(
self.to_glib_none().0 as *mut glib::gobject_ffi::GObject,
b"start-time\0".as_ptr() as *const _,
glib::Value::from(&start_time).to_glib_none().0,
start_time.to_value().to_glib_none().0,
);
}
}
fn connect_property_latency_notify<F: Fn(&Self) + Send + Sync + 'static>(
#[doc(alias = "latency")]
fn connect_latency_notify<F: Fn(&Self) + Send + Sync + 'static>(
&self,
f: F,
) -> SignalHandlerId {
unsafe extern "C" fn notify_latency_trampoline<P, F: Fn(&P) + Send + Sync + 'static>(
unsafe extern "C" fn notify_latency_trampoline<
P: IsA<Aggregator>,
F: Fn(&P) + Send + Sync + 'static,
>(
this: *mut ffi::GstAggregator,
_param_spec: glib::ffi::gpointer,
f: glib::ffi::gpointer,
) where
P: IsA<Aggregator>,
{
) {
let f: &F = &*(f as *const F);
f(&Aggregator::from_glib_borrow(this).unsafe_cast_ref())
}
@ -170,17 +184,19 @@ impl<O: IsA<Aggregator>> AggregatorExt for O {
}
}
fn connect_property_start_time_notify<F: Fn(&Self) + Send + Sync + 'static>(
#[doc(alias = "start-time")]
fn connect_start_time_notify<F: Fn(&Self) + Send + Sync + 'static>(
&self,
f: F,
) -> SignalHandlerId {
unsafe extern "C" fn notify_start_time_trampoline<P, F: Fn(&P) + Send + Sync + 'static>(
unsafe extern "C" fn notify_start_time_trampoline<
P: IsA<Aggregator>,
F: Fn(&P) + Send + Sync + 'static,
>(
this: *mut ffi::GstAggregator,
_param_spec: glib::ffi::gpointer,
f: glib::ffi::gpointer,
) where
P: IsA<Aggregator>,
{
) {
let f: &F = &*(f as *const F);
f(&Aggregator::from_glib_borrow(this).unsafe_cast_ref())
}

View file

@ -3,13 +3,14 @@
// DO NOT EDIT
use super::super::ffi;
use glib::object::Cast;
use glib::object::IsA;
use glib::signal::connect_raw;
use glib::signal::SignalHandlerId;
use glib::translate::*;
use glib::StaticType;
use gst::glib;
use gst::prelude::*;
use std::boxed::Box as Box_;
use std::mem::transmute;
@ -42,10 +43,10 @@ pub trait AggregatorPadExt: 'static {
#[doc(alias = "gst_aggregator_pad_pop_buffer")]
fn pop_buffer(&self) -> Option<gst::Buffer>;
#[doc(alias = "get_property_emit_signals")]
#[doc(alias = "emit-signals")]
fn emits_signals(&self) -> bool;
#[doc(alias = "set_property_emit_signals")]
#[doc(alias = "emit-signals")]
fn set_emit_signals(&self, emit_signals: bool);
fn connect_buffer_consumed<F: Fn(&Self, &gst::Buffer) + Send + Sync + 'static>(
@ -53,7 +54,8 @@ pub trait AggregatorPadExt: 'static {
f: F,
) -> SignalHandlerId;
fn connect_property_emit_signals_notify<F: Fn(&Self) + Send + Sync + 'static>(
#[doc(alias = "emit-signals")]
fn connect_emit_signals_notify<F: Fn(&Self) + Send + Sync + 'static>(
&self,
f: F,
) -> SignalHandlerId;
@ -119,25 +121,24 @@ impl<O: IsA<AggregatorPad>> AggregatorPadExt for O {
glib::gobject_ffi::g_object_set_property(
self.to_glib_none().0 as *mut glib::gobject_ffi::GObject,
b"emit-signals\0".as_ptr() as *const _,
glib::Value::from(&emit_signals).to_glib_none().0,
emit_signals.to_value().to_glib_none().0,
);
}
}
#[doc(alias = "buffer-consumed")]
fn connect_buffer_consumed<F: Fn(&Self, &gst::Buffer) + Send + Sync + 'static>(
&self,
f: F,
) -> SignalHandlerId {
unsafe extern "C" fn buffer_consumed_trampoline<
P,
P: IsA<AggregatorPad>,
F: Fn(&P, &gst::Buffer) + Send + Sync + 'static,
>(
this: *mut ffi::GstAggregatorPad,
object: *mut gst::ffi::GstBuffer,
f: glib::ffi::gpointer,
) where
P: IsA<AggregatorPad>,
{
) {
let f: &F = &*(f as *const F);
f(
&AggregatorPad::from_glib_borrow(this).unsafe_cast_ref(),
@ -157,17 +158,19 @@ impl<O: IsA<AggregatorPad>> AggregatorPadExt for O {
}
}
fn connect_property_emit_signals_notify<F: Fn(&Self) + Send + Sync + 'static>(
#[doc(alias = "emit-signals")]
fn connect_emit_signals_notify<F: Fn(&Self) + Send + Sync + 'static>(
&self,
f: F,
) -> SignalHandlerId {
unsafe extern "C" fn notify_emit_signals_trampoline<P, F: Fn(&P) + Send + Sync + 'static>(
unsafe extern "C" fn notify_emit_signals_trampoline<
P: IsA<AggregatorPad>,
F: Fn(&P) + Send + Sync + 'static,
>(
this: *mut ffi::GstAggregatorPad,
_param_spec: glib::ffi::gpointer,
f: glib::ffi::gpointer,
) where
P: IsA<AggregatorPad>,
{
) {
let f: &F = &*(f as *const F);
f(&AggregatorPad::from_glib_borrow(this).unsafe_cast_ref())
}

View file

@ -1,10 +1,4 @@
// Copyright (C) 2017-2019 Sebastian Dröge <sebastian@centricular.com>
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
// Take a look at the license at the top of the repository in the LICENSE file.
use super::super::ffi;
@ -109,7 +103,7 @@ pub trait AggregatorImpl: AggregatorImplExt + ElementImpl {
self.parent_stop(aggregator)
}
fn next_time(&self, aggregator: &Self::Type) -> gst::ClockTime {
fn next_time(&self, aggregator: &Self::Type) -> Option<gst::ClockTime> {
self.parent_next_time(aggregator)
}
@ -213,7 +207,7 @@ pub trait AggregatorImplExt: ObjectSubclass {
fn parent_stop(&self, aggregator: &Self::Type) -> Result<(), gst::ErrorMessage>;
fn parent_next_time(&self, aggregator: &Self::Type) -> gst::ClockTime;
fn parent_next_time(&self, aggregator: &Self::Type) -> Option<gst::ClockTime>;
fn parent_create_new_pad(
&self,
@ -243,18 +237,17 @@ pub trait AggregatorImplExt: ObjectSubclass {
impl<T: AggregatorImpl> AggregatorImplExt for T {
fn parent_flush(&self, aggregator: &Self::Type) -> Result<gst::FlowSuccess, gst::FlowError> {
unsafe {
let data = T::type_data();
let data = Self::type_data();
let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass;
(*parent_class)
.flush
.map(|f| {
from_glib(f(aggregator
try_from_glib(f(aggregator
.unsafe_cast_ref::<Aggregator>()
.to_glib_none()
.0))
})
.unwrap_or(gst::FlowReturn::Ok)
.into_result()
.unwrap_or(Ok(gst::FlowSuccess::Ok))
}
}
@ -265,7 +258,7 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
buffer: gst::Buffer,
) -> Option<gst::Buffer> {
unsafe {
let data = T::type_data();
let data = Self::type_data();
let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass;
match (*parent_class).clip {
None => Some(buffer),
@ -284,16 +277,15 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
unsafe {
let data = T::type_data();
let data = Self::type_data();
let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass;
let f = (*parent_class)
.finish_buffer
.expect("Missing parent function `finish_buffer`");
gst::FlowReturn::from_glib(f(
try_from_glib(f(
aggregator.unsafe_cast_ref::<Aggregator>().to_glib_none().0,
buffer.into_ptr(),
))
.into_result()
}
}
@ -304,7 +296,7 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
event: gst::Event,
) -> bool {
unsafe {
let data = T::type_data();
let data = Self::type_data();
let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass;
let f = (*parent_class)
.sink_event
@ -324,17 +316,16 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
event: gst::Event,
) -> Result<gst::FlowSuccess, gst::FlowError> {
unsafe {
let data = T::type_data();
let data = Self::type_data();
let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass;
let f = (*parent_class)
.sink_event_pre_queue
.expect("Missing parent function `sink_event_pre_queue`");
gst::FlowReturn::from_glib(f(
try_from_glib(f(
aggregator.unsafe_cast_ref::<Aggregator>().to_glib_none().0,
aggregator_pad.to_glib_none().0,
event.into_ptr(),
))
.into_result()
}
}
@ -365,7 +356,7 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
query: &mut gst::QueryRef,
) -> bool {
unsafe {
let data = T::type_data();
let data = Self::type_data();
let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass;
let f = (*parent_class)
.sink_query_pre_queue
@ -380,7 +371,7 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
fn parent_src_event(&self, aggregator: &Self::Type, event: gst::Event) -> bool {
unsafe {
let data = T::type_data();
let data = Self::type_data();
let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass;
let f = (*parent_class)
.src_event
@ -394,7 +385,7 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
fn parent_src_query(&self, aggregator: &Self::Type, query: &mut gst::QueryRef) -> bool {
unsafe {
let data = T::type_data();
let data = Self::type_data();
let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass;
let f = (*parent_class)
.src_query
@ -413,7 +404,7 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
active: bool,
) -> Result<(), gst::LoggableError> {
unsafe {
let data = T::type_data();
let data = Self::type_data();
let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass;
match (*parent_class).src_activate {
None => Ok(()),
@ -436,22 +427,21 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
timeout: bool,
) -> Result<gst::FlowSuccess, gst::FlowError> {
unsafe {
let data = T::type_data();
let data = Self::type_data();
let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass;
let f = (*parent_class)
.aggregate
.expect("Missing parent function `aggregate`");
gst::FlowReturn::from_glib(f(
try_from_glib(f(
aggregator.unsafe_cast_ref::<Aggregator>().to_glib_none().0,
timeout.into_glib(),
))
.into_result()
}
}
fn parent_start(&self, aggregator: &Self::Type) -> Result<(), gst::ErrorMessage> {
unsafe {
let data = T::type_data();
let data = Self::type_data();
let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass;
(*parent_class)
.start
@ -475,7 +465,7 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
fn parent_stop(&self, aggregator: &Self::Type) -> Result<(), gst::ErrorMessage> {
unsafe {
let data = T::type_data();
let data = Self::type_data();
let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass;
(*parent_class)
.stop
@ -497,9 +487,9 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
}
}
fn parent_next_time(&self, aggregator: &Self::Type) -> gst::ClockTime {
fn parent_next_time(&self, aggregator: &Self::Type) -> Option<gst::ClockTime> {
unsafe {
let data = T::type_data();
let data = Self::type_data();
let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass;
(*parent_class)
.get_next_time
@ -509,7 +499,7 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
.to_glib_none()
.0))
})
.unwrap_or(gst::CLOCK_TIME_NONE)
.unwrap_or(gst::ClockTime::NONE)
}
}
@ -521,7 +511,7 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
caps: Option<&gst::Caps>,
) -> Option<AggregatorPad> {
unsafe {
let data = T::type_data();
let data = Self::type_data();
let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass;
let f = (*parent_class)
.create_new_pad
@ -541,7 +531,7 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
caps: &gst::Caps,
) -> Result<gst::Caps, gst::FlowError> {
unsafe {
let data = T::type_data();
let data = Self::type_data();
let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass;
let f = (*parent_class)
.update_src_caps
@ -559,7 +549,7 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
fn parent_fixate_src_caps(&self, aggregator: &Self::Type, caps: gst::Caps) -> gst::Caps {
unsafe {
let data = T::type_data();
let data = Self::type_data();
let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass;
let f = (*parent_class)
@ -578,7 +568,7 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
caps: &gst::Caps,
) -> Result<(), gst::LoggableError> {
unsafe {
let data = T::type_data();
let data = Self::type_data();
let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass;
(*parent_class)
.negotiated_src_caps
@ -598,7 +588,7 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
fn parent_negotiate(&self, aggregator: &Self::Type) -> bool {
unsafe {
let data = T::type_data();
let data = Self::type_data();
let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorClass;
(*parent_class)
.negotiate
@ -879,7 +869,7 @@ unsafe extern "C" fn aggregator_get_next_time<T: AggregatorImpl>(
let imp = instance.impl_();
let wrap: Borrowed<Aggregator> = from_glib_borrow(ptr);
gst::panic_to_error!(&wrap, &imp.panicked(), gst::CLOCK_TIME_NONE, {
gst::panic_to_error!(&wrap, &imp.panicked(), gst::ClockTime::NONE, {
imp.next_time(wrap.unsafe_cast_ref())
})
.into_glib()

View file

@ -1,17 +1,10 @@
// Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com>
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
// Take a look at the license at the top of the repository in the LICENSE file.
use super::super::ffi;
use glib::translate::*;
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
use super::super::Aggregator;
@ -58,12 +51,12 @@ impl<T: AggregatorPadImpl> AggregatorPadImplExt for T {
aggregator: &Aggregator,
) -> Result<gst::FlowSuccess, gst::FlowError> {
unsafe {
let data = T::type_data();
let data = Self::type_data();
let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorPadClass;
(*parent_class)
.flush
.map(|f| {
from_glib(f(
try_from_glib(f(
aggregator_pad
.unsafe_cast_ref::<AggregatorPad>()
.to_glib_none()
@ -71,8 +64,7 @@ impl<T: AggregatorPadImpl> AggregatorPadImplExt for T {
aggregator.to_glib_none().0,
))
})
.unwrap_or(gst::FlowReturn::Ok)
.into_result()
.unwrap_or(Ok(gst::FlowSuccess::Ok))
}
}
@ -83,7 +75,7 @@ impl<T: AggregatorPadImpl> AggregatorPadImplExt for T {
buffer: &gst::Buffer,
) -> bool {
unsafe {
let data = T::type_data();
let data = Self::type_data();
let parent_class = data.as_ref().parent_class() as *mut ffi::GstAggregatorPadClass;
(*parent_class)
.skip_buffer

View file

@ -1,10 +1,4 @@
// Copyright (C) 2017 Sebastian Dröge <sebastian@centricular.com>
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
// Take a look at the license at the top of the repository in the LICENSE file.
use glib::translate::mut_override;
use gst::glib;
@ -14,6 +8,7 @@ pub struct MutexGuard<'a>(&'a glib::ffi::GMutex);
impl<'a> MutexGuard<'a> {
#[allow(clippy::trivially_copy_pass_by_ref)]
#[doc(alias = "g_mutex_lock")]
pub fn lock(mutex: &'a glib::ffi::GMutex) -> Self {
unsafe {
glib::ffi::g_mutex_lock(mut_override(mutex));

View file

@ -20,9 +20,10 @@ use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::{gst_debug, gst_error, gst_info, gst_warning};
use std::convert::TryFrom;
use std::mem;
use std::sync::Mutex;
use std::time::{Duration, Instant};
use std::time::Instant;
use once_cell::sync::Lazy;
@ -72,11 +73,11 @@ struct Settings {
uri: Option<String>,
source: Option<gst::Element>,
fallback_uri: Option<String>,
timeout: u64,
restart_timeout: u64,
retry_timeout: u64,
timeout: gst::ClockTime,
restart_timeout: gst::ClockTime,
retry_timeout: gst::ClockTime,
restart_on_eos: bool,
min_latency: u64,
min_latency: gst::ClockTime,
buffer_duration: i64,
}
@ -88,11 +89,11 @@ impl Default for Settings {
uri: None,
source: None,
fallback_uri: None,
timeout: 5 * gst::SECOND_VAL,
restart_timeout: 5 * gst::SECOND_VAL,
retry_timeout: 60 * gst::SECOND_VAL,
timeout: 5 * gst::ClockTime::SECOND,
restart_timeout: 5 * gst::ClockTime::SECOND,
retry_timeout: 60 * gst::ClockTime::SECOND,
restart_on_eos: false,
min_latency: 0,
min_latency: gst::ClockTime::ZERO,
buffer_duration: -1,
}
}
@ -112,7 +113,7 @@ enum Source {
struct Block {
pad: gst::Pad,
probe_id: gst::PadProbeId,
running_time: gst::ClockTime,
running_time: Option<gst::ClockTime>,
}
// Connects one source pad with fallbackswitch and the corresponding fallback input
@ -221,8 +222,8 @@ impl ObjectImpl for FallbackSrc {
"Timeout",
"Timeout for switching to the fallback URI",
0,
std::u64::MAX,
5 * gst::SECOND_VAL,
std::u64::MAX - 1,
5 * *gst::ClockTime::SECOND,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpec::new_uint64(
@ -230,8 +231,8 @@ impl ObjectImpl for FallbackSrc {
"Timeout",
"Timeout for restarting an active source",
0,
std::u64::MAX,
5 * gst::SECOND_VAL,
std::u64::MAX - 1,
5 * *gst::ClockTime::SECOND,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpec::new_uint64(
@ -239,8 +240,8 @@ impl ObjectImpl for FallbackSrc {
"Retry Timeout",
"Timeout for stopping after repeated failure",
0,
std::u64::MAX,
60 * gst::SECOND_VAL,
std::u64::MAX - 1,
60 * *gst::ClockTime::SECOND,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpec::new_boolean(
@ -265,7 +266,7 @@ impl ObjectImpl for FallbackSrc {
this allows to configure a minimum latency that would be configured \
if initially the fallback is enabled",
0,
std::u64::MAX,
std::u64::MAX - 1,
0,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
@ -274,7 +275,7 @@ impl ObjectImpl for FallbackSrc {
"Buffer Duration",
"Buffer duration when buffering streams (-1 default value)",
-1,
std::i64::MAX,
std::i64::MAX - 1,
-1,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
@ -740,7 +741,7 @@ impl FallbackSrc {
fn create_fallback_video_input(
&self,
_element: &super::FallbackSrc,
min_latency: u64,
min_latency: gst::ClockTime,
fallback_uri: Option<&str>,
) -> gst::Element {
VideoFallbackSource::new(fallback_uri, min_latency).upcast()
@ -770,8 +771,8 @@ impl FallbackSrc {
fn create_stream(
&self,
element: &super::FallbackSrc,
timeout: u64,
min_latency: u64,
timeout: gst::ClockTime,
min_latency: gst::ClockTime,
is_audio: bool,
fallback_uri: Option<&str>,
) -> Stream {
@ -797,7 +798,7 @@ impl FallbackSrc {
.set_properties(&[
("max-size-buffers", &0u32),
("max-size-bytes", &0u32),
("max-size-time", &gst::SECOND),
("max-size-time", &gst::ClockTime::SECOND),
])
.unwrap();
@ -815,9 +816,9 @@ impl FallbackSrc {
let src = FallbackSrc::from_instance(&element);
src.handle_switch_active_pad_change(&element);
});
switch.set_property("timeout", &timeout).unwrap();
switch.set_property("timeout", &timeout.nseconds()).unwrap();
switch
.set_property("min-upstream-latency", &min_latency)
.set_property("min-upstream-latency", &min_latency.nseconds())
.unwrap();
gst::Element::link_pads(&fallback_input, Some("src"), &switch, Some("fallback_sink"))
@ -1067,7 +1068,7 @@ impl FallbackSrc {
|| (!state.source_is_live && transition == gst::StateChange::PausedToPlaying)
{
assert!(state.source_restart_timeout.is_none());
self.schedule_source_restart_timeout(element, state, 0.into());
self.schedule_source_restart_timeout(element, state, gst::ClockTime::ZERO);
}
}
}
@ -1230,7 +1231,7 @@ impl FallbackSrc {
let pts = match info.data {
Some(gst::PadProbeData::Buffer(ref buffer)) => buffer.pts(),
Some(gst::PadProbeData::Event(ref ev)) => match ev.view() {
gst::EventView::Gap(ref ev) => ev.get().0,
gst::EventView::Gap(ref ev) => Some(ev.get().0),
_ => return gst::PadProbeReturn::Pass,
},
_ => unreachable!(),
@ -1250,7 +1251,7 @@ impl FallbackSrc {
Block {
pad: stream.clocksync_queue_srcpad.clone(),
probe_id,
running_time: gst::CLOCK_TIME_NONE,
running_time: gst::ClockTime::NONE,
}
}
@ -1258,7 +1259,7 @@ impl FallbackSrc {
&self,
element: &super::FallbackSrc,
pad: &gst::Pad,
pts: gst::ClockTime,
pts: impl Into<Option<gst::ClockTime>>,
) -> Result<(), gst::ErrorMessage> {
let mut state_guard = self.state.lock().unwrap();
let state = match &mut *state_guard {
@ -1354,21 +1355,22 @@ impl FallbackSrc {
gst::error_msg!(gst::CoreError::Clock, ["Have no time segment"])
})?;
let running_time = if pts < segment.start() {
segment.start()
} else if segment.stop().is_some() && pts >= segment.stop() {
segment.stop()
let pts = pts.into();
let running_time = if let Some((_, start)) =
pts.zip(segment.start()).filter(|(pts, start)| pts < start)
{
Some(start)
} else if let Some((_, stop)) = pts.zip(segment.stop()).filter(|(pts, stop)| pts >= stop) {
Some(stop)
} else {
segment.to_running_time(pts)
};
assert!(running_time.is_some());
gst_debug!(
CAT,
obj: element,
"Have block running time {}",
running_time,
running_time.display(),
);
block.running_time = running_time;
@ -1414,13 +1416,13 @@ impl FallbackSrc {
let audio_running_time = state
.audio_stream
.as_ref()
.and_then(|s| s.source_srcpad_block.as_ref().map(|b| b.running_time))
.unwrap_or(gst::CLOCK_TIME_NONE);
.and_then(|s| s.source_srcpad_block.as_ref())
.and_then(|b| b.running_time);
let video_running_time = state
.video_stream
.as_ref()
.and_then(|s| s.source_srcpad_block.as_ref().map(|b| b.running_time))
.unwrap_or(gst::CLOCK_TIME_NONE);
.and_then(|s| s.source_srcpad_block.as_ref())
.and_then(|b| b.running_time);
let audio_srcpad = state
.audio_stream
@ -1444,7 +1446,14 @@ impl FallbackSrc {
// Also consider EOS, we'd never get a new running time after EOS so don't need to wait.
// FIXME: All this surely can be simplified somehow
let current_running_time = element.current_running_time();
// FIXME I guess this could be moved up
let current_running_time = match element.current_running_time() {
Some(current_running_time) => current_running_time,
None => {
gst_debug!(CAT, obj: element, "Waiting for current_running_time");
return;
}
};
if have_audio && want_audio && have_video && want_video {
if audio_running_time.is_none()
@ -1466,18 +1475,21 @@ impl FallbackSrc {
return;
}
let audio_running_time = audio_running_time.expect("checked above");
let video_running_time = video_running_time.expect("checked above");
let min_running_time = if audio_is_eos {
video_running_time
} else if video_is_eos {
audio_running_time
} else {
assert!(audio_running_time.is_some() && video_running_time.is_some());
audio_running_time.min(video_running_time).unwrap()
audio_running_time.min(video_running_time)
};
let offset = if current_running_time > min_running_time {
(current_running_time - min_running_time).unwrap() as i64
(current_running_time - min_running_time).nseconds() as i64
} else {
-((min_running_time - current_running_time).unwrap() as i64)
-((min_running_time - current_running_time).nseconds() as i64)
};
gst_debug!(
@ -1514,15 +1526,18 @@ impl FallbackSrc {
block.pad.remove_probe(block.probe_id);
}
} else if have_audio && want_audio {
if audio_running_time.is_none() {
gst_debug!(CAT, obj: element, "Waiting for audio pad to block");
return;
}
let audio_running_time = match audio_running_time {
Some(audio_running_time) => audio_running_time,
None => {
gst_debug!(CAT, obj: element, "Waiting for audio pad to block");
return;
}
};
let offset = if current_running_time > audio_running_time {
(current_running_time - audio_running_time).unwrap() as i64
(current_running_time - audio_running_time).nseconds() as i64
} else {
-((audio_running_time - current_running_time).unwrap() as i64)
-((audio_running_time - current_running_time).nseconds() as i64)
};
gst_debug!(
@ -1546,15 +1561,18 @@ impl FallbackSrc {
block.pad.remove_probe(block.probe_id);
}
} else if have_video && want_video {
if video_running_time.is_none() {
gst_debug!(CAT, obj: element, "Waiting for video pad to block");
return;
}
let video_running_time = match video_running_time {
Some(video_running_time) => video_running_time,
None => {
gst_debug!(CAT, obj: element, "Waiting for video pad to block");
return;
}
};
let offset = if current_running_time > video_running_time {
(current_running_time - video_running_time).unwrap() as i64
(current_running_time - video_running_time).nseconds() as i64
} else {
-((video_running_time - current_running_time).unwrap() as i64)
-((video_running_time - current_running_time).nseconds() as i64)
};
gst_debug!(
@ -1911,8 +1929,7 @@ impl FallbackSrc {
gst_debug!(CAT, obj: element, "Waiting for 1s before retrying");
let clock = gst::SystemClock::obtain();
let wait_time = clock.time() + gst::SECOND;
assert!(wait_time.is_some());
let wait_time = clock.time().unwrap() + gst::ClockTime::SECOND;
assert!(state.source_pending_restart_timeout.is_none());
let timeout = clock.new_single_shot_id(wait_time);
@ -1998,7 +2015,11 @@ impl FallbackSrc {
let mut state_guard = src.state.lock().unwrap();
let state = state_guard.as_mut().expect("no state");
assert!(state.source_restart_timeout.is_none());
src.schedule_source_restart_timeout(element, state, 0.into());
src.schedule_source_restart_timeout(
element,
state,
gst::ClockTime::ZERO,
);
}
});
})
@ -2024,9 +2045,7 @@ impl FallbackSrc {
}
let clock = gst::SystemClock::obtain();
let wait_time =
clock.time() + gst::ClockTime::from_nseconds(state.settings.restart_timeout) - elapsed;
assert!(wait_time.is_some());
let wait_time = clock.time().unwrap() + state.settings.restart_timeout - elapsed;
gst_debug!(
CAT,
obj: element,
@ -2063,9 +2082,7 @@ impl FallbackSrc {
// If we're not actively buffering right now let's restart the source
if state
.last_buffering_update
.map(|i| {
i.elapsed() >= Duration::from_nanos(state.settings.restart_timeout)
})
.map(|i| i.elapsed() >= state.settings.restart_timeout.into())
.unwrap_or(state.stats.buffering_percent == 100)
{
gst_debug!(CAT, obj: element, "Not buffering, restarting source");
@ -2077,10 +2094,12 @@ impl FallbackSrc {
gst_debug!(CAT, obj: element, "Buffering, restarting source later");
let elapsed = state
.last_buffering_update
.map(|i| i.elapsed().as_nanos() as u64)
.unwrap_or(0);
.and_then(|last_buffering_update| {
gst::ClockTime::try_from(last_buffering_update.elapsed()).ok()
})
.unwrap_or(gst::ClockTime::ZERO);
src.schedule_source_restart_timeout(element, state, elapsed.into());
src.schedule_source_restart_timeout(element, state, elapsed);
}
} else {
gst_debug!(CAT, obj: element, "Restarting source not needed anymore");
@ -2150,7 +2169,7 @@ impl FallbackSrc {
if self.have_fallback_activated(element, state) {
gst_warning!(CAT, obj: element, "Switched to fallback stream");
if state.source_restart_timeout.is_none() {
self.schedule_source_restart_timeout(element, state, 0.into());
self.schedule_source_restart_timeout(element, state, gst::ClockTime::ZERO);
}
drop(state_guard);

View file

@ -36,14 +36,14 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
#[derive(Debug, Clone)]
struct Settings {
uri: Option<String>,
min_latency: u64,
min_latency: gst::ClockTime,
}
impl Default for Settings {
fn default() -> Self {
Settings {
uri: None,
min_latency: 0,
min_latency: gst::ClockTime::ZERO,
}
}
}
@ -131,7 +131,7 @@ impl ObjectImpl for VideoFallbackSource {
gst_info!(
CAT,
obj: obj,
"Changing Minimum Latency from {:?} to {:?}",
"Changing Minimum Latency from {} to {}",
settings.min_latency,
new_value,
);
@ -276,7 +276,7 @@ impl VideoFallbackSource {
fn create_source(
&self,
element: &super::VideoFallbackSource,
min_latency: u64,
min_latency: gst::ClockTime,
uri: Option<&str>,
) -> gst::Element {
gst_debug!(CAT, obj: element, "Creating source with uri {:?}", uri);
@ -313,7 +313,7 @@ impl VideoFallbackSource {
("max-size-bytes", &0u32),
(
"max-size-time",
&gst::ClockTime::max(5 * gst::SECOND, min_latency.into()).unwrap(),
&min_latency.max(5 * gst::ClockTime::SECOND).nseconds(),
),
])
.unwrap();

View file

@ -30,7 +30,7 @@ unsafe impl Send for VideoFallbackSource {}
unsafe impl Sync for VideoFallbackSource {}
impl VideoFallbackSource {
pub fn new(uri: Option<&str>, min_latency: u64) -> VideoFallbackSource {
glib::Object::new(&[("uri", &uri), ("min-latency", &min_latency)]).unwrap()
pub fn new(uri: Option<&str>, min_latency: gst::ClockTime) -> VideoFallbackSource {
glib::Object::new(&[("uri", &uri), ("min-latency", &min_latency.nseconds())]).unwrap()
}
}

View file

@ -71,13 +71,13 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
#[derive(Debug, Default)]
struct PadOutputState {
last_sinkpad_time: gst::ClockTime,
last_sinkpad_time: Option<gst::ClockTime>,
stream_health: StreamHealth,
}
#[derive(Debug)]
struct OutputState {
last_output_time: gst::ClockTime,
last_output_time: Option<gst::ClockTime>,
primary: PadOutputState,
fallback: PadOutputState,
}
@ -89,7 +89,7 @@ struct PadInputState {
video_info: Option<gst_video::VideoInfo>,
}
const DEFAULT_TIMEOUT: u64 = 5 * gst::SECOND_VAL;
const DEFAULT_TIMEOUT: gst::ClockTime = gst::ClockTime::from_seconds(5);
const DEFAULT_AUTO_SWITCH: bool = true;
const DEFAULT_STREAM_HEALTH: StreamHealth = StreamHealth::Inactive;
@ -108,7 +108,7 @@ impl Default for StreamHealth {
impl Default for OutputState {
fn default() -> Self {
OutputState {
last_output_time: gst::CLOCK_TIME_NONE,
last_output_time: gst::ClockTime::NONE,
primary: PadOutputState::default(),
fallback: PadOutputState::default(),
}
@ -118,7 +118,7 @@ impl Default for OutputState {
impl Default for Settings {
fn default() -> Self {
Settings {
timeout: DEFAULT_TIMEOUT.into(),
timeout: DEFAULT_TIMEOUT,
auto_switch: DEFAULT_AUTO_SWITCH,
}
}
@ -129,7 +129,7 @@ impl OutputState {
&self,
settings: &Settings,
check_primary_pad: bool,
cur_running_time: gst::ClockTime,
cur_running_time: impl Into<Option<gst::ClockTime>>,
) -> StreamHealth {
let last_sinkpad_time = if check_primary_pad {
self.primary.last_sinkpad_time
@ -137,11 +137,11 @@ impl OutputState {
self.fallback.last_sinkpad_time
};
if last_sinkpad_time == gst::ClockTime::none() {
if last_sinkpad_time.is_none() {
StreamHealth::Inactive
} else if cur_running_time != gst::ClockTime::none()
&& cur_running_time < last_sinkpad_time + settings.timeout
{
} else if cur_running_time.into().map_or(false, |cur_running_time| {
cur_running_time < last_sinkpad_time.expect("checked above") + settings.timeout
}) {
StreamHealth::Present
} else {
StreamHealth::Inactive
@ -153,7 +153,7 @@ impl OutputState {
settings: &Settings,
backup_pad: &Option<&gst_base::AggregatorPad>,
preferred_is_primary: bool,
cur_running_time: gst::ClockTime,
cur_running_time: impl Into<Option<gst::ClockTime>> + Copy,
) -> (bool, bool) {
let preferred_health = self.health(settings, preferred_is_primary, cur_running_time);
let backup_health = if backup_pad.is_some() {
@ -187,7 +187,7 @@ impl FallbackSwitch {
&self,
state: &mut OutputState,
pad: &gst_base::AggregatorPad,
target_running_time: gst::ClockTime,
target_running_time: impl Into<Option<gst::ClockTime>> + Copy,
) -> Result<(), gst::FlowError> {
let segment = pad.segment();
@ -201,13 +201,17 @@ impl FallbackSwitch {
gst::FlowError::Error
})?;
let mut running_time = gst::ClockTime::none();
let mut running_time = gst::ClockTime::NONE;
while let Some(buffer) = pad.peek_buffer() {
let pts = buffer.dts_or_pts();
let new_running_time = segment.to_running_time(pts);
if pts.is_none() || new_running_time <= target_running_time {
if pts.is_none()
|| new_running_time
.zip(target_running_time.into())
.map_or(false, |(new, target)| new <= target)
{
gst_debug!(CAT, obj: pad, "Dropping trailing buffer {:?}", buffer);
pad.drop_buffer();
running_time = new_running_time;
@ -215,7 +219,7 @@ impl FallbackSwitch {
break;
}
}
if running_time != gst::ClockTime::none() {
if running_time.is_some() {
if pad == &self.primary_sinkpad {
state.primary.last_sinkpad_time = running_time;
} else {
@ -234,7 +238,7 @@ impl FallbackSwitch {
mut buffer: gst::Buffer,
preferred_pad: &gst_base::AggregatorPad,
backup_pad: &Option<&gst_base::AggregatorPad>,
cur_running_time: gst::ClockTime,
cur_running_time: impl Into<Option<gst::ClockTime>>,
) -> Result<Option<(gst::Buffer, gst::Caps, bool)>, gst::FlowError> {
// If we got a buffer on the sinkpad just handle it
gst_debug!(
@ -273,48 +277,46 @@ impl FallbackSwitch {
state.fallback.last_sinkpad_time = running_time;
}
let is_late = {
if cur_running_time != gst::ClockTime::none() {
let latency = agg.latency();
if latency.is_some() {
let deadline = running_time + latency + 40 * gst::MSECOND;
let cur_running_time = cur_running_time.into();
let (is_late, deadline) = cur_running_time
.zip(agg.latency())
.zip(running_time)
.map_or(
(false, None),
|((cur_running_time, latency), running_time)| {
let dealine = running_time + latency + 40 * gst::ClockTime::MSECOND;
(cur_running_time > dealine, Some(dealine))
},
);
if cur_running_time > deadline {
gst_debug!(
CAT,
obj: preferred_pad,
"Buffer is too late: {} > {}",
cur_running_time,
deadline
);
true
} else {
false
}
} else {
false
}
} else {
false
}
};
if state.last_output_time.is_some()
&& is_late
&& state.last_output_time + settings.timeout <= running_time
{
/* This buffer arrived too late - we either already switched
* to the other pad or there's no point outputting this anyway */
if is_late {
gst_debug!(
CAT,
obj: preferred_pad,
"Buffer is too late and timeout reached: {} + {} <= {}",
state.last_output_time,
settings.timeout,
running_time,
"Buffer is too late: {} > {}",
cur_running_time.display(),
deadline.display(),
);
return Ok(None);
if state.last_output_time.zip(running_time).map_or(
false,
|(last_output_time, running_time)| {
last_output_time + settings.timeout <= running_time
},
) {
/* This buffer arrived too late - we either already switched
* to the other pad or there's no point outputting this anyway */
gst_debug!(
CAT,
obj: preferred_pad,
"Buffer is too late and timeout reached: {} + {} <= {}",
state.last_output_time.display(),
settings.timeout,
running_time.display(),
);
return Ok(None);
}
}
let mut active_sinkpad = self.active_sinkpad.lock().unwrap();
@ -416,14 +418,19 @@ impl FallbackSwitch {
}
// Get the next one if this one is before the timeout
if state.last_output_time + settings.timeout > running_time {
if state.last_output_time.zip(running_time).map_or(
false,
|(last_output_time, running_time)| {
last_output_time + settings.timeout > running_time
},
) {
gst_debug!(
CAT,
obj: backup_pad,
"Timeout not reached yet: {} + {} > {}",
state.last_output_time,
state.last_output_time.display(),
settings.timeout,
running_time
running_time.display(),
);
continue;
@ -433,9 +440,9 @@ impl FallbackSwitch {
CAT,
obj: backup_pad,
"Timeout reached: {} + {} <= {}",
state.last_output_time,
state.last_output_time.display(),
settings.timeout,
running_time
running_time.display(),
);
let mut active_sinkpad = self.active_sinkpad.lock().unwrap();
@ -529,9 +536,12 @@ impl FallbackSwitch {
let base_time = agg.base_time();
let cur_running_time = if let Some(clock) = clock {
clock.time() - base_time
clock
.time()
.zip(base_time)
.and_then(|(time, base_time)| time.checked_sub(base_time))
} else {
gst::ClockTime::none()
gst::ClockTime::NONE
};
/* See if there's a buffer on the preferred pad and output that */
@ -677,8 +687,8 @@ impl ObjectImpl for FallbackSwitch {
"Timeout",
"Timeout in nanoseconds",
0,
std::u64::MAX,
DEFAULT_TIMEOUT,
std::u64::MAX - 1,
DEFAULT_TIMEOUT.nseconds() as u64,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpec::new_object(
@ -984,7 +994,7 @@ impl AggregatorImpl for FallbackSwitch {
}
}
fn next_time(&self, agg: &Self::Type) -> gst::ClockTime {
fn next_time(&self, agg: &Self::Type) -> Option<gst::ClockTime> {
/* At each iteration, we have a preferred pad and a backup pad. If autoswitch is true,
* the sinkpad is always preferred, otherwise it's the active sinkpad as set by the app.
* The backup pad is the other one (may be None if there's no fallback pad yet).
@ -1020,10 +1030,10 @@ impl AggregatorImpl for FallbackSwitch {
"Have buffer on sinkpad {}, immediate timeout",
preferred_pad.name()
);
0.into()
Some(gst::ClockTime::ZERO)
} else if self.primary_sinkpad.is_eos() {
gst_debug!(CAT, obj: agg, "Sinkpad is EOS, immediate timeout");
0.into()
Some(gst::ClockTime::ZERO)
} else if let Some((buffer, backup_sinkpad)) = backup_pad
.as_ref()
.and_then(|p| p.peek_buffer().map(|buffer| (buffer, p)))
@ -1031,7 +1041,7 @@ impl AggregatorImpl for FallbackSwitch {
if buffer.pts().is_none() {
gst_error!(CAT, obj: agg, "Only buffers with PTS supported");
// Trigger aggregate immediately to error out immediately
return 0.into();
return Some(gst::ClockTime::ZERO);
}
let segment = match backup_sinkpad.segment().downcast::<gst::ClockTime>() {
@ -1039,7 +1049,7 @@ impl AggregatorImpl for FallbackSwitch {
Err(_) => {
gst_error!(CAT, obj: agg, "Only TIME segments supported");
// Trigger aggregate immediately to error out immediately
return 0.into();
return Some(gst::ClockTime::ZERO);
}
};
@ -1049,12 +1059,12 @@ impl AggregatorImpl for FallbackSwitch {
obj: agg,
"Have buffer on {} pad, timeout at {}",
backup_sinkpad.name(),
running_time
running_time.display(),
);
running_time
} else {
gst_debug!(CAT, obj: agg, "No buffer available on either input");
gst::CLOCK_TIME_NONE
gst::ClockTime::NONE
}
}
@ -1096,25 +1106,21 @@ impl AggregatorImpl for FallbackSwitch {
return Some(buffer);
}
let duration = if buffer.duration().is_some() {
buffer.duration()
let duration = if let Some(duration) = buffer.duration() {
Some(duration)
} else if let Some(ref audio_info) = pad_state.audio_info {
gst::SECOND
.mul_div_floor(
buffer.size() as u64,
audio_info.rate() as u64 * audio_info.bpf() as u64,
)
.unwrap()
gst::ClockTime::SECOND.mul_div_floor(
buffer.size() as u64,
audio_info.rate() as u64 * audio_info.bpf() as u64,
)
} else if let Some(ref video_info) = pad_state.video_info {
if *video_info.fps().numer() > 0 {
gst::SECOND
.mul_div_floor(
*video_info.fps().denom() as u64,
*video_info.fps().numer() as u64,
)
.unwrap()
gst::ClockTime::SECOND.mul_div_floor(
*video_info.fps().denom() as u64,
*video_info.fps().numer() as u64,
)
} else {
gst::CLOCK_TIME_NONE
gst::ClockTime::NONE
}
} else {
unreachable!()
@ -1125,8 +1131,8 @@ impl AggregatorImpl for FallbackSwitch {
obj: agg_pad,
"Clipping buffer {:?} with PTS {} and duration {}",
buffer,
pts,
duration
pts.display(),
duration.display(),
);
if let Some(ref audio_info) = pad_state.audio_info {
gst_audio::audio_buffer_clip(
@ -1136,12 +1142,16 @@ impl AggregatorImpl for FallbackSwitch {
audio_info.bpf(),
)
} else if pad_state.video_info.is_some() {
segment.clip(pts, pts + duration).map(|(start, stop)| {
let stop = pts.zip(duration).map(|(pts, duration)| pts + duration);
segment.clip(pts, stop).map(|(start, stop)| {
{
let buffer = buffer.make_mut();
buffer.set_pts(start);
if duration.is_some() {
buffer.set_duration(stop - start);
buffer.set_duration(
stop.zip(start)
.and_then(|(stop, start)| stop.checked_sub(start)),
);
}
}

View file

@ -56,20 +56,20 @@ macro_rules! assert_buffer {
fn test_no_fallback_no_drops() {
let pipeline = setup_pipeline(None);
push_buffer(&pipeline, 0.into());
set_time(&pipeline, 0.into());
push_buffer(&pipeline, gst::ClockTime::ZERO);
set_time(&pipeline, gst::ClockTime::ZERO);
let buffer = pull_buffer(&pipeline);
assert_buffer!(buffer, 0.into());
assert_buffer!(buffer, Some(gst::ClockTime::ZERO));
push_buffer(&pipeline, gst::SECOND);
set_time(&pipeline, gst::SECOND);
push_buffer(&pipeline, gst::ClockTime::SECOND);
set_time(&pipeline, gst::ClockTime::SECOND);
let buffer = pull_buffer(&pipeline);
assert_buffer!(buffer, gst::SECOND);
assert_buffer!(buffer, Some(gst::ClockTime::SECOND));
push_buffer(&pipeline, 2 * gst::SECOND);
set_time(&pipeline, 2 * gst::SECOND);
push_buffer(&pipeline, 2 * gst::ClockTime::SECOND);
set_time(&pipeline, 2 * gst::ClockTime::SECOND);
let buffer = pull_buffer(&pipeline);
assert_buffer!(buffer, 2 * gst::SECOND);
assert_buffer!(buffer, Some(2 * gst::ClockTime::SECOND));
push_eos(&pipeline);
wait_eos(&pipeline);
@ -90,23 +90,23 @@ fn test_no_drops_not_live() {
fn test_no_drops(live: bool) {
let pipeline = setup_pipeline(Some(live));
push_buffer(&pipeline, 0.into());
push_fallback_buffer(&pipeline, 0.into());
set_time(&pipeline, 0.into());
push_buffer(&pipeline, gst::ClockTime::ZERO);
push_fallback_buffer(&pipeline, gst::ClockTime::ZERO);
set_time(&pipeline, gst::ClockTime::ZERO);
let buffer = pull_buffer(&pipeline);
assert_buffer!(buffer, 0.into());
assert_buffer!(buffer, Some(gst::ClockTime::ZERO));
push_fallback_buffer(&pipeline, gst::SECOND);
push_buffer(&pipeline, gst::SECOND);
set_time(&pipeline, gst::SECOND);
push_fallback_buffer(&pipeline, gst::ClockTime::SECOND);
push_buffer(&pipeline, gst::ClockTime::SECOND);
set_time(&pipeline, gst::ClockTime::SECOND);
let buffer = pull_buffer(&pipeline);
assert_buffer!(buffer, gst::SECOND);
assert_buffer!(buffer, Some(gst::ClockTime::SECOND));
push_buffer(&pipeline, 2 * gst::SECOND);
push_fallback_buffer(&pipeline, 2 * gst::SECOND);
set_time(&pipeline, 2 * gst::SECOND);
push_buffer(&pipeline, 2 * gst::ClockTime::SECOND);
push_fallback_buffer(&pipeline, 2 * gst::ClockTime::SECOND);
set_time(&pipeline, 2 * gst::ClockTime::SECOND);
let buffer = pull_buffer(&pipeline);
assert_buffer!(buffer, 2 * gst::SECOND);
assert_buffer!(buffer, Some(2 * gst::ClockTime::SECOND));
// EOS on the fallback should not be required
push_eos(&pipeline);
@ -128,22 +128,22 @@ fn test_no_drops_but_no_fallback_frames_not_live() {
fn test_no_drops_but_no_fallback_frames(live: bool) {
let pipeline = setup_pipeline(Some(live));
push_buffer(&pipeline, 0.into());
push_buffer(&pipeline, gst::ClockTime::ZERO);
// +10ms needed here because the immediate timeout will be always at running time 0, but
// aggregator also adds the latency to it so we end up at 10ms instead.
set_time(&pipeline, 10 * gst::MSECOND);
set_time(&pipeline, 10 * gst::ClockTime::MSECOND);
let buffer = pull_buffer(&pipeline);
assert_buffer!(buffer, 0.into());
assert_buffer!(buffer, Some(gst::ClockTime::ZERO));
push_buffer(&pipeline, gst::SECOND);
set_time(&pipeline, gst::SECOND);
push_buffer(&pipeline, gst::ClockTime::SECOND);
set_time(&pipeline, gst::ClockTime::SECOND);
let buffer = pull_buffer(&pipeline);
assert_buffer!(buffer, gst::SECOND);
assert_buffer!(buffer, Some(gst::ClockTime::SECOND));
push_buffer(&pipeline, 2 * gst::SECOND);
set_time(&pipeline, 2 * gst::SECOND);
push_buffer(&pipeline, 2 * gst::ClockTime::SECOND);
set_time(&pipeline, 2 * gst::ClockTime::SECOND);
let buffer = pull_buffer(&pipeline);
assert_buffer!(buffer, 2 * gst::SECOND);
assert_buffer!(buffer, Some(2 * gst::ClockTime::SECOND));
// EOS on the fallback should not be required
push_eos(&pipeline);
@ -165,23 +165,26 @@ fn test_short_drop_not_live() {
fn test_short_drop(live: bool) {
let pipeline = setup_pipeline(Some(live));
push_buffer(&pipeline, 0.into());
push_fallback_buffer(&pipeline, 0.into());
set_time(&pipeline, 0.into());
push_buffer(&pipeline, gst::ClockTime::ZERO);
push_fallback_buffer(&pipeline, gst::ClockTime::ZERO);
set_time(&pipeline, gst::ClockTime::ZERO);
let buffer = pull_buffer(&pipeline);
assert_buffer!(buffer, 0.into());
assert_buffer!(buffer, Some(gst::ClockTime::ZERO));
// A timeout at 1s will get rid of the fallback buffer
// but not output anything
push_fallback_buffer(&pipeline, gst::SECOND);
push_fallback_buffer(&pipeline, gst::ClockTime::SECOND);
// Time out the fallback buffer at +10ms
set_time(&pipeline, gst::SECOND + 10 * gst::MSECOND);
set_time(
&pipeline,
gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND,
);
push_fallback_buffer(&pipeline, 2 * gst::SECOND);
push_buffer(&pipeline, 2 * gst::SECOND);
set_time(&pipeline, 2 * gst::SECOND);
push_fallback_buffer(&pipeline, 2 * gst::ClockTime::SECOND);
push_buffer(&pipeline, 2 * gst::ClockTime::SECOND);
set_time(&pipeline, 2 * gst::ClockTime::SECOND);
let buffer = pull_buffer(&pipeline);
assert_buffer!(buffer, 2 * gst::SECOND);
assert_buffer!(buffer, Some(2 * gst::ClockTime::SECOND));
push_eos(&pipeline);
push_fallback_eos(&pipeline);
@ -204,33 +207,45 @@ fn test_long_drop_and_eos(live: bool) {
let pipeline = setup_pipeline(Some(live));
// Produce the first frame
push_buffer(&pipeline, 0.into());
push_fallback_buffer(&pipeline, 0.into());
set_time(&pipeline, 0.into());
push_buffer(&pipeline, gst::ClockTime::ZERO);
push_fallback_buffer(&pipeline, gst::ClockTime::ZERO);
set_time(&pipeline, gst::ClockTime::ZERO);
let buffer = pull_buffer(&pipeline);
assert_buffer!(buffer, 0.into());
assert_buffer!(buffer, Some(gst::ClockTime::ZERO));
// Produce a second frame but only from the fallback source
push_fallback_buffer(&pipeline, gst::SECOND);
set_time(&pipeline, gst::SECOND + 10 * gst::MSECOND);
push_fallback_buffer(&pipeline, gst::ClockTime::SECOND);
set_time(
&pipeline,
gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND,
);
// Produce a third frame but only from the fallback source
push_fallback_buffer(&pipeline, 2 * gst::SECOND);
set_time(&pipeline, 2 * gst::SECOND + 10 * gst::MSECOND);
push_fallback_buffer(&pipeline, 2 * gst::ClockTime::SECOND);
set_time(
&pipeline,
2 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND,
);
// Produce a fourth frame but only from the fallback source
// This should be output now
push_fallback_buffer(&pipeline, 3 * gst::SECOND);
set_time(&pipeline, 3 * gst::SECOND + 10 * gst::MSECOND);
push_fallback_buffer(&pipeline, 3 * gst::ClockTime::SECOND);
set_time(
&pipeline,
3 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND,
);
let buffer = pull_buffer(&pipeline);
assert_fallback_buffer!(buffer, 3 * gst::SECOND);
assert_fallback_buffer!(buffer, Some(3 * gst::ClockTime::SECOND));
// Produce a fifth frame but only from the fallback source
// This should be output now
push_fallback_buffer(&pipeline, 4 * gst::SECOND);
set_time(&pipeline, 4 * gst::SECOND + 10 * gst::MSECOND);
push_fallback_buffer(&pipeline, 4 * gst::ClockTime::SECOND);
set_time(
&pipeline,
4 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND,
);
let buffer = pull_buffer(&pipeline);
assert_fallback_buffer!(buffer, 4 * gst::SECOND);
assert_fallback_buffer!(buffer, Some(4 * gst::ClockTime::SECOND));
// Wait for EOS to arrive at appsink
push_eos(&pipeline);
@ -254,54 +269,66 @@ fn test_long_drop_and_recover(live: bool) {
let pipeline = setup_pipeline(Some(live));
// Produce the first frame
push_buffer(&pipeline, 0.into());
push_fallback_buffer(&pipeline, 0.into());
set_time(&pipeline, 0.into());
push_buffer(&pipeline, gst::ClockTime::ZERO);
push_fallback_buffer(&pipeline, gst::ClockTime::ZERO);
set_time(&pipeline, gst::ClockTime::ZERO);
let buffer = pull_buffer(&pipeline);
assert_buffer!(buffer, 0.into());
assert_buffer!(buffer, Some(gst::ClockTime::ZERO));
// Produce a second frame but only from the fallback source
push_fallback_buffer(&pipeline, gst::SECOND);
set_time(&pipeline, gst::SECOND + 10 * gst::MSECOND);
push_fallback_buffer(&pipeline, gst::ClockTime::SECOND);
set_time(
&pipeline,
gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND,
);
// Produce a third frame but only from the fallback source
push_fallback_buffer(&pipeline, 2 * gst::SECOND);
set_time(&pipeline, 2 * gst::SECOND + 10 * gst::MSECOND);
push_fallback_buffer(&pipeline, 2 * gst::ClockTime::SECOND);
set_time(
&pipeline,
2 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND,
);
// Produce a fourth frame but only from the fallback source
// This should be output now
push_fallback_buffer(&pipeline, 3 * gst::SECOND);
set_time(&pipeline, 3 * gst::SECOND + 10 * gst::MSECOND);
push_fallback_buffer(&pipeline, 3 * gst::ClockTime::SECOND);
set_time(
&pipeline,
3 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND,
);
let buffer = pull_buffer(&pipeline);
assert_fallback_buffer!(buffer, 3 * gst::SECOND);
assert_fallback_buffer!(buffer, Some(3 * gst::ClockTime::SECOND));
// Produce a fifth frame but only from the fallback source
// This should be output now
push_fallback_buffer(&pipeline, 4 * gst::SECOND);
set_time(&pipeline, 4 * gst::SECOND + 10 * gst::MSECOND);
push_fallback_buffer(&pipeline, 4 * gst::ClockTime::SECOND);
set_time(
&pipeline,
4 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND,
);
let buffer = pull_buffer(&pipeline);
assert_fallback_buffer!(buffer, 4 * gst::SECOND);
assert_fallback_buffer!(buffer, Some(4 * gst::ClockTime::SECOND));
// Produce a sixth frame from the normal source
push_buffer(&pipeline, 5 * gst::SECOND);
push_fallback_buffer(&pipeline, 5 * gst::SECOND);
set_time(&pipeline, 5 * gst::SECOND);
push_buffer(&pipeline, 5 * gst::ClockTime::SECOND);
push_fallback_buffer(&pipeline, 5 * gst::ClockTime::SECOND);
set_time(&pipeline, 5 * gst::ClockTime::SECOND);
let buffer = pull_buffer(&pipeline);
assert_buffer!(buffer, 5 * gst::SECOND);
assert_buffer!(buffer, Some(5 * gst::ClockTime::SECOND));
// Produce a seventh frame from the normal source but no fallback.
// This should still be output immediately
push_buffer(&pipeline, 6 * gst::SECOND);
set_time(&pipeline, 6 * gst::SECOND);
push_buffer(&pipeline, 6 * gst::ClockTime::SECOND);
set_time(&pipeline, 6 * gst::ClockTime::SECOND);
let buffer = pull_buffer(&pipeline);
assert_buffer!(buffer, 6 * gst::SECOND);
assert_buffer!(buffer, Some(6 * gst::ClockTime::SECOND));
// Produce a eight frame from the normal source
push_buffer(&pipeline, 7 * gst::SECOND);
push_fallback_buffer(&pipeline, 7 * gst::SECOND);
set_time(&pipeline, 7 * gst::SECOND);
push_buffer(&pipeline, 7 * gst::ClockTime::SECOND);
push_fallback_buffer(&pipeline, 7 * gst::ClockTime::SECOND);
set_time(&pipeline, 7 * gst::ClockTime::SECOND);
let buffer = pull_buffer(&pipeline);
assert_buffer!(buffer, 7 * gst::SECOND);
assert_buffer!(buffer, Some(7 * gst::ClockTime::SECOND));
// Wait for EOS to arrive at appsink
push_eos(&pipeline);
@ -330,15 +357,15 @@ fn setup_pipeline(with_live_fallback: Option<bool>) -> Pipeline {
gst_debug!(TEST_CAT, "Setting up pipeline");
let clock = gst_check::TestClock::new();
clock.set_time(0.into());
clock.set_time(gst::ClockTime::ZERO);
let pipeline = gst::Pipeline::new(None);
// Running time 0 in our pipeline is going to be clock time 1s. All
// clock ids before 1s are used for signalling to our clock advancing
// thread.
pipeline.use_clock(Some(&clock));
pipeline.set_base_time(gst::SECOND);
pipeline.set_start_time(gst::CLOCK_TIME_NONE);
pipeline.set_base_time(gst::ClockTime::SECOND);
pipeline.set_start_time(gst::ClockTime::NONE);
let src = gst::ElementFactory::make("appsrc", Some("src"))
.unwrap()
@ -359,7 +386,9 @@ fn setup_pipeline(with_live_fallback: Option<bool>) -> Pipeline {
.unwrap();
let switch = gst::ElementFactory::make("fallbackswitch", Some("switch")).unwrap();
switch.set_property("timeout", &(3 * gst::SECOND)).unwrap();
switch
.set_property("timeout", &(3 * gst::ClockTime::SECOND))
.unwrap();
let sink = gst::ElementFactory::make("appsink", Some("sink"))
.unwrap()
@ -411,7 +440,7 @@ fn setup_pipeline(with_live_fallback: Option<bool>) -> Pipeline {
loop {
while let Some(clock_id) = clock.peek_next_pending_id().and_then(|clock_id| {
// Process if the clock ID is in the past or now
if clock.time() >= clock_id.time() {
if clock.time().map_or(false, |time| time >= clock_id.time()) {
Some(clock_id)
} else {
None
@ -420,7 +449,7 @@ fn setup_pipeline(with_live_fallback: Option<bool>) -> Pipeline {
gst_debug!(TEST_CAT, "Processing clock ID at {}", clock_id.time());
if let Some(clock_id) = clock.process_next_clock_id() {
gst_debug!(TEST_CAT, "Processed clock ID at {}", clock_id.time());
if clock_id.time() == 0.into() {
if clock_id.time().is_zero() {
gst_debug!(TEST_CAT, "Stopping clock thread");
return;
}
@ -431,7 +460,10 @@ fn setup_pipeline(with_live_fallback: Option<bool>) -> Pipeline {
// at the top of the queue. We don't want to do a busy loop here.
while clock.peek_next_pending_id().iter().any(|clock_id| {
// Sleep if the clock ID is in the future
clock.time() < clock_id.time()
// FIXME probably can expect clock.time()
clock
.time()
.map_or(true, |clock_time| clock_time < clock_id.time())
}) {
use std::{thread, time};
@ -514,7 +546,7 @@ fn set_time(pipeline: &Pipeline, time: gst::ClockTime) {
.unwrap();
gst_debug!(TEST_CAT, "Setting time to {}", time);
clock.set_time(gst::SECOND + time);
clock.set_time(gst::ClockTime::SECOND + time);
}
fn wait_eos(pipeline: &Pipeline) {
@ -545,7 +577,7 @@ fn stop_pipeline(mut pipeline: Pipeline) {
.unwrap();
// Signal shutdown to the clock thread
let clock_id = clock.new_single_shot_id(0.into());
let clock_id = clock.new_single_shot_id(gst::ClockTime::ZERO);
let _ = clock_id.wait();
let switch = pipeline.by_name("switch").unwrap();

View file

@ -14,7 +14,6 @@ gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org
gtk = { git = "https://github.com/gtk-rs/gtk3-rs", optional = true }
gio = { git = "https://github.com/gtk-rs/gtk-rs-core", optional = true }
parking_lot = "0.11"
more-asserts = "0.2"
once_cell = "1.0"
[dev-dependencies]

View file

@ -237,14 +237,14 @@ fn create_ui(app: &gtk::Application) {
let position = video_sink
.query_position::<gst::ClockTime>()
.unwrap_or_else(|| 0.into());
.unwrap_or(gst::ClockTime::ZERO);
position_label.set_text(&format!("Position: {:.1}", position));
let recording_duration = togglerecord
.static_pad("src")
.unwrap()
.query_position::<gst::ClockTime>()
.unwrap_or_else(|| 0.into());
.unwrap_or(gst::ClockTime::ZERO);
recorded_duration_label.set_text(&format!("Recorded: {:.1}", recording_duration));
glib::Continue(true)

View file

@ -20,8 +20,6 @@ use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::{gst_debug, gst_log, gst_trace, gst_warning};
use more_asserts::{assert_ge, assert_le, assert_lt};
use once_cell::sync::Lazy;
use parking_lot::{Condvar, Mutex};
use std::cmp;
@ -75,8 +73,8 @@ struct StreamState {
out_segment: gst::FormattedSegment<gst::ClockTime>,
segment_seqnum: gst::Seqnum,
// Start/end running time of the current/last buffer
current_running_time: gst::ClockTime,
current_running_time_end: gst::ClockTime,
current_running_time: Option<gst::ClockTime>,
current_running_time_end: Option<gst::ClockTime>,
eos: bool,
flushing: bool,
segment_pending: bool,
@ -92,8 +90,8 @@ impl Default for StreamState {
in_segment: gst::FormattedSegment::new(),
out_segment: gst::FormattedSegment::new(),
segment_seqnum: gst::Seqnum::next(),
current_running_time: gst::CLOCK_TIME_NONE,
current_running_time_end: gst::CLOCK_TIME_NONE,
current_running_time: None,
current_running_time_end: None,
eos: false,
flushing: false,
segment_pending: false,
@ -127,23 +125,23 @@ enum RecordingState {
#[derive(Debug)]
struct State {
recording_state: RecordingState,
last_recording_start: gst::ClockTime,
last_recording_stop: gst::ClockTime,
last_recording_start: Option<gst::ClockTime>,
last_recording_stop: Option<gst::ClockTime>,
// Accumulated duration of previous recording segments,
// updated whenever going to Stopped
recording_duration: gst::ClockTime,
// Updated whenever going to Recording
running_time_offset: gst::ClockTime,
running_time_offset: i64,
}
impl Default for State {
fn default() -> Self {
Self {
recording_state: RecordingState::Stopped,
last_recording_start: gst::CLOCK_TIME_NONE,
last_recording_stop: gst::CLOCK_TIME_NONE,
recording_duration: 0.into(),
running_time_offset: gst::CLOCK_TIME_NONE,
last_recording_start: None,
last_recording_stop: None,
recording_duration: gst::ClockTime::ZERO,
running_time_offset: 0,
}
}
}
@ -157,9 +155,9 @@ enum HandleResult<T> {
}
trait HandleData: Sized {
fn pts(&self) -> gst::ClockTime;
fn dts(&self) -> gst::ClockTime;
fn dts_or_pts(&self) -> gst::ClockTime {
fn pts(&self) -> Option<gst::ClockTime>;
fn dts(&self) -> Option<gst::ClockTime>;
fn dts_or_pts(&self) -> Option<gst::ClockTime> {
let dts = self.dts();
if dts.is_some() {
dts
@ -167,7 +165,7 @@ trait HandleData: Sized {
self.pts()
}
}
fn duration(&self, state: &StreamState) -> gst::ClockTime;
fn duration(&self, state: &StreamState) -> Option<gst::ClockTime>;
fn is_keyframe(&self) -> bool;
fn can_clip(&self, state: &StreamState) -> bool;
fn clip(
@ -177,16 +175,16 @@ trait HandleData: Sized {
) -> Option<Self>;
}
impl HandleData for (gst::ClockTime, gst::ClockTime) {
fn pts(&self) -> gst::ClockTime {
self.0
impl HandleData for (gst::ClockTime, Option<gst::ClockTime>) {
fn pts(&self) -> Option<gst::ClockTime> {
Some(self.0)
}
fn dts(&self) -> gst::ClockTime {
self.0
fn dts(&self) -> Option<gst::ClockTime> {
Some(self.0)
}
fn duration(&self, _state: &StreamState) -> gst::ClockTime {
fn duration(&self, _state: &StreamState) -> Option<gst::ClockTime> {
self.1
}
@ -203,55 +201,48 @@ impl HandleData for (gst::ClockTime, gst::ClockTime) {
_state: &StreamState,
segment: &gst::FormattedSegment<gst::ClockTime>,
) -> Option<Self> {
let stop = if self.1.is_some() {
self.0 + self.1
} else {
self.0
};
let stop = self.0 + self.1.unwrap_or(gst::ClockTime::ZERO);
segment
.clip(self.0, stop)
.map(|(start, stop)| (start, stop - start))
segment.clip(self.0, stop).map(|(start, stop)| {
let start = start.expect("provided a defined value");
(start, stop.map(|stop| stop - start))
})
}
}
impl HandleData for gst::Buffer {
fn pts(&self) -> gst::ClockTime {
fn pts(&self) -> Option<gst::ClockTime> {
gst::BufferRef::pts(self)
}
fn dts(&self) -> gst::ClockTime {
fn dts(&self) -> Option<gst::ClockTime> {
gst::BufferRef::dts(self)
}
fn duration(&self, state: &StreamState) -> gst::ClockTime {
fn duration(&self, state: &StreamState) -> Option<gst::ClockTime> {
let duration = gst::BufferRef::duration(self);
if duration.is_some() {
duration
} else if let Some(ref video_info) = state.video_info {
if video_info.fps() != 0.into() {
gst::SECOND
.mul_div_floor(
*video_info.fps().denom() as u64,
*video_info.fps().numer() as u64,
)
.unwrap_or(gst::CLOCK_TIME_NONE)
gst::ClockTime::SECOND.mul_div_floor(
*video_info.fps().denom() as u64,
*video_info.fps().numer() as u64,
)
} else {
gst::CLOCK_TIME_NONE
gst::ClockTime::NONE
}
} else if let Some(ref audio_info) = state.audio_info {
if audio_info.bpf() == 0 || audio_info.rate() == 0 {
return gst::CLOCK_TIME_NONE;
return gst::ClockTime::NONE;
}
let size = self.size() as u64;
let num_samples = size / audio_info.bpf() as u64;
gst::SECOND
.mul_div_floor(num_samples, audio_info.rate() as u64)
.unwrap_or(gst::CLOCK_TIME_NONE)
gst::ClockTime::SECOND.mul_div_floor(num_samples, audio_info.rate() as u64)
} else {
gst::CLOCK_TIME_NONE
gst::ClockTime::NONE
}
}
@ -295,11 +286,7 @@ impl HandleData for gst::Buffer {
let pts = HandleData::pts(&self);
let duration = HandleData::duration(&self, state);
let stop = if duration.is_some() {
pts + duration
} else {
pts
};
let stop = pts.map(|pts| pts + duration.unwrap_or(gst::ClockTime::ZERO));
if let Some(ref audio_info) = state.audio_info {
gst_audio::audio_buffer_clip(
@ -313,7 +300,11 @@ impl HandleData for gst::Buffer {
{
let buffer = self.make_mut();
buffer.set_pts(start);
buffer.set_duration(stop - start);
buffer.set_duration(
stop.zip(start)
.and_then(|(stop, start)| stop.checked_sub(start)),
// FIXME we could expect here
);
}
self
@ -354,48 +345,47 @@ impl ToggleRecord {
) -> Result<HandleResult<T>, gst::FlowError> {
let mut state = stream.state.lock();
let mut dts_or_pts = data.dts_or_pts();
let duration = data.duration(&state);
if !dts_or_pts.is_some() {
let mut dts_or_pts = data.dts_or_pts().ok_or_else(|| {
gst::element_error!(
element,
gst::StreamError::Format,
["Buffer without DTS or PTS"]
);
return Err(gst::FlowError::Error);
}
gst::FlowError::Error
})?;
let mut dts_or_pts_end = if duration.is_some() {
dts_or_pts + duration
} else {
dts_or_pts
};
let mut dts_or_pts_end = dts_or_pts + data.duration(&state).unwrap_or(gst::ClockTime::ZERO);
let data = match data.clip(&state, &state.in_segment) {
Some(data) => data,
None => {
gst_log!(CAT, obj: pad, "Dropping raw data outside segment");
return Ok(HandleResult::Drop);
}
Some(data) => data,
};
// This will only do anything for non-raw data
dts_or_pts = state.in_segment.start().max(dts_or_pts).unwrap();
dts_or_pts_end = state.in_segment.start().max(dts_or_pts_end).unwrap();
if state.in_segment.stop().is_some() {
dts_or_pts = state.in_segment.stop().min(dts_or_pts).unwrap();
dts_or_pts_end = state.in_segment.stop().min(dts_or_pts_end).unwrap();
// FIXME comment why we can unwrap
dts_or_pts = state.in_segment.start().unwrap().max(dts_or_pts);
dts_or_pts_end = state.in_segment.start().unwrap().max(dts_or_pts_end);
if let Some(stop) = state.in_segment.stop() {
dts_or_pts = stop.min(dts_or_pts);
dts_or_pts_end = stop.min(dts_or_pts_end);
}
let current_running_time = state.in_segment.to_running_time(dts_or_pts);
let current_running_time_end = state.in_segment.to_running_time(dts_or_pts_end);
state.current_running_time = current_running_time
.max(state.current_running_time)
.unwrap_or(current_running_time);
.zip(state.current_running_time)
.map(|(cur_rt, state_rt)| cur_rt.max(state_rt))
.or(current_running_time);
state.current_running_time_end = current_running_time_end
.max(state.current_running_time_end)
.unwrap_or(current_running_time_end);
.zip(state.current_running_time_end)
.map(|(cur_rt_end, state_rt_end)| cur_rt_end.max(state_rt_end))
.or(current_running_time_end);
// FIXME we should probably return if either current_running_time or current_running_time_end
// are None at this point
// Wake up everybody, we advanced a bit
// Important: They will only be able to advance once we're done with this
@ -407,10 +397,10 @@ impl ToggleRecord {
CAT,
obj: pad,
"Main stream current running time {}-{} (position: {}-{})",
current_running_time,
current_running_time_end,
current_running_time.display(),
current_running_time_end.display(),
dts_or_pts,
dts_or_pts_end
dts_or_pts_end,
);
let settings = *self.settings.lock();
@ -458,13 +448,17 @@ impl ToggleRecord {
// Remember the time when we stopped: now, i.e. right before the current buffer!
rec_state.last_recording_stop = current_running_time;
let last_recording_duration = rec_state
.last_recording_stop
.zip(rec_state.last_recording_start)
.and_then(|(stop, start)| stop.checked_sub(start));
gst_debug!(
CAT,
obj: pad,
"Stopping at {}, started at {}, current duration {}, previous accumulated recording duration {}",
rec_state.last_recording_stop,
rec_state.last_recording_start,
rec_state.last_recording_stop - rec_state.last_recording_start,
rec_state.last_recording_stop.display(),
rec_state.last_recording_start.display(),
last_recording_duration.display(),
rec_state.recording_duration
);
@ -477,8 +471,9 @@ impl ToggleRecord {
&& !self.other_streams.lock().0.iter().all(|s| {
let s = s.state.lock();
s.eos
|| (s.current_running_time.is_some()
&& s.current_running_time >= current_running_time)
|| s.current_running_time
.zip(current_running_time)
.map_or(false, |(s_cur_rt, cur_rt)| s_cur_rt >= cur_rt)
})
{
gst_log!(CAT, obj: pad, "Waiting for other streams to stop");
@ -492,17 +487,17 @@ impl ToggleRecord {
let mut rec_state = self.state.lock();
rec_state.recording_state = RecordingState::Stopped;
let advance_by = rec_state.last_recording_stop - rec_state.last_recording_start;
rec_state.recording_duration += advance_by;
rec_state.last_recording_start = gst::CLOCK_TIME_NONE;
rec_state.last_recording_stop = gst::CLOCK_TIME_NONE;
rec_state.recording_duration +=
last_recording_duration.unwrap_or(gst::ClockTime::ZERO);
rec_state.last_recording_start = None;
rec_state.last_recording_stop = None;
gst_debug!(
CAT,
obj: pad,
"Stopped at {}, recording duration {}",
current_running_time,
rec_state.recording_duration
current_running_time.display(),
rec_state.recording_duration.display(),
);
// Then become Stopped and drop this buffer. We always stop right before
@ -538,12 +533,17 @@ impl ToggleRecord {
// Remember the time when we started: now!
rec_state.last_recording_start = current_running_time;
rec_state.running_time_offset = current_running_time - rec_state.recording_duration;
rec_state.running_time_offset =
current_running_time.map_or(0, |current_running_time| {
current_running_time
.saturating_sub(rec_state.recording_duration)
.nseconds()
}) as i64;
gst_debug!(
CAT,
obj: pad,
"Starting at {}, previous accumulated recording duration {}",
current_running_time,
current_running_time.display(),
rec_state.recording_duration,
);
@ -564,8 +564,9 @@ impl ToggleRecord {
&& !self.other_streams.lock().0.iter().all(|s| {
let s = s.state.lock();
s.eos
|| (s.current_running_time.is_some()
&& s.current_running_time >= current_running_time)
|| s.current_running_time
.zip(current_running_time)
.map_or(false, |(s_cur_rt, cur_rt)| s_cur_rt >= cur_rt)
})
{
gst_log!(CAT, obj: pad, "Waiting for other streams to start");
@ -583,7 +584,7 @@ impl ToggleRecord {
CAT,
obj: pad,
"Started at {}, recording duration {}",
current_running_time,
current_running_time.display(),
rec_state.recording_duration
);
@ -608,16 +609,12 @@ impl ToggleRecord {
// Calculate end pts & current running time and make sure we stay in the segment
let mut state = stream.state.lock();
let mut pts = data.pts();
let duration = data.duration(&state);
if pts.is_none() {
let mut pts = data.pts().ok_or_else(|| {
gst::element_error!(element, gst::StreamError::Format, ["Buffer without PTS"]);
return Err(gst::FlowError::Error);
}
gst::FlowError::Error
})?;
let dts = data.dts();
if dts.is_some() && pts.is_some() && dts != pts {
if data.dts().map_or(false, |dts| dts != pts) {
gst::element_error!(
element,
gst::StreamError::Format,
@ -635,11 +632,7 @@ impl ToggleRecord {
return Err(gst::FlowError::Error);
}
let mut pts_end = if duration.is_some() {
pts + duration
} else {
pts
};
let mut pts_end = pts + data.duration(&state).unwrap_or(gst::ClockTime::ZERO);
let data = match data.clip(&state, &state.in_segment) {
None => {
@ -650,28 +643,31 @@ impl ToggleRecord {
};
// This will only do anything for non-raw data
pts = state.in_segment.start().max(pts).unwrap();
pts_end = state.in_segment.start().max(pts_end).unwrap();
if state.in_segment.stop().is_some() {
pts = state.in_segment.stop().min(pts).unwrap();
pts_end = state.in_segment.stop().min(pts_end).unwrap();
// FIXME comment why we can unwrap
pts = state.in_segment.start().unwrap().max(pts);
pts_end = state.in_segment.start().unwrap().max(pts_end);
if let Some(stop) = state.in_segment.stop() {
pts = stop.min(pts);
pts_end = stop.min(pts_end);
}
let current_running_time = state.in_segment.to_running_time(pts);
let current_running_time_end = state.in_segment.to_running_time(pts_end);
state.current_running_time = current_running_time
.max(state.current_running_time)
.unwrap_or(current_running_time);
.zip(state.current_running_time)
.map(|(cur_rt, state_rt)| cur_rt.max(state_rt))
.or(current_running_time);
state.current_running_time_end = current_running_time_end
.max(state.current_running_time_end)
.unwrap_or(current_running_time_end);
.zip(state.current_running_time_end)
.map(|(cur_rt_end, state_rt_end)| cur_rt_end.max(state_rt_end))
.or(current_running_time_end);
gst_log!(
CAT,
obj: pad,
"Secondary stream current running time {}-{} (position: {}-{}",
current_running_time,
current_running_time_end,
current_running_time.display(),
current_running_time_end.display(),
pts,
pts_end
);
@ -697,13 +693,20 @@ impl ToggleRecord {
while (main_state.current_running_time.is_none()
|| rec_state.recording_state != RecordingState::Starting
&& rec_state.recording_state != RecordingState::Stopping
&& main_state.current_running_time_end < current_running_time_end
&& main_state
.current_running_time_end
.zip(current_running_time_end)
.map_or(false, |(main_rt_end, cur_rt_end)| main_rt_end < cur_rt_end)
|| rec_state.recording_state == RecordingState::Starting
&& (rec_state.last_recording_start.is_none()
|| rec_state.last_recording_start <= current_running_time)
&& rec_state
.last_recording_start
.map_or(true, |last_rec_start| {
current_running_time.map_or(false, |cur_rt| last_rec_start <= cur_rt)
})
|| rec_state.recording_state == RecordingState::Stopping
&& (rec_state.last_recording_stop.is_none()
|| rec_state.last_recording_stop <= current_running_time))
&& rec_state.last_recording_stop.map_or(true, |last_rec_stop| {
current_running_time.map_or(false, |cur_rt| last_rec_stop <= cur_rt)
}))
&& !main_state.eos
&& !stream.state.lock().flushing
{
@ -711,11 +714,11 @@ impl ToggleRecord {
CAT,
obj: pad,
"Waiting at {}-{} in {:?} state, main stream at {}-{}",
current_running_time,
current_running_time_end,
current_running_time.display(),
current_running_time_end.display(),
rec_state.recording_state,
main_state.current_running_time,
main_state.current_running_time_end
main_state.current_running_time.display(),
main_state.current_running_time_end.display(),
);
drop(rec_state);
@ -742,9 +745,17 @@ impl ToggleRecord {
&mut state,
&mut rec_state,
)));
} else if data.can_clip(&*state)
&& current_running_time < rec_state.last_recording_start
&& current_running_time_end > rec_state.last_recording_start
}
let last_recording_start = rec_state.last_recording_start.expect("recording started");
// FIXME it would help a lot if we could expect current_running_time
// and possibly current_running_time_end at some point.
if data.can_clip(&*state)
&& current_running_time.map_or(false, |cur_rt| cur_rt < last_recording_start)
&& current_running_time_end
.map_or(false, |cur_rt_end| cur_rt_end > last_recording_start)
{
// Otherwise if we're before the recording start but the end of the buffer is after
// the start and we can clip, clip the buffer and pass it onwards.
@ -752,9 +763,9 @@ impl ToggleRecord {
CAT,
obj: pad,
"Main stream EOS and we're not EOS yet (overlapping recording start, {} < {} < {})",
current_running_time,
rec_state.last_recording_start,
current_running_time_end
current_running_time.display(),
last_recording_start,
current_running_time_end.display(),
);
let mut clip_start = state
@ -773,7 +784,7 @@ impl ToggleRecord {
segment.set_start(clip_start);
segment.set_stop(clip_stop);
gst_log!(CAT, obj: pad, "Clipping to segment {:?}", segment,);
gst_log!(CAT, obj: pad, "Clipping to segment {:?}", segment);
if let Some(data) = data.clip(&*state, &segment) {
return Ok(HandleResult::Pass(data));
@ -781,7 +792,7 @@ impl ToggleRecord {
gst_warning!(CAT, obj: pad, "Complete buffer clipped!");
return Ok(HandleResult::Drop);
}
} else if current_running_time < rec_state.last_recording_start {
} else if current_running_time.map_or(false, |cur_rt| cur_rt < last_recording_start) {
// Otherwise if the buffer starts before the recording start, drop it. This
// means that we either can't clip, or that the end is also before the
// recording start
@ -789,13 +800,19 @@ impl ToggleRecord {
CAT,
obj: pad,
"Main stream EOS and we're not EOS yet (before recording start, {} < {})",
current_running_time,
rec_state.last_recording_start
current_running_time.display(),
last_recording_start,
);
return Ok(HandleResult::Drop);
} else if data.can_clip(&*state)
&& current_running_time < rec_state.last_recording_stop
&& current_running_time_end > rec_state.last_recording_stop
&& current_running_time
.zip(rec_state.last_recording_stop)
.map_or(false, |(cur_rt, last_rec_stop)| cur_rt < last_rec_stop)
&& current_running_time_end
.zip(rec_state.last_recording_stop)
.map_or(false, |(cur_rt_end, last_rec_stop)| {
cur_rt_end > last_rec_stop
})
{
// Similarly if the end is after the recording stop but the start is before and we
// can clip, clip the buffer and pass it through.
@ -803,9 +820,9 @@ impl ToggleRecord {
CAT,
obj: pad,
"Main stream EOS and we're not EOS yet (overlapping recording end, {} < {} < {})",
current_running_time,
rec_state.last_recording_stop,
current_running_time_end
current_running_time.display(),
rec_state.last_recording_stop.display(),
current_running_time_end.display(),
);
let mut clip_start = state
@ -837,7 +854,12 @@ impl ToggleRecord {
&mut rec_state,
)));
}
} else if current_running_time_end > rec_state.last_recording_stop {
} else if current_running_time_end
.zip(rec_state.last_recording_stop)
.map_or(false, |(cur_rt_end, last_rec_stop)| {
cur_rt_end > last_rec_stop
})
{
// Otherwise if the end of the buffer is after the recording stop, we're EOS
// now. This means that we either couldn't clip or that the start is also after
// the recording stop
@ -845,8 +867,8 @@ impl ToggleRecord {
CAT,
obj: pad,
"Main stream EOS and we're EOS too (after recording end, {} > {})",
current_running_time_end,
rec_state.last_recording_stop
current_running_time_end.display(),
rec_state.last_recording_stop.display(),
);
return Ok(HandleResult::Eos(self.check_and_update_eos(
pad,
@ -857,16 +879,19 @@ impl ToggleRecord {
} else {
// In all other cases the buffer is fully between recording start and end and
// can be passed through as is
assert_ge!(current_running_time, rec_state.last_recording_start);
assert_le!(current_running_time_end, rec_state.last_recording_stop);
assert!(current_running_time.map_or(false, |cur_rt| cur_rt >= last_recording_start));
assert!(current_running_time_end
.zip(rec_state.last_recording_stop)
.map_or(false, |(cur_rt_end, last_rec_stop)| cur_rt_end
<= last_rec_stop));
gst_debug!(
CAT,
obj: pad,
"Main stream EOS and we're not EOS yet (before recording end, {} <= {} <= {})",
rec_state.last_recording_start,
current_running_time,
rec_state.last_recording_stop
last_recording_start,
current_running_time.display(),
rec_state.last_recording_stop.display(),
);
return Ok(HandleResult::Pass(data));
}
@ -876,51 +901,61 @@ impl ToggleRecord {
RecordingState::Recording => {
// The end of our buffer must be before/at the end of the previous buffer of the main
// stream
assert_le!(
current_running_time_end,
main_state.current_running_time_end
);
assert!(current_running_time_end
.zip(main_state.current_running_time_end)
.map_or(false, |(cur_rt_end, main_cur_rt_end)| cur_rt_end
<= main_cur_rt_end));
// We're properly started, must have a start position and
// be actually after that start position
assert!(rec_state.last_recording_start.is_some());
assert_ge!(current_running_time, rec_state.last_recording_start);
assert!(current_running_time
.zip(rec_state.last_recording_start)
.map_or(false, |(cur_rt, last_rec_start)| cur_rt >= last_rec_start));
gst_log!(CAT, obj: pad, "Passing buffer (recording)");
Ok(HandleResult::Pass(data))
}
RecordingState::Stopping => {
// If we have no start position yet, the main stream is waiting for a key-frame
let last_recording_stop = match rec_state.last_recording_stop {
Some(last_recording_stop) => last_recording_stop,
None => {
gst_log!(
CAT,
obj: pad,
"Passing buffer (stopping: waiting for keyframe)",
);
return Ok(HandleResult::Pass(data));
}
};
// The start of our buffer must be before the last recording stop as
// otherwise we would be in Stopped state already
assert_lt!(current_running_time, rec_state.last_recording_stop);
assert!(current_running_time.map_or(false, |cur_rt| cur_rt < last_recording_stop));
let current_running_time = current_running_time.expect("checked above");
// If we have no start position yet, the main stream is waiting for a key-frame
if rec_state.last_recording_stop.is_none() {
gst_log!(
CAT,
obj: pad,
"Passing buffer (stopping: waiting for keyframe)",
);
Ok(HandleResult::Pass(data))
} else if current_running_time_end <= rec_state.last_recording_stop {
if current_running_time_end
.map_or(false, |cur_rt_end| cur_rt_end <= last_recording_stop)
{
gst_log!(
CAT,
obj: pad,
"Passing buffer (stopping: {} <= {})",
current_running_time_end,
rec_state.last_recording_stop
current_running_time_end.display(),
last_recording_stop,
);
Ok(HandleResult::Pass(data))
} else if data.can_clip(&*state)
&& current_running_time < rec_state.last_recording_stop
&& current_running_time_end > rec_state.last_recording_stop
&& current_running_time < last_recording_stop
&& current_running_time_end
.map_or(false, |cur_rt_end| cur_rt_end > last_recording_stop)
{
gst_log!(
CAT,
obj: pad,
"Passing buffer (stopping: {} < {} < {})",
current_running_time,
rec_state.last_recording_stop,
current_running_time_end,
last_recording_stop,
current_running_time_end.display(),
);
let mut clip_stop = state
@ -945,8 +980,8 @@ impl ToggleRecord {
CAT,
obj: pad,
"Dropping buffer (stopping: {} > {})",
current_running_time_end,
rec_state.last_recording_stop
current_running_time_end.display(),
rec_state.last_recording_stop.display(),
);
Ok(HandleResult::Drop)
}
@ -954,10 +989,10 @@ impl ToggleRecord {
RecordingState::Stopped => {
// The end of our buffer must be before/at the end of the previous buffer of the main
// stream
assert_le!(
current_running_time_end,
main_state.current_running_time_end
);
assert!(current_running_time_end
.zip(main_state.current_running_time_end)
.map_or(false, |(cur_rt_end, state_rt_end)| cur_rt_end
<= state_rt_end));
// We're properly stopped
gst_log!(CAT, obj: pad, "Dropping buffer (stopped)");
@ -965,38 +1000,44 @@ impl ToggleRecord {
}
RecordingState::Starting => {
// If we have no start position yet, the main stream is waiting for a key-frame
if rec_state.last_recording_start.is_none() {
gst_log!(
CAT,
obj: pad,
"Dropping buffer (starting: waiting for keyframe)",
);
return Ok(HandleResult::Drop);
}
let last_recording_start = match rec_state.last_recording_start {
Some(last_recording_start) => last_recording_start,
None => {
gst_log!(
CAT,
obj: pad,
"Dropping buffer (starting: waiting for keyframe)",
);
return Ok(HandleResult::Drop);
}
};
// The start of our buffer must be before the last recording start as
// otherwise we would be in Recording state already
assert_lt!(current_running_time, rec_state.last_recording_start);
if current_running_time >= rec_state.last_recording_start {
assert!(current_running_time.map_or(false, |cur_rt| cur_rt < last_recording_start));
let current_running_time = current_running_time.expect("checked_above");
if current_running_time >= last_recording_start {
gst_log!(
CAT,
obj: pad,
"Passing buffer (starting: {} >= {})",
current_running_time,
rec_state.last_recording_start
last_recording_start,
);
Ok(HandleResult::Pass(data))
} else if data.can_clip(&*state)
&& current_running_time < rec_state.last_recording_start
&& current_running_time_end > rec_state.last_recording_start
&& current_running_time < last_recording_start
&& current_running_time_end
.map_or(false, |cur_rt_end| cur_rt_end > last_recording_start)
{
gst_log!(
CAT,
obj: pad,
"Passing buffer (starting: {} < {} < {})",
current_running_time,
rec_state.last_recording_start,
current_running_time_end,
last_recording_start,
current_running_time_end.display(),
);
let mut clip_start = state
@ -1008,7 +1049,7 @@ impl ToggleRecord {
let mut segment = state.in_segment.clone();
segment.set_start(clip_start);
gst_log!(CAT, obj: pad, "Clipping to segment {:?}", segment,);
gst_log!(CAT, obj: pad, "Clipping to segment {:?}", segment);
if let Some(data) = data.clip(&*state, &segment) {
Ok(HandleResult::Pass(data))
@ -1022,7 +1063,7 @@ impl ToggleRecord {
obj: pad,
"Dropping buffer (starting: {} < {})",
current_running_time,
rec_state.last_recording_start
last_recording_start,
);
Ok(HandleResult::Drop)
}
@ -1151,10 +1192,9 @@ impl ToggleRecord {
// recording_duration
state.out_segment = state.in_segment.clone();
let offset = rec_state.running_time_offset.unwrap_or(0);
state
.out_segment
.offset_running_time(-(offset as i64))
.offset_running_time(-rec_state.running_time_offset)
.expect("Adjusting record duration");
events.push(
gst::event::Segment::builder(&state.out_segment)
@ -1187,8 +1227,8 @@ impl ToggleRecord {
CAT,
obj: pad,
"Pushing buffer with running time {}: {:?}",
out_running_time,
buffer
out_running_time.display(),
buffer,
);
stream.srcpad.push(buffer)
}
@ -1238,8 +1278,8 @@ impl ToggleRecord {
state.flushing = false;
state.segment_pending = true;
state.discont_pending = true;
state.current_running_time = gst::CLOCK_TIME_NONE;
state.current_running_time_end = gst::CLOCK_TIME_NONE;
state.current_running_time = None;
state.current_running_time_end = None;
}
EventView::Caps(c) => {
let mut state = stream.state.lock();
@ -1288,8 +1328,8 @@ impl ToggleRecord {
state.in_segment = segment;
state.segment_seqnum = event.seqnum();
state.segment_pending = true;
state.current_running_time = gst::CLOCK_TIME_NONE;
state.current_running_time_end = gst::CLOCK_TIME_NONE;
state.current_running_time = None;
state.current_running_time_end = None;
gst_debug!(CAT, obj: pad, "Got new Segment {:?}", state.in_segment);
@ -1305,14 +1345,19 @@ impl ToggleRecord {
};
forward = match handle_result {
Ok(HandleResult::Pass((new_pts, new_duration))) if new_pts.is_some() => {
if new_pts != pts || new_duration != duration {
event = gst::event::Gap::new(new_pts, new_duration);
Ok(HandleResult::Pass((new_pts, new_duration))) => {
if new_pts != pts
|| new_duration
.zip(duration)
.map_or(false, |(new_duration, duration)| new_duration != duration)
{
event = gst::event::Gap::builder(new_pts)
.duration(new_duration)
.build();
}
true
}
Ok(_) => false,
Err(_) => false,
_ => false,
};
}
EventView::Eos(..) => {
@ -1449,11 +1494,10 @@ impl ToggleRecord {
let forward = !matches!(event.view(), EventView::Seek(..));
let rec_state = self.state.lock();
let running_time_offset = rec_state.running_time_offset.unwrap_or(0) as i64;
let offset = event.running_time_offset();
event
.make_mut()
.set_running_time_offset(offset + running_time_offset);
.set_running_time_offset(offset + rec_state.running_time_offset);
drop(rec_state);
if forward {
@ -1530,20 +1574,26 @@ impl ToggleRecord {
if rec_state.recording_state == RecordingState::Recording
|| rec_state.recording_state == RecordingState::Stopping
{
gst_debug!(
CAT,
obj: pad,
"Returning position {} = {} - ({} + {})",
recording_duration
+ (state.current_running_time_end - rec_state.last_recording_start),
recording_duration,
state.current_running_time_end,
rec_state.last_recording_start
);
recording_duration +=
state.current_running_time_end - rec_state.last_recording_start;
if let Some(delta) = state
.current_running_time_end
.zip(rec_state.last_recording_start)
.and_then(|(cur_rt_end, last_rec_start)| {
cur_rt_end.checked_sub(last_rec_start)
})
{
gst_debug!(
CAT,
obj: pad,
"Returning position {} = {} - ({} + {})",
recording_duration + delta,
recording_duration,
state.current_running_time_end.display(),
rec_state.last_recording_start.display(),
);
recording_duration += delta;
}
} else {
gst_debug!(CAT, obj: pad, "Returning position {}", recording_duration,);
gst_debug!(CAT, obj: pad, "Returning position {}", recording_duration);
}
q.set(recording_duration);
true
@ -1559,20 +1609,26 @@ impl ToggleRecord {
if rec_state.recording_state == RecordingState::Recording
|| rec_state.recording_state == RecordingState::Stopping
{
gst_debug!(
CAT,
obj: pad,
"Returning duration {} = {} - ({} + {})",
recording_duration
+ (state.current_running_time_end - rec_state.last_recording_start),
recording_duration,
state.current_running_time_end,
rec_state.last_recording_start
);
recording_duration +=
state.current_running_time_end - rec_state.last_recording_start;
if let Some(delta) = state
.current_running_time_end
.zip(rec_state.last_recording_start)
.and_then(|(cur_rt_end, last_rec_start)| {
cur_rt_end.checked_sub(last_rec_start)
})
{
gst_debug!(
CAT,
obj: pad,
"Returning duration {} = {} - ({} + {})",
recording_duration + delta,
recording_duration,
state.current_running_time_end.display(),
rec_state.last_recording_start.display(),
);
recording_duration += delta;
}
} else {
gst_debug!(CAT, obj: pad, "Returning duration {}", recording_duration,);
gst_debug!(CAT, obj: pad, "Returning duration {}", recording_duration);
}
q.set(recording_duration);
true

View file

@ -153,8 +153,8 @@ fn setup_sender_receiver(
let mut buffer = buffer.clone();
{
let buffer = buffer.make_mut();
buffer.set_pts(offset + i * 20 * gst::MSECOND);
buffer.set_duration(20 * gst::MSECOND);
buffer.set_pts(offset + i * 20 * gst::ClockTime::MSECOND);
buffer.set_duration(20 * gst::ClockTime::MSECOND);
}
let _ = sinkpad.chain(buffer);
i += 1;
@ -166,8 +166,11 @@ fn setup_sender_receiver(
buffer
.get_mut()
.unwrap()
.set_pts(offset + i * 20 * gst::MSECOND);
buffer.get_mut().unwrap().set_duration(20 * gst::MSECOND);
.set_pts(offset + i * 20 * gst::ClockTime::MSECOND);
buffer
.get_mut()
.unwrap()
.set_duration(20 * gst::ClockTime::MSECOND);
buffer
.get_mut()
.unwrap()
@ -178,8 +181,10 @@ fn setup_sender_receiver(
}
SendData::Gaps(n) => {
for _ in 0..n {
let event =
gst::event::Gap::new(offset + i * 20 * gst::MSECOND, 20 * gst::MSECOND);
let event = gst::event::Gap::new(
offset + i * 20 * gst::ClockTime::MSECOND,
20 * gst::ClockTime::MSECOND,
);
let _ = sinkpad.send_event(event);
i += 1;
}
@ -204,7 +209,14 @@ fn recv_buffers(
receiver_output: &mpsc::Receiver<Either<gst::Buffer, gst::Event>>,
segment: &mut gst::FormattedSegment<gst::ClockTime>,
wait_buffers: usize,
) -> (Vec<(gst::ClockTime, gst::ClockTime, gst::ClockTime)>, bool) {
) -> (
Vec<(
Option<gst::ClockTime>,
Option<gst::ClockTime>,
Option<gst::ClockTime>,
)>,
bool,
) {
let mut res = Vec::new();
let mut n_buffers = 0;
let mut saw_eos = false;
@ -228,7 +240,7 @@ fn recv_buffers(
EventView::Gap(ref e) => {
let (ts, duration) = e.get();
res.push((segment.to_running_time(ts), ts, duration));
res.push((segment.to_running_time(ts), Some(ts), duration));
n_buffers += 1;
if wait_buffers > 0 && n_buffers == wait_buffers {
return (res, saw_eos);
@ -281,7 +293,7 @@ fn test_one_stream_open() {
pipeline.add(&togglerecord).unwrap();
let (sender_input, _, receiver_output, thread) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
pipeline.set_state(gst::State::Playing).unwrap();
@ -294,9 +306,9 @@ fn test_one_stream_open() {
assert_eq!(buffers.len(), 10);
for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
thread.join().unwrap();
@ -313,7 +325,7 @@ fn test_one_stream_gaps_open() {
pipeline.add(&togglerecord).unwrap();
let (sender_input, _, receiver_output, thread) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
pipeline.set_state(gst::State::Playing).unwrap();
@ -327,9 +339,9 @@ fn test_one_stream_gaps_open() {
assert_eq!(buffers.len(), 10);
for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
thread.join().unwrap();
@ -346,7 +358,7 @@ fn test_one_stream_close_open() {
pipeline.add(&togglerecord).unwrap();
let (sender_input, receiver_input_done, receiver_output, thread) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
pipeline.set_state(gst::State::Playing).unwrap();
@ -361,9 +373,9 @@ fn test_one_stream_close_open() {
assert_eq!(buffers.len(), 10);
for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, (10 + index) * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), (10 + index) * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
thread.join().unwrap();
@ -380,7 +392,7 @@ fn test_one_stream_open_close() {
pipeline.add(&togglerecord).unwrap();
let (sender_input, receiver_input_done, receiver_output, thread) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
pipeline.set_state(gst::State::Playing).unwrap();
@ -396,9 +408,9 @@ fn test_one_stream_open_close() {
assert_eq!(buffers.len(), 10);
for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
thread.join().unwrap();
@ -415,7 +427,7 @@ fn test_one_stream_open_close_open() {
pipeline.add(&togglerecord).unwrap();
let (sender_input, receiver_input_done, receiver_output, thread) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
pipeline.set_state(gst::State::Playing).unwrap();
@ -434,15 +446,15 @@ fn test_one_stream_open_close_open() {
assert_eq!(buffers.len(), 20);
for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
10 * 20 * gst::ClockTime::MSECOND
} else {
0.into()
gst::ClockTime::ZERO
};
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), pts_off + index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
thread.join().unwrap();
@ -459,9 +471,9 @@ fn test_two_stream_open() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
pipeline.set_state(gst::State::Playing).unwrap();
@ -480,9 +492,9 @@ fn test_two_stream_open() {
let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_1.len(), 10);
@ -491,9 +503,9 @@ fn test_two_stream_open() {
let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_2.len(), 10);
@ -512,9 +524,14 @@ fn test_two_stream_open_shift() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 5 * gst::MSECOND);
setup_sender_receiver(
&pipeline,
&togglerecord,
"src_%u",
5 * gst::ClockTime::MSECOND,
);
pipeline.set_state(gst::State::Playing).unwrap();
@ -533,9 +550,9 @@ fn test_two_stream_open_shift() {
let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_1.len(), 10);
@ -544,12 +561,18 @@ fn test_two_stream_open_shift() {
let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, 5 * gst::MSECOND + index * 20 * gst::MSECOND);
assert_eq!(pts, 5 * gst::MSECOND + index * 20 * gst::MSECOND);
assert_eq!(
running_time.unwrap(),
5 * gst::ClockTime::MSECOND + index * 20 * gst::ClockTime::MSECOND
);
assert_eq!(
pts.unwrap(),
5 * gst::ClockTime::MSECOND + index * 20 * gst::ClockTime::MSECOND
);
if index == 9 {
assert_eq!(duration, 15 * gst::MSECOND);
assert_eq!(duration.unwrap(), 15 * gst::ClockTime::MSECOND);
} else {
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
}
assert_eq!(buffers_2.len(), 10);
@ -569,9 +592,9 @@ fn test_two_stream_open_shift_main() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 5 * gst::MSECOND);
setup_sender_receiver(&pipeline, &togglerecord, "src", 5 * gst::ClockTime::MSECOND);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
pipeline.set_state(gst::State::Playing).unwrap();
@ -591,9 +614,12 @@ fn test_two_stream_open_shift_main() {
let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, 5 * gst::MSECOND + index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(
pts.unwrap(),
5 * gst::ClockTime::MSECOND + index * 20 * gst::ClockTime::MSECOND
);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_1.len(), 10);
@ -604,17 +630,26 @@ fn test_two_stream_open_shift_main() {
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
if index == 0 {
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, 5 * gst::MSECOND + index * 20 * gst::MSECOND);
assert_eq!(duration, 15 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(
pts.unwrap(),
5 * gst::ClockTime::MSECOND + index * 20 * gst::ClockTime::MSECOND
);
assert_eq!(duration.unwrap(), 15 * gst::ClockTime::MSECOND);
} else if index == 10 {
assert_eq!(running_time, index * 20 * gst::MSECOND - 5 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 5 * gst::MSECOND);
assert_eq!(
running_time.unwrap(),
index * 20 * gst::ClockTime::MSECOND - 5 * gst::ClockTime::MSECOND
);
assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 5 * gst::ClockTime::MSECOND);
} else {
assert_eq!(running_time, index * 20 * gst::MSECOND - 5 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(
running_time.unwrap(),
index * 20 * gst::ClockTime::MSECOND - 5 * gst::ClockTime::MSECOND
);
assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
}
assert_eq!(buffers_2.len(), 11);
@ -634,9 +669,9 @@ fn test_two_stream_open_close() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
pipeline.set_state(gst::State::Playing).unwrap();
@ -671,9 +706,9 @@ fn test_two_stream_open_close() {
let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_1.len(), 10);
@ -682,9 +717,9 @@ fn test_two_stream_open_close() {
let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_2.len(), 10);
@ -703,9 +738,9 @@ fn test_two_stream_close_open() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
pipeline.set_state(gst::State::Playing).unwrap();
@ -740,9 +775,9 @@ fn test_two_stream_close_open() {
let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, (10 + index) * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), (10 + index) * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_1.len(), 10);
@ -751,9 +786,9 @@ fn test_two_stream_close_open() {
let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, (10 + index) * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), (10 + index) * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_2.len(), 10);
@ -772,9 +807,9 @@ fn test_two_stream_open_close_open() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
pipeline.set_state(gst::State::Playing).unwrap();
@ -822,15 +857,15 @@ fn test_two_stream_open_close_open() {
let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
10 * 20 * gst::ClockTime::MSECOND
} else {
0.into()
gst::ClockTime::ZERO
};
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), pts_off + index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_1.len(), 20);
@ -839,15 +874,15 @@ fn test_two_stream_open_close_open() {
let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
10 * 20 * gst::ClockTime::MSECOND
} else {
0.into()
gst::ClockTime::ZERO
};
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), pts_off + index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_2.len(), 20);
@ -866,9 +901,9 @@ fn test_two_stream_open_close_open_gaps() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
pipeline.set_state(gst::State::Playing).unwrap();
@ -922,15 +957,15 @@ fn test_two_stream_open_close_open_gaps() {
let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
10 * 20 * gst::ClockTime::MSECOND
} else {
0.into()
gst::ClockTime::ZERO
};
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), pts_off + index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_1.len(), 20);
@ -939,15 +974,15 @@ fn test_two_stream_open_close_open_gaps() {
let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
10 * 20 * gst::ClockTime::MSECOND
} else {
0.into()
gst::ClockTime::ZERO
};
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), pts_off + index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_2.len(), 20);
@ -966,9 +1001,9 @@ fn test_two_stream_close_open_close_delta() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
pipeline.set_state(gst::State::Playing).unwrap();
@ -1023,9 +1058,9 @@ fn test_two_stream_close_open_close_delta() {
let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, (11 + index) * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), (11 + index) * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_1.len(), 10);
@ -1034,9 +1069,9 @@ fn test_two_stream_close_open_close_delta() {
let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, (11 + index) * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), (11 + index) * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_2.len(), 10);
@ -1055,11 +1090,11 @@ fn test_three_stream_open_close_open() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
pipeline.set_state(gst::State::Playing).unwrap();
@ -1118,15 +1153,15 @@ fn test_three_stream_open_close_open() {
let (buffers_1, _) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
10 * 20 * gst::ClockTime::MSECOND
} else {
0.into()
gst::ClockTime::ZERO
};
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), pts_off + index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_1.len(), 20);
@ -1135,15 +1170,15 @@ fn test_three_stream_open_close_open() {
let (buffers_2, _) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
10 * 20 * gst::ClockTime::MSECOND
} else {
0.into()
gst::ClockTime::ZERO
};
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), pts_off + index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_2.len(), 20);
@ -1151,15 +1186,15 @@ fn test_three_stream_open_close_open() {
let (buffers_3, _) = recv_buffers(&receiver_output_3, &mut segment_3, 0);
for (index, &(running_time, pts, duration)) in buffers_3.iter().enumerate() {
let pts_off = if index >= 10 {
10 * 20 * gst::MSECOND
10 * 20 * gst::ClockTime::MSECOND
} else {
0.into()
gst::ClockTime::ZERO
};
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), pts_off + index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_3.len(), 20);
@ -1179,9 +1214,9 @@ fn test_two_stream_main_eos() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
pipeline.set_state(gst::State::Playing).unwrap();
@ -1228,9 +1263,9 @@ fn test_two_stream_main_eos() {
let (buffers_1, saw_eos) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_1.len(), 10);
assert_eq!(saw_eos, true);
@ -1240,9 +1275,9 @@ fn test_two_stream_main_eos() {
let (buffers_2, saw_eos) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_2.len(), 10);
assert_eq!(saw_eos, true);
@ -1262,9 +1297,9 @@ fn test_two_stream_secondary_eos_first() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
pipeline.set_state(gst::State::Playing).unwrap();
@ -1303,9 +1338,9 @@ fn test_two_stream_secondary_eos_first() {
let (buffers_1, saw_eos) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_1.len(), 10);
assert_eq!(saw_eos, true);
@ -1316,9 +1351,9 @@ fn test_two_stream_secondary_eos_first() {
let (buffers_2, saw_eos) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_2.len(), 9);
assert_eq!(saw_eos, true);
@ -1338,11 +1373,11 @@ fn test_three_stream_main_eos() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
pipeline.set_state(gst::State::Playing).unwrap();
@ -1405,9 +1440,9 @@ fn test_three_stream_main_eos() {
let (buffers_1, saw_eos) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_1.len(), 10);
assert_eq!(saw_eos, true);
@ -1417,9 +1452,9 @@ fn test_three_stream_main_eos() {
let (buffers_2, saw_eos) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_2.len(), 10);
assert_eq!(saw_eos, true);
@ -1428,9 +1463,9 @@ fn test_three_stream_main_eos() {
let (buffers_3, saw_eos) = recv_buffers(&receiver_output_3, &mut segment_3, 0);
for (index, &(running_time, pts, duration)) in buffers_3.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_3.len(), 10);
assert_eq!(saw_eos, true);
@ -1451,11 +1486,11 @@ fn test_three_stream_main_and_second_eos() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
pipeline.set_state(gst::State::Playing).unwrap();
@ -1517,9 +1552,9 @@ fn test_three_stream_main_and_second_eos() {
let (buffers_1, saw_eos) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_1.len(), 10);
assert_eq!(saw_eos, true);
@ -1529,9 +1564,9 @@ fn test_three_stream_main_and_second_eos() {
let (buffers_2, saw_eos) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_2.len(), 9);
assert_eq!(saw_eos, true);
@ -1541,9 +1576,9 @@ fn test_three_stream_main_and_second_eos() {
let (buffers_3, saw_eos) = recv_buffers(&receiver_output_3, &mut segment_3, 0);
for (index, &(running_time, pts, duration)) in buffers_3.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_3.len(), 10);
assert_eq!(saw_eos, true);
@ -1564,11 +1599,11 @@ fn test_three_stream_secondary_eos_first() {
pipeline.add(&togglerecord).unwrap();
let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src", gst::ClockTime::ZERO);
let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) =
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
setup_sender_receiver(&pipeline, &togglerecord, "src_%u", gst::ClockTime::ZERO);
pipeline.set_state(gst::State::Playing).unwrap();
@ -1612,9 +1647,9 @@ fn test_three_stream_secondary_eos_first() {
let (buffers_1, saw_eos) = recv_buffers(&receiver_output_1, &mut segment_1, 0);
for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_1.len(), 10);
assert_eq!(saw_eos, true);
@ -1624,9 +1659,9 @@ fn test_three_stream_secondary_eos_first() {
let (buffers_2, saw_eos) = recv_buffers(&receiver_output_2, &mut segment_2, 0);
for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_2.len(), 9);
assert_eq!(saw_eos, true);
@ -1635,9 +1670,9 @@ fn test_three_stream_secondary_eos_first() {
let (buffers_3, saw_eos) = recv_buffers(&receiver_output_3, &mut segment_3, 0);
for (index, &(running_time, pts, duration)) in buffers_3.iter().enumerate() {
let index = index as u64;
assert_eq!(running_time, index * 20 * gst::MSECOND);
assert_eq!(pts, index * 20 * gst::MSECOND);
assert_eq!(duration, 20 * gst::MSECOND);
assert_eq!(running_time.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(pts.unwrap(), index * 20 * gst::ClockTime::MSECOND);
assert_eq!(duration.unwrap(), 20 * gst::ClockTime::MSECOND);
}
assert_eq!(buffers_3.len(), 9);
assert_eq!(saw_eos, true);