utils: Update for subclassing API changes

This commit is contained in:
Sebastian Dröge 2020-11-15 14:08:54 +02:00
parent 4829e31191
commit b021a8bf10
10 changed files with 906 additions and 710 deletions

View file

@ -8,6 +8,7 @@
use super::super::gst_base_sys;
use glib::prelude::*;
use glib::subclass::prelude::*;
use glib::translate::*;
@ -19,13 +20,13 @@ use super::super::Aggregator;
use super::super::AggregatorPad;
pub trait AggregatorImpl: AggregatorImplExt + ElementImpl {
fn flush(&self, aggregator: &Aggregator) -> Result<gst::FlowSuccess, gst::FlowError> {
fn flush(&self, aggregator: &Self::Type) -> Result<gst::FlowSuccess, gst::FlowError> {
self.parent_flush(aggregator)
}
fn clip(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
aggregator_pad: &AggregatorPad,
buffer: gst::Buffer,
) -> Option<gst::Buffer> {
@ -34,7 +35,7 @@ pub trait AggregatorImpl: AggregatorImplExt + ElementImpl {
fn finish_buffer(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
self.parent_finish_buffer(aggregator, buffer)
@ -42,7 +43,7 @@ pub trait AggregatorImpl: AggregatorImplExt + ElementImpl {
fn sink_event(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
aggregator_pad: &AggregatorPad,
event: gst::Event,
) -> bool {
@ -51,7 +52,7 @@ pub trait AggregatorImpl: AggregatorImplExt + ElementImpl {
fn sink_event_pre_queue(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
aggregator_pad: &AggregatorPad,
event: gst::Event,
) -> Result<gst::FlowSuccess, gst::FlowError> {
@ -60,7 +61,7 @@ pub trait AggregatorImpl: AggregatorImplExt + ElementImpl {
fn sink_query(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
aggregator_pad: &AggregatorPad,
query: &mut gst::QueryRef,
) -> bool {
@ -69,24 +70,24 @@ pub trait AggregatorImpl: AggregatorImplExt + ElementImpl {
fn sink_query_pre_queue(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
aggregator_pad: &AggregatorPad,
query: &mut gst::QueryRef,
) -> bool {
self.parent_sink_query_pre_queue(aggregator, aggregator_pad, query)
}
fn src_event(&self, aggregator: &Aggregator, event: gst::Event) -> bool {
fn src_event(&self, aggregator: &Self::Type, event: gst::Event) -> bool {
self.parent_src_event(aggregator, event)
}
fn src_query(&self, aggregator: &Aggregator, query: &mut gst::QueryRef) -> bool {
fn src_query(&self, aggregator: &Self::Type, query: &mut gst::QueryRef) -> bool {
self.parent_src_query(aggregator, query)
}
fn src_activate(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
mode: gst::PadMode,
active: bool,
) -> Result<(), gst::LoggableError> {
@ -95,27 +96,27 @@ pub trait AggregatorImpl: AggregatorImplExt + ElementImpl {
fn aggregate(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
timeout: bool,
) -> Result<gst::FlowSuccess, gst::FlowError> {
self.parent_aggregate(aggregator, timeout)
}
fn start(&self, aggregator: &Aggregator) -> Result<(), gst::ErrorMessage> {
fn start(&self, aggregator: &Self::Type) -> Result<(), gst::ErrorMessage> {
self.parent_start(aggregator)
}
fn stop(&self, aggregator: &Aggregator) -> Result<(), gst::ErrorMessage> {
fn stop(&self, aggregator: &Self::Type) -> Result<(), gst::ErrorMessage> {
self.parent_stop(aggregator)
}
fn get_next_time(&self, aggregator: &Aggregator) -> gst::ClockTime {
fn get_next_time(&self, aggregator: &Self::Type) -> gst::ClockTime {
self.parent_get_next_time(aggregator)
}
fn create_new_pad(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
templ: &gst::PadTemplate,
req_name: Option<&str>,
caps: Option<&gst::Caps>,
@ -125,99 +126,99 @@ pub trait AggregatorImpl: AggregatorImplExt + ElementImpl {
fn update_src_caps(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
caps: &gst::Caps,
) -> Result<gst::Caps, gst::FlowError> {
self.parent_update_src_caps(aggregator, caps)
}
fn fixate_src_caps(&self, aggregator: &Aggregator, caps: gst::Caps) -> gst::Caps {
fn fixate_src_caps(&self, aggregator: &Self::Type, caps: gst::Caps) -> gst::Caps {
self.parent_fixate_src_caps(aggregator, caps)
}
fn negotiated_src_caps(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
caps: &gst::Caps,
) -> Result<(), gst::LoggableError> {
self.parent_negotiated_src_caps(aggregator, caps)
}
fn negotiate(&self, aggregator: &Aggregator) -> bool {
fn negotiate(&self, aggregator: &Self::Type) -> bool {
self.parent_negotiate(aggregator)
}
}
pub trait AggregatorImplExt {
fn parent_flush(&self, aggregator: &Aggregator) -> Result<gst::FlowSuccess, gst::FlowError>;
pub trait AggregatorImplExt: ObjectSubclass {
fn parent_flush(&self, aggregator: &Self::Type) -> Result<gst::FlowSuccess, gst::FlowError>;
fn parent_clip(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
aggregator_pad: &AggregatorPad,
buffer: gst::Buffer,
) -> Option<gst::Buffer>;
fn parent_finish_buffer(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError>;
fn parent_sink_event(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
aggregator_pad: &AggregatorPad,
event: gst::Event,
) -> bool;
fn parent_sink_event_pre_queue(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
aggregator_pad: &AggregatorPad,
event: gst::Event,
) -> Result<gst::FlowSuccess, gst::FlowError>;
fn parent_sink_query(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
aggregator_pad: &AggregatorPad,
query: &mut gst::QueryRef,
) -> bool;
fn parent_sink_query_pre_queue(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
aggregator_pad: &AggregatorPad,
query: &mut gst::QueryRef,
) -> bool;
fn parent_src_event(&self, aggregator: &Aggregator, event: gst::Event) -> bool;
fn parent_src_event(&self, aggregator: &Self::Type, event: gst::Event) -> bool;
fn parent_src_query(&self, aggregator: &Aggregator, query: &mut gst::QueryRef) -> bool;
fn parent_src_query(&self, aggregator: &Self::Type, query: &mut gst::QueryRef) -> bool;
fn parent_src_activate(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
mode: gst::PadMode,
active: bool,
) -> Result<(), gst::LoggableError>;
fn parent_aggregate(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
timeout: bool,
) -> Result<gst::FlowSuccess, gst::FlowError>;
fn parent_start(&self, aggregator: &Aggregator) -> Result<(), gst::ErrorMessage>;
fn parent_start(&self, aggregator: &Self::Type) -> Result<(), gst::ErrorMessage>;
fn parent_stop(&self, aggregator: &Aggregator) -> Result<(), gst::ErrorMessage>;
fn parent_stop(&self, aggregator: &Self::Type) -> Result<(), gst::ErrorMessage>;
fn parent_get_next_time(&self, aggregator: &Aggregator) -> gst::ClockTime;
fn parent_get_next_time(&self, aggregator: &Self::Type) -> gst::ClockTime;
fn parent_create_new_pad(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
templ: &gst::PadTemplate,
req_name: Option<&str>,
caps: Option<&gst::Caps>,
@ -225,30 +226,35 @@ pub trait AggregatorImplExt {
fn parent_update_src_caps(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
caps: &gst::Caps,
) -> Result<gst::Caps, gst::FlowError>;
fn parent_fixate_src_caps(&self, aggregator: &Aggregator, caps: gst::Caps) -> gst::Caps;
fn parent_fixate_src_caps(&self, aggregator: &Self::Type, caps: gst::Caps) -> gst::Caps;
fn parent_negotiated_src_caps(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
caps: &gst::Caps,
) -> Result<(), gst::LoggableError>;
fn parent_negotiate(&self, aggregator: &Aggregator) -> bool;
fn parent_negotiate(&self, aggregator: &Self::Type) -> bool;
}
impl<T: AggregatorImpl> AggregatorImplExt for T {
fn parent_flush(&self, aggregator: &Aggregator) -> Result<gst::FlowSuccess, gst::FlowError> {
fn parent_flush(&self, aggregator: &Self::Type) -> Result<gst::FlowSuccess, gst::FlowError> {
unsafe {
let data = T::type_data();
let parent_class =
data.as_ref().get_parent_class() as *mut gst_base_sys::GstAggregatorClass;
(*parent_class)
.flush
.map(|f| from_glib(f(aggregator.to_glib_none().0)))
.map(|f| {
from_glib(f(aggregator
.unsafe_cast_ref::<Aggregator>()
.to_glib_none()
.0))
})
.unwrap_or(gst::FlowReturn::Ok)
.into_result()
}
@ -256,7 +262,7 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
fn parent_clip(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
aggregator_pad: &AggregatorPad,
buffer: gst::Buffer,
) -> Option<gst::Buffer> {
@ -267,7 +273,7 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
match (*parent_class).clip {
None => Some(buffer),
Some(ref func) => from_glib_full(func(
aggregator.to_glib_none().0,
aggregator.unsafe_cast_ref::<Aggregator>().to_glib_none().0,
aggregator_pad.to_glib_none().0,
buffer.into_ptr(),
)),
@ -277,7 +283,7 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
fn parent_finish_buffer(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
unsafe {
@ -287,14 +293,17 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
let f = (*parent_class)
.finish_buffer
.expect("Missing parent function `finish_buffer`");
gst::FlowReturn::from_glib(f(aggregator.to_glib_none().0, buffer.into_ptr()))
.into_result()
gst::FlowReturn::from_glib(f(
aggregator.unsafe_cast_ref::<Aggregator>().to_glib_none().0,
buffer.into_ptr(),
))
.into_result()
}
}
fn parent_sink_event(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
aggregator_pad: &AggregatorPad,
event: gst::Event,
) -> bool {
@ -306,7 +315,7 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
.sink_event
.expect("Missing parent function `sink_event`");
from_glib(f(
aggregator.to_glib_none().0,
aggregator.unsafe_cast_ref::<Aggregator>().to_glib_none().0,
aggregator_pad.to_glib_none().0,
event.into_ptr(),
))
@ -315,7 +324,7 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
fn parent_sink_event_pre_queue(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
aggregator_pad: &AggregatorPad,
event: gst::Event,
) -> Result<gst::FlowSuccess, gst::FlowError> {
@ -327,7 +336,7 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
.sink_event_pre_queue
.expect("Missing parent function `sink_event_pre_queue`");
gst::FlowReturn::from_glib(f(
aggregator.to_glib_none().0,
aggregator.unsafe_cast_ref::<Aggregator>().to_glib_none().0,
aggregator_pad.to_glib_none().0,
event.into_ptr(),
))
@ -337,7 +346,7 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
fn parent_sink_query(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
aggregator_pad: &AggregatorPad,
query: &mut gst::QueryRef,
) -> bool {
@ -349,7 +358,7 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
.sink_query
.expect("Missing parent function `sink_query`");
from_glib(f(
aggregator.to_glib_none().0,
aggregator.unsafe_cast_ref::<Aggregator>().to_glib_none().0,
aggregator_pad.to_glib_none().0,
query.as_mut_ptr(),
))
@ -358,7 +367,7 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
fn parent_sink_query_pre_queue(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
aggregator_pad: &AggregatorPad,
query: &mut gst::QueryRef,
) -> bool {
@ -370,14 +379,14 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
.sink_query_pre_queue
.expect("Missing parent function `sink_query`");
from_glib(f(
aggregator.to_glib_none().0,
aggregator.unsafe_cast_ref::<Aggregator>().to_glib_none().0,
aggregator_pad.to_glib_none().0,
query.as_mut_ptr(),
))
}
}
fn parent_src_event(&self, aggregator: &Aggregator, event: gst::Event) -> bool {
fn parent_src_event(&self, aggregator: &Self::Type, event: gst::Event) -> bool {
unsafe {
let data = T::type_data();
let parent_class =
@ -385,11 +394,14 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
let f = (*parent_class)
.src_event
.expect("Missing parent function `src_event`");
from_glib(f(aggregator.to_glib_none().0, event.into_ptr()))
from_glib(f(
aggregator.unsafe_cast_ref::<Aggregator>().to_glib_none().0,
event.into_ptr(),
))
}
}
fn parent_src_query(&self, aggregator: &Aggregator, query: &mut gst::QueryRef) -> bool {
fn parent_src_query(&self, aggregator: &Self::Type, query: &mut gst::QueryRef) -> bool {
unsafe {
let data = T::type_data();
let parent_class =
@ -397,13 +409,16 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
let f = (*parent_class)
.src_query
.expect("Missing parent function `src_query`");
from_glib(f(aggregator.to_glib_none().0, query.as_mut_ptr()))
from_glib(f(
aggregator.unsafe_cast_ref::<Aggregator>().to_glib_none().0,
query.as_mut_ptr(),
))
}
}
fn parent_src_activate(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
mode: gst::PadMode,
active: bool,
) -> Result<(), gst::LoggableError> {
@ -415,7 +430,7 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
None => Ok(()),
Some(f) => gst_result_from_gboolean!(
f(
aggregator.to_glib_none().0,
aggregator.unsafe_cast_ref::<Aggregator>().to_glib_none().0,
mode.to_glib(),
active.to_glib()
),
@ -428,7 +443,7 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
fn parent_aggregate(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
timeout: bool,
) -> Result<gst::FlowSuccess, gst::FlowError> {
unsafe {
@ -438,12 +453,15 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
let f = (*parent_class)
.aggregate
.expect("Missing parent function `aggregate`");
gst::FlowReturn::from_glib(f(aggregator.to_glib_none().0, timeout.to_glib()))
.into_result()
gst::FlowReturn::from_glib(f(
aggregator.unsafe_cast_ref::<Aggregator>().to_glib_none().0,
timeout.to_glib(),
))
.into_result()
}
}
fn parent_start(&self, aggregator: &Aggregator) -> Result<(), gst::ErrorMessage> {
fn parent_start(&self, aggregator: &Self::Type) -> Result<(), gst::ErrorMessage> {
unsafe {
let data = T::type_data();
let parent_class =
@ -451,7 +469,11 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
(*parent_class)
.start
.map(|f| {
if from_glib(f(aggregator.to_glib_none().0)) {
if from_glib(f(aggregator
.unsafe_cast_ref::<Aggregator>()
.to_glib_none()
.0))
{
Ok(())
} else {
Err(gst_error_msg!(
@ -464,7 +486,7 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
}
}
fn parent_stop(&self, aggregator: &Aggregator) -> Result<(), gst::ErrorMessage> {
fn parent_stop(&self, aggregator: &Self::Type) -> Result<(), gst::ErrorMessage> {
unsafe {
let data = T::type_data();
let parent_class =
@ -472,7 +494,11 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
(*parent_class)
.stop
.map(|f| {
if from_glib(f(aggregator.to_glib_none().0)) {
if from_glib(f(aggregator
.unsafe_cast_ref::<Aggregator>()
.to_glib_none()
.0))
{
Ok(())
} else {
Err(gst_error_msg!(
@ -485,21 +511,26 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
}
}
fn parent_get_next_time(&self, aggregator: &Aggregator) -> gst::ClockTime {
fn parent_get_next_time(&self, aggregator: &Self::Type) -> gst::ClockTime {
unsafe {
let data = T::type_data();
let parent_class =
data.as_ref().get_parent_class() as *mut gst_base_sys::GstAggregatorClass;
(*parent_class)
.get_next_time
.map(|f| from_glib(f(aggregator.to_glib_none().0)))
.map(|f| {
from_glib(f(aggregator
.unsafe_cast_ref::<Aggregator>()
.to_glib_none()
.0))
})
.unwrap_or(gst::CLOCK_TIME_NONE)
}
}
fn parent_create_new_pad(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
templ: &gst::PadTemplate,
req_name: Option<&str>,
caps: Option<&gst::Caps>,
@ -512,7 +543,7 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
.create_new_pad
.expect("Missing parent function `create_new_pad`");
from_glib_full(f(
aggregator.to_glib_none().0,
aggregator.unsafe_cast_ref::<Aggregator>().to_glib_none().0,
templ.to_glib_none().0,
req_name.to_glib_none().0,
caps.to_glib_none().0,
@ -522,7 +553,7 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
fn parent_update_src_caps(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
caps: &gst::Caps,
) -> Result<gst::Caps, gst::FlowError> {
unsafe {
@ -535,7 +566,7 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
let mut out_caps = ptr::null_mut();
gst::FlowReturn::from_glib(f(
aggregator.to_glib_none().0,
aggregator.unsafe_cast_ref::<Aggregator>().to_glib_none().0,
caps.as_mut_ptr(),
&mut out_caps,
))
@ -543,7 +574,7 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
}
}
fn parent_fixate_src_caps(&self, aggregator: &Aggregator, caps: gst::Caps) -> gst::Caps {
fn parent_fixate_src_caps(&self, aggregator: &Self::Type, caps: gst::Caps) -> gst::Caps {
unsafe {
let data = T::type_data();
let parent_class =
@ -552,13 +583,16 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
let f = (*parent_class)
.fixate_src_caps
.expect("Missing parent function `fixate_src_caps`");
from_glib_full(f(aggregator.to_glib_none().0, caps.into_ptr()))
from_glib_full(f(
aggregator.unsafe_cast_ref::<Aggregator>().to_glib_none().0,
caps.into_ptr(),
))
}
}
fn parent_negotiated_src_caps(
&self,
aggregator: &Aggregator,
aggregator: &Self::Type,
caps: &gst::Caps,
) -> Result<(), gst::LoggableError> {
unsafe {
@ -569,7 +603,10 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
.negotiated_src_caps
.map(|f| {
gst_result_from_gboolean!(
f(aggregator.to_glib_none().0, caps.to_glib_none().0),
f(
aggregator.unsafe_cast_ref::<Aggregator>().to_glib_none().0,
caps.to_glib_none().0
),
gst::CAT_RUST,
"Parent function `negotiated_src_caps` failed"
)
@ -578,14 +615,19 @@ impl<T: AggregatorImpl> AggregatorImplExt for T {
}
}
fn parent_negotiate(&self, aggregator: &Aggregator) -> bool {
fn parent_negotiate(&self, aggregator: &Self::Type) -> bool {
unsafe {
let data = T::type_data();
let parent_class =
data.as_ref().get_parent_class() as *mut gst_base_sys::GstAggregatorClass;
(*parent_class)
.negotiate
.map(|f| from_glib(f(aggregator.to_glib_none().0)))
.map(|f| {
from_glib(f(aggregator
.unsafe_cast_ref::<Aggregator>()
.to_glib_none()
.0))
})
.unwrap_or(true)
}
}
@ -595,31 +637,29 @@ unsafe impl<T: AggregatorImpl> IsSubclassable<T> for Aggregator
where
<T as ObjectSubclass>::Instance: PanicPoison,
{
fn override_vfuncs(klass: &mut glib::object::Class<Self>) {
fn override_vfuncs(klass: &mut glib::Class<Self>) {
<gst::Element as IsSubclassable<T>>::override_vfuncs(klass);
unsafe {
let klass = &mut *(klass.as_mut() as *mut gst_base_sys::GstAggregatorClass);
klass.flush = Some(aggregator_flush::<T>);
klass.clip = Some(aggregator_clip::<T>);
klass.finish_buffer = Some(aggregator_finish_buffer::<T>);
klass.sink_event = Some(aggregator_sink_event::<T>);
klass.sink_query = Some(aggregator_sink_query::<T>);
klass.src_event = Some(aggregator_src_event::<T>);
klass.src_query = Some(aggregator_src_query::<T>);
klass.src_activate = Some(aggregator_src_activate::<T>);
klass.aggregate = Some(aggregator_aggregate::<T>);
klass.start = Some(aggregator_start::<T>);
klass.stop = Some(aggregator_stop::<T>);
klass.get_next_time = Some(aggregator_get_next_time::<T>);
klass.create_new_pad = Some(aggregator_create_new_pad::<T>);
klass.update_src_caps = Some(aggregator_update_src_caps::<T>);
klass.fixate_src_caps = Some(aggregator_fixate_src_caps::<T>);
klass.negotiated_src_caps = Some(aggregator_negotiated_src_caps::<T>);
{
klass.sink_event_pre_queue = Some(aggregator_sink_event_pre_queue::<T>);
klass.sink_query_pre_queue = Some(aggregator_sink_query_pre_queue::<T>);
klass.negotiate = Some(aggregator_negotiate::<T>);
}
let klass = klass.as_mut();
klass.flush = Some(aggregator_flush::<T>);
klass.clip = Some(aggregator_clip::<T>);
klass.finish_buffer = Some(aggregator_finish_buffer::<T>);
klass.sink_event = Some(aggregator_sink_event::<T>);
klass.sink_query = Some(aggregator_sink_query::<T>);
klass.src_event = Some(aggregator_src_event::<T>);
klass.src_query = Some(aggregator_src_query::<T>);
klass.src_activate = Some(aggregator_src_activate::<T>);
klass.aggregate = Some(aggregator_aggregate::<T>);
klass.start = Some(aggregator_start::<T>);
klass.stop = Some(aggregator_stop::<T>);
klass.get_next_time = Some(aggregator_get_next_time::<T>);
klass.create_new_pad = Some(aggregator_create_new_pad::<T>);
klass.update_src_caps = Some(aggregator_update_src_caps::<T>);
klass.fixate_src_caps = Some(aggregator_fixate_src_caps::<T>);
klass.negotiated_src_caps = Some(aggregator_negotiated_src_caps::<T>);
{
klass.sink_event_pre_queue = Some(aggregator_sink_event_pre_queue::<T>);
klass.sink_query_pre_queue = Some(aggregator_sink_query_pre_queue::<T>);
klass.negotiate = Some(aggregator_negotiate::<T>);
}
}
}
@ -635,7 +675,7 @@ where
let wrap: Borrowed<Aggregator> = from_glib_borrow(ptr);
gst_panic_to_error!(&wrap, &instance.panicked(), gst::FlowReturn::Error, {
imp.flush(&wrap).into()
imp.flush(wrap.unsafe_cast_ref()).into()
})
.to_glib()
}
@ -654,7 +694,7 @@ where
let ret = gst_panic_to_error!(&wrap, &instance.panicked(), None, {
imp.clip(
&wrap,
wrap.unsafe_cast_ref(),
&from_glib_borrow(aggregator_pad),
from_glib_full(buffer),
)
@ -675,7 +715,8 @@ where
let wrap: Borrowed<Aggregator> = from_glib_borrow(ptr);
gst_panic_to_error!(&wrap, &instance.panicked(), gst::FlowReturn::Error, {
imp.finish_buffer(&wrap, from_glib_full(buffer)).into()
imp.finish_buffer(wrap.unsafe_cast_ref(), from_glib_full(buffer))
.into()
})
.to_glib()
}
@ -692,9 +733,9 @@ where
let imp = instance.get_impl();
let wrap: Borrowed<Aggregator> = from_glib_borrow(ptr);
gst_panic_to_error!(&wrap, &instance.panicked(), false, {
gst_panic_to_error!(wrap, &instance.panicked(), false, {
imp.sink_event(
&wrap,
wrap.unsafe_cast_ref(),
&from_glib_borrow(aggregator_pad),
from_glib_full(event),
)
@ -716,7 +757,7 @@ where
gst_panic_to_error!(&wrap, &instance.panicked(), gst::FlowReturn::Error, {
imp.sink_event_pre_queue(
&wrap,
wrap.unsafe_cast_ref(),
&from_glib_borrow(aggregator_pad),
from_glib_full(event),
)
@ -739,7 +780,7 @@ where
gst_panic_to_error!(&wrap, &instance.panicked(), false, {
imp.sink_query(
&wrap,
wrap.unsafe_cast_ref(),
&from_glib_borrow(aggregator_pad),
gst::QueryRef::from_mut_ptr(query),
)
@ -761,7 +802,7 @@ where
gst_panic_to_error!(&wrap, &instance.panicked(), false, {
imp.sink_query_pre_queue(
&wrap,
wrap.unsafe_cast_ref(),
&from_glib_borrow(aggregator_pad),
gst::QueryRef::from_mut_ptr(query),
)
@ -781,7 +822,7 @@ where
let wrap: Borrowed<Aggregator> = from_glib_borrow(ptr);
gst_panic_to_error!(&wrap, &instance.panicked(), false, {
imp.src_event(&wrap, from_glib_full(event))
imp.src_event(wrap.unsafe_cast_ref(), from_glib_full(event))
})
.to_glib()
}
@ -798,7 +839,7 @@ where
let wrap: Borrowed<Aggregator> = from_glib_borrow(ptr);
gst_panic_to_error!(&wrap, &instance.panicked(), false, {
imp.src_query(&wrap, gst::QueryRef::from_mut_ptr(query))
imp.src_query(wrap.unsafe_cast_ref(), gst::QueryRef::from_mut_ptr(query))
})
.to_glib()
}
@ -816,7 +857,7 @@ where
let wrap: Borrowed<Aggregator> = from_glib_borrow(ptr);
gst_panic_to_error!(&wrap, &instance.panicked(), false, {
match imp.src_activate(&wrap, from_glib(mode), from_glib(active)) {
match imp.src_activate(wrap.unsafe_cast_ref(), from_glib(mode), from_glib(active)) {
Ok(()) => true,
Err(err) => {
err.log_with_object(&*wrap);
@ -839,7 +880,8 @@ where
let wrap: Borrowed<Aggregator> = from_glib_borrow(ptr);
gst_panic_to_error!(&wrap, &instance.panicked(), gst::FlowReturn::Error, {
imp.aggregate(&wrap, from_glib(timeout)).into()
imp.aggregate(wrap.unsafe_cast_ref(), from_glib(timeout))
.into()
})
.to_glib()
}
@ -855,7 +897,7 @@ where
let wrap: Borrowed<Aggregator> = from_glib_borrow(ptr);
gst_panic_to_error!(&wrap, &instance.panicked(), false, {
match imp.start(&wrap) {
match imp.start(wrap.unsafe_cast_ref()) {
Ok(()) => true,
Err(err) => {
wrap.post_error_message(err);
@ -877,7 +919,7 @@ where
let wrap: Borrowed<Aggregator> = from_glib_borrow(ptr);
gst_panic_to_error!(&wrap, &instance.panicked(), false, {
match imp.stop(&wrap) {
match imp.stop(wrap.unsafe_cast_ref()) {
Ok(()) => true,
Err(err) => {
wrap.post_error_message(err);
@ -899,7 +941,7 @@ where
let wrap: Borrowed<Aggregator> = from_glib_borrow(ptr);
gst_panic_to_error!(&wrap, &instance.panicked(), gst::CLOCK_TIME_NONE, {
imp.get_next_time(&wrap)
imp.get_next_time(wrap.unsafe_cast_ref())
})
.to_glib()
}
@ -921,7 +963,7 @@ where
let req_name: Borrowed<Option<glib::GString>> = from_glib_borrow(req_name);
imp.create_new_pad(
&wrap,
wrap.unsafe_cast_ref(),
&from_glib_borrow(templ),
req_name.as_ref().as_ref().map(|s| s.as_str()),
Option::<gst::Caps>::from_glib_borrow(caps)
@ -947,7 +989,7 @@ where
*res = ptr::null_mut();
gst_panic_to_error!(&wrap, &instance.panicked(), gst::FlowReturn::Error, {
match imp.update_src_caps(&wrap, &from_glib_borrow(caps)) {
match imp.update_src_caps(wrap.unsafe_cast_ref(), &from_glib_borrow(caps)) {
Ok(res_caps) => {
*res = res_caps.into_ptr();
gst::FlowReturn::Ok
@ -970,7 +1012,7 @@ where
let wrap: Borrowed<Aggregator> = from_glib_borrow(ptr);
gst_panic_to_error!(&wrap, &instance.panicked(), gst::Caps::new_empty(), {
imp.fixate_src_caps(&wrap, from_glib_full(caps))
imp.fixate_src_caps(wrap.unsafe_cast_ref(), from_glib_full(caps))
})
.into_ptr()
}
@ -987,7 +1029,7 @@ where
let wrap: Borrowed<Aggregator> = from_glib_borrow(ptr);
gst_panic_to_error!(&wrap, &instance.panicked(), false, {
match imp.negotiated_src_caps(&wrap, &from_glib_borrow(caps)) {
match imp.negotiated_src_caps(wrap.unsafe_cast_ref(), &from_glib_borrow(caps)) {
Ok(()) => true,
Err(err) => {
err.log_with_object(&*wrap);
@ -1008,5 +1050,8 @@ where
let imp = instance.get_impl();
let wrap: Borrowed<Aggregator> = from_glib_borrow(ptr);
gst_panic_to_error!(&wrap, &instance.panicked(), false, { imp.negotiate(&wrap) }).to_glib()
gst_panic_to_error!(&wrap, &instance.panicked(), false, {
imp.negotiate(wrap.unsafe_cast_ref())
})
.to_glib()
}

View file

@ -8,9 +8,10 @@
use super::super::gst_base_sys;
use glib::prelude::*;
use glib::subclass::prelude::*;
use glib::translate::*;
use glib::subclass::prelude::*;
use gst::subclass::prelude::*;
use super::super::Aggregator;
@ -19,7 +20,7 @@ use super::super::AggregatorPad;
pub trait AggregatorPadImpl: AggregatorPadImplExt + PadImpl {
fn flush(
&self,
aggregator_pad: &AggregatorPad,
aggregator_pad: &Self::Type,
aggregator: &Aggregator,
) -> Result<gst::FlowSuccess, gst::FlowError> {
self.parent_flush(aggregator_pad, aggregator)
@ -27,7 +28,7 @@ pub trait AggregatorPadImpl: AggregatorPadImplExt + PadImpl {
fn skip_buffer(
&self,
aggregator_pad: &AggregatorPad,
aggregator_pad: &Self::Type,
aggregator: &Aggregator,
buffer: &gst::Buffer,
) -> bool {
@ -35,16 +36,16 @@ pub trait AggregatorPadImpl: AggregatorPadImplExt + PadImpl {
}
}
pub trait AggregatorPadImplExt {
pub trait AggregatorPadImplExt: ObjectSubclass {
fn parent_flush(
&self,
aggregator_pad: &AggregatorPad,
aggregator_pad: &Self::Type,
aggregator: &Aggregator,
) -> Result<gst::FlowSuccess, gst::FlowError>;
fn parent_skip_buffer(
&self,
aggregator_pad: &AggregatorPad,
aggregator_pad: &Self::Type,
aggregator: &Aggregator,
buffer: &gst::Buffer,
) -> bool;
@ -53,7 +54,7 @@ pub trait AggregatorPadImplExt {
impl<T: AggregatorPadImpl> AggregatorPadImplExt for T {
fn parent_flush(
&self,
aggregator_pad: &AggregatorPad,
aggregator_pad: &Self::Type,
aggregator: &Aggregator,
) -> Result<gst::FlowSuccess, gst::FlowError> {
unsafe {
@ -64,7 +65,10 @@ impl<T: AggregatorPadImpl> AggregatorPadImplExt for T {
.flush
.map(|f| {
from_glib(f(
aggregator_pad.to_glib_none().0,
aggregator_pad
.unsafe_cast_ref::<AggregatorPad>()
.to_glib_none()
.0,
aggregator.to_glib_none().0,
))
})
@ -75,7 +79,7 @@ impl<T: AggregatorPadImpl> AggregatorPadImplExt for T {
fn parent_skip_buffer(
&self,
aggregator_pad: &AggregatorPad,
aggregator_pad: &Self::Type,
aggregator: &Aggregator,
buffer: &gst::Buffer,
) -> bool {
@ -87,7 +91,10 @@ impl<T: AggregatorPadImpl> AggregatorPadImplExt for T {
.skip_buffer
.map(|f| {
from_glib(f(
aggregator_pad.to_glib_none().0,
aggregator_pad
.unsafe_cast_ref::<AggregatorPad>()
.to_glib_none()
.0,
aggregator.to_glib_none().0,
buffer.to_glib_none().0,
))
@ -97,13 +104,11 @@ impl<T: AggregatorPadImpl> AggregatorPadImplExt for T {
}
}
unsafe impl<T: AggregatorPadImpl> IsSubclassable<T> for AggregatorPad {
fn override_vfuncs(klass: &mut glib::object::Class<Self>) {
fn override_vfuncs(klass: &mut glib::Class<Self>) {
<gst::Pad as IsSubclassable<T>>::override_vfuncs(klass);
unsafe {
let klass = &mut *(klass.as_mut() as *mut gst_base_sys::GstAggregatorPadClass);
klass.flush = Some(aggregator_pad_flush::<T>);
klass.skip_buffer = Some(aggregator_pad_skip_buffer::<T>);
}
let klass = klass.as_mut();
klass.flush = Some(aggregator_pad_flush::<T>);
klass.skip_buffer = Some(aggregator_pad_skip_buffer::<T>);
}
}
@ -115,7 +120,9 @@ unsafe extern "C" fn aggregator_pad_flush<T: AggregatorPadImpl>(
let imp = instance.get_impl();
let wrap: Borrowed<AggregatorPad> = from_glib_borrow(ptr);
let res: gst::FlowReturn = imp.flush(&wrap, &from_glib_borrow(aggregator)).into();
let res: gst::FlowReturn = imp
.flush(wrap.unsafe_cast_ref(), &from_glib_borrow(aggregator))
.into();
res.to_glib()
}
@ -129,7 +136,7 @@ unsafe extern "C" fn aggregator_pad_skip_buffer<T: AggregatorPadImpl>(
let wrap: Borrowed<AggregatorPad> = from_glib_borrow(ptr);
imp.skip_buffer(
&wrap,
wrap.unsafe_cast_ref(),
&from_glib_borrow(aggregator),
&from_glib_borrow(buffer),
)

View file

@ -0,0 +1,410 @@
// Copyright (C) 2020 Sebastian Dröge <sebastian@centricular.com>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.
use glib::prelude::*;
use glib::subclass;
use glib::subclass::prelude::*;
use gst::prelude::*;
use gst::subclass::prelude::*;
use std::{mem, sync::Mutex};
use once_cell::sync::Lazy;
use once_cell::sync::OnceCell;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"fallbacksrc-custom-source",
gst::DebugColorFlags::empty(),
Some("Fallback Custom Source Bin"),
)
});
static PROPERTIES: [subclass::Property; 1] = [subclass::Property("source", |name| {
glib::ParamSpec::object(
name,
"Source",
"Source",
gst::Element::static_type(),
glib::ParamFlags::WRITABLE | glib::ParamFlags::CONSTRUCT_ONLY,
)
})];
struct Stream {
source_pad: gst::Pad,
ghost_pad: gst::GhostPad,
// Dummy stream we created
stream: gst::Stream,
}
struct State {
pads: Vec<Stream>,
num_audio: usize,
num_video: usize,
}
pub struct CustomSource {
source: OnceCell<gst::Element>,
state: Mutex<State>,
}
impl ObjectSubclass for CustomSource {
const NAME: &'static str = "FallbackSrcCustomSource";
type Type = super::CustomSource;
type ParentType = gst::Bin;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib_object_subclass!();
fn new() -> Self {
Self {
source: OnceCell::default(),
state: Mutex::new(State {
pads: vec![],
num_audio: 0,
num_video: 0,
}),
}
}
fn class_init(klass: &mut Self::Class) {
let src_pad_template = gst::PadTemplate::new(
"audio_%u",
gst::PadDirection::Src,
gst::PadPresence::Sometimes,
&gst::Caps::new_any(),
)
.unwrap();
klass.add_pad_template(src_pad_template);
let src_pad_template = gst::PadTemplate::new(
"video_%u",
gst::PadDirection::Src,
gst::PadPresence::Sometimes,
&gst::Caps::new_any(),
)
.unwrap();
klass.add_pad_template(src_pad_template);
klass.install_properties(&PROPERTIES);
}
}
impl ObjectImpl for CustomSource {
fn set_property(&self, obj: &Self::Type, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
match *prop {
subclass::Property("source", ..) => {
let source = value.get::<gst::Element>().unwrap().unwrap();
self.source.set(source.clone()).unwrap();
obj.add(&source).unwrap();
}
_ => unreachable!(),
}
}
fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
obj.set_suppressed_flags(gst::ElementFlags::SOURCE | gst::ElementFlags::SINK);
obj.set_element_flags(gst::ElementFlags::SOURCE);
obj.set_bin_flags(gst::BinFlags::STREAMS_AWARE);
}
}
impl ElementImpl for CustomSource {
#[allow(clippy::single_match)]
fn change_state(
&self,
element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
match transition {
gst::StateChange::NullToReady => {
self.start(element)?;
}
_ => (),
}
let res = self.parent_change_state(element, transition)?;
match transition {
gst::StateChange::ReadyToNull => {
self.stop(element)?;
}
_ => (),
}
Ok(res)
}
}
impl BinImpl for CustomSource {
#[allow(clippy::single_match)]
fn handle_message(&self, bin: &Self::Type, msg: gst::Message) {
use gst::MessageView;
match msg.view() {
MessageView::StreamCollection(_) => {
// TODO: Drop stream collection message for now, we only create a simple custom
// one here so that fallbacksrc can know about our streams. It is never
// forwarded.
if let Err(msg) = self.handle_source_no_more_pads(&bin) {
bin.post_error_message(msg);
}
}
_ => self.parent_handle_message(bin, msg),
}
}
}
impl CustomSource {
fn start(
&self,
element: &super::CustomSource,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_debug!(CAT, obj: element, "Starting");
let source = self.source.get().unwrap();
let templates = source.get_pad_template_list();
if templates
.iter()
.any(|templ| templ.get_property_presence() == gst::PadPresence::Request)
{
gst_error!(CAT, obj: element, "Request pads not supported");
gst_element_error!(
element,
gst::LibraryError::Settings,
["Request pads not supported"]
);
return Err(gst::StateChangeError);
}
let has_sometimes_pads = templates
.iter()
.any(|templ| templ.get_property_presence() == gst::PadPresence::Sometimes);
// Handle all source pads that already exist
for pad in source.get_src_pads() {
if let Err(msg) = self.handle_source_pad_added(&element, &pad) {
element.post_error_message(msg);
return Err(gst::StateChangeError);
}
}
if !has_sometimes_pads {
if let Err(msg) = self.handle_source_no_more_pads(&element) {
element.post_error_message(msg);
return Err(gst::StateChangeError);
}
} else {
gst_debug!(CAT, obj: element, "Found sometimes pads");
let element_weak = element.downgrade();
source.connect_pad_added(move |_, pad| {
let element = match element_weak.upgrade() {
None => return,
Some(element) => element,
};
let src = CustomSource::from_instance(&element);
if let Err(msg) = src.handle_source_pad_added(&element, pad) {
element.post_error_message(msg);
}
});
let element_weak = element.downgrade();
source.connect_pad_removed(move |_, pad| {
let element = match element_weak.upgrade() {
None => return,
Some(element) => element,
};
let src = CustomSource::from_instance(&element);
if let Err(msg) = src.handle_source_pad_removed(&element, pad) {
element.post_error_message(msg);
}
});
let element_weak = element.downgrade();
source.connect_no_more_pads(move |_| {
let element = match element_weak.upgrade() {
None => return,
Some(element) => element,
};
let src = CustomSource::from_instance(&element);
if let Err(msg) = src.handle_source_no_more_pads(&element) {
element.post_error_message(msg);
}
});
}
Ok(gst::StateChangeSuccess::Success)
}
fn handle_source_pad_added(
&self,
element: &super::CustomSource,
pad: &gst::Pad,
) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Source added pad {}", pad.get_name());
let mut state = self.state.lock().unwrap();
let mut stream_type = None;
// Take stream type from stream-start event if we can
if let Some(event) = pad.get_sticky_event(gst::EventType::StreamStart, 0) {
if let gst::EventView::StreamStart(ev) = event.view() {
stream_type = ev.get_stream().map(|s| s.get_stream_type());
}
}
// Otherwise from the caps
if stream_type.is_none() {
let caps = match pad.get_current_caps().or_else(|| pad.query_caps(None)) {
Some(caps) if !caps.is_any() && !caps.is_empty() => caps,
_ => {
gst_error!(CAT, obj: element, "Pad {} had no caps", pad.get_name());
return Err(gst_error_msg!(
gst::CoreError::Negotiation,
["Pad had no caps"]
));
}
};
let s = caps.get_structure(0).unwrap();
if s.get_name().starts_with("audio/") {
stream_type = Some(gst::StreamType::AUDIO);
} else if s.get_name().starts_with("video/") {
stream_type = Some(gst::StreamType::VIDEO);
} else {
return Ok(());
}
}
let stream_type = stream_type.unwrap();
let (templ, name) = if stream_type.contains(gst::StreamType::AUDIO) {
let name = format!("audio_{}", state.num_audio);
state.num_audio += 1;
(element.get_pad_template("audio_%u").unwrap(), name)
} else {
let name = format!("video_{}", state.num_video);
state.num_video += 1;
(element.get_pad_template("video_%u").unwrap(), name)
};
let ghost_pad = gst::GhostPad::builder_with_template(&templ, Some(&name))
.build_with_target(pad)
.unwrap();
let stream = Stream {
source_pad: pad.clone(),
ghost_pad: ghost_pad.clone().upcast(),
// TODO: We only add the stream type right now
stream: gst::Stream::new(None, None, stream_type, gst::StreamFlags::empty()),
};
state.pads.push(stream);
drop(state);
ghost_pad.set_active(true).unwrap();
element.add_pad(&ghost_pad).unwrap();
Ok(())
}
fn handle_source_pad_removed(
&self,
element: &super::CustomSource,
pad: &gst::Pad,
) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Source removed pad {}", pad.get_name());
let mut state = self.state.lock().unwrap();
let (i, stream) = match state
.pads
.iter()
.enumerate()
.find(|(_i, p)| &p.source_pad == pad)
{
None => return Ok(()),
Some(v) => v,
};
let ghost_pad = stream.ghost_pad.clone();
state.pads.remove(i);
drop(state);
ghost_pad.set_active(false).unwrap();
let _ = ghost_pad.set_target(None::<&gst::Pad>);
let _ = element.remove_pad(&ghost_pad);
Ok(())
}
fn handle_source_no_more_pads(
&self,
element: &super::CustomSource,
) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Source signalled no-more-pads");
let state = self.state.lock().unwrap();
let streams = state
.pads
.iter()
.map(|p| p.stream.clone())
.collect::<Vec<_>>();
let collection = gst::StreamCollection::builder(None)
.streams(&streams)
.build();
drop(state);
element.no_more_pads();
let _ = element.post_message(
gst::message::StreamsSelected::builder(&collection)
.src(element)
.build(),
);
Ok(())
}
fn stop(
&self,
element: &super::CustomSource,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_debug!(CAT, obj: element, "Stopping");
let mut state = self.state.lock().unwrap();
let pads = mem::replace(&mut state.pads, vec![]);
state.num_audio = 0;
state.num_video = 0;
drop(state);
for pad in pads {
let _ = pad.ghost_pad.set_target(None::<&gst::Pad>);
let _ = element.remove_pad(&pad.ghost_pad);
}
Ok(gst::StateChangeSuccess::Success)
}
}

View file

@ -0,0 +1,38 @@
// Copyright (C) 2020 Sebastian Dröge <sebastian@centricular.com>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.
use glib::prelude::*;
mod imp;
glib_wrapper! {
pub struct CustomSource(ObjectSubclass<imp::CustomSource>) @extends gst::Bin, gst::Element, gst::Object;
}
// GStreamer elements need to be thread-safe. For the private implementation this is automatically
// enforced but for the public wrapper type we need to specify this manually.
unsafe impl Send for CustomSource {}
unsafe impl Sync for CustomSource {}
impl CustomSource {
pub fn new(source: &gst::Element) -> CustomSource {
glib::Object::new(CustomSource::static_type(), &[("source", source)])
.unwrap()
.downcast()
.unwrap()
}
}

View file

@ -27,6 +27,9 @@ use std::time::{Duration, Instant};
use once_cell::sync::Lazy;
use super::custom_source::CustomSource;
use super::{RetryReason, Status};
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"fallbacksrc",
@ -35,17 +38,6 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
)
});
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, GEnum)]
#[repr(u32)]
#[genum(type_name = "GstFallbackSourceRetryReason")]
enum RetryReason {
None,
Error,
Eos,
StateChangeFailure,
Timeout,
}
#[derive(Debug, Clone)]
struct Stats {
num_retry: u64,
@ -178,21 +170,11 @@ struct State {
stats: Stats,
}
struct FallbackSrc {
pub struct FallbackSrc {
settings: Mutex<Settings>,
state: Mutex<Option<State>>,
}
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, GEnum)]
#[repr(u32)]
#[genum(type_name = "GstFallbackSourceStatus")]
enum Status {
Stopped,
Buffering,
Retrying,
Running,
}
static PROPERTIES: [subclass::Property; 13] = [
subclass::Property("enable-audio", |name| {
glib::ParamSpec::boolean(
@ -322,6 +304,7 @@ static PROPERTIES: [subclass::Property; 13] = [
impl ObjectSubclass for FallbackSrc {
const NAME: &'static str = "FallbackSrc";
type Type = super::FallbackSrc;
type ParentType = gst::Bin;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
@ -335,7 +318,7 @@ impl ObjectSubclass for FallbackSrc {
}
}
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
fn class_init(klass: &mut Self::Class) {
klass.set_metadata(
"Fallback Source",
"Generic/Source",
@ -381,9 +364,8 @@ impl ObjectSubclass for FallbackSrc {
}
impl ObjectImpl for FallbackSrc {
fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) {
fn set_property(&self, obj: &Self::Type, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
let element = obj.downcast_ref::<gst::Bin>().unwrap();
match *prop {
subclass::Property("enable-audio", ..) => {
@ -391,7 +373,7 @@ impl ObjectImpl for FallbackSrc {
let new_value = value.get_some().expect("type checked upstream");
gst_info!(
CAT,
obj: element,
obj: obj,
"Changing enable-audio from {:?} to {:?}",
settings.enable_audio,
new_value,
@ -403,7 +385,7 @@ impl ObjectImpl for FallbackSrc {
let new_value = value.get_some().expect("type checked upstream");
gst_info!(
CAT,
obj: element,
obj: obj,
"Changing enable-video from {:?} to {:?}",
settings.enable_video,
new_value,
@ -415,7 +397,7 @@ impl ObjectImpl for FallbackSrc {
let new_value = value.get().expect("type checked upstream");
gst_info!(
CAT,
obj: element,
obj: obj,
"Changing URI from {:?} to {:?}",
settings.uri,
new_value,
@ -427,7 +409,7 @@ impl ObjectImpl for FallbackSrc {
let new_value = value.get().expect("type checked upstream");
gst_info!(
CAT,
obj: element,
obj: obj,
"Changing source from {:?} to {:?}",
settings.source,
new_value,
@ -439,7 +421,7 @@ impl ObjectImpl for FallbackSrc {
let new_value = value.get().expect("type checked upstream");
gst_info!(
CAT,
obj: element,
obj: obj,
"Changing Fallback URI from {:?} to {:?}",
settings.fallback_uri,
new_value,
@ -451,7 +433,7 @@ impl ObjectImpl for FallbackSrc {
let new_value = value.get_some().expect("type checked upstream");
gst_info!(
CAT,
obj: element,
obj: obj,
"Changing timeout from {:?} to {:?}",
settings.timeout,
new_value,
@ -463,7 +445,7 @@ impl ObjectImpl for FallbackSrc {
let new_value = value.get_some().expect("type checked upstream");
gst_info!(
CAT,
obj: element,
obj: obj,
"Changing Restart Timeout from {:?} to {:?}",
settings.restart_timeout,
new_value,
@ -475,7 +457,7 @@ impl ObjectImpl for FallbackSrc {
let new_value = value.get_some().expect("type checked upstream");
gst_info!(
CAT,
obj: element,
obj: obj,
"Changing Retry Timeout from {:?} to {:?}",
settings.retry_timeout,
new_value,
@ -487,7 +469,7 @@ impl ObjectImpl for FallbackSrc {
let new_value = value.get_some().expect("type checked upstream");
gst_info!(
CAT,
obj: element,
obj: obj,
"Changing restart-on-eos from {:?} to {:?}",
settings.restart_on_eos,
new_value,
@ -499,7 +481,7 @@ impl ObjectImpl for FallbackSrc {
let new_value = value.get_some().expect("type checked upstream");
gst_info!(
CAT,
obj: element,
obj: obj,
"Changing Minimum Latency from {:?} to {:?}",
settings.min_latency,
new_value,
@ -511,7 +493,7 @@ impl ObjectImpl for FallbackSrc {
let new_value = value.get_some().expect("type checked upstream");
gst_info!(
CAT,
obj: element,
obj: obj,
"Changing Buffer Duration from {:?} to {:?}",
settings.buffer_duration,
new_value,
@ -525,7 +507,7 @@ impl ObjectImpl for FallbackSrc {
// Called whenever a value of a property is read. It can be called
// at any time from any thread.
#[allow(clippy::blocks_in_if_conditions)]
fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
fn get_property(&self, _obj: &Self::Type, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
match *prop {
@ -630,13 +612,12 @@ impl ObjectImpl for FallbackSrc {
}
}
fn constructed(&self, obj: &glib::Object) {
fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
let bin = obj.downcast_ref::<gst::Bin>().unwrap();
bin.set_suppressed_flags(gst::ElementFlags::SOURCE | gst::ElementFlags::SINK);
bin.set_element_flags(gst::ElementFlags::SOURCE);
bin.set_bin_flags(gst::BinFlags::STREAMS_AWARE);
obj.set_suppressed_flags(gst::ElementFlags::SOURCE | gst::ElementFlags::SINK);
obj.set_element_flags(gst::ElementFlags::SOURCE);
obj.set_bin_flags(gst::BinFlags::STREAMS_AWARE);
}
}
@ -644,7 +625,7 @@ impl ElementImpl for FallbackSrc {
#[allow(clippy::single_match)]
fn change_state(
&self,
element: &gst::Element,
element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
match transition {
@ -676,7 +657,7 @@ impl ElementImpl for FallbackSrc {
}
impl BinImpl for FallbackSrc {
fn handle_message(&self, bin: &gst::Bin, msg: gst::Message) {
fn handle_message(&self, bin: &Self::Type, msg: gst::Message) {
use gst::MessageView;
match msg.view() {
@ -704,7 +685,7 @@ impl BinImpl for FallbackSrc {
impl FallbackSrc {
fn create_main_input(
&self,
element: &gst::Bin,
element: &super::FallbackSrc,
source: &Source,
buffer_duration: i64,
) -> Result<gst::Element, gst::StateChangeError> {
@ -729,7 +710,7 @@ impl FallbackSrc {
source
}
Source::Element(ref source) => custom_source::CustomSource::new(source),
Source::Element(ref source) => CustomSource::new(source).upcast(),
};
// Handle any async state changes internally, they don't affect the pipeline because we
@ -771,7 +752,7 @@ impl FallbackSrc {
fn create_fallback_video_input(
&self,
element: &gst::Bin,
element: &super::FallbackSrc,
min_latency: u64,
fallback_uri: Option<&str>,
) -> Result<gst::Element, gst::StateChangeError> {
@ -949,7 +930,7 @@ impl FallbackSrc {
fn create_fallback_audio_input(
&self,
_element: &gst::Bin,
_element: &super::FallbackSrc,
) -> Result<gst::Element, gst::StateChangeError> {
let input = gst::Bin::new(Some("fallback_audio"));
let audiotestsrc = gst::ElementFactory::make("audiotestsrc", Some("fallback_audiosrc"))
@ -973,7 +954,7 @@ impl FallbackSrc {
fn create_stream(
&self,
element: &gst::Bin,
element: &super::FallbackSrc,
timeout: u64,
min_latency: u64,
is_audio: bool,
@ -1064,9 +1045,7 @@ impl FallbackSrc {
})
}
fn start(&self, element: &gst::Element) -> Result<(), gst::StateChangeError> {
let element = element.downcast_ref::<gst::Bin>().unwrap();
fn start(&self, element: &super::FallbackSrc) -> Result<(), gst::StateChangeError> {
gst_debug!(CAT, obj: element, "Starting");
let mut state_guard = self.state.lock().unwrap();
if state_guard.is_some() {
@ -1153,9 +1132,7 @@ impl FallbackSrc {
Ok(())
}
fn stop(&self, element: &gst::Element) -> Result<(), gst::StateChangeError> {
let element = element.downcast_ref::<gst::Bin>().unwrap();
fn stop(&self, element: &super::FallbackSrc) -> Result<(), gst::StateChangeError> {
gst_debug!(CAT, obj: element, "Stopping");
let mut state_guard = self.state.lock().unwrap();
let mut state = match state_guard.take() {
@ -1214,11 +1191,9 @@ impl FallbackSrc {
fn change_source_state(
&self,
element: &gst::Element,
element: &super::FallbackSrc,
transition: gst::StateChange,
) -> Result<(), gst::StateChangeError> {
let element = element.downcast_ref::<gst::Bin>().unwrap();
gst_debug!(CAT, obj: element, "Changing source state: {:?}", transition);
let mut state_guard = self.state.lock().unwrap();
let state = match &mut *state_guard {
@ -1293,7 +1268,7 @@ impl FallbackSrc {
fn proxy_pad_chain(
&self,
element: &gst::Bin,
element: &super::FallbackSrc,
pad: &gst::ProxyPad,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
@ -1310,7 +1285,7 @@ impl FallbackSrc {
fn handle_source_pad_added(
&self,
element: &gst::Bin,
element: &super::FallbackSrc,
pad: &gst::Pad,
) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Pad {} added to source", pad.get_name(),);
@ -1423,7 +1398,7 @@ impl FallbackSrc {
Ok(())
}
fn add_pad_probe(&self, element: &gst::Bin, stream: &mut Stream) -> Block {
fn add_pad_probe(&self, element: &super::FallbackSrc, stream: &mut Stream) -> Block {
// FIXME: Not literally correct as we add the probe to the queue source pad but that's only
// a workaround until
// https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/800
@ -1476,7 +1451,7 @@ impl FallbackSrc {
fn handle_pad_blocked(
&self,
element: &gst::Bin,
element: &super::FallbackSrc,
pad: &gst::Pad,
pts: gst::ClockTime,
) -> Result<(), gst::ErrorMessage> {
@ -1601,7 +1576,7 @@ impl FallbackSrc {
Ok(())
}
fn unblock_pads(&self, element: &gst::Bin, state: &mut State) {
fn unblock_pads(&self, element: &super::FallbackSrc, state: &mut State) {
// Check if all streams are blocked and have a running time and we have
// 100% buffering
if state.stats.buffering_percent < 100 {
@ -1802,7 +1777,7 @@ impl FallbackSrc {
fn handle_source_pad_removed(
&self,
element: &gst::Bin,
element: &super::FallbackSrc,
pad: &gst::Pad,
) -> Result<(), gst::ErrorMessage> {
gst_debug!(
@ -1849,7 +1824,7 @@ impl FallbackSrc {
Ok(())
}
fn handle_buffering(&self, element: &gst::Bin, m: &gst::message::Buffering) {
fn handle_buffering(&self, element: &super::FallbackSrc, m: &gst::message::Buffering) {
let mut state_guard = self.state.lock().unwrap();
let state = match &mut *state_guard {
None => {
@ -1889,7 +1864,11 @@ impl FallbackSrc {
element.notify("statistics");
}
fn handle_streams_selected(&self, element: &gst::Bin, m: &gst::message::StreamsSelected) {
fn handle_streams_selected(
&self,
element: &super::FallbackSrc,
m: &gst::message::StreamsSelected,
) {
let mut state_guard = self.state.lock().unwrap();
let state = match &mut *state_guard {
None => {
@ -1949,7 +1928,7 @@ impl FallbackSrc {
element.notify("status");
}
fn handle_error(&self, element: &gst::Bin, m: &gst::message::Error) -> bool {
fn handle_error(&self, element: &super::FallbackSrc, m: &gst::message::Error) -> bool {
let mut state_guard = self.state.lock().unwrap();
let state = match &mut *state_guard {
None => {
@ -1981,7 +1960,12 @@ impl FallbackSrc {
false
}
fn handle_source_error(&self, element: &gst::Bin, state: &mut State, reason: RetryReason) {
fn handle_source_error(
&self,
element: &super::FallbackSrc,
state: &mut State,
reason: RetryReason,
) {
gst_debug!(CAT, obj: element, "Handling source error");
state.stats.last_retry_reason = reason;
@ -2194,7 +2178,7 @@ impl FallbackSrc {
#[allow(clippy::blocks_in_if_conditions)]
fn schedule_source_restart_timeout(
&self,
element: &gst::Bin,
element: &super::FallbackSrc,
state: &mut State,
elapsed: gst::ClockTime,
) {
@ -2278,7 +2262,7 @@ impl FallbackSrc {
}
#[allow(clippy::blocks_in_if_conditions)]
fn have_fallback_activated(&self, _element: &gst::Bin, state: &State) -> bool {
fn have_fallback_activated(&self, _element: &super::FallbackSrc, state: &State) -> bool {
let mut have_audio = false;
let mut have_video = false;
if let Some(ref streams) = state.streams {
@ -2323,7 +2307,7 @@ impl FallbackSrc {
.unwrap_or(true))
}
fn handle_switch_active_pad_change(&self, element: &gst::Bin) {
fn handle_switch_active_pad_change(&self, element: &super::FallbackSrc) {
let mut state_guard = self.state.lock().unwrap();
let state = match &mut *state_guard {
None => {
@ -2370,408 +2354,3 @@ impl FallbackSrc {
state.stats.to_structure()
}
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"fallbacksrc",
gst::Rank::None,
FallbackSrc::get_type(),
)
}
mod custom_source {
use super::CAT;
use glib::prelude::*;
use glib::subclass;
use glib::subclass::prelude::*;
use gst::prelude::*;
use gst::subclass::prelude::*;
use std::{mem, sync::Mutex};
use once_cell::sync::OnceCell;
static PROPERTIES: [subclass::Property; 1] = [subclass::Property("source", |name| {
glib::ParamSpec::object(
name,
"Source",
"Source",
gst::Element::static_type(),
glib::ParamFlags::WRITABLE | glib::ParamFlags::CONSTRUCT_ONLY,
)
})];
struct Stream {
source_pad: gst::Pad,
ghost_pad: gst::GhostPad,
// Dummy stream we created
stream: gst::Stream,
}
struct State {
pads: Vec<Stream>,
num_audio: usize,
num_video: usize,
}
pub struct CustomSource {
source: OnceCell<gst::Element>,
state: Mutex<State>,
}
impl ObjectSubclass for CustomSource {
const NAME: &'static str = "FallbackSrcCustomSource";
type ParentType = gst::Bin;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib_object_subclass!();
fn new() -> Self {
Self {
source: OnceCell::default(),
state: Mutex::new(State {
pads: vec![],
num_audio: 0,
num_video: 0,
}),
}
}
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
let src_pad_template = gst::PadTemplate::new(
"audio_%u",
gst::PadDirection::Src,
gst::PadPresence::Sometimes,
&gst::Caps::new_any(),
)
.unwrap();
klass.add_pad_template(src_pad_template);
let src_pad_template = gst::PadTemplate::new(
"video_%u",
gst::PadDirection::Src,
gst::PadPresence::Sometimes,
&gst::Caps::new_any(),
)
.unwrap();
klass.add_pad_template(src_pad_template);
klass.install_properties(&PROPERTIES);
}
}
impl ObjectImpl for CustomSource {
fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
let element = obj.downcast_ref::<gst::Bin>().unwrap();
match *prop {
subclass::Property("source", ..) => {
let source = value.get::<gst::Element>().unwrap().unwrap();
self.source.set(source.clone()).unwrap();
element.add(&source).unwrap();
}
_ => unreachable!(),
}
}
fn constructed(&self, obj: &glib::Object) {
self.parent_constructed(obj);
let bin = obj.downcast_ref::<gst::Bin>().unwrap();
bin.set_suppressed_flags(gst::ElementFlags::SOURCE | gst::ElementFlags::SINK);
bin.set_element_flags(gst::ElementFlags::SOURCE);
bin.set_bin_flags(gst::BinFlags::STREAMS_AWARE);
}
}
impl ElementImpl for CustomSource {
#[allow(clippy::single_match)]
fn change_state(
&self,
element: &gst::Element,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
let element = element.downcast_ref::<gst::Bin>().unwrap();
match transition {
gst::StateChange::NullToReady => {
self.start(element)?;
}
_ => (),
}
let res = self.parent_change_state(element.upcast_ref(), transition)?;
match transition {
gst::StateChange::ReadyToNull => {
self.stop(element)?;
}
_ => (),
}
Ok(res)
}
}
impl BinImpl for CustomSource {
#[allow(clippy::single_match)]
fn handle_message(&self, bin: &gst::Bin, msg: gst::Message) {
use gst::MessageView;
match msg.view() {
MessageView::StreamCollection(_) => {
// TODO: Drop stream collection message for now, we only create a simple custom
// one here so that fallbacksrc can know about our streams. It is never
// forwarded.
if let Err(msg) = self.handle_source_no_more_pads(&bin) {
bin.post_error_message(msg);
}
}
_ => self.parent_handle_message(bin, msg),
}
}
}
impl CustomSource {
fn start(
&self,
element: &gst::Bin,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_debug!(CAT, obj: element, "Starting");
let source = self.source.get().unwrap();
let templates = source.get_pad_template_list();
if templates
.iter()
.any(|templ| templ.get_property_presence() == gst::PadPresence::Request)
{
gst_error!(CAT, obj: element, "Request pads not supported");
gst_element_error!(
element,
gst::LibraryError::Settings,
["Request pads not supported"]
);
return Err(gst::StateChangeError);
}
let has_sometimes_pads = templates
.iter()
.any(|templ| templ.get_property_presence() == gst::PadPresence::Sometimes);
// Handle all source pads that already exist
for pad in source.get_src_pads() {
if let Err(msg) = self.handle_source_pad_added(&element, &pad) {
element.post_error_message(msg);
return Err(gst::StateChangeError);
}
}
if !has_sometimes_pads {
if let Err(msg) = self.handle_source_no_more_pads(&element) {
element.post_error_message(msg);
return Err(gst::StateChangeError);
}
} else {
gst_debug!(CAT, obj: element, "Found sometimes pads");
let element_weak = element.downgrade();
source.connect_pad_added(move |_, pad| {
let element = match element_weak.upgrade() {
None => return,
Some(element) => element,
};
let src = CustomSource::from_instance(&element);
if let Err(msg) = src.handle_source_pad_added(&element, pad) {
element.post_error_message(msg);
}
});
let element_weak = element.downgrade();
source.connect_pad_removed(move |_, pad| {
let element = match element_weak.upgrade() {
None => return,
Some(element) => element,
};
let src = CustomSource::from_instance(&element);
if let Err(msg) = src.handle_source_pad_removed(&element, pad) {
element.post_error_message(msg);
}
});
let element_weak = element.downgrade();
source.connect_no_more_pads(move |_| {
let element = match element_weak.upgrade() {
None => return,
Some(element) => element,
};
let src = CustomSource::from_instance(&element);
if let Err(msg) = src.handle_source_no_more_pads(&element) {
element.post_error_message(msg);
}
});
}
Ok(gst::StateChangeSuccess::Success)
}
fn handle_source_pad_added(
&self,
element: &gst::Bin,
pad: &gst::Pad,
) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Source added pad {}", pad.get_name());
let mut state = self.state.lock().unwrap();
let mut stream_type = None;
// Take stream type from stream-start event if we can
if let Some(event) = pad.get_sticky_event(gst::EventType::StreamStart, 0) {
if let gst::EventView::StreamStart(ev) = event.view() {
stream_type = ev.get_stream().map(|s| s.get_stream_type());
}
}
// Otherwise from the caps
if stream_type.is_none() {
let caps = match pad.get_current_caps().or_else(|| pad.query_caps(None)) {
Some(caps) if !caps.is_any() && !caps.is_empty() => caps,
_ => {
gst_error!(CAT, obj: element, "Pad {} had no caps", pad.get_name());
return Err(gst_error_msg!(
gst::CoreError::Negotiation,
["Pad had no caps"]
));
}
};
let s = caps.get_structure(0).unwrap();
if s.get_name().starts_with("audio/") {
stream_type = Some(gst::StreamType::AUDIO);
} else if s.get_name().starts_with("video/") {
stream_type = Some(gst::StreamType::VIDEO);
} else {
return Ok(());
}
}
let stream_type = stream_type.unwrap();
let (templ, name) = if stream_type.contains(gst::StreamType::AUDIO) {
let name = format!("audio_{}", state.num_audio);
state.num_audio += 1;
(element.get_pad_template("audio_%u").unwrap(), name)
} else {
let name = format!("video_{}", state.num_video);
state.num_video += 1;
(element.get_pad_template("video_%u").unwrap(), name)
};
let ghost_pad = gst::GhostPad::builder_with_template(&templ, Some(&name))
.build_with_target(pad)
.unwrap();
let stream = Stream {
source_pad: pad.clone(),
ghost_pad: ghost_pad.clone().upcast(),
// TODO: We only add the stream type right now
stream: gst::Stream::new(None, None, stream_type, gst::StreamFlags::empty()),
};
state.pads.push(stream);
drop(state);
ghost_pad.set_active(true).unwrap();
element.add_pad(&ghost_pad).unwrap();
Ok(())
}
fn handle_source_pad_removed(
&self,
element: &gst::Bin,
pad: &gst::Pad,
) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Source removed pad {}", pad.get_name());
let mut state = self.state.lock().unwrap();
let (i, stream) = match state
.pads
.iter()
.enumerate()
.find(|(_i, p)| &p.source_pad == pad)
{
None => return Ok(()),
Some(v) => v,
};
let ghost_pad = stream.ghost_pad.clone();
state.pads.remove(i);
drop(state);
ghost_pad.set_active(false).unwrap();
let _ = ghost_pad.set_target(None::<&gst::Pad>);
let _ = element.remove_pad(&ghost_pad);
Ok(())
}
fn handle_source_no_more_pads(&self, element: &gst::Bin) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Source signalled no-more-pads");
let state = self.state.lock().unwrap();
let streams = state
.pads
.iter()
.map(|p| p.stream.clone())
.collect::<Vec<_>>();
let collection = gst::StreamCollection::builder(None)
.streams(&streams)
.build();
drop(state);
element.no_more_pads();
let _ = element.post_message(
gst::message::StreamsSelected::builder(&collection)
.src(element)
.build(),
);
Ok(())
}
fn stop(
&self,
element: &gst::Bin,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_debug!(CAT, obj: element, "Stopping");
let mut state = self.state.lock().unwrap();
let pads = mem::replace(&mut state.pads, vec![]);
state.num_audio = 0;
state.num_video = 0;
drop(state);
for pad in pads {
let _ = pad.ghost_pad.set_target(None::<&gst::Pad>);
let _ = element.remove_pad(&pad.ghost_pad);
}
Ok(gst::StateChangeSuccess::Success)
}
#[allow(clippy::new_ret_no_self)]
pub fn new(source: &gst::Element) -> gst::Element {
glib::Object::new(CustomSource::get_type(), &[("source", source)])
.unwrap()
.downcast::<gst::Element>()
.unwrap()
}
}
}

View file

@ -0,0 +1,60 @@
// Copyright (C) 2020 Sebastian Dröge <sebastian@centricular.com>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.
use glib::prelude::*;
mod custom_source;
mod imp;
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, GEnum)]
#[repr(u32)]
#[genum(type_name = "GstFallbackSourceRetryReason")]
enum RetryReason {
None,
Error,
Eos,
StateChangeFailure,
Timeout,
}
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, GEnum)]
#[repr(u32)]
#[genum(type_name = "GstFallbackSourceStatus")]
enum Status {
Stopped,
Buffering,
Retrying,
Running,
}
glib_wrapper! {
pub struct FallbackSrc(ObjectSubclass<imp::FallbackSrc>) @extends gst::Bin, gst::Element, gst::Object;
}
// GStreamer elements need to be thread-safe. For the private implementation this is automatically
// enforced but for the public wrapper type we need to specify this manually.
unsafe impl Send for FallbackSrc {}
unsafe impl Sync for FallbackSrc {}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"fallbacksrc",
gst::Rank::None,
FallbackSrc::static_type(),
)
}

View file

@ -20,7 +20,7 @@ use self::gst_base::prelude::*;
#[cfg(not(feature = "v1_18"))]
use self::gst_base::subclass::prelude::*;
#[cfg(not(feature = "v1_18"))]
use super::gst_base_compat as gst_base;
use crate::gst_base_compat as gst_base;
#[cfg(feature = "v1_18")]
use gst_base::prelude::*;
@ -35,7 +35,7 @@ use once_cell::sync::Lazy;
use std::sync::{Mutex, RwLock};
struct FallbackSwitch {
pub struct FallbackSwitch {
sinkpad: gst_base::AggregatorPad,
fallback_sinkpad: RwLock<Option<gst_base::AggregatorPad>>,
active_sinkpad: Mutex<Option<gst::Pad>>,
@ -119,7 +119,7 @@ static PROPERTIES: [subclass::Property; 2] = [
impl FallbackSwitch {
fn handle_main_buffer(
&self,
agg: &gst_base::Aggregator,
agg: &super::FallbackSwitch,
state: &mut OutputState,
settings: &Settings,
mut buffer: gst::Buffer,
@ -259,7 +259,7 @@ impl FallbackSwitch {
fn get_fallback_buffer(
&self,
agg: &gst_base::Aggregator,
agg: &super::FallbackSwitch,
state: &mut OutputState,
settings: &Settings,
fallback_sinkpad: &gst_base::AggregatorPad,
@ -363,7 +363,7 @@ impl FallbackSwitch {
fn get_next_buffer(
&self,
agg: &gst_base::Aggregator,
agg: &super::FallbackSwitch,
timeout: bool,
) -> Result<(gst::Buffer, gst::Caps, bool), gst::FlowError> {
let settings = self.settings.lock().unwrap().clone();
@ -407,25 +407,17 @@ impl FallbackSwitch {
impl ObjectSubclass for FallbackSwitch {
const NAME: &'static str = "FallbackSwitch";
type Type = super::FallbackSwitch;
type ParentType = gst_base::Aggregator;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib_object_subclass!();
fn with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
fn with_class(klass: &Self::Class) -> Self {
let templ = klass.get_pad_template("sink").unwrap();
let sinkpad: gst_base::AggregatorPad = glib::Object::new(
gst_base::AggregatorPad::static_type(),
&[
("name", &"sink"),
("direction", &gst::PadDirection::Sink),
("template", &templ),
],
)
.unwrap()
.downcast()
.unwrap();
let sinkpad =
gst::PadBuilder::<gst_base::AggregatorPad>::from_template(&templ, Some("sink")).build();
Self {
sinkpad,
@ -437,7 +429,7 @@ impl ObjectSubclass for FallbackSwitch {
}
}
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
fn class_init(klass: &mut Self::Class) {
klass.set_metadata(
"Fallback Switch",
"Generic",
@ -481,16 +473,14 @@ impl ObjectSubclass for FallbackSwitch {
}
impl ObjectImpl for FallbackSwitch {
fn constructed(&self, obj: &glib::Object) {
fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
let agg = obj.downcast_ref::<gst_base::Aggregator>().unwrap();
agg.add_pad(&self.sinkpad).unwrap();
obj.add_pad(&self.sinkpad).unwrap();
}
fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) {
fn set_property(&self, obj: &Self::Type, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
let agg = obj.downcast_ref::<gst_base::Aggregator>().unwrap();
match *prop {
subclass::Property("timeout", ..) => {
@ -498,7 +488,7 @@ impl ObjectImpl for FallbackSwitch {
let timeout = value.get_some().expect("type checked upstream");
gst_info!(
CAT,
obj: agg,
obj: obj,
"Changing timeout from {} to {}",
settings.timeout,
timeout
@ -510,7 +500,7 @@ impl ObjectImpl for FallbackSwitch {
}
}
fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
fn get_property(&self, _obj: &Self::Type, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
match *prop {
@ -530,47 +520,39 @@ impl ObjectImpl for FallbackSwitch {
impl ElementImpl for FallbackSwitch {
fn request_new_pad(
&self,
element: &gst::Element,
element: &Self::Type,
templ: &gst::PadTemplate,
name: Option<String>,
_caps: Option<&gst::Caps>,
) -> Option<gst::Pad> {
let agg = element.downcast_ref::<gst_base::Aggregator>().unwrap();
let fallback_sink_templ = agg.get_pad_template("fallback_sink").unwrap();
let fallback_sink_templ = element.get_pad_template("fallback_sink").unwrap();
if templ != &fallback_sink_templ
|| (name.is_some() && name.as_deref() != Some("fallback_sink"))
{
gst_error!(CAT, obj: agg, "Wrong pad template or name");
gst_error!(CAT, obj: element, "Wrong pad template or name");
return None;
}
let mut fallback_sinkpad = self.fallback_sinkpad.write().unwrap();
if fallback_sinkpad.is_some() {
gst_error!(CAT, obj: agg, "Already have a fallback sinkpad");
gst_error!(CAT, obj: element, "Already have a fallback sinkpad");
return None;
}
let sinkpad: gst_base::AggregatorPad = glib::Object::new(
gst_base::AggregatorPad::static_type(),
&[
("name", &"fallback_sink"),
("direction", &gst::PadDirection::Sink),
("template", templ),
],
let sinkpad = gst::PadBuilder::<gst_base::AggregatorPad>::from_template(
&templ,
Some("fallback_sink"),
)
.unwrap()
.downcast()
.unwrap();
.build();
*fallback_sinkpad = Some(sinkpad.clone());
drop(fallback_sinkpad);
agg.add_pad(&sinkpad).unwrap();
element.add_pad(&sinkpad).unwrap();
Some(sinkpad.upcast())
}
fn release_pad(&self, element: &gst::Element, pad: &gst::Pad) {
let agg = element.downcast_ref::<gst_base::Aggregator>().unwrap();
fn release_pad(&self, element: &Self::Type, pad: &gst::Pad) {
let mut fallback_sinkpad = self.fallback_sinkpad.write().unwrap();
let mut pad_states = self.pad_states.write().unwrap();
@ -579,21 +561,21 @@ impl ElementImpl for FallbackSwitch {
pad_states.fallback_sinkpad = None;
drop(pad_states);
drop(fallback_sinkpad);
agg.remove_pad(pad).unwrap();
gst_debug!(CAT, obj: agg, "Removed fallback sinkpad {:?}", pad);
element.remove_pad(pad).unwrap();
gst_debug!(CAT, obj: element, "Removed fallback sinkpad {:?}", pad);
}
}
}
impl AggregatorImpl for FallbackSwitch {
fn start(&self, _agg: &gst_base::Aggregator) -> Result<(), gst::ErrorMessage> {
fn start(&self, _agg: &Self::Type) -> Result<(), gst::ErrorMessage> {
*self.output_state.lock().unwrap() = OutputState::default();
*self.pad_states.write().unwrap() = PadStates::default();
Ok(())
}
fn stop(&self, _agg: &gst_base::Aggregator) -> Result<(), gst::ErrorMessage> {
fn stop(&self, _agg: &Self::Type) -> Result<(), gst::ErrorMessage> {
*self.active_sinkpad.lock().unwrap() = None;
Ok(())
@ -601,7 +583,7 @@ impl AggregatorImpl for FallbackSwitch {
fn sink_event_pre_queue(
&self,
agg: &gst_base::Aggregator,
agg: &Self::Type,
agg_pad: &gst_base::AggregatorPad,
event: gst::Event,
) -> Result<gst::FlowSuccess, gst::FlowError> {
@ -618,7 +600,7 @@ impl AggregatorImpl for FallbackSwitch {
fn sink_event(
&self,
agg: &gst_base::Aggregator,
agg: &Self::Type,
agg_pad: &gst_base::AggregatorPad,
event: gst::Event,
) -> bool {
@ -662,7 +644,7 @@ impl AggregatorImpl for FallbackSwitch {
}
}
fn get_next_time(&self, agg: &gst_base::Aggregator) -> gst::ClockTime {
fn get_next_time(&self, agg: &Self::Type) -> gst::ClockTime {
// If we have a buffer on the sinkpad then the timeout is always going to be immediately,
// i.e. 0. We want to output that buffer immediately, no matter what.
//
@ -715,7 +697,7 @@ impl AggregatorImpl for FallbackSwitch {
// calculating the running times later works correctly
fn clip(
&self,
agg: &gst_base::Aggregator,
agg: &Self::Type,
agg_pad: &gst_base::AggregatorPad,
mut buffer: gst::Buffer,
) -> Option<gst::Buffer> {
@ -810,7 +792,7 @@ impl AggregatorImpl for FallbackSwitch {
fn aggregate(
&self,
agg: &gst_base::Aggregator,
agg: &Self::Type,
timeout: bool,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_debug!(CAT, obj: agg, "Aggregate called: timeout {}", timeout);
@ -838,16 +820,7 @@ impl AggregatorImpl for FallbackSwitch {
agg.finish_buffer(buffer)
}
fn negotiate(&self, _agg: &gst_base::Aggregator) -> bool {
fn negotiate(&self, _agg: &Self::Type) -> bool {
true
}
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"fallbackswitch",
gst::Rank::None,
FallbackSwitch::get_type(),
)
}

View file

@ -0,0 +1,41 @@
// Copyright (C) 2019 Sebastian Dröge <sebastian@centricular.com>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.
use glib::prelude::*;
#[cfg(not(feature = "v1_18"))]
use crate::gst_base_compat as gst_base;
mod imp;
glib_wrapper! {
pub struct FallbackSwitch(ObjectSubclass<imp::FallbackSwitch>) @extends gst_base::Aggregator, gst::Element, gst::Object;
}
// GStreamer elements need to be thread-safe. For the private implementation this is automatically
// enforced but for the public wrapper type we need to specify this manually.
unsafe impl Send for FallbackSwitch {}
unsafe impl Sync for FallbackSwitch {}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"fallbackswitch",
gst::Rank::None,
FallbackSwitch::static_type(),
)
}

View file

@ -344,7 +344,7 @@ impl HandleData for gst::Buffer {
}
}
struct ToggleRecord {
pub struct ToggleRecord {
settings: Mutex<Settings>,
state: Mutex<State>,
main_stream: Stream,
@ -367,7 +367,7 @@ lazy_static! {
impl ToggleRecord {
fn handle_main_stream<T: HandleData>(
&self,
element: &gst::Element,
element: &super::ToggleRecord,
pad: &gst::Pad,
stream: &Stream,
data: T,
@ -616,7 +616,7 @@ impl ToggleRecord {
fn handle_secondary_stream<T: HandleData>(
&self,
element: &gst::Element,
element: &super::ToggleRecord,
pad: &gst::Pad,
stream: &Stream,
data: T,
@ -1034,7 +1034,7 @@ impl ToggleRecord {
fn sink_chain(
&self,
pad: &gst::Pad,
element: &gst::Element,
element: &super::ToggleRecord,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let stream = self.pads.lock().get(pad).cloned().ok_or_else(|| {
@ -1143,7 +1143,12 @@ impl ToggleRecord {
stream.srcpad.push(buffer)
}
fn sink_event(&self, pad: &gst::Pad, element: &gst::Element, mut event: gst::Event) -> bool {
fn sink_event(
&self,
pad: &gst::Pad,
element: &super::ToggleRecord,
mut event: gst::Event,
) -> bool {
use gst::EventView;
let stream = match self.pads.lock().get(pad) {
@ -1332,7 +1337,7 @@ impl ToggleRecord {
fn sink_query(
&self,
pad: &gst::Pad,
element: &gst::Element,
element: &super::ToggleRecord,
query: &mut gst::QueryRef,
) -> bool {
let stream = match self.pads.lock().get(pad) {
@ -1355,7 +1360,12 @@ impl ToggleRecord {
// FIXME `matches!` was introduced in rustc 1.42.0, current MSRV is 1.41.0
// FIXME uncomment when CI can upgrade to 1.47.1
//#[allow(clippy::match_like_matches_macro)]
fn src_event(&self, pad: &gst::Pad, element: &gst::Element, mut event: gst::Event) -> bool {
fn src_event(
&self,
pad: &gst::Pad,
element: &super::ToggleRecord,
mut event: gst::Event,
) -> bool {
use gst::EventView;
let stream = match self.pads.lock().get(pad) {
@ -1394,7 +1404,12 @@ impl ToggleRecord {
}
}
fn src_query(&self, pad: &gst::Pad, element: &gst::Element, query: &mut gst::QueryRef) -> bool {
fn src_query(
&self,
pad: &gst::Pad,
element: &super::ToggleRecord,
query: &mut gst::QueryRef,
) -> bool {
use gst::QueryView;
let stream = match self.pads.lock().get(pad) {
@ -1514,7 +1529,7 @@ impl ToggleRecord {
fn iterate_internal_links(
&self,
pad: &gst::Pad,
element: &gst::Element,
element: &super::ToggleRecord,
) -> gst::Iterator<gst::Pad> {
let stream = match self.pads.lock().get(pad) {
None => {
@ -1538,13 +1553,14 @@ impl ToggleRecord {
impl ObjectSubclass for ToggleRecord {
const NAME: &'static str = "RsToggleRecord";
type Type = super::ToggleRecord;
type ParentType = gst::Element;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib_object_subclass!();
fn with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
fn with_class(klass: &Self::Class) -> Self {
let templ = klass.get_pad_template("sink").unwrap();
let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink"))
.chain_function(|pad, parent, buffer| {
@ -1618,7 +1634,7 @@ impl ObjectSubclass for ToggleRecord {
}
}
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
fn class_init(klass: &mut Self::Class) {
klass.install_properties(&PROPERTIES);
klass.set_metadata(
@ -1668,9 +1684,8 @@ impl ObjectSubclass for ToggleRecord {
}
impl ObjectImpl for ToggleRecord {
fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) {
fn set_property(&self, obj: &Self::Type, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
let element = obj.downcast_ref::<gst::Element>().unwrap();
match *prop {
subclass::Property("record", ..) => {
@ -1678,7 +1693,7 @@ impl ObjectImpl for ToggleRecord {
let record = value.get_some().expect("type checked upstream");
gst_debug!(
CAT,
obj: element,
obj: obj,
"Setting record from {:?} to {:?}",
settings.record,
record
@ -1690,7 +1705,7 @@ impl ObjectImpl for ToggleRecord {
}
}
fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
fn get_property(&self, _obj: &Self::Type, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
match *prop {
@ -1706,19 +1721,18 @@ impl ObjectImpl for ToggleRecord {
}
}
fn constructed(&self, obj: &glib::Object) {
fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
let element = obj.downcast_ref::<gst::Element>().unwrap();
element.add_pad(&self.main_stream.sinkpad).unwrap();
element.add_pad(&self.main_stream.srcpad).unwrap();
obj.add_pad(&self.main_stream.sinkpad).unwrap();
obj.add_pad(&self.main_stream.srcpad).unwrap();
}
}
impl ElementImpl for ToggleRecord {
fn change_state(
&self,
element: &gst::Element,
element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_trace!(CAT, obj: element, "Changing state {:?}", transition);
@ -1778,7 +1792,7 @@ impl ElementImpl for ToggleRecord {
fn request_new_pad(
&self,
element: &gst::Element,
element: &Self::Type,
_templ: &gst::PadTemplate,
_name: Option<String>,
_caps: Option<&gst::Caps>,
@ -1867,7 +1881,7 @@ impl ElementImpl for ToggleRecord {
Some(sinkpad)
}
fn release_pad(&self, element: &gst::Element, pad: &gst::Pad) {
fn release_pad(&self, element: &Self::Type, pad: &gst::Pad) {
let mut other_streams_guard = self.other_streams.lock();
let (ref mut other_streams, _) = *other_streams_guard;
let mut pads = self.pads.lock();
@ -1894,12 +1908,3 @@ impl ElementImpl for ToggleRecord {
element.remove_pad(&stream.srcpad).unwrap();
}
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"togglerecord",
gst::Rank::None,
ToggleRecord::get_type(),
)
}

View file

@ -0,0 +1,38 @@
// Copyright (C) 2017 Sebastian Dröge <sebastian@centricular.com>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.
use glib::prelude::*;
mod imp;
glib_wrapper! {
pub struct ToggleRecord(ObjectSubclass<imp::ToggleRecord>) @extends gst::Element, gst::Object;
}
// GStreamer elements need to be thread-safe. For the private implementation this is automatically
// enforced but for the public wrapper type we need to specify this manually.
unsafe impl Send for ToggleRecord {}
unsafe impl Sync for ToggleRecord {}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"togglerecord",
gst::Rank::None,
ToggleRecord::static_type(),
)
}