From b021a8bf100bbd7bbb35c28a7fc9fff78afad28f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Sun, 15 Nov 2020 14:08:54 +0200 Subject: [PATCH] utils: Update for subclassing API changes --- .../src/base/subclass/aggregator.rs | 291 ++++++---- .../src/base/subclass/aggregator_pad.rs | 43 +- .../src/fallbacksrc/custom_source/imp.rs | 410 ++++++++++++++ .../src/fallbacksrc/custom_source/mod.rs | 38 ++ .../{fallbacksrc.rs => fallbacksrc/imp.rs} | 531 ++---------------- utils/fallbackswitch/src/fallbacksrc/mod.rs | 60 ++ .../imp.rs} | 97 ++-- .../fallbackswitch/src/fallbackswitch/mod.rs | 41 ++ .../{togglerecord.rs => togglerecord/imp.rs} | 67 ++- utils/togglerecord/src/togglerecord/mod.rs | 38 ++ 10 files changed, 906 insertions(+), 710 deletions(-) create mode 100644 utils/fallbackswitch/src/fallbacksrc/custom_source/imp.rs create mode 100644 utils/fallbackswitch/src/fallbacksrc/custom_source/mod.rs rename utils/fallbackswitch/src/{fallbacksrc.rs => fallbacksrc/imp.rs} (83%) create mode 100644 utils/fallbackswitch/src/fallbacksrc/mod.rs rename utils/fallbackswitch/src/{fallbackswitch.rs => fallbackswitch/imp.rs} (90%) create mode 100644 utils/fallbackswitch/src/fallbackswitch/mod.rs rename utils/togglerecord/src/{togglerecord.rs => togglerecord/imp.rs} (97%) create mode 100644 utils/togglerecord/src/togglerecord/mod.rs diff --git a/utils/fallbackswitch/src/base/subclass/aggregator.rs b/utils/fallbackswitch/src/base/subclass/aggregator.rs index f56218d3a..73ed8315b 100644 --- a/utils/fallbackswitch/src/base/subclass/aggregator.rs +++ b/utils/fallbackswitch/src/base/subclass/aggregator.rs @@ -8,6 +8,7 @@ use super::super::gst_base_sys; +use glib::prelude::*; use glib::subclass::prelude::*; use glib::translate::*; @@ -19,13 +20,13 @@ use super::super::Aggregator; use super::super::AggregatorPad; pub trait AggregatorImpl: AggregatorImplExt + ElementImpl { - fn flush(&self, aggregator: &Aggregator) -> Result { + fn flush(&self, aggregator: &Self::Type) -> Result { self.parent_flush(aggregator) } fn clip( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, aggregator_pad: &AggregatorPad, buffer: gst::Buffer, ) -> Option { @@ -34,7 +35,7 @@ pub trait AggregatorImpl: AggregatorImplExt + ElementImpl { fn finish_buffer( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, buffer: gst::Buffer, ) -> Result { self.parent_finish_buffer(aggregator, buffer) @@ -42,7 +43,7 @@ pub trait AggregatorImpl: AggregatorImplExt + ElementImpl { fn sink_event( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, aggregator_pad: &AggregatorPad, event: gst::Event, ) -> bool { @@ -51,7 +52,7 @@ pub trait AggregatorImpl: AggregatorImplExt + ElementImpl { fn sink_event_pre_queue( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, aggregator_pad: &AggregatorPad, event: gst::Event, ) -> Result { @@ -60,7 +61,7 @@ pub trait AggregatorImpl: AggregatorImplExt + ElementImpl { fn sink_query( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, aggregator_pad: &AggregatorPad, query: &mut gst::QueryRef, ) -> bool { @@ -69,24 +70,24 @@ pub trait AggregatorImpl: AggregatorImplExt + ElementImpl { fn sink_query_pre_queue( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, aggregator_pad: &AggregatorPad, query: &mut gst::QueryRef, ) -> bool { self.parent_sink_query_pre_queue(aggregator, aggregator_pad, query) } - fn src_event(&self, aggregator: &Aggregator, event: gst::Event) -> bool { + fn src_event(&self, aggregator: &Self::Type, event: gst::Event) -> bool { self.parent_src_event(aggregator, event) } - fn src_query(&self, aggregator: &Aggregator, query: &mut gst::QueryRef) -> bool { + fn src_query(&self, aggregator: &Self::Type, query: &mut gst::QueryRef) -> bool { self.parent_src_query(aggregator, query) } fn src_activate( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, mode: gst::PadMode, active: bool, ) -> Result<(), gst::LoggableError> { @@ -95,27 +96,27 @@ pub trait AggregatorImpl: AggregatorImplExt + ElementImpl { fn aggregate( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, timeout: bool, ) -> Result { self.parent_aggregate(aggregator, timeout) } - fn start(&self, aggregator: &Aggregator) -> Result<(), gst::ErrorMessage> { + fn start(&self, aggregator: &Self::Type) -> Result<(), gst::ErrorMessage> { self.parent_start(aggregator) } - fn stop(&self, aggregator: &Aggregator) -> Result<(), gst::ErrorMessage> { + fn stop(&self, aggregator: &Self::Type) -> Result<(), gst::ErrorMessage> { self.parent_stop(aggregator) } - fn get_next_time(&self, aggregator: &Aggregator) -> gst::ClockTime { + fn get_next_time(&self, aggregator: &Self::Type) -> gst::ClockTime { self.parent_get_next_time(aggregator) } fn create_new_pad( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, templ: &gst::PadTemplate, req_name: Option<&str>, caps: Option<&gst::Caps>, @@ -125,99 +126,99 @@ pub trait AggregatorImpl: AggregatorImplExt + ElementImpl { fn update_src_caps( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, caps: &gst::Caps, ) -> Result { self.parent_update_src_caps(aggregator, caps) } - fn fixate_src_caps(&self, aggregator: &Aggregator, caps: gst::Caps) -> gst::Caps { + fn fixate_src_caps(&self, aggregator: &Self::Type, caps: gst::Caps) -> gst::Caps { self.parent_fixate_src_caps(aggregator, caps) } fn negotiated_src_caps( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, caps: &gst::Caps, ) -> Result<(), gst::LoggableError> { self.parent_negotiated_src_caps(aggregator, caps) } - fn negotiate(&self, aggregator: &Aggregator) -> bool { + fn negotiate(&self, aggregator: &Self::Type) -> bool { self.parent_negotiate(aggregator) } } -pub trait AggregatorImplExt { - fn parent_flush(&self, aggregator: &Aggregator) -> Result; +pub trait AggregatorImplExt: ObjectSubclass { + fn parent_flush(&self, aggregator: &Self::Type) -> Result; fn parent_clip( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, aggregator_pad: &AggregatorPad, buffer: gst::Buffer, ) -> Option; fn parent_finish_buffer( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, buffer: gst::Buffer, ) -> Result; fn parent_sink_event( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, aggregator_pad: &AggregatorPad, event: gst::Event, ) -> bool; fn parent_sink_event_pre_queue( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, aggregator_pad: &AggregatorPad, event: gst::Event, ) -> Result; fn parent_sink_query( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, aggregator_pad: &AggregatorPad, query: &mut gst::QueryRef, ) -> bool; fn parent_sink_query_pre_queue( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, aggregator_pad: &AggregatorPad, query: &mut gst::QueryRef, ) -> bool; - fn parent_src_event(&self, aggregator: &Aggregator, event: gst::Event) -> bool; + fn parent_src_event(&self, aggregator: &Self::Type, event: gst::Event) -> bool; - fn parent_src_query(&self, aggregator: &Aggregator, query: &mut gst::QueryRef) -> bool; + fn parent_src_query(&self, aggregator: &Self::Type, query: &mut gst::QueryRef) -> bool; fn parent_src_activate( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, mode: gst::PadMode, active: bool, ) -> Result<(), gst::LoggableError>; fn parent_aggregate( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, timeout: bool, ) -> Result; - fn parent_start(&self, aggregator: &Aggregator) -> Result<(), gst::ErrorMessage>; + fn parent_start(&self, aggregator: &Self::Type) -> Result<(), gst::ErrorMessage>; - fn parent_stop(&self, aggregator: &Aggregator) -> Result<(), gst::ErrorMessage>; + fn parent_stop(&self, aggregator: &Self::Type) -> Result<(), gst::ErrorMessage>; - fn parent_get_next_time(&self, aggregator: &Aggregator) -> gst::ClockTime; + fn parent_get_next_time(&self, aggregator: &Self::Type) -> gst::ClockTime; fn parent_create_new_pad( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, templ: &gst::PadTemplate, req_name: Option<&str>, caps: Option<&gst::Caps>, @@ -225,30 +226,35 @@ pub trait AggregatorImplExt { fn parent_update_src_caps( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, caps: &gst::Caps, ) -> Result; - fn parent_fixate_src_caps(&self, aggregator: &Aggregator, caps: gst::Caps) -> gst::Caps; + fn parent_fixate_src_caps(&self, aggregator: &Self::Type, caps: gst::Caps) -> gst::Caps; fn parent_negotiated_src_caps( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, caps: &gst::Caps, ) -> Result<(), gst::LoggableError>; - fn parent_negotiate(&self, aggregator: &Aggregator) -> bool; + fn parent_negotiate(&self, aggregator: &Self::Type) -> bool; } impl AggregatorImplExt for T { - fn parent_flush(&self, aggregator: &Aggregator) -> Result { + fn parent_flush(&self, aggregator: &Self::Type) -> Result { unsafe { let data = T::type_data(); let parent_class = data.as_ref().get_parent_class() as *mut gst_base_sys::GstAggregatorClass; (*parent_class) .flush - .map(|f| from_glib(f(aggregator.to_glib_none().0))) + .map(|f| { + from_glib(f(aggregator + .unsafe_cast_ref::() + .to_glib_none() + .0)) + }) .unwrap_or(gst::FlowReturn::Ok) .into_result() } @@ -256,7 +262,7 @@ impl AggregatorImplExt for T { fn parent_clip( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, aggregator_pad: &AggregatorPad, buffer: gst::Buffer, ) -> Option { @@ -267,7 +273,7 @@ impl AggregatorImplExt for T { match (*parent_class).clip { None => Some(buffer), Some(ref func) => from_glib_full(func( - aggregator.to_glib_none().0, + aggregator.unsafe_cast_ref::().to_glib_none().0, aggregator_pad.to_glib_none().0, buffer.into_ptr(), )), @@ -277,7 +283,7 @@ impl AggregatorImplExt for T { fn parent_finish_buffer( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, buffer: gst::Buffer, ) -> Result { unsafe { @@ -287,14 +293,17 @@ impl AggregatorImplExt for T { let f = (*parent_class) .finish_buffer .expect("Missing parent function `finish_buffer`"); - gst::FlowReturn::from_glib(f(aggregator.to_glib_none().0, buffer.into_ptr())) - .into_result() + gst::FlowReturn::from_glib(f( + aggregator.unsafe_cast_ref::().to_glib_none().0, + buffer.into_ptr(), + )) + .into_result() } } fn parent_sink_event( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, aggregator_pad: &AggregatorPad, event: gst::Event, ) -> bool { @@ -306,7 +315,7 @@ impl AggregatorImplExt for T { .sink_event .expect("Missing parent function `sink_event`"); from_glib(f( - aggregator.to_glib_none().0, + aggregator.unsafe_cast_ref::().to_glib_none().0, aggregator_pad.to_glib_none().0, event.into_ptr(), )) @@ -315,7 +324,7 @@ impl AggregatorImplExt for T { fn parent_sink_event_pre_queue( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, aggregator_pad: &AggregatorPad, event: gst::Event, ) -> Result { @@ -327,7 +336,7 @@ impl AggregatorImplExt for T { .sink_event_pre_queue .expect("Missing parent function `sink_event_pre_queue`"); gst::FlowReturn::from_glib(f( - aggregator.to_glib_none().0, + aggregator.unsafe_cast_ref::().to_glib_none().0, aggregator_pad.to_glib_none().0, event.into_ptr(), )) @@ -337,7 +346,7 @@ impl AggregatorImplExt for T { fn parent_sink_query( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, aggregator_pad: &AggregatorPad, query: &mut gst::QueryRef, ) -> bool { @@ -349,7 +358,7 @@ impl AggregatorImplExt for T { .sink_query .expect("Missing parent function `sink_query`"); from_glib(f( - aggregator.to_glib_none().0, + aggregator.unsafe_cast_ref::().to_glib_none().0, aggregator_pad.to_glib_none().0, query.as_mut_ptr(), )) @@ -358,7 +367,7 @@ impl AggregatorImplExt for T { fn parent_sink_query_pre_queue( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, aggregator_pad: &AggregatorPad, query: &mut gst::QueryRef, ) -> bool { @@ -370,14 +379,14 @@ impl AggregatorImplExt for T { .sink_query_pre_queue .expect("Missing parent function `sink_query`"); from_glib(f( - aggregator.to_glib_none().0, + aggregator.unsafe_cast_ref::().to_glib_none().0, aggregator_pad.to_glib_none().0, query.as_mut_ptr(), )) } } - fn parent_src_event(&self, aggregator: &Aggregator, event: gst::Event) -> bool { + fn parent_src_event(&self, aggregator: &Self::Type, event: gst::Event) -> bool { unsafe { let data = T::type_data(); let parent_class = @@ -385,11 +394,14 @@ impl AggregatorImplExt for T { let f = (*parent_class) .src_event .expect("Missing parent function `src_event`"); - from_glib(f(aggregator.to_glib_none().0, event.into_ptr())) + from_glib(f( + aggregator.unsafe_cast_ref::().to_glib_none().0, + event.into_ptr(), + )) } } - fn parent_src_query(&self, aggregator: &Aggregator, query: &mut gst::QueryRef) -> bool { + fn parent_src_query(&self, aggregator: &Self::Type, query: &mut gst::QueryRef) -> bool { unsafe { let data = T::type_data(); let parent_class = @@ -397,13 +409,16 @@ impl AggregatorImplExt for T { let f = (*parent_class) .src_query .expect("Missing parent function `src_query`"); - from_glib(f(aggregator.to_glib_none().0, query.as_mut_ptr())) + from_glib(f( + aggregator.unsafe_cast_ref::().to_glib_none().0, + query.as_mut_ptr(), + )) } } fn parent_src_activate( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, mode: gst::PadMode, active: bool, ) -> Result<(), gst::LoggableError> { @@ -415,7 +430,7 @@ impl AggregatorImplExt for T { None => Ok(()), Some(f) => gst_result_from_gboolean!( f( - aggregator.to_glib_none().0, + aggregator.unsafe_cast_ref::().to_glib_none().0, mode.to_glib(), active.to_glib() ), @@ -428,7 +443,7 @@ impl AggregatorImplExt for T { fn parent_aggregate( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, timeout: bool, ) -> Result { unsafe { @@ -438,12 +453,15 @@ impl AggregatorImplExt for T { let f = (*parent_class) .aggregate .expect("Missing parent function `aggregate`"); - gst::FlowReturn::from_glib(f(aggregator.to_glib_none().0, timeout.to_glib())) - .into_result() + gst::FlowReturn::from_glib(f( + aggregator.unsafe_cast_ref::().to_glib_none().0, + timeout.to_glib(), + )) + .into_result() } } - fn parent_start(&self, aggregator: &Aggregator) -> Result<(), gst::ErrorMessage> { + fn parent_start(&self, aggregator: &Self::Type) -> Result<(), gst::ErrorMessage> { unsafe { let data = T::type_data(); let parent_class = @@ -451,7 +469,11 @@ impl AggregatorImplExt for T { (*parent_class) .start .map(|f| { - if from_glib(f(aggregator.to_glib_none().0)) { + if from_glib(f(aggregator + .unsafe_cast_ref::() + .to_glib_none() + .0)) + { Ok(()) } else { Err(gst_error_msg!( @@ -464,7 +486,7 @@ impl AggregatorImplExt for T { } } - fn parent_stop(&self, aggregator: &Aggregator) -> Result<(), gst::ErrorMessage> { + fn parent_stop(&self, aggregator: &Self::Type) -> Result<(), gst::ErrorMessage> { unsafe { let data = T::type_data(); let parent_class = @@ -472,7 +494,11 @@ impl AggregatorImplExt for T { (*parent_class) .stop .map(|f| { - if from_glib(f(aggregator.to_glib_none().0)) { + if from_glib(f(aggregator + .unsafe_cast_ref::() + .to_glib_none() + .0)) + { Ok(()) } else { Err(gst_error_msg!( @@ -485,21 +511,26 @@ impl AggregatorImplExt for T { } } - fn parent_get_next_time(&self, aggregator: &Aggregator) -> gst::ClockTime { + fn parent_get_next_time(&self, aggregator: &Self::Type) -> gst::ClockTime { unsafe { let data = T::type_data(); let parent_class = data.as_ref().get_parent_class() as *mut gst_base_sys::GstAggregatorClass; (*parent_class) .get_next_time - .map(|f| from_glib(f(aggregator.to_glib_none().0))) + .map(|f| { + from_glib(f(aggregator + .unsafe_cast_ref::() + .to_glib_none() + .0)) + }) .unwrap_or(gst::CLOCK_TIME_NONE) } } fn parent_create_new_pad( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, templ: &gst::PadTemplate, req_name: Option<&str>, caps: Option<&gst::Caps>, @@ -512,7 +543,7 @@ impl AggregatorImplExt for T { .create_new_pad .expect("Missing parent function `create_new_pad`"); from_glib_full(f( - aggregator.to_glib_none().0, + aggregator.unsafe_cast_ref::().to_glib_none().0, templ.to_glib_none().0, req_name.to_glib_none().0, caps.to_glib_none().0, @@ -522,7 +553,7 @@ impl AggregatorImplExt for T { fn parent_update_src_caps( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, caps: &gst::Caps, ) -> Result { unsafe { @@ -535,7 +566,7 @@ impl AggregatorImplExt for T { let mut out_caps = ptr::null_mut(); gst::FlowReturn::from_glib(f( - aggregator.to_glib_none().0, + aggregator.unsafe_cast_ref::().to_glib_none().0, caps.as_mut_ptr(), &mut out_caps, )) @@ -543,7 +574,7 @@ impl AggregatorImplExt for T { } } - fn parent_fixate_src_caps(&self, aggregator: &Aggregator, caps: gst::Caps) -> gst::Caps { + fn parent_fixate_src_caps(&self, aggregator: &Self::Type, caps: gst::Caps) -> gst::Caps { unsafe { let data = T::type_data(); let parent_class = @@ -552,13 +583,16 @@ impl AggregatorImplExt for T { let f = (*parent_class) .fixate_src_caps .expect("Missing parent function `fixate_src_caps`"); - from_glib_full(f(aggregator.to_glib_none().0, caps.into_ptr())) + from_glib_full(f( + aggregator.unsafe_cast_ref::().to_glib_none().0, + caps.into_ptr(), + )) } } fn parent_negotiated_src_caps( &self, - aggregator: &Aggregator, + aggregator: &Self::Type, caps: &gst::Caps, ) -> Result<(), gst::LoggableError> { unsafe { @@ -569,7 +603,10 @@ impl AggregatorImplExt for T { .negotiated_src_caps .map(|f| { gst_result_from_gboolean!( - f(aggregator.to_glib_none().0, caps.to_glib_none().0), + f( + aggregator.unsafe_cast_ref::().to_glib_none().0, + caps.to_glib_none().0 + ), gst::CAT_RUST, "Parent function `negotiated_src_caps` failed" ) @@ -578,14 +615,19 @@ impl AggregatorImplExt for T { } } - fn parent_negotiate(&self, aggregator: &Aggregator) -> bool { + fn parent_negotiate(&self, aggregator: &Self::Type) -> bool { unsafe { let data = T::type_data(); let parent_class = data.as_ref().get_parent_class() as *mut gst_base_sys::GstAggregatorClass; (*parent_class) .negotiate - .map(|f| from_glib(f(aggregator.to_glib_none().0))) + .map(|f| { + from_glib(f(aggregator + .unsafe_cast_ref::() + .to_glib_none() + .0)) + }) .unwrap_or(true) } } @@ -595,31 +637,29 @@ unsafe impl IsSubclassable for Aggregator where ::Instance: PanicPoison, { - fn override_vfuncs(klass: &mut glib::object::Class) { + fn override_vfuncs(klass: &mut glib::Class) { >::override_vfuncs(klass); - unsafe { - let klass = &mut *(klass.as_mut() as *mut gst_base_sys::GstAggregatorClass); - klass.flush = Some(aggregator_flush::); - klass.clip = Some(aggregator_clip::); - klass.finish_buffer = Some(aggregator_finish_buffer::); - klass.sink_event = Some(aggregator_sink_event::); - klass.sink_query = Some(aggregator_sink_query::); - klass.src_event = Some(aggregator_src_event::); - klass.src_query = Some(aggregator_src_query::); - klass.src_activate = Some(aggregator_src_activate::); - klass.aggregate = Some(aggregator_aggregate::); - klass.start = Some(aggregator_start::); - klass.stop = Some(aggregator_stop::); - klass.get_next_time = Some(aggregator_get_next_time::); - klass.create_new_pad = Some(aggregator_create_new_pad::); - klass.update_src_caps = Some(aggregator_update_src_caps::); - klass.fixate_src_caps = Some(aggregator_fixate_src_caps::); - klass.negotiated_src_caps = Some(aggregator_negotiated_src_caps::); - { - klass.sink_event_pre_queue = Some(aggregator_sink_event_pre_queue::); - klass.sink_query_pre_queue = Some(aggregator_sink_query_pre_queue::); - klass.negotiate = Some(aggregator_negotiate::); - } + let klass = klass.as_mut(); + klass.flush = Some(aggregator_flush::); + klass.clip = Some(aggregator_clip::); + klass.finish_buffer = Some(aggregator_finish_buffer::); + klass.sink_event = Some(aggregator_sink_event::); + klass.sink_query = Some(aggregator_sink_query::); + klass.src_event = Some(aggregator_src_event::); + klass.src_query = Some(aggregator_src_query::); + klass.src_activate = Some(aggregator_src_activate::); + klass.aggregate = Some(aggregator_aggregate::); + klass.start = Some(aggregator_start::); + klass.stop = Some(aggregator_stop::); + klass.get_next_time = Some(aggregator_get_next_time::); + klass.create_new_pad = Some(aggregator_create_new_pad::); + klass.update_src_caps = Some(aggregator_update_src_caps::); + klass.fixate_src_caps = Some(aggregator_fixate_src_caps::); + klass.negotiated_src_caps = Some(aggregator_negotiated_src_caps::); + { + klass.sink_event_pre_queue = Some(aggregator_sink_event_pre_queue::); + klass.sink_query_pre_queue = Some(aggregator_sink_query_pre_queue::); + klass.negotiate = Some(aggregator_negotiate::); } } } @@ -635,7 +675,7 @@ where let wrap: Borrowed = from_glib_borrow(ptr); gst_panic_to_error!(&wrap, &instance.panicked(), gst::FlowReturn::Error, { - imp.flush(&wrap).into() + imp.flush(wrap.unsafe_cast_ref()).into() }) .to_glib() } @@ -654,7 +694,7 @@ where let ret = gst_panic_to_error!(&wrap, &instance.panicked(), None, { imp.clip( - &wrap, + wrap.unsafe_cast_ref(), &from_glib_borrow(aggregator_pad), from_glib_full(buffer), ) @@ -675,7 +715,8 @@ where let wrap: Borrowed = from_glib_borrow(ptr); gst_panic_to_error!(&wrap, &instance.panicked(), gst::FlowReturn::Error, { - imp.finish_buffer(&wrap, from_glib_full(buffer)).into() + imp.finish_buffer(wrap.unsafe_cast_ref(), from_glib_full(buffer)) + .into() }) .to_glib() } @@ -692,9 +733,9 @@ where let imp = instance.get_impl(); let wrap: Borrowed = from_glib_borrow(ptr); - gst_panic_to_error!(&wrap, &instance.panicked(), false, { + gst_panic_to_error!(wrap, &instance.panicked(), false, { imp.sink_event( - &wrap, + wrap.unsafe_cast_ref(), &from_glib_borrow(aggregator_pad), from_glib_full(event), ) @@ -716,7 +757,7 @@ where gst_panic_to_error!(&wrap, &instance.panicked(), gst::FlowReturn::Error, { imp.sink_event_pre_queue( - &wrap, + wrap.unsafe_cast_ref(), &from_glib_borrow(aggregator_pad), from_glib_full(event), ) @@ -739,7 +780,7 @@ where gst_panic_to_error!(&wrap, &instance.panicked(), false, { imp.sink_query( - &wrap, + wrap.unsafe_cast_ref(), &from_glib_borrow(aggregator_pad), gst::QueryRef::from_mut_ptr(query), ) @@ -761,7 +802,7 @@ where gst_panic_to_error!(&wrap, &instance.panicked(), false, { imp.sink_query_pre_queue( - &wrap, + wrap.unsafe_cast_ref(), &from_glib_borrow(aggregator_pad), gst::QueryRef::from_mut_ptr(query), ) @@ -781,7 +822,7 @@ where let wrap: Borrowed = from_glib_borrow(ptr); gst_panic_to_error!(&wrap, &instance.panicked(), false, { - imp.src_event(&wrap, from_glib_full(event)) + imp.src_event(wrap.unsafe_cast_ref(), from_glib_full(event)) }) .to_glib() } @@ -798,7 +839,7 @@ where let wrap: Borrowed = from_glib_borrow(ptr); gst_panic_to_error!(&wrap, &instance.panicked(), false, { - imp.src_query(&wrap, gst::QueryRef::from_mut_ptr(query)) + imp.src_query(wrap.unsafe_cast_ref(), gst::QueryRef::from_mut_ptr(query)) }) .to_glib() } @@ -816,7 +857,7 @@ where let wrap: Borrowed = from_glib_borrow(ptr); gst_panic_to_error!(&wrap, &instance.panicked(), false, { - match imp.src_activate(&wrap, from_glib(mode), from_glib(active)) { + match imp.src_activate(wrap.unsafe_cast_ref(), from_glib(mode), from_glib(active)) { Ok(()) => true, Err(err) => { err.log_with_object(&*wrap); @@ -839,7 +880,8 @@ where let wrap: Borrowed = from_glib_borrow(ptr); gst_panic_to_error!(&wrap, &instance.panicked(), gst::FlowReturn::Error, { - imp.aggregate(&wrap, from_glib(timeout)).into() + imp.aggregate(wrap.unsafe_cast_ref(), from_glib(timeout)) + .into() }) .to_glib() } @@ -855,7 +897,7 @@ where let wrap: Borrowed = from_glib_borrow(ptr); gst_panic_to_error!(&wrap, &instance.panicked(), false, { - match imp.start(&wrap) { + match imp.start(wrap.unsafe_cast_ref()) { Ok(()) => true, Err(err) => { wrap.post_error_message(err); @@ -877,7 +919,7 @@ where let wrap: Borrowed = from_glib_borrow(ptr); gst_panic_to_error!(&wrap, &instance.panicked(), false, { - match imp.stop(&wrap) { + match imp.stop(wrap.unsafe_cast_ref()) { Ok(()) => true, Err(err) => { wrap.post_error_message(err); @@ -899,7 +941,7 @@ where let wrap: Borrowed = from_glib_borrow(ptr); gst_panic_to_error!(&wrap, &instance.panicked(), gst::CLOCK_TIME_NONE, { - imp.get_next_time(&wrap) + imp.get_next_time(wrap.unsafe_cast_ref()) }) .to_glib() } @@ -921,7 +963,7 @@ where let req_name: Borrowed> = from_glib_borrow(req_name); imp.create_new_pad( - &wrap, + wrap.unsafe_cast_ref(), &from_glib_borrow(templ), req_name.as_ref().as_ref().map(|s| s.as_str()), Option::::from_glib_borrow(caps) @@ -947,7 +989,7 @@ where *res = ptr::null_mut(); gst_panic_to_error!(&wrap, &instance.panicked(), gst::FlowReturn::Error, { - match imp.update_src_caps(&wrap, &from_glib_borrow(caps)) { + match imp.update_src_caps(wrap.unsafe_cast_ref(), &from_glib_borrow(caps)) { Ok(res_caps) => { *res = res_caps.into_ptr(); gst::FlowReturn::Ok @@ -970,7 +1012,7 @@ where let wrap: Borrowed = from_glib_borrow(ptr); gst_panic_to_error!(&wrap, &instance.panicked(), gst::Caps::new_empty(), { - imp.fixate_src_caps(&wrap, from_glib_full(caps)) + imp.fixate_src_caps(wrap.unsafe_cast_ref(), from_glib_full(caps)) }) .into_ptr() } @@ -987,7 +1029,7 @@ where let wrap: Borrowed = from_glib_borrow(ptr); gst_panic_to_error!(&wrap, &instance.panicked(), false, { - match imp.negotiated_src_caps(&wrap, &from_glib_borrow(caps)) { + match imp.negotiated_src_caps(wrap.unsafe_cast_ref(), &from_glib_borrow(caps)) { Ok(()) => true, Err(err) => { err.log_with_object(&*wrap); @@ -1008,5 +1050,8 @@ where let imp = instance.get_impl(); let wrap: Borrowed = from_glib_borrow(ptr); - gst_panic_to_error!(&wrap, &instance.panicked(), false, { imp.negotiate(&wrap) }).to_glib() + gst_panic_to_error!(&wrap, &instance.panicked(), false, { + imp.negotiate(wrap.unsafe_cast_ref()) + }) + .to_glib() } diff --git a/utils/fallbackswitch/src/base/subclass/aggregator_pad.rs b/utils/fallbackswitch/src/base/subclass/aggregator_pad.rs index a3be4f941..45c09b452 100644 --- a/utils/fallbackswitch/src/base/subclass/aggregator_pad.rs +++ b/utils/fallbackswitch/src/base/subclass/aggregator_pad.rs @@ -8,9 +8,10 @@ use super::super::gst_base_sys; +use glib::prelude::*; +use glib::subclass::prelude::*; use glib::translate::*; -use glib::subclass::prelude::*; use gst::subclass::prelude::*; use super::super::Aggregator; @@ -19,7 +20,7 @@ use super::super::AggregatorPad; pub trait AggregatorPadImpl: AggregatorPadImplExt + PadImpl { fn flush( &self, - aggregator_pad: &AggregatorPad, + aggregator_pad: &Self::Type, aggregator: &Aggregator, ) -> Result { self.parent_flush(aggregator_pad, aggregator) @@ -27,7 +28,7 @@ pub trait AggregatorPadImpl: AggregatorPadImplExt + PadImpl { fn skip_buffer( &self, - aggregator_pad: &AggregatorPad, + aggregator_pad: &Self::Type, aggregator: &Aggregator, buffer: &gst::Buffer, ) -> bool { @@ -35,16 +36,16 @@ pub trait AggregatorPadImpl: AggregatorPadImplExt + PadImpl { } } -pub trait AggregatorPadImplExt { +pub trait AggregatorPadImplExt: ObjectSubclass { fn parent_flush( &self, - aggregator_pad: &AggregatorPad, + aggregator_pad: &Self::Type, aggregator: &Aggregator, ) -> Result; fn parent_skip_buffer( &self, - aggregator_pad: &AggregatorPad, + aggregator_pad: &Self::Type, aggregator: &Aggregator, buffer: &gst::Buffer, ) -> bool; @@ -53,7 +54,7 @@ pub trait AggregatorPadImplExt { impl AggregatorPadImplExt for T { fn parent_flush( &self, - aggregator_pad: &AggregatorPad, + aggregator_pad: &Self::Type, aggregator: &Aggregator, ) -> Result { unsafe { @@ -64,7 +65,10 @@ impl AggregatorPadImplExt for T { .flush .map(|f| { from_glib(f( - aggregator_pad.to_glib_none().0, + aggregator_pad + .unsafe_cast_ref::() + .to_glib_none() + .0, aggregator.to_glib_none().0, )) }) @@ -75,7 +79,7 @@ impl AggregatorPadImplExt for T { fn parent_skip_buffer( &self, - aggregator_pad: &AggregatorPad, + aggregator_pad: &Self::Type, aggregator: &Aggregator, buffer: &gst::Buffer, ) -> bool { @@ -87,7 +91,10 @@ impl AggregatorPadImplExt for T { .skip_buffer .map(|f| { from_glib(f( - aggregator_pad.to_glib_none().0, + aggregator_pad + .unsafe_cast_ref::() + .to_glib_none() + .0, aggregator.to_glib_none().0, buffer.to_glib_none().0, )) @@ -97,13 +104,11 @@ impl AggregatorPadImplExt for T { } } unsafe impl IsSubclassable for AggregatorPad { - fn override_vfuncs(klass: &mut glib::object::Class) { + fn override_vfuncs(klass: &mut glib::Class) { >::override_vfuncs(klass); - unsafe { - let klass = &mut *(klass.as_mut() as *mut gst_base_sys::GstAggregatorPadClass); - klass.flush = Some(aggregator_pad_flush::); - klass.skip_buffer = Some(aggregator_pad_skip_buffer::); - } + let klass = klass.as_mut(); + klass.flush = Some(aggregator_pad_flush::); + klass.skip_buffer = Some(aggregator_pad_skip_buffer::); } } @@ -115,7 +120,9 @@ unsafe extern "C" fn aggregator_pad_flush( let imp = instance.get_impl(); let wrap: Borrowed = from_glib_borrow(ptr); - let res: gst::FlowReturn = imp.flush(&wrap, &from_glib_borrow(aggregator)).into(); + let res: gst::FlowReturn = imp + .flush(wrap.unsafe_cast_ref(), &from_glib_borrow(aggregator)) + .into(); res.to_glib() } @@ -129,7 +136,7 @@ unsafe extern "C" fn aggregator_pad_skip_buffer( let wrap: Borrowed = from_glib_borrow(ptr); imp.skip_buffer( - &wrap, + wrap.unsafe_cast_ref(), &from_glib_borrow(aggregator), &from_glib_borrow(buffer), ) diff --git a/utils/fallbackswitch/src/fallbacksrc/custom_source/imp.rs b/utils/fallbackswitch/src/fallbacksrc/custom_source/imp.rs new file mode 100644 index 000000000..faf63440e --- /dev/null +++ b/utils/fallbackswitch/src/fallbacksrc/custom_source/imp.rs @@ -0,0 +1,410 @@ +// Copyright (C) 2020 Sebastian Dröge +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +use glib::prelude::*; +use glib::subclass; +use glib::subclass::prelude::*; +use gst::prelude::*; +use gst::subclass::prelude::*; + +use std::{mem, sync::Mutex}; + +use once_cell::sync::Lazy; +use once_cell::sync::OnceCell; + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "fallbacksrc-custom-source", + gst::DebugColorFlags::empty(), + Some("Fallback Custom Source Bin"), + ) +}); + +static PROPERTIES: [subclass::Property; 1] = [subclass::Property("source", |name| { + glib::ParamSpec::object( + name, + "Source", + "Source", + gst::Element::static_type(), + glib::ParamFlags::WRITABLE | glib::ParamFlags::CONSTRUCT_ONLY, + ) +})]; + +struct Stream { + source_pad: gst::Pad, + ghost_pad: gst::GhostPad, + // Dummy stream we created + stream: gst::Stream, +} + +struct State { + pads: Vec, + num_audio: usize, + num_video: usize, +} + +pub struct CustomSource { + source: OnceCell, + state: Mutex, +} + +impl ObjectSubclass for CustomSource { + const NAME: &'static str = "FallbackSrcCustomSource"; + type Type = super::CustomSource; + type ParentType = gst::Bin; + type Instance = gst::subclass::ElementInstanceStruct; + type Class = subclass::simple::ClassStruct; + + glib_object_subclass!(); + + fn new() -> Self { + Self { + source: OnceCell::default(), + state: Mutex::new(State { + pads: vec![], + num_audio: 0, + num_video: 0, + }), + } + } + + fn class_init(klass: &mut Self::Class) { + let src_pad_template = gst::PadTemplate::new( + "audio_%u", + gst::PadDirection::Src, + gst::PadPresence::Sometimes, + &gst::Caps::new_any(), + ) + .unwrap(); + klass.add_pad_template(src_pad_template); + + let src_pad_template = gst::PadTemplate::new( + "video_%u", + gst::PadDirection::Src, + gst::PadPresence::Sometimes, + &gst::Caps::new_any(), + ) + .unwrap(); + klass.add_pad_template(src_pad_template); + klass.install_properties(&PROPERTIES); + } +} + +impl ObjectImpl for CustomSource { + fn set_property(&self, obj: &Self::Type, id: usize, value: &glib::Value) { + let prop = &PROPERTIES[id]; + + match *prop { + subclass::Property("source", ..) => { + let source = value.get::().unwrap().unwrap(); + self.source.set(source.clone()).unwrap(); + obj.add(&source).unwrap(); + } + _ => unreachable!(), + } + } + + fn constructed(&self, obj: &Self::Type) { + self.parent_constructed(obj); + + obj.set_suppressed_flags(gst::ElementFlags::SOURCE | gst::ElementFlags::SINK); + obj.set_element_flags(gst::ElementFlags::SOURCE); + obj.set_bin_flags(gst::BinFlags::STREAMS_AWARE); + } +} + +impl ElementImpl for CustomSource { + #[allow(clippy::single_match)] + fn change_state( + &self, + element: &Self::Type, + transition: gst::StateChange, + ) -> Result { + match transition { + gst::StateChange::NullToReady => { + self.start(element)?; + } + _ => (), + } + + let res = self.parent_change_state(element, transition)?; + + match transition { + gst::StateChange::ReadyToNull => { + self.stop(element)?; + } + _ => (), + } + + Ok(res) + } +} + +impl BinImpl for CustomSource { + #[allow(clippy::single_match)] + fn handle_message(&self, bin: &Self::Type, msg: gst::Message) { + use gst::MessageView; + + match msg.view() { + MessageView::StreamCollection(_) => { + // TODO: Drop stream collection message for now, we only create a simple custom + // one here so that fallbacksrc can know about our streams. It is never + // forwarded. + if let Err(msg) = self.handle_source_no_more_pads(&bin) { + bin.post_error_message(msg); + } + } + _ => self.parent_handle_message(bin, msg), + } + } +} + +impl CustomSource { + fn start( + &self, + element: &super::CustomSource, + ) -> Result { + gst_debug!(CAT, obj: element, "Starting"); + let source = self.source.get().unwrap(); + + let templates = source.get_pad_template_list(); + + if templates + .iter() + .any(|templ| templ.get_property_presence() == gst::PadPresence::Request) + { + gst_error!(CAT, obj: element, "Request pads not supported"); + gst_element_error!( + element, + gst::LibraryError::Settings, + ["Request pads not supported"] + ); + return Err(gst::StateChangeError); + } + + let has_sometimes_pads = templates + .iter() + .any(|templ| templ.get_property_presence() == gst::PadPresence::Sometimes); + + // Handle all source pads that already exist + for pad in source.get_src_pads() { + if let Err(msg) = self.handle_source_pad_added(&element, &pad) { + element.post_error_message(msg); + return Err(gst::StateChangeError); + } + } + + if !has_sometimes_pads { + if let Err(msg) = self.handle_source_no_more_pads(&element) { + element.post_error_message(msg); + return Err(gst::StateChangeError); + } + } else { + gst_debug!(CAT, obj: element, "Found sometimes pads"); + + let element_weak = element.downgrade(); + source.connect_pad_added(move |_, pad| { + let element = match element_weak.upgrade() { + None => return, + Some(element) => element, + }; + let src = CustomSource::from_instance(&element); + + if let Err(msg) = src.handle_source_pad_added(&element, pad) { + element.post_error_message(msg); + } + }); + let element_weak = element.downgrade(); + source.connect_pad_removed(move |_, pad| { + let element = match element_weak.upgrade() { + None => return, + Some(element) => element, + }; + let src = CustomSource::from_instance(&element); + + if let Err(msg) = src.handle_source_pad_removed(&element, pad) { + element.post_error_message(msg); + } + }); + + let element_weak = element.downgrade(); + source.connect_no_more_pads(move |_| { + let element = match element_weak.upgrade() { + None => return, + Some(element) => element, + }; + let src = CustomSource::from_instance(&element); + + if let Err(msg) = src.handle_source_no_more_pads(&element) { + element.post_error_message(msg); + } + }); + } + + Ok(gst::StateChangeSuccess::Success) + } + + fn handle_source_pad_added( + &self, + element: &super::CustomSource, + pad: &gst::Pad, + ) -> Result<(), gst::ErrorMessage> { + gst_debug!(CAT, obj: element, "Source added pad {}", pad.get_name()); + + let mut state = self.state.lock().unwrap(); + + let mut stream_type = None; + + // Take stream type from stream-start event if we can + if let Some(event) = pad.get_sticky_event(gst::EventType::StreamStart, 0) { + if let gst::EventView::StreamStart(ev) = event.view() { + stream_type = ev.get_stream().map(|s| s.get_stream_type()); + } + } + + // Otherwise from the caps + if stream_type.is_none() { + let caps = match pad.get_current_caps().or_else(|| pad.query_caps(None)) { + Some(caps) if !caps.is_any() && !caps.is_empty() => caps, + _ => { + gst_error!(CAT, obj: element, "Pad {} had no caps", pad.get_name()); + return Err(gst_error_msg!( + gst::CoreError::Negotiation, + ["Pad had no caps"] + )); + } + }; + + let s = caps.get_structure(0).unwrap(); + + if s.get_name().starts_with("audio/") { + stream_type = Some(gst::StreamType::AUDIO); + } else if s.get_name().starts_with("video/") { + stream_type = Some(gst::StreamType::VIDEO); + } else { + return Ok(()); + } + } + + let stream_type = stream_type.unwrap(); + + let (templ, name) = if stream_type.contains(gst::StreamType::AUDIO) { + let name = format!("audio_{}", state.num_audio); + state.num_audio += 1; + (element.get_pad_template("audio_%u").unwrap(), name) + } else { + let name = format!("video_{}", state.num_video); + state.num_video += 1; + (element.get_pad_template("video_%u").unwrap(), name) + }; + + let ghost_pad = gst::GhostPad::builder_with_template(&templ, Some(&name)) + .build_with_target(pad) + .unwrap(); + + let stream = Stream { + source_pad: pad.clone(), + ghost_pad: ghost_pad.clone().upcast(), + // TODO: We only add the stream type right now + stream: gst::Stream::new(None, None, stream_type, gst::StreamFlags::empty()), + }; + state.pads.push(stream); + drop(state); + + ghost_pad.set_active(true).unwrap(); + element.add_pad(&ghost_pad).unwrap(); + + Ok(()) + } + + fn handle_source_pad_removed( + &self, + element: &super::CustomSource, + pad: &gst::Pad, + ) -> Result<(), gst::ErrorMessage> { + gst_debug!(CAT, obj: element, "Source removed pad {}", pad.get_name()); + + let mut state = self.state.lock().unwrap(); + let (i, stream) = match state + .pads + .iter() + .enumerate() + .find(|(_i, p)| &p.source_pad == pad) + { + None => return Ok(()), + Some(v) => v, + }; + + let ghost_pad = stream.ghost_pad.clone(); + state.pads.remove(i); + drop(state); + + ghost_pad.set_active(false).unwrap(); + let _ = ghost_pad.set_target(None::<&gst::Pad>); + let _ = element.remove_pad(&ghost_pad); + + Ok(()) + } + + fn handle_source_no_more_pads( + &self, + element: &super::CustomSource, + ) -> Result<(), gst::ErrorMessage> { + gst_debug!(CAT, obj: element, "Source signalled no-more-pads"); + + let state = self.state.lock().unwrap(); + let streams = state + .pads + .iter() + .map(|p| p.stream.clone()) + .collect::>(); + let collection = gst::StreamCollection::builder(None) + .streams(&streams) + .build(); + drop(state); + + element.no_more_pads(); + + let _ = element.post_message( + gst::message::StreamsSelected::builder(&collection) + .src(element) + .build(), + ); + + Ok(()) + } + + fn stop( + &self, + element: &super::CustomSource, + ) -> Result { + gst_debug!(CAT, obj: element, "Stopping"); + + let mut state = self.state.lock().unwrap(); + let pads = mem::replace(&mut state.pads, vec![]); + state.num_audio = 0; + state.num_video = 0; + drop(state); + + for pad in pads { + let _ = pad.ghost_pad.set_target(None::<&gst::Pad>); + let _ = element.remove_pad(&pad.ghost_pad); + } + + Ok(gst::StateChangeSuccess::Success) + } +} diff --git a/utils/fallbackswitch/src/fallbacksrc/custom_source/mod.rs b/utils/fallbackswitch/src/fallbacksrc/custom_source/mod.rs new file mode 100644 index 000000000..9cd53f4b9 --- /dev/null +++ b/utils/fallbackswitch/src/fallbacksrc/custom_source/mod.rs @@ -0,0 +1,38 @@ +// Copyright (C) 2020 Sebastian Dröge +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +use glib::prelude::*; + +mod imp; + +glib_wrapper! { + pub struct CustomSource(ObjectSubclass) @extends gst::Bin, gst::Element, gst::Object; +} + +// GStreamer elements need to be thread-safe. For the private implementation this is automatically +// enforced but for the public wrapper type we need to specify this manually. +unsafe impl Send for CustomSource {} +unsafe impl Sync for CustomSource {} + +impl CustomSource { + pub fn new(source: &gst::Element) -> CustomSource { + glib::Object::new(CustomSource::static_type(), &[("source", source)]) + .unwrap() + .downcast() + .unwrap() + } +} diff --git a/utils/fallbackswitch/src/fallbacksrc.rs b/utils/fallbackswitch/src/fallbacksrc/imp.rs similarity index 83% rename from utils/fallbackswitch/src/fallbacksrc.rs rename to utils/fallbackswitch/src/fallbacksrc/imp.rs index 05303b161..6a1a61f46 100644 --- a/utils/fallbackswitch/src/fallbacksrc.rs +++ b/utils/fallbackswitch/src/fallbacksrc/imp.rs @@ -27,6 +27,9 @@ use std::time::{Duration, Instant}; use once_cell::sync::Lazy; +use super::custom_source::CustomSource; +use super::{RetryReason, Status}; + static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( "fallbacksrc", @@ -35,17 +38,6 @@ static CAT: Lazy = Lazy::new(|| { ) }); -#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, GEnum)] -#[repr(u32)] -#[genum(type_name = "GstFallbackSourceRetryReason")] -enum RetryReason { - None, - Error, - Eos, - StateChangeFailure, - Timeout, -} - #[derive(Debug, Clone)] struct Stats { num_retry: u64, @@ -178,21 +170,11 @@ struct State { stats: Stats, } -struct FallbackSrc { +pub struct FallbackSrc { settings: Mutex, state: Mutex>, } -#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, GEnum)] -#[repr(u32)] -#[genum(type_name = "GstFallbackSourceStatus")] -enum Status { - Stopped, - Buffering, - Retrying, - Running, -} - static PROPERTIES: [subclass::Property; 13] = [ subclass::Property("enable-audio", |name| { glib::ParamSpec::boolean( @@ -322,6 +304,7 @@ static PROPERTIES: [subclass::Property; 13] = [ impl ObjectSubclass for FallbackSrc { const NAME: &'static str = "FallbackSrc"; + type Type = super::FallbackSrc; type ParentType = gst::Bin; type Instance = gst::subclass::ElementInstanceStruct; type Class = subclass::simple::ClassStruct; @@ -335,7 +318,7 @@ impl ObjectSubclass for FallbackSrc { } } - fn class_init(klass: &mut subclass::simple::ClassStruct) { + fn class_init(klass: &mut Self::Class) { klass.set_metadata( "Fallback Source", "Generic/Source", @@ -381,9 +364,8 @@ impl ObjectSubclass for FallbackSrc { } impl ObjectImpl for FallbackSrc { - fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) { + fn set_property(&self, obj: &Self::Type, id: usize, value: &glib::Value) { let prop = &PROPERTIES[id]; - let element = obj.downcast_ref::().unwrap(); match *prop { subclass::Property("enable-audio", ..) => { @@ -391,7 +373,7 @@ impl ObjectImpl for FallbackSrc { let new_value = value.get_some().expect("type checked upstream"); gst_info!( CAT, - obj: element, + obj: obj, "Changing enable-audio from {:?} to {:?}", settings.enable_audio, new_value, @@ -403,7 +385,7 @@ impl ObjectImpl for FallbackSrc { let new_value = value.get_some().expect("type checked upstream"); gst_info!( CAT, - obj: element, + obj: obj, "Changing enable-video from {:?} to {:?}", settings.enable_video, new_value, @@ -415,7 +397,7 @@ impl ObjectImpl for FallbackSrc { let new_value = value.get().expect("type checked upstream"); gst_info!( CAT, - obj: element, + obj: obj, "Changing URI from {:?} to {:?}", settings.uri, new_value, @@ -427,7 +409,7 @@ impl ObjectImpl for FallbackSrc { let new_value = value.get().expect("type checked upstream"); gst_info!( CAT, - obj: element, + obj: obj, "Changing source from {:?} to {:?}", settings.source, new_value, @@ -439,7 +421,7 @@ impl ObjectImpl for FallbackSrc { let new_value = value.get().expect("type checked upstream"); gst_info!( CAT, - obj: element, + obj: obj, "Changing Fallback URI from {:?} to {:?}", settings.fallback_uri, new_value, @@ -451,7 +433,7 @@ impl ObjectImpl for FallbackSrc { let new_value = value.get_some().expect("type checked upstream"); gst_info!( CAT, - obj: element, + obj: obj, "Changing timeout from {:?} to {:?}", settings.timeout, new_value, @@ -463,7 +445,7 @@ impl ObjectImpl for FallbackSrc { let new_value = value.get_some().expect("type checked upstream"); gst_info!( CAT, - obj: element, + obj: obj, "Changing Restart Timeout from {:?} to {:?}", settings.restart_timeout, new_value, @@ -475,7 +457,7 @@ impl ObjectImpl for FallbackSrc { let new_value = value.get_some().expect("type checked upstream"); gst_info!( CAT, - obj: element, + obj: obj, "Changing Retry Timeout from {:?} to {:?}", settings.retry_timeout, new_value, @@ -487,7 +469,7 @@ impl ObjectImpl for FallbackSrc { let new_value = value.get_some().expect("type checked upstream"); gst_info!( CAT, - obj: element, + obj: obj, "Changing restart-on-eos from {:?} to {:?}", settings.restart_on_eos, new_value, @@ -499,7 +481,7 @@ impl ObjectImpl for FallbackSrc { let new_value = value.get_some().expect("type checked upstream"); gst_info!( CAT, - obj: element, + obj: obj, "Changing Minimum Latency from {:?} to {:?}", settings.min_latency, new_value, @@ -511,7 +493,7 @@ impl ObjectImpl for FallbackSrc { let new_value = value.get_some().expect("type checked upstream"); gst_info!( CAT, - obj: element, + obj: obj, "Changing Buffer Duration from {:?} to {:?}", settings.buffer_duration, new_value, @@ -525,7 +507,7 @@ impl ObjectImpl for FallbackSrc { // Called whenever a value of a property is read. It can be called // at any time from any thread. #[allow(clippy::blocks_in_if_conditions)] - fn get_property(&self, _obj: &glib::Object, id: usize) -> Result { + fn get_property(&self, _obj: &Self::Type, id: usize) -> Result { let prop = &PROPERTIES[id]; match *prop { @@ -630,13 +612,12 @@ impl ObjectImpl for FallbackSrc { } } - fn constructed(&self, obj: &glib::Object) { + fn constructed(&self, obj: &Self::Type) { self.parent_constructed(obj); - let bin = obj.downcast_ref::().unwrap(); - bin.set_suppressed_flags(gst::ElementFlags::SOURCE | gst::ElementFlags::SINK); - bin.set_element_flags(gst::ElementFlags::SOURCE); - bin.set_bin_flags(gst::BinFlags::STREAMS_AWARE); + obj.set_suppressed_flags(gst::ElementFlags::SOURCE | gst::ElementFlags::SINK); + obj.set_element_flags(gst::ElementFlags::SOURCE); + obj.set_bin_flags(gst::BinFlags::STREAMS_AWARE); } } @@ -644,7 +625,7 @@ impl ElementImpl for FallbackSrc { #[allow(clippy::single_match)] fn change_state( &self, - element: &gst::Element, + element: &Self::Type, transition: gst::StateChange, ) -> Result { match transition { @@ -676,7 +657,7 @@ impl ElementImpl for FallbackSrc { } impl BinImpl for FallbackSrc { - fn handle_message(&self, bin: &gst::Bin, msg: gst::Message) { + fn handle_message(&self, bin: &Self::Type, msg: gst::Message) { use gst::MessageView; match msg.view() { @@ -704,7 +685,7 @@ impl BinImpl for FallbackSrc { impl FallbackSrc { fn create_main_input( &self, - element: &gst::Bin, + element: &super::FallbackSrc, source: &Source, buffer_duration: i64, ) -> Result { @@ -729,7 +710,7 @@ impl FallbackSrc { source } - Source::Element(ref source) => custom_source::CustomSource::new(source), + Source::Element(ref source) => CustomSource::new(source).upcast(), }; // Handle any async state changes internally, they don't affect the pipeline because we @@ -771,7 +752,7 @@ impl FallbackSrc { fn create_fallback_video_input( &self, - element: &gst::Bin, + element: &super::FallbackSrc, min_latency: u64, fallback_uri: Option<&str>, ) -> Result { @@ -949,7 +930,7 @@ impl FallbackSrc { fn create_fallback_audio_input( &self, - _element: &gst::Bin, + _element: &super::FallbackSrc, ) -> Result { let input = gst::Bin::new(Some("fallback_audio")); let audiotestsrc = gst::ElementFactory::make("audiotestsrc", Some("fallback_audiosrc")) @@ -973,7 +954,7 @@ impl FallbackSrc { fn create_stream( &self, - element: &gst::Bin, + element: &super::FallbackSrc, timeout: u64, min_latency: u64, is_audio: bool, @@ -1064,9 +1045,7 @@ impl FallbackSrc { }) } - fn start(&self, element: &gst::Element) -> Result<(), gst::StateChangeError> { - let element = element.downcast_ref::().unwrap(); - + fn start(&self, element: &super::FallbackSrc) -> Result<(), gst::StateChangeError> { gst_debug!(CAT, obj: element, "Starting"); let mut state_guard = self.state.lock().unwrap(); if state_guard.is_some() { @@ -1153,9 +1132,7 @@ impl FallbackSrc { Ok(()) } - fn stop(&self, element: &gst::Element) -> Result<(), gst::StateChangeError> { - let element = element.downcast_ref::().unwrap(); - + fn stop(&self, element: &super::FallbackSrc) -> Result<(), gst::StateChangeError> { gst_debug!(CAT, obj: element, "Stopping"); let mut state_guard = self.state.lock().unwrap(); let mut state = match state_guard.take() { @@ -1214,11 +1191,9 @@ impl FallbackSrc { fn change_source_state( &self, - element: &gst::Element, + element: &super::FallbackSrc, transition: gst::StateChange, ) -> Result<(), gst::StateChangeError> { - let element = element.downcast_ref::().unwrap(); - gst_debug!(CAT, obj: element, "Changing source state: {:?}", transition); let mut state_guard = self.state.lock().unwrap(); let state = match &mut *state_guard { @@ -1293,7 +1268,7 @@ impl FallbackSrc { fn proxy_pad_chain( &self, - element: &gst::Bin, + element: &super::FallbackSrc, pad: &gst::ProxyPad, buffer: gst::Buffer, ) -> Result { @@ -1310,7 +1285,7 @@ impl FallbackSrc { fn handle_source_pad_added( &self, - element: &gst::Bin, + element: &super::FallbackSrc, pad: &gst::Pad, ) -> Result<(), gst::ErrorMessage> { gst_debug!(CAT, obj: element, "Pad {} added to source", pad.get_name(),); @@ -1423,7 +1398,7 @@ impl FallbackSrc { Ok(()) } - fn add_pad_probe(&self, element: &gst::Bin, stream: &mut Stream) -> Block { + fn add_pad_probe(&self, element: &super::FallbackSrc, stream: &mut Stream) -> Block { // FIXME: Not literally correct as we add the probe to the queue source pad but that's only // a workaround until // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/800 @@ -1476,7 +1451,7 @@ impl FallbackSrc { fn handle_pad_blocked( &self, - element: &gst::Bin, + element: &super::FallbackSrc, pad: &gst::Pad, pts: gst::ClockTime, ) -> Result<(), gst::ErrorMessage> { @@ -1601,7 +1576,7 @@ impl FallbackSrc { Ok(()) } - fn unblock_pads(&self, element: &gst::Bin, state: &mut State) { + fn unblock_pads(&self, element: &super::FallbackSrc, state: &mut State) { // Check if all streams are blocked and have a running time and we have // 100% buffering if state.stats.buffering_percent < 100 { @@ -1802,7 +1777,7 @@ impl FallbackSrc { fn handle_source_pad_removed( &self, - element: &gst::Bin, + element: &super::FallbackSrc, pad: &gst::Pad, ) -> Result<(), gst::ErrorMessage> { gst_debug!( @@ -1849,7 +1824,7 @@ impl FallbackSrc { Ok(()) } - fn handle_buffering(&self, element: &gst::Bin, m: &gst::message::Buffering) { + fn handle_buffering(&self, element: &super::FallbackSrc, m: &gst::message::Buffering) { let mut state_guard = self.state.lock().unwrap(); let state = match &mut *state_guard { None => { @@ -1889,7 +1864,11 @@ impl FallbackSrc { element.notify("statistics"); } - fn handle_streams_selected(&self, element: &gst::Bin, m: &gst::message::StreamsSelected) { + fn handle_streams_selected( + &self, + element: &super::FallbackSrc, + m: &gst::message::StreamsSelected, + ) { let mut state_guard = self.state.lock().unwrap(); let state = match &mut *state_guard { None => { @@ -1949,7 +1928,7 @@ impl FallbackSrc { element.notify("status"); } - fn handle_error(&self, element: &gst::Bin, m: &gst::message::Error) -> bool { + fn handle_error(&self, element: &super::FallbackSrc, m: &gst::message::Error) -> bool { let mut state_guard = self.state.lock().unwrap(); let state = match &mut *state_guard { None => { @@ -1981,7 +1960,12 @@ impl FallbackSrc { false } - fn handle_source_error(&self, element: &gst::Bin, state: &mut State, reason: RetryReason) { + fn handle_source_error( + &self, + element: &super::FallbackSrc, + state: &mut State, + reason: RetryReason, + ) { gst_debug!(CAT, obj: element, "Handling source error"); state.stats.last_retry_reason = reason; @@ -2194,7 +2178,7 @@ impl FallbackSrc { #[allow(clippy::blocks_in_if_conditions)] fn schedule_source_restart_timeout( &self, - element: &gst::Bin, + element: &super::FallbackSrc, state: &mut State, elapsed: gst::ClockTime, ) { @@ -2278,7 +2262,7 @@ impl FallbackSrc { } #[allow(clippy::blocks_in_if_conditions)] - fn have_fallback_activated(&self, _element: &gst::Bin, state: &State) -> bool { + fn have_fallback_activated(&self, _element: &super::FallbackSrc, state: &State) -> bool { let mut have_audio = false; let mut have_video = false; if let Some(ref streams) = state.streams { @@ -2323,7 +2307,7 @@ impl FallbackSrc { .unwrap_or(true)) } - fn handle_switch_active_pad_change(&self, element: &gst::Bin) { + fn handle_switch_active_pad_change(&self, element: &super::FallbackSrc) { let mut state_guard = self.state.lock().unwrap(); let state = match &mut *state_guard { None => { @@ -2370,408 +2354,3 @@ impl FallbackSrc { state.stats.to_structure() } } - -pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { - gst::Element::register( - Some(plugin), - "fallbacksrc", - gst::Rank::None, - FallbackSrc::get_type(), - ) -} - -mod custom_source { - use super::CAT; - use glib::prelude::*; - use glib::subclass; - use glib::subclass::prelude::*; - use gst::prelude::*; - use gst::subclass::prelude::*; - - use std::{mem, sync::Mutex}; - - use once_cell::sync::OnceCell; - - static PROPERTIES: [subclass::Property; 1] = [subclass::Property("source", |name| { - glib::ParamSpec::object( - name, - "Source", - "Source", - gst::Element::static_type(), - glib::ParamFlags::WRITABLE | glib::ParamFlags::CONSTRUCT_ONLY, - ) - })]; - - struct Stream { - source_pad: gst::Pad, - ghost_pad: gst::GhostPad, - // Dummy stream we created - stream: gst::Stream, - } - - struct State { - pads: Vec, - num_audio: usize, - num_video: usize, - } - - pub struct CustomSource { - source: OnceCell, - state: Mutex, - } - - impl ObjectSubclass for CustomSource { - const NAME: &'static str = "FallbackSrcCustomSource"; - type ParentType = gst::Bin; - type Instance = gst::subclass::ElementInstanceStruct; - type Class = subclass::simple::ClassStruct; - - glib_object_subclass!(); - - fn new() -> Self { - Self { - source: OnceCell::default(), - state: Mutex::new(State { - pads: vec![], - num_audio: 0, - num_video: 0, - }), - } - } - - fn class_init(klass: &mut subclass::simple::ClassStruct) { - let src_pad_template = gst::PadTemplate::new( - "audio_%u", - gst::PadDirection::Src, - gst::PadPresence::Sometimes, - &gst::Caps::new_any(), - ) - .unwrap(); - klass.add_pad_template(src_pad_template); - - let src_pad_template = gst::PadTemplate::new( - "video_%u", - gst::PadDirection::Src, - gst::PadPresence::Sometimes, - &gst::Caps::new_any(), - ) - .unwrap(); - klass.add_pad_template(src_pad_template); - klass.install_properties(&PROPERTIES); - } - } - - impl ObjectImpl for CustomSource { - fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) { - let prop = &PROPERTIES[id]; - let element = obj.downcast_ref::().unwrap(); - - match *prop { - subclass::Property("source", ..) => { - let source = value.get::().unwrap().unwrap(); - self.source.set(source.clone()).unwrap(); - element.add(&source).unwrap(); - } - _ => unreachable!(), - } - } - - fn constructed(&self, obj: &glib::Object) { - self.parent_constructed(obj); - - let bin = obj.downcast_ref::().unwrap(); - bin.set_suppressed_flags(gst::ElementFlags::SOURCE | gst::ElementFlags::SINK); - bin.set_element_flags(gst::ElementFlags::SOURCE); - bin.set_bin_flags(gst::BinFlags::STREAMS_AWARE); - } - } - - impl ElementImpl for CustomSource { - #[allow(clippy::single_match)] - fn change_state( - &self, - element: &gst::Element, - transition: gst::StateChange, - ) -> Result { - let element = element.downcast_ref::().unwrap(); - - match transition { - gst::StateChange::NullToReady => { - self.start(element)?; - } - _ => (), - } - - let res = self.parent_change_state(element.upcast_ref(), transition)?; - - match transition { - gst::StateChange::ReadyToNull => { - self.stop(element)?; - } - _ => (), - } - - Ok(res) - } - } - - impl BinImpl for CustomSource { - #[allow(clippy::single_match)] - fn handle_message(&self, bin: &gst::Bin, msg: gst::Message) { - use gst::MessageView; - - match msg.view() { - MessageView::StreamCollection(_) => { - // TODO: Drop stream collection message for now, we only create a simple custom - // one here so that fallbacksrc can know about our streams. It is never - // forwarded. - if let Err(msg) = self.handle_source_no_more_pads(&bin) { - bin.post_error_message(msg); - } - } - _ => self.parent_handle_message(bin, msg), - } - } - } - - impl CustomSource { - fn start( - &self, - element: &gst::Bin, - ) -> Result { - gst_debug!(CAT, obj: element, "Starting"); - let source = self.source.get().unwrap(); - - let templates = source.get_pad_template_list(); - - if templates - .iter() - .any(|templ| templ.get_property_presence() == gst::PadPresence::Request) - { - gst_error!(CAT, obj: element, "Request pads not supported"); - gst_element_error!( - element, - gst::LibraryError::Settings, - ["Request pads not supported"] - ); - return Err(gst::StateChangeError); - } - - let has_sometimes_pads = templates - .iter() - .any(|templ| templ.get_property_presence() == gst::PadPresence::Sometimes); - - // Handle all source pads that already exist - for pad in source.get_src_pads() { - if let Err(msg) = self.handle_source_pad_added(&element, &pad) { - element.post_error_message(msg); - return Err(gst::StateChangeError); - } - } - - if !has_sometimes_pads { - if let Err(msg) = self.handle_source_no_more_pads(&element) { - element.post_error_message(msg); - return Err(gst::StateChangeError); - } - } else { - gst_debug!(CAT, obj: element, "Found sometimes pads"); - - let element_weak = element.downgrade(); - source.connect_pad_added(move |_, pad| { - let element = match element_weak.upgrade() { - None => return, - Some(element) => element, - }; - let src = CustomSource::from_instance(&element); - - if let Err(msg) = src.handle_source_pad_added(&element, pad) { - element.post_error_message(msg); - } - }); - let element_weak = element.downgrade(); - source.connect_pad_removed(move |_, pad| { - let element = match element_weak.upgrade() { - None => return, - Some(element) => element, - }; - let src = CustomSource::from_instance(&element); - - if let Err(msg) = src.handle_source_pad_removed(&element, pad) { - element.post_error_message(msg); - } - }); - - let element_weak = element.downgrade(); - source.connect_no_more_pads(move |_| { - let element = match element_weak.upgrade() { - None => return, - Some(element) => element, - }; - let src = CustomSource::from_instance(&element); - - if let Err(msg) = src.handle_source_no_more_pads(&element) { - element.post_error_message(msg); - } - }); - } - - Ok(gst::StateChangeSuccess::Success) - } - - fn handle_source_pad_added( - &self, - element: &gst::Bin, - pad: &gst::Pad, - ) -> Result<(), gst::ErrorMessage> { - gst_debug!(CAT, obj: element, "Source added pad {}", pad.get_name()); - - let mut state = self.state.lock().unwrap(); - - let mut stream_type = None; - - // Take stream type from stream-start event if we can - if let Some(event) = pad.get_sticky_event(gst::EventType::StreamStart, 0) { - if let gst::EventView::StreamStart(ev) = event.view() { - stream_type = ev.get_stream().map(|s| s.get_stream_type()); - } - } - - // Otherwise from the caps - if stream_type.is_none() { - let caps = match pad.get_current_caps().or_else(|| pad.query_caps(None)) { - Some(caps) if !caps.is_any() && !caps.is_empty() => caps, - _ => { - gst_error!(CAT, obj: element, "Pad {} had no caps", pad.get_name()); - return Err(gst_error_msg!( - gst::CoreError::Negotiation, - ["Pad had no caps"] - )); - } - }; - - let s = caps.get_structure(0).unwrap(); - - if s.get_name().starts_with("audio/") { - stream_type = Some(gst::StreamType::AUDIO); - } else if s.get_name().starts_with("video/") { - stream_type = Some(gst::StreamType::VIDEO); - } else { - return Ok(()); - } - } - - let stream_type = stream_type.unwrap(); - - let (templ, name) = if stream_type.contains(gst::StreamType::AUDIO) { - let name = format!("audio_{}", state.num_audio); - state.num_audio += 1; - (element.get_pad_template("audio_%u").unwrap(), name) - } else { - let name = format!("video_{}", state.num_video); - state.num_video += 1; - (element.get_pad_template("video_%u").unwrap(), name) - }; - - let ghost_pad = gst::GhostPad::builder_with_template(&templ, Some(&name)) - .build_with_target(pad) - .unwrap(); - - let stream = Stream { - source_pad: pad.clone(), - ghost_pad: ghost_pad.clone().upcast(), - // TODO: We only add the stream type right now - stream: gst::Stream::new(None, None, stream_type, gst::StreamFlags::empty()), - }; - state.pads.push(stream); - drop(state); - - ghost_pad.set_active(true).unwrap(); - element.add_pad(&ghost_pad).unwrap(); - - Ok(()) - } - - fn handle_source_pad_removed( - &self, - element: &gst::Bin, - pad: &gst::Pad, - ) -> Result<(), gst::ErrorMessage> { - gst_debug!(CAT, obj: element, "Source removed pad {}", pad.get_name()); - - let mut state = self.state.lock().unwrap(); - let (i, stream) = match state - .pads - .iter() - .enumerate() - .find(|(_i, p)| &p.source_pad == pad) - { - None => return Ok(()), - Some(v) => v, - }; - - let ghost_pad = stream.ghost_pad.clone(); - state.pads.remove(i); - drop(state); - - ghost_pad.set_active(false).unwrap(); - let _ = ghost_pad.set_target(None::<&gst::Pad>); - let _ = element.remove_pad(&ghost_pad); - - Ok(()) - } - - fn handle_source_no_more_pads(&self, element: &gst::Bin) -> Result<(), gst::ErrorMessage> { - gst_debug!(CAT, obj: element, "Source signalled no-more-pads"); - - let state = self.state.lock().unwrap(); - let streams = state - .pads - .iter() - .map(|p| p.stream.clone()) - .collect::>(); - let collection = gst::StreamCollection::builder(None) - .streams(&streams) - .build(); - drop(state); - - element.no_more_pads(); - - let _ = element.post_message( - gst::message::StreamsSelected::builder(&collection) - .src(element) - .build(), - ); - - Ok(()) - } - - fn stop( - &self, - element: &gst::Bin, - ) -> Result { - gst_debug!(CAT, obj: element, "Stopping"); - - let mut state = self.state.lock().unwrap(); - let pads = mem::replace(&mut state.pads, vec![]); - state.num_audio = 0; - state.num_video = 0; - drop(state); - - for pad in pads { - let _ = pad.ghost_pad.set_target(None::<&gst::Pad>); - let _ = element.remove_pad(&pad.ghost_pad); - } - - Ok(gst::StateChangeSuccess::Success) - } - - #[allow(clippy::new_ret_no_self)] - pub fn new(source: &gst::Element) -> gst::Element { - glib::Object::new(CustomSource::get_type(), &[("source", source)]) - .unwrap() - .downcast::() - .unwrap() - } - } -} diff --git a/utils/fallbackswitch/src/fallbacksrc/mod.rs b/utils/fallbackswitch/src/fallbacksrc/mod.rs new file mode 100644 index 000000000..4c1cc32a6 --- /dev/null +++ b/utils/fallbackswitch/src/fallbacksrc/mod.rs @@ -0,0 +1,60 @@ +// Copyright (C) 2020 Sebastian Dröge +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +use glib::prelude::*; + +mod custom_source; +mod imp; + +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, GEnum)] +#[repr(u32)] +#[genum(type_name = "GstFallbackSourceRetryReason")] +enum RetryReason { + None, + Error, + Eos, + StateChangeFailure, + Timeout, +} + +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, GEnum)] +#[repr(u32)] +#[genum(type_name = "GstFallbackSourceStatus")] +enum Status { + Stopped, + Buffering, + Retrying, + Running, +} + +glib_wrapper! { + pub struct FallbackSrc(ObjectSubclass) @extends gst::Bin, gst::Element, gst::Object; +} + +// GStreamer elements need to be thread-safe. For the private implementation this is automatically +// enforced but for the public wrapper type we need to specify this manually. +unsafe impl Send for FallbackSrc {} +unsafe impl Sync for FallbackSrc {} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "fallbacksrc", + gst::Rank::None, + FallbackSrc::static_type(), + ) +} diff --git a/utils/fallbackswitch/src/fallbackswitch.rs b/utils/fallbackswitch/src/fallbackswitch/imp.rs similarity index 90% rename from utils/fallbackswitch/src/fallbackswitch.rs rename to utils/fallbackswitch/src/fallbackswitch/imp.rs index 039977b1f..05fc2df2d 100644 --- a/utils/fallbackswitch/src/fallbackswitch.rs +++ b/utils/fallbackswitch/src/fallbackswitch/imp.rs @@ -20,7 +20,7 @@ use self::gst_base::prelude::*; #[cfg(not(feature = "v1_18"))] use self::gst_base::subclass::prelude::*; #[cfg(not(feature = "v1_18"))] -use super::gst_base_compat as gst_base; +use crate::gst_base_compat as gst_base; #[cfg(feature = "v1_18")] use gst_base::prelude::*; @@ -35,7 +35,7 @@ use once_cell::sync::Lazy; use std::sync::{Mutex, RwLock}; -struct FallbackSwitch { +pub struct FallbackSwitch { sinkpad: gst_base::AggregatorPad, fallback_sinkpad: RwLock>, active_sinkpad: Mutex>, @@ -119,7 +119,7 @@ static PROPERTIES: [subclass::Property; 2] = [ impl FallbackSwitch { fn handle_main_buffer( &self, - agg: &gst_base::Aggregator, + agg: &super::FallbackSwitch, state: &mut OutputState, settings: &Settings, mut buffer: gst::Buffer, @@ -259,7 +259,7 @@ impl FallbackSwitch { fn get_fallback_buffer( &self, - agg: &gst_base::Aggregator, + agg: &super::FallbackSwitch, state: &mut OutputState, settings: &Settings, fallback_sinkpad: &gst_base::AggregatorPad, @@ -363,7 +363,7 @@ impl FallbackSwitch { fn get_next_buffer( &self, - agg: &gst_base::Aggregator, + agg: &super::FallbackSwitch, timeout: bool, ) -> Result<(gst::Buffer, gst::Caps, bool), gst::FlowError> { let settings = self.settings.lock().unwrap().clone(); @@ -407,25 +407,17 @@ impl FallbackSwitch { impl ObjectSubclass for FallbackSwitch { const NAME: &'static str = "FallbackSwitch"; + type Type = super::FallbackSwitch; type ParentType = gst_base::Aggregator; type Instance = gst::subclass::ElementInstanceStruct; type Class = subclass::simple::ClassStruct; glib_object_subclass!(); - fn with_class(klass: &subclass::simple::ClassStruct) -> Self { + fn with_class(klass: &Self::Class) -> Self { let templ = klass.get_pad_template("sink").unwrap(); - let sinkpad: gst_base::AggregatorPad = glib::Object::new( - gst_base::AggregatorPad::static_type(), - &[ - ("name", &"sink"), - ("direction", &gst::PadDirection::Sink), - ("template", &templ), - ], - ) - .unwrap() - .downcast() - .unwrap(); + let sinkpad = + gst::PadBuilder::::from_template(&templ, Some("sink")).build(); Self { sinkpad, @@ -437,7 +429,7 @@ impl ObjectSubclass for FallbackSwitch { } } - fn class_init(klass: &mut subclass::simple::ClassStruct) { + fn class_init(klass: &mut Self::Class) { klass.set_metadata( "Fallback Switch", "Generic", @@ -481,16 +473,14 @@ impl ObjectSubclass for FallbackSwitch { } impl ObjectImpl for FallbackSwitch { - fn constructed(&self, obj: &glib::Object) { + fn constructed(&self, obj: &Self::Type) { self.parent_constructed(obj); - let agg = obj.downcast_ref::().unwrap(); - agg.add_pad(&self.sinkpad).unwrap(); + obj.add_pad(&self.sinkpad).unwrap(); } - fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) { + fn set_property(&self, obj: &Self::Type, id: usize, value: &glib::Value) { let prop = &PROPERTIES[id]; - let agg = obj.downcast_ref::().unwrap(); match *prop { subclass::Property("timeout", ..) => { @@ -498,7 +488,7 @@ impl ObjectImpl for FallbackSwitch { let timeout = value.get_some().expect("type checked upstream"); gst_info!( CAT, - obj: agg, + obj: obj, "Changing timeout from {} to {}", settings.timeout, timeout @@ -510,7 +500,7 @@ impl ObjectImpl for FallbackSwitch { } } - fn get_property(&self, _obj: &glib::Object, id: usize) -> Result { + fn get_property(&self, _obj: &Self::Type, id: usize) -> Result { let prop = &PROPERTIES[id]; match *prop { @@ -530,47 +520,39 @@ impl ObjectImpl for FallbackSwitch { impl ElementImpl for FallbackSwitch { fn request_new_pad( &self, - element: &gst::Element, + element: &Self::Type, templ: &gst::PadTemplate, name: Option, _caps: Option<&gst::Caps>, ) -> Option { - let agg = element.downcast_ref::().unwrap(); - let fallback_sink_templ = agg.get_pad_template("fallback_sink").unwrap(); + let fallback_sink_templ = element.get_pad_template("fallback_sink").unwrap(); if templ != &fallback_sink_templ || (name.is_some() && name.as_deref() != Some("fallback_sink")) { - gst_error!(CAT, obj: agg, "Wrong pad template or name"); + gst_error!(CAT, obj: element, "Wrong pad template or name"); return None; } let mut fallback_sinkpad = self.fallback_sinkpad.write().unwrap(); if fallback_sinkpad.is_some() { - gst_error!(CAT, obj: agg, "Already have a fallback sinkpad"); + gst_error!(CAT, obj: element, "Already have a fallback sinkpad"); return None; } - let sinkpad: gst_base::AggregatorPad = glib::Object::new( - gst_base::AggregatorPad::static_type(), - &[ - ("name", &"fallback_sink"), - ("direction", &gst::PadDirection::Sink), - ("template", templ), - ], + let sinkpad = gst::PadBuilder::::from_template( + &templ, + Some("fallback_sink"), ) - .unwrap() - .downcast() - .unwrap(); + .build(); *fallback_sinkpad = Some(sinkpad.clone()); drop(fallback_sinkpad); - agg.add_pad(&sinkpad).unwrap(); + element.add_pad(&sinkpad).unwrap(); Some(sinkpad.upcast()) } - fn release_pad(&self, element: &gst::Element, pad: &gst::Pad) { - let agg = element.downcast_ref::().unwrap(); + fn release_pad(&self, element: &Self::Type, pad: &gst::Pad) { let mut fallback_sinkpad = self.fallback_sinkpad.write().unwrap(); let mut pad_states = self.pad_states.write().unwrap(); @@ -579,21 +561,21 @@ impl ElementImpl for FallbackSwitch { pad_states.fallback_sinkpad = None; drop(pad_states); drop(fallback_sinkpad); - agg.remove_pad(pad).unwrap(); - gst_debug!(CAT, obj: agg, "Removed fallback sinkpad {:?}", pad); + element.remove_pad(pad).unwrap(); + gst_debug!(CAT, obj: element, "Removed fallback sinkpad {:?}", pad); } } } impl AggregatorImpl for FallbackSwitch { - fn start(&self, _agg: &gst_base::Aggregator) -> Result<(), gst::ErrorMessage> { + fn start(&self, _agg: &Self::Type) -> Result<(), gst::ErrorMessage> { *self.output_state.lock().unwrap() = OutputState::default(); *self.pad_states.write().unwrap() = PadStates::default(); Ok(()) } - fn stop(&self, _agg: &gst_base::Aggregator) -> Result<(), gst::ErrorMessage> { + fn stop(&self, _agg: &Self::Type) -> Result<(), gst::ErrorMessage> { *self.active_sinkpad.lock().unwrap() = None; Ok(()) @@ -601,7 +583,7 @@ impl AggregatorImpl for FallbackSwitch { fn sink_event_pre_queue( &self, - agg: &gst_base::Aggregator, + agg: &Self::Type, agg_pad: &gst_base::AggregatorPad, event: gst::Event, ) -> Result { @@ -618,7 +600,7 @@ impl AggregatorImpl for FallbackSwitch { fn sink_event( &self, - agg: &gst_base::Aggregator, + agg: &Self::Type, agg_pad: &gst_base::AggregatorPad, event: gst::Event, ) -> bool { @@ -662,7 +644,7 @@ impl AggregatorImpl for FallbackSwitch { } } - fn get_next_time(&self, agg: &gst_base::Aggregator) -> gst::ClockTime { + fn get_next_time(&self, agg: &Self::Type) -> gst::ClockTime { // If we have a buffer on the sinkpad then the timeout is always going to be immediately, // i.e. 0. We want to output that buffer immediately, no matter what. // @@ -715,7 +697,7 @@ impl AggregatorImpl for FallbackSwitch { // calculating the running times later works correctly fn clip( &self, - agg: &gst_base::Aggregator, + agg: &Self::Type, agg_pad: &gst_base::AggregatorPad, mut buffer: gst::Buffer, ) -> Option { @@ -810,7 +792,7 @@ impl AggregatorImpl for FallbackSwitch { fn aggregate( &self, - agg: &gst_base::Aggregator, + agg: &Self::Type, timeout: bool, ) -> Result { gst_debug!(CAT, obj: agg, "Aggregate called: timeout {}", timeout); @@ -838,16 +820,7 @@ impl AggregatorImpl for FallbackSwitch { agg.finish_buffer(buffer) } - fn negotiate(&self, _agg: &gst_base::Aggregator) -> bool { + fn negotiate(&self, _agg: &Self::Type) -> bool { true } } - -pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { - gst::Element::register( - Some(plugin), - "fallbackswitch", - gst::Rank::None, - FallbackSwitch::get_type(), - ) -} diff --git a/utils/fallbackswitch/src/fallbackswitch/mod.rs b/utils/fallbackswitch/src/fallbackswitch/mod.rs new file mode 100644 index 000000000..ed35e69bf --- /dev/null +++ b/utils/fallbackswitch/src/fallbackswitch/mod.rs @@ -0,0 +1,41 @@ +// Copyright (C) 2019 Sebastian Dröge +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +use glib::prelude::*; + +#[cfg(not(feature = "v1_18"))] +use crate::gst_base_compat as gst_base; + +mod imp; + +glib_wrapper! { + pub struct FallbackSwitch(ObjectSubclass) @extends gst_base::Aggregator, gst::Element, gst::Object; +} + +// GStreamer elements need to be thread-safe. For the private implementation this is automatically +// enforced but for the public wrapper type we need to specify this manually. +unsafe impl Send for FallbackSwitch {} +unsafe impl Sync for FallbackSwitch {} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "fallbackswitch", + gst::Rank::None, + FallbackSwitch::static_type(), + ) +} diff --git a/utils/togglerecord/src/togglerecord.rs b/utils/togglerecord/src/togglerecord/imp.rs similarity index 97% rename from utils/togglerecord/src/togglerecord.rs rename to utils/togglerecord/src/togglerecord/imp.rs index 938f2f0c2..fa0cdcfbf 100644 --- a/utils/togglerecord/src/togglerecord.rs +++ b/utils/togglerecord/src/togglerecord/imp.rs @@ -344,7 +344,7 @@ impl HandleData for gst::Buffer { } } -struct ToggleRecord { +pub struct ToggleRecord { settings: Mutex, state: Mutex, main_stream: Stream, @@ -367,7 +367,7 @@ lazy_static! { impl ToggleRecord { fn handle_main_stream( &self, - element: &gst::Element, + element: &super::ToggleRecord, pad: &gst::Pad, stream: &Stream, data: T, @@ -616,7 +616,7 @@ impl ToggleRecord { fn handle_secondary_stream( &self, - element: &gst::Element, + element: &super::ToggleRecord, pad: &gst::Pad, stream: &Stream, data: T, @@ -1034,7 +1034,7 @@ impl ToggleRecord { fn sink_chain( &self, pad: &gst::Pad, - element: &gst::Element, + element: &super::ToggleRecord, buffer: gst::Buffer, ) -> Result { let stream = self.pads.lock().get(pad).cloned().ok_or_else(|| { @@ -1143,7 +1143,12 @@ impl ToggleRecord { stream.srcpad.push(buffer) } - fn sink_event(&self, pad: &gst::Pad, element: &gst::Element, mut event: gst::Event) -> bool { + fn sink_event( + &self, + pad: &gst::Pad, + element: &super::ToggleRecord, + mut event: gst::Event, + ) -> bool { use gst::EventView; let stream = match self.pads.lock().get(pad) { @@ -1332,7 +1337,7 @@ impl ToggleRecord { fn sink_query( &self, pad: &gst::Pad, - element: &gst::Element, + element: &super::ToggleRecord, query: &mut gst::QueryRef, ) -> bool { let stream = match self.pads.lock().get(pad) { @@ -1355,7 +1360,12 @@ impl ToggleRecord { // FIXME `matches!` was introduced in rustc 1.42.0, current MSRV is 1.41.0 // FIXME uncomment when CI can upgrade to 1.47.1 //#[allow(clippy::match_like_matches_macro)] - fn src_event(&self, pad: &gst::Pad, element: &gst::Element, mut event: gst::Event) -> bool { + fn src_event( + &self, + pad: &gst::Pad, + element: &super::ToggleRecord, + mut event: gst::Event, + ) -> bool { use gst::EventView; let stream = match self.pads.lock().get(pad) { @@ -1394,7 +1404,12 @@ impl ToggleRecord { } } - fn src_query(&self, pad: &gst::Pad, element: &gst::Element, query: &mut gst::QueryRef) -> bool { + fn src_query( + &self, + pad: &gst::Pad, + element: &super::ToggleRecord, + query: &mut gst::QueryRef, + ) -> bool { use gst::QueryView; let stream = match self.pads.lock().get(pad) { @@ -1514,7 +1529,7 @@ impl ToggleRecord { fn iterate_internal_links( &self, pad: &gst::Pad, - element: &gst::Element, + element: &super::ToggleRecord, ) -> gst::Iterator { let stream = match self.pads.lock().get(pad) { None => { @@ -1538,13 +1553,14 @@ impl ToggleRecord { impl ObjectSubclass for ToggleRecord { const NAME: &'static str = "RsToggleRecord"; + type Type = super::ToggleRecord; type ParentType = gst::Element; type Instance = gst::subclass::ElementInstanceStruct; type Class = subclass::simple::ClassStruct; glib_object_subclass!(); - fn with_class(klass: &subclass::simple::ClassStruct) -> Self { + fn with_class(klass: &Self::Class) -> Self { let templ = klass.get_pad_template("sink").unwrap(); let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink")) .chain_function(|pad, parent, buffer| { @@ -1618,7 +1634,7 @@ impl ObjectSubclass for ToggleRecord { } } - fn class_init(klass: &mut subclass::simple::ClassStruct) { + fn class_init(klass: &mut Self::Class) { klass.install_properties(&PROPERTIES); klass.set_metadata( @@ -1668,9 +1684,8 @@ impl ObjectSubclass for ToggleRecord { } impl ObjectImpl for ToggleRecord { - fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) { + fn set_property(&self, obj: &Self::Type, id: usize, value: &glib::Value) { let prop = &PROPERTIES[id]; - let element = obj.downcast_ref::().unwrap(); match *prop { subclass::Property("record", ..) => { @@ -1678,7 +1693,7 @@ impl ObjectImpl for ToggleRecord { let record = value.get_some().expect("type checked upstream"); gst_debug!( CAT, - obj: element, + obj: obj, "Setting record from {:?} to {:?}", settings.record, record @@ -1690,7 +1705,7 @@ impl ObjectImpl for ToggleRecord { } } - fn get_property(&self, _obj: &glib::Object, id: usize) -> Result { + fn get_property(&self, _obj: &Self::Type, id: usize) -> Result { let prop = &PROPERTIES[id]; match *prop { @@ -1706,19 +1721,18 @@ impl ObjectImpl for ToggleRecord { } } - fn constructed(&self, obj: &glib::Object) { + fn constructed(&self, obj: &Self::Type) { self.parent_constructed(obj); - let element = obj.downcast_ref::().unwrap(); - element.add_pad(&self.main_stream.sinkpad).unwrap(); - element.add_pad(&self.main_stream.srcpad).unwrap(); + obj.add_pad(&self.main_stream.sinkpad).unwrap(); + obj.add_pad(&self.main_stream.srcpad).unwrap(); } } impl ElementImpl for ToggleRecord { fn change_state( &self, - element: &gst::Element, + element: &Self::Type, transition: gst::StateChange, ) -> Result { gst_trace!(CAT, obj: element, "Changing state {:?}", transition); @@ -1778,7 +1792,7 @@ impl ElementImpl for ToggleRecord { fn request_new_pad( &self, - element: &gst::Element, + element: &Self::Type, _templ: &gst::PadTemplate, _name: Option, _caps: Option<&gst::Caps>, @@ -1867,7 +1881,7 @@ impl ElementImpl for ToggleRecord { Some(sinkpad) } - fn release_pad(&self, element: &gst::Element, pad: &gst::Pad) { + fn release_pad(&self, element: &Self::Type, pad: &gst::Pad) { let mut other_streams_guard = self.other_streams.lock(); let (ref mut other_streams, _) = *other_streams_guard; let mut pads = self.pads.lock(); @@ -1894,12 +1908,3 @@ impl ElementImpl for ToggleRecord { element.remove_pad(&stream.srcpad).unwrap(); } } - -pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { - gst::Element::register( - Some(plugin), - "togglerecord", - gst::Rank::None, - ToggleRecord::get_type(), - ) -} diff --git a/utils/togglerecord/src/togglerecord/mod.rs b/utils/togglerecord/src/togglerecord/mod.rs new file mode 100644 index 000000000..43b811270 --- /dev/null +++ b/utils/togglerecord/src/togglerecord/mod.rs @@ -0,0 +1,38 @@ +// Copyright (C) 2017 Sebastian Dröge +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +use glib::prelude::*; + +mod imp; + +glib_wrapper! { + pub struct ToggleRecord(ObjectSubclass) @extends gst::Element, gst::Object; +} + +// GStreamer elements need to be thread-safe. For the private implementation this is automatically +// enforced but for the public wrapper type we need to specify this manually. +unsafe impl Send for ToggleRecord {} +unsafe impl Sync for ToggleRecord {} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "togglerecord", + gst::Rank::None, + ToggleRecord::static_type(), + ) +}