From 6390d85b5f08794b7e12ec79f0420b874886fa14 Mon Sep 17 00:00:00 2001 From: Seungha Yang Date: Thu, 29 Oct 2020 22:39:58 +0900 Subject: [PATCH] fallbacksrc: Add statistics property This property would be useful for application to understand the internal status of fallbacksrc element. --- utils/fallbackswitch/src/fallbacksrc.rs | 121 +++++++++++++++++++----- 1 file changed, 99 insertions(+), 22 deletions(-) diff --git a/utils/fallbackswitch/src/fallbacksrc.rs b/utils/fallbackswitch/src/fallbacksrc.rs index bde5bf93..3742faac 100644 --- a/utils/fallbackswitch/src/fallbacksrc.rs +++ b/utils/fallbackswitch/src/fallbacksrc.rs @@ -35,6 +35,44 @@ static CAT: Lazy = Lazy::new(|| { ) }); +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, GEnum)] +#[repr(u32)] +#[genum(type_name = "GstFallbackSourceRetryReason")] +enum RetryReason { + None, + Error, + Eos, + StateChangeFailure, + Timeout, +} + +#[derive(Debug, Clone)] +struct Stats { + num_retry: u64, + last_retry_reason: RetryReason, + buffering_percent: i32, +} + +impl Default for Stats { + fn default() -> Self { + Self { + num_retry: 0, + last_retry_reason: RetryReason::None, + buffering_percent: 100, + } + } +} + +impl Stats { + fn to_structure(&self) -> gst::Structure { + gst::Structure::builder("application/x-fallbacksrc-stats") + .field("num-retry", &self.num_retry) + .field("last-retry-reason", &self.last_retry_reason) + .field("buffering-percent", &self.buffering_percent) + .build() + } +} + #[derive(Debug, Clone)] struct Settings { enable_audio: bool, @@ -127,7 +165,6 @@ struct State { audio_stream: Option, flow_combiner: gst_base::UniqueFlowCombiner, - buffering_percent: u8, last_buffering_update: Option, // Stream collection posted by source @@ -136,6 +173,9 @@ struct State { // Configure settings settings: Settings, configured_source: Source, + + // Statistics + stats: Stats, } struct FallbackSrc { @@ -153,7 +193,7 @@ enum Status { Running, } -static PROPERTIES: [subclass::Property; 12] = [ +static PROPERTIES: [subclass::Property; 13] = [ subclass::Property("enable-audio", |name| { glib::ParamSpec::boolean( name, @@ -269,6 +309,15 @@ static PROPERTIES: [subclass::Property; 12] = [ glib::ParamFlags::READWRITE, ) }), + subclass::Property("statistics", |name| { + glib::ParamSpec::boxed( + name, + "Statistics", + "Various statistics", + gst::Structure::static_type(), + glib::ParamFlags::READABLE, + ) + }), ]; impl ObjectSubclass for FallbackSrc { @@ -546,7 +595,7 @@ impl ObjectImpl for FallbackSrc { } } - if state.buffering_percent < 100 + if state.stats.buffering_percent < 100 || state.source_restart_timeout.is_some() || state.streams.is_none() || (have_audio @@ -576,6 +625,7 @@ impl ObjectImpl for FallbackSrc { let settings = self.settings.lock().unwrap(); Ok(settings.buffer_duration.to_value()) } + subclass::Property("statistics", ..) => Ok(self.get_stats().to_value()), _ => unimplemented!(), } } @@ -1086,11 +1136,11 @@ impl FallbackSrc { video_stream, audio_stream, flow_combiner, - buffering_percent: 100, last_buffering_update: None, streams: None, settings, configured_source, + stats: Stats::default(), }); drop(state_guard); @@ -1208,7 +1258,9 @@ impl FallbackSrc { let _ = source.set_state(gst::State::Null); let mut state_guard = self.state.lock().unwrap(); let state = state_guard.as_mut().expect("no state"); - self.handle_source_error(element, state); + self.handle_source_error(element, state, RetryReason::StateChangeFailure); + drop(state_guard); + element.notify("statistics"); } } Ok(res) => { @@ -1350,7 +1402,9 @@ impl FallbackSrc { } Some(state) => state, }; - src.handle_source_error(&element, state); + src.handle_source_error(&element, state, RetryReason::Eos); + drop(state_guard); + element.notify("statistics"); gst::PadProbeReturn::Drop } @@ -1548,12 +1602,12 @@ impl FallbackSrc { fn unblock_pads(&self, element: &gst::Bin, state: &mut State) { // Check if all streams are blocked and have a running time and we have // 100% buffering - if state.buffering_percent < 100 { + if state.stats.buffering_percent < 100 { gst_debug!( CAT, obj: element, "Not unblocking yet: buffering {}%", - state.buffering_percent + state.stats.buffering_percent ); return; } @@ -1809,8 +1863,8 @@ impl FallbackSrc { gst_debug!(CAT, obj: element, "Got buffering {}%", m.get_percent()); - state.buffering_percent = m.get_percent() as u8; - if state.buffering_percent < 100 { + state.stats.buffering_percent = m.get_percent(); + if state.stats.buffering_percent < 100 { state.last_buffering_update = Some(Instant::now()); // Block source pads if needed to pause if let Some(ref mut stream) = state.audio_stream { @@ -1823,16 +1877,14 @@ impl FallbackSrc { stream.source_srcpad_block = Some(self.add_pad_probe(element, stream)); } } - - drop(state_guard); - element.notify("status"); } else { // Check if we can unblock now self.unblock_pads(element, state); - - drop(state_guard); - element.notify("status"); } + + drop(state_guard); + element.notify("status"); + element.notify("statistics"); } fn handle_streams_selected(&self, element: &gst::Bin, m: &gst::message::StreamsSelected) { @@ -1917,22 +1969,28 @@ impl FallbackSrc { ); if src == state.source || src.has_as_ancestor(&state.source) { - self.handle_source_error(element, state); + self.handle_source_error(element, state, RetryReason::Error); drop(state_guard); element.notify("status"); + element.notify("statistics"); return true; } false } - fn handle_source_error(&self, element: &gst::Bin, state: &mut State) { + fn handle_source_error(&self, element: &gst::Bin, state: &mut State, reason: RetryReason) { gst_debug!(CAT, obj: element, "Handling source error"); + + state.stats.last_retry_reason = reason; if state.source_pending_restart { gst_debug!(CAT, obj: element, "Source is already pending restart"); return; } + // Increase retry count only if there was no pending restart + state.stats.num_retry += 1; + // Unschedule pending timeout, we're restarting now if let Some(timeout) = state.source_restart_timeout.take() { timeout.unschedule(); @@ -2091,7 +2149,7 @@ impl FallbackSrc { state.source_pending_restart = false; state.source_pending_restart_timeout = None; - state.buffering_percent = 100; + state.stats.buffering_percent = 100; state.last_buffering_update = None; if let Some(timeout) = state.source_restart_timeout.take() { @@ -2111,7 +2169,13 @@ impl FallbackSrc { let _ = source.set_state(gst::State::Null); let mut state_guard = src.state.lock().unwrap(); let state = state_guard.as_mut().expect("no state"); - src.handle_source_error(element, state); + src.handle_source_error( + element, + state, + RetryReason::StateChangeFailure, + ); + drop(state_guard); + element.notify("statistics"); } else { let mut state_guard = src.state.lock().unwrap(); let state = state_guard.as_mut().expect("no state"); @@ -2185,11 +2249,13 @@ impl FallbackSrc { .map(|i| { i.elapsed() >= Duration::from_nanos(state.settings.restart_timeout) }) - .unwrap_or(state.buffering_percent == 100) + .unwrap_or(state.stats.buffering_percent == 100) { gst_debug!(CAT, obj: element, "Not buffering, restarting source"); - src.handle_source_error(element, state); + src.handle_source_error(element, state, RetryReason::Timeout); + drop(state_guard); + element.notify("statistics"); } else { gst_debug!(CAT, obj: element, "Buffering, restarting source later"); let elapsed = state @@ -2290,6 +2356,17 @@ impl FallbackSrc { element.notify("status"); } } + + fn get_stats(&self) -> gst::Structure { + let state_guard = self.state.lock().unwrap(); + + let state = match &*state_guard { + None => return Stats::default().to_structure(), + Some(ref state) => state, + }; + + state.stats.to_structure() + } } pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {