fallbacksrc: Add statistics property

This property would be useful for application to understand
the internal status of fallbacksrc element.
This commit is contained in:
Seungha Yang 2020-10-29 22:39:58 +09:00
parent d16e7d1213
commit 6390d85b5f

View file

@ -35,6 +35,44 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
) )
}); });
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, GEnum)]
#[repr(u32)]
#[genum(type_name = "GstFallbackSourceRetryReason")]
enum RetryReason {
None,
Error,
Eos,
StateChangeFailure,
Timeout,
}
#[derive(Debug, Clone)]
struct Stats {
num_retry: u64,
last_retry_reason: RetryReason,
buffering_percent: i32,
}
impl Default for Stats {
fn default() -> Self {
Self {
num_retry: 0,
last_retry_reason: RetryReason::None,
buffering_percent: 100,
}
}
}
impl Stats {
fn to_structure(&self) -> gst::Structure {
gst::Structure::builder("application/x-fallbacksrc-stats")
.field("num-retry", &self.num_retry)
.field("last-retry-reason", &self.last_retry_reason)
.field("buffering-percent", &self.buffering_percent)
.build()
}
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct Settings { struct Settings {
enable_audio: bool, enable_audio: bool,
@ -127,7 +165,6 @@ struct State {
audio_stream: Option<Stream>, audio_stream: Option<Stream>,
flow_combiner: gst_base::UniqueFlowCombiner, flow_combiner: gst_base::UniqueFlowCombiner,
buffering_percent: u8,
last_buffering_update: Option<Instant>, last_buffering_update: Option<Instant>,
// Stream collection posted by source // Stream collection posted by source
@ -136,6 +173,9 @@ struct State {
// Configure settings // Configure settings
settings: Settings, settings: Settings,
configured_source: Source, configured_source: Source,
// Statistics
stats: Stats,
} }
struct FallbackSrc { struct FallbackSrc {
@ -153,7 +193,7 @@ enum Status {
Running, Running,
} }
static PROPERTIES: [subclass::Property; 12] = [ static PROPERTIES: [subclass::Property; 13] = [
subclass::Property("enable-audio", |name| { subclass::Property("enable-audio", |name| {
glib::ParamSpec::boolean( glib::ParamSpec::boolean(
name, name,
@ -269,6 +309,15 @@ static PROPERTIES: [subclass::Property; 12] = [
glib::ParamFlags::READWRITE, glib::ParamFlags::READWRITE,
) )
}), }),
subclass::Property("statistics", |name| {
glib::ParamSpec::boxed(
name,
"Statistics",
"Various statistics",
gst::Structure::static_type(),
glib::ParamFlags::READABLE,
)
}),
]; ];
impl ObjectSubclass for FallbackSrc { impl ObjectSubclass for FallbackSrc {
@ -546,7 +595,7 @@ impl ObjectImpl for FallbackSrc {
} }
} }
if state.buffering_percent < 100 if state.stats.buffering_percent < 100
|| state.source_restart_timeout.is_some() || state.source_restart_timeout.is_some()
|| state.streams.is_none() || state.streams.is_none()
|| (have_audio || (have_audio
@ -576,6 +625,7 @@ impl ObjectImpl for FallbackSrc {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
Ok(settings.buffer_duration.to_value()) Ok(settings.buffer_duration.to_value())
} }
subclass::Property("statistics", ..) => Ok(self.get_stats().to_value()),
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
@ -1086,11 +1136,11 @@ impl FallbackSrc {
video_stream, video_stream,
audio_stream, audio_stream,
flow_combiner, flow_combiner,
buffering_percent: 100,
last_buffering_update: None, last_buffering_update: None,
streams: None, streams: None,
settings, settings,
configured_source, configured_source,
stats: Stats::default(),
}); });
drop(state_guard); drop(state_guard);
@ -1208,7 +1258,9 @@ impl FallbackSrc {
let _ = source.set_state(gst::State::Null); let _ = source.set_state(gst::State::Null);
let mut state_guard = self.state.lock().unwrap(); let mut state_guard = self.state.lock().unwrap();
let state = state_guard.as_mut().expect("no state"); let state = state_guard.as_mut().expect("no state");
self.handle_source_error(element, state); self.handle_source_error(element, state, RetryReason::StateChangeFailure);
drop(state_guard);
element.notify("statistics");
} }
} }
Ok(res) => { Ok(res) => {
@ -1350,7 +1402,9 @@ impl FallbackSrc {
} }
Some(state) => state, Some(state) => state,
}; };
src.handle_source_error(&element, state); src.handle_source_error(&element, state, RetryReason::Eos);
drop(state_guard);
element.notify("statistics");
gst::PadProbeReturn::Drop gst::PadProbeReturn::Drop
} }
@ -1548,12 +1602,12 @@ impl FallbackSrc {
fn unblock_pads(&self, element: &gst::Bin, state: &mut State) { fn unblock_pads(&self, element: &gst::Bin, state: &mut State) {
// Check if all streams are blocked and have a running time and we have // Check if all streams are blocked and have a running time and we have
// 100% buffering // 100% buffering
if state.buffering_percent < 100 { if state.stats.buffering_percent < 100 {
gst_debug!( gst_debug!(
CAT, CAT,
obj: element, obj: element,
"Not unblocking yet: buffering {}%", "Not unblocking yet: buffering {}%",
state.buffering_percent state.stats.buffering_percent
); );
return; return;
} }
@ -1809,8 +1863,8 @@ impl FallbackSrc {
gst_debug!(CAT, obj: element, "Got buffering {}%", m.get_percent()); gst_debug!(CAT, obj: element, "Got buffering {}%", m.get_percent());
state.buffering_percent = m.get_percent() as u8; state.stats.buffering_percent = m.get_percent();
if state.buffering_percent < 100 { if state.stats.buffering_percent < 100 {
state.last_buffering_update = Some(Instant::now()); state.last_buffering_update = Some(Instant::now());
// Block source pads if needed to pause // Block source pads if needed to pause
if let Some(ref mut stream) = state.audio_stream { if let Some(ref mut stream) = state.audio_stream {
@ -1823,16 +1877,14 @@ impl FallbackSrc {
stream.source_srcpad_block = Some(self.add_pad_probe(element, stream)); stream.source_srcpad_block = Some(self.add_pad_probe(element, stream));
} }
} }
drop(state_guard);
element.notify("status");
} else { } else {
// Check if we can unblock now // Check if we can unblock now
self.unblock_pads(element, state); self.unblock_pads(element, state);
}
drop(state_guard); drop(state_guard);
element.notify("status"); element.notify("status");
} element.notify("statistics");
} }
fn handle_streams_selected(&self, element: &gst::Bin, m: &gst::message::StreamsSelected) { fn handle_streams_selected(&self, element: &gst::Bin, m: &gst::message::StreamsSelected) {
@ -1917,22 +1969,28 @@ impl FallbackSrc {
); );
if src == state.source || src.has_as_ancestor(&state.source) { if src == state.source || src.has_as_ancestor(&state.source) {
self.handle_source_error(element, state); self.handle_source_error(element, state, RetryReason::Error);
drop(state_guard); drop(state_guard);
element.notify("status"); element.notify("status");
element.notify("statistics");
return true; return true;
} }
false false
} }
fn handle_source_error(&self, element: &gst::Bin, state: &mut State) { fn handle_source_error(&self, element: &gst::Bin, state: &mut State, reason: RetryReason) {
gst_debug!(CAT, obj: element, "Handling source error"); gst_debug!(CAT, obj: element, "Handling source error");
state.stats.last_retry_reason = reason;
if state.source_pending_restart { if state.source_pending_restart {
gst_debug!(CAT, obj: element, "Source is already pending restart"); gst_debug!(CAT, obj: element, "Source is already pending restart");
return; return;
} }
// Increase retry count only if there was no pending restart
state.stats.num_retry += 1;
// Unschedule pending timeout, we're restarting now // Unschedule pending timeout, we're restarting now
if let Some(timeout) = state.source_restart_timeout.take() { if let Some(timeout) = state.source_restart_timeout.take() {
timeout.unschedule(); timeout.unschedule();
@ -2091,7 +2149,7 @@ impl FallbackSrc {
state.source_pending_restart = false; state.source_pending_restart = false;
state.source_pending_restart_timeout = None; state.source_pending_restart_timeout = None;
state.buffering_percent = 100; state.stats.buffering_percent = 100;
state.last_buffering_update = None; state.last_buffering_update = None;
if let Some(timeout) = state.source_restart_timeout.take() { if let Some(timeout) = state.source_restart_timeout.take() {
@ -2111,7 +2169,13 @@ impl FallbackSrc {
let _ = source.set_state(gst::State::Null); let _ = source.set_state(gst::State::Null);
let mut state_guard = src.state.lock().unwrap(); let mut state_guard = src.state.lock().unwrap();
let state = state_guard.as_mut().expect("no state"); let state = state_guard.as_mut().expect("no state");
src.handle_source_error(element, state); src.handle_source_error(
element,
state,
RetryReason::StateChangeFailure,
);
drop(state_guard);
element.notify("statistics");
} else { } else {
let mut state_guard = src.state.lock().unwrap(); let mut state_guard = src.state.lock().unwrap();
let state = state_guard.as_mut().expect("no state"); let state = state_guard.as_mut().expect("no state");
@ -2185,11 +2249,13 @@ impl FallbackSrc {
.map(|i| { .map(|i| {
i.elapsed() >= Duration::from_nanos(state.settings.restart_timeout) i.elapsed() >= Duration::from_nanos(state.settings.restart_timeout)
}) })
.unwrap_or(state.buffering_percent == 100) .unwrap_or(state.stats.buffering_percent == 100)
{ {
gst_debug!(CAT, obj: element, "Not buffering, restarting source"); gst_debug!(CAT, obj: element, "Not buffering, restarting source");
src.handle_source_error(element, state); src.handle_source_error(element, state, RetryReason::Timeout);
drop(state_guard);
element.notify("statistics");
} else { } else {
gst_debug!(CAT, obj: element, "Buffering, restarting source later"); gst_debug!(CAT, obj: element, "Buffering, restarting source later");
let elapsed = state let elapsed = state
@ -2290,6 +2356,17 @@ impl FallbackSrc {
element.notify("status"); element.notify("status");
} }
} }
fn get_stats(&self) -> gst::Structure {
let state_guard = self.state.lock().unwrap();
let state = match &*state_guard {
None => return Stats::default().to_structure(),
Some(ref state) => state,
};
state.stats.to_structure()
}
} }
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {