fallbackswitch: output buffers ASAP at startup

When only the backup pad is receiving buffers, and the primary
pad is a bit slow to start up (eg network source with buffering),
it makes for a better UX to output buffers from the backup pad
while waiting for the network source to make its move.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/515>
This commit is contained in:
Mathieu Duponchelle 2021-05-21 22:28:06 +02:00 committed by GStreamer Marge Bot
parent 3cdc5870a1
commit a5a80281f3
2 changed files with 94 additions and 26 deletions

View file

@ -79,6 +79,7 @@ struct Settings {
restart_on_eos: bool,
min_latency: gst::ClockTime,
buffer_duration: i64,
immediate_fallback: bool,
}
impl Default for Settings {
@ -95,6 +96,7 @@ impl Default for Settings {
restart_on_eos: false,
min_latency: gst::ClockTime::ZERO,
buffer_duration: -1,
immediate_fallback: false,
}
}
}
@ -286,6 +288,13 @@ impl ObjectImpl for FallbackSrc {
gst::Structure::static_type(),
glib::ParamFlags::READABLE,
),
glib::ParamSpec::new_boolean(
"immediate-fallback",
"Immediate fallback",
"Forward the fallback streams immediately at startup, when the primary streams are slow to start up and immediate output is required",
false,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
]
});
@ -432,6 +441,18 @@ impl ObjectImpl for FallbackSrc {
);
settings.buffer_duration = new_value;
}
"immediate-fallback" => {
let mut settings = self.settings.lock().unwrap();
let new_value = value.get().expect("type checked upstream");
gst_info!(
CAT,
obj: obj,
"Changing immediate-fallback from {:?} to {:?}",
settings.immediate_fallback,
new_value,
);
settings.immediate_fallback = new_value;
}
_ => unimplemented!(),
}
}
@ -538,6 +559,10 @@ impl ObjectImpl for FallbackSrc {
settings.buffer_duration.to_value()
}
"statistics" => self.stats().to_value(),
"immediate-fallback" => {
let settings = self.settings.lock().unwrap();
settings.immediate_fallback.to_value()
}
_ => unimplemented!(),
}
}
@ -775,6 +800,7 @@ impl FallbackSrc {
min_latency: gst::ClockTime,
is_audio: bool,
fallback_uri: Option<&str>,
immediate_fallback: bool,
) -> Stream {
let fallback_input = if is_audio {
self.create_fallback_audio_input(element)
@ -820,6 +846,9 @@ impl FallbackSrc {
switch
.set_property("min-upstream-latency", &min_latency.nseconds())
.unwrap();
switch
.set_property("immediate-fallback", &immediate_fallback)
.unwrap();
gst::Element::link_pads(&fallback_input, Some("src"), &switch, Some("fallback_sink"))
.unwrap();
@ -903,6 +932,7 @@ impl FallbackSrc {
settings.min_latency,
false,
fallback_uri.as_deref(),
settings.immediate_fallback,
);
flow_combiner.add_pad(&stream.srcpad);
Some(stream)
@ -912,8 +942,14 @@ impl FallbackSrc {
// Create audio stream
let audio_stream = if settings.enable_audio {
let stream =
self.create_stream(element, settings.timeout, settings.min_latency, true, None);
let stream = self.create_stream(
element,
settings.timeout,
settings.min_latency,
true,
None,
settings.immediate_fallback,
);
flow_combiner.add_pad(&stream.srcpad);
Some(stream)
} else {

View file

@ -92,11 +92,13 @@ struct PadInputState {
const DEFAULT_TIMEOUT: gst::ClockTime = gst::ClockTime::from_seconds(5);
const DEFAULT_AUTO_SWITCH: bool = true;
const DEFAULT_STREAM_HEALTH: StreamHealth = StreamHealth::Inactive;
const DEFAULT_IMMEDIATE_FALLBACK: bool = false;
#[derive(Debug, Clone)]
struct Settings {
timeout: gst::ClockTime,
auto_switch: bool,
immediate_fallback: bool,
}
impl Default for StreamHealth {
@ -120,6 +122,7 @@ impl Default for Settings {
Settings {
timeout: DEFAULT_TIMEOUT,
auto_switch: DEFAULT_AUTO_SWITCH,
immediate_fallback: DEFAULT_IMMEDIATE_FALLBACK,
}
}
}
@ -411,40 +414,54 @@ impl FallbackSwitch {
if state.last_output_time.is_none() {
state.last_output_time = running_time;
}
if backup_pad == &self.primary_sinkpad {
state.primary.last_sinkpad_time = running_time;
} else {
state.fallback.last_sinkpad_time = running_time;
}
// Get the next one if this one is before the timeout
if state.last_output_time.zip(running_time).map_or(
false,
|(last_output_time, running_time)| {
last_output_time + settings.timeout > running_time
},
) {
// If the other pad never received a buffer, we want to start consuming
// buffers on this pad in order to provide an output at start up
// (for example with a slow primary)
let ignore_timeout = settings.immediate_fallback && {
if backup_pad == &self.primary_sinkpad {
state.primary.last_sinkpad_time = running_time;
state.fallback.last_sinkpad_time.is_none()
} else {
state.fallback.last_sinkpad_time = running_time;
state.primary.last_sinkpad_time.is_none()
}
};
if !ignore_timeout {
// Get the next one if this one is before the timeout
if state.last_output_time.zip(running_time).map_or(
false,
|(last_output_time, running_time)| {
last_output_time + settings.timeout > running_time
},
) {
gst_debug!(
CAT,
obj: backup_pad,
"Timeout not reached yet: {} + {} > {}",
state.last_output_time.display(),
settings.timeout,
running_time.display(),
);
continue;
}
gst_debug!(
CAT,
obj: backup_pad,
"Timeout not reached yet: {} + {} > {}",
"Timeout reached: {} + {} <= {}",
state.last_output_time.display(),
settings.timeout,
running_time.display(),
);
continue;
} else {
gst_debug!(
CAT,
obj: backup_pad,
"Consuming buffer as we haven't yet received a buffer on the other pad",
);
}
gst_debug!(
CAT,
obj: backup_pad,
"Timeout reached: {} + {} <= {}",
state.last_output_time.display(),
settings.timeout,
running_time.display(),
);
let mut active_sinkpad = self.active_sinkpad.lock().unwrap();
let pad_change = settings.auto_switch
&& active_sinkpad.as_ref() != Some(backup_pad.upcast_ref::<gst::Pad>());
@ -721,6 +738,13 @@ impl ObjectImpl for FallbackSwitch {
DEFAULT_STREAM_HEALTH as i32,
glib::ParamFlags::READABLE,
),
glib::ParamSpec::new_boolean(
"immediate-fallback",
"Immediate fallback",
"Forward the fallback stream immediately at startup, when the primary stream is slow to start up and immediate output is required",
DEFAULT_AUTO_SWITCH,
glib::ParamFlags::READWRITE| gst::PARAM_FLAG_MUTABLE_READY,
),
]
});
@ -779,6 +803,10 @@ impl ObjectImpl for FallbackSwitch {
let mut settings = self.settings.lock().unwrap();
settings.auto_switch = value.get().expect("type checked upstream");
}
"immediate-fallback" => {
let mut settings = self.settings.lock().unwrap();
settings.immediate_fallback = value.get().expect("type checked upstream");
}
_ => unimplemented!(),
}
}
@ -805,6 +833,10 @@ impl ObjectImpl for FallbackSwitch {
let state = self.output_state.lock().unwrap();
state.fallback.stream_health.to_value()
}
"immediate-fallback" => {
let settings = self.settings.lock().unwrap();
settings.immediate_fallback.to_value()
}
_ => unimplemented!(),
}
}