fallbacksrc: implement manual unblocking feature

This enables a use case for preparing slow to start up sources
ahead of time in a live cueing system, where a stream is scheduled
to start at some point in the future, and the application wants to
make sure it is ready for prime time by that time, instead of
spinning it up at the last moment and waiting for the stream to
actually come up.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/515>
This commit is contained in:
Mathieu Duponchelle 2021-05-28 01:43:50 +02:00 committed by GStreamer Marge Bot
parent 29052b1acb
commit ef41adf776

View file

@ -80,6 +80,7 @@ struct Settings {
min_latency: gst::ClockTime,
buffer_duration: i64,
immediate_fallback: bool,
manual_unblock: bool,
}
impl Default for Settings {
@ -97,6 +98,7 @@ impl Default for Settings {
min_latency: gst::ClockTime::ZERO,
buffer_duration: -1,
immediate_fallback: false,
manual_unblock: false,
}
}
}
@ -171,6 +173,12 @@ struct State {
// Statistics
stats: Stats,
// When application is using the manual-unblock property
manually_blocked: bool,
// So that we don't schedule a restart when manually unblocking
// and our source hasn't reached the required state
schedule_restart_on_unblock: bool,
}
#[derive(Default)]
@ -288,6 +296,13 @@ impl ObjectImpl for FallbackSrc {
gst::Structure::static_type(),
glib::ParamFlags::READABLE,
),
glib::ParamSpec::new_boolean(
"manual-unblock",
"Manual unblock",
"When enabled, the application must call the unblock signal, except for live streams",
false,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpec::new_boolean(
"immediate-fallback",
"Immediate fallback",
@ -453,6 +468,18 @@ impl ObjectImpl for FallbackSrc {
);
settings.immediate_fallback = new_value;
}
"manual-unblock" => {
let mut settings = self.settings.lock().unwrap();
let new_value = value.get().expect("type checked upstream");
gst_info!(
CAT,
obj: obj,
"Changing manual-unblock from {:?} to {:?}",
settings.manual_unblock,
new_value,
);
settings.manual_unblock = new_value;
}
_ => unimplemented!(),
}
}
@ -563,28 +590,64 @@ impl ObjectImpl for FallbackSrc {
let settings = self.settings.lock().unwrap();
settings.immediate_fallback.to_value()
}
"manual-unblock" => {
let settings = self.settings.lock().unwrap();
settings.manual_unblock.to_value()
}
_ => unimplemented!(),
}
}
fn signals() -> &'static [glib::subclass::Signal] {
static SIGNALS: Lazy<Vec<glib::subclass::Signal>> = Lazy::new(|| {
vec![glib::subclass::Signal::builder(
"update-uri",
&[String::static_type().into()],
String::static_type().into(),
)
.action()
.class_handler(|_token, args| {
// Simply return the input by default
Some(args[1].clone())
})
.accumulator(|_hint, ret, value| {
// First signal handler wins
*ret = value.clone();
false
})
.build()]
vec![
glib::subclass::Signal::builder(
"update-uri",
&[String::static_type().into()],
String::static_type().into(),
)
.action()
.class_handler(|_token, args| {
// Simply return the input by default
Some(args[1].clone())
})
.accumulator(|_hint, ret, value| {
// First signal handler wins
*ret = value.clone();
false
})
.build(),
glib::subclass::Signal::builder("unblock", &[], glib::types::Type::UNIT.into())
.action()
.class_handler(|_token, args| {
let element = args[0].get::<super::FallbackSrc>().expect("signal arg");
let src = FallbackSrc::from_instance(&element);
let mut state_guard = src.state.lock().unwrap();
let state = match &mut *state_guard {
None => {
return None;
}
Some(state) => state,
};
state.manually_blocked = false;
if state.schedule_restart_on_unblock
&& src.have_fallback_activated(&element, state)
{
src.schedule_source_restart_timeout(
&element,
state,
gst::ClockTime::ZERO,
);
}
src.unblock_pads(&element, state);
None
})
.build(),
]
});
SIGNALS.as_ref()
@ -1007,6 +1070,8 @@ impl FallbackSrc {
None
};
let manually_blocked = settings.manual_unblock;
*state_guard = Some(State {
source,
source_is_live: false,
@ -1022,6 +1087,8 @@ impl FallbackSrc {
settings,
configured_source,
stats: Stats::default(),
manually_blocked,
schedule_restart_on_unblock: false,
});
drop(state_guard);
@ -1155,6 +1222,7 @@ impl FallbackSrc {
|| (!state.source_is_live && transition == gst::StateChange::PausedToPlaying)
{
assert!(state.source_restart_timeout.is_none());
state.schedule_restart_on_unblock = true;
self.schedule_source_restart_timeout(element, state, gst::ClockTime::ZERO);
}
}
@ -1493,6 +1561,11 @@ impl FallbackSrc {
}
fn unblock_pads(&self, element: &super::FallbackSrc, state: &mut State) {
if state.manually_blocked {
gst_debug!(CAT, obj: element, "Not unblocking yet: manual unblock",);
return;
}
// Check if all streams are blocked and have a running time and we have
// 100% buffering
if state.stats.buffering_percent < 100 {
@ -2153,6 +2226,15 @@ impl FallbackSrc {
return;
}
if state.manually_blocked {
gst_debug!(
CAT,
obj: element,
"Not scheduling source restart timeout because we are manually blocked",
);
return;
}
let clock = gst::SystemClock::obtain();
let wait_time = clock.time().unwrap() + state.settings.restart_timeout - elapsed;
gst_debug!(