fallbacksrc: Differentiate between fallback and restart timeout

This also fixes the bug that the source wouldn't be restarted another
time if we switched to the fallback stream before and didn't at least
shortly switch to the normal stream. There was no timeout for this.

Based on a patch by Mathieu Duponchelle <mathieu@centricular.com>
This commit is contained in:
Sebastian Dröge 2020-07-02 12:23:48 +03:00
parent f7fc5bb0a3
commit 44ad0a2f52

View file

@ -43,6 +43,7 @@ struct Settings {
source: Option<gst::Element>, source: Option<gst::Element>,
fallback_uri: Option<String>, fallback_uri: Option<String>,
timeout: u64, timeout: u64,
restart_timeout: u64,
retry_timeout: u64, retry_timeout: u64,
} }
@ -55,6 +56,7 @@ impl Default for Settings {
source: None, source: None,
fallback_uri: None, fallback_uri: None,
timeout: 5 * gst::SECOND_VAL, timeout: 5 * gst::SECOND_VAL,
restart_timeout: 5 * gst::SECOND_VAL,
retry_timeout: 60 * gst::SECOND_VAL, retry_timeout: 60 * gst::SECOND_VAL,
} }
} }
@ -104,9 +106,8 @@ struct State {
source_is_live: bool, source_is_live: bool,
source_pending_restart: bool, source_pending_restart: bool,
// For timing out the source if we have to wait some additional time // For timing out the source and shutting it down to restart it
// after fallbackswitch switched due to recent buffering activity source_restart_timeout: Option<gst::ClockId>,
source_pending_timeout: Option<gst::ClockId>,
// For restarting the source after shutting it down // For restarting the source after shutting it down
source_pending_restart_timeout: Option<gst::ClockId>, source_pending_restart_timeout: Option<gst::ClockId>,
// For failing completely if we didn't recover after the retry timeout // For failing completely if we didn't recover after the retry timeout
@ -143,7 +144,7 @@ enum Status {
Running, Running,
} }
static PROPERTIES: [subclass::Property; 8] = [ static PROPERTIES: [subclass::Property; 9] = [
subclass::Property("enable-audio", |name| { subclass::Property("enable-audio", |name| {
glib::ParamSpec::boolean( glib::ParamSpec::boolean(
name, name,
@ -194,6 +195,17 @@ static PROPERTIES: [subclass::Property; 8] = [
glib::ParamFlags::READWRITE, glib::ParamFlags::READWRITE,
) )
}), }),
subclass::Property("restart-timeout", |name| {
glib::ParamSpec::uint64(
name,
"Timeout",
"Timeout for restarting an active source",
0,
std::u64::MAX,
5 * gst::SECOND_VAL,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("retry-timeout", |name| { subclass::Property("retry-timeout", |name| {
glib::ParamSpec::uint64( glib::ParamSpec::uint64(
name, name,
@ -342,6 +354,18 @@ impl ObjectImpl for FallbackSrc {
); );
settings.timeout = new_value; settings.timeout = new_value;
} }
subclass::Property("restart-timeout", ..) => {
let mut settings = self.settings.lock().unwrap();
let new_value = value.get_some().expect("type checked upstream");
gst_info!(
CAT,
obj: element,
"Changing Restart Timeout from {:?} to {:?}",
settings.restart_timeout,
new_value,
);
settings.restart_timeout = new_value;
}
subclass::Property("retry-timeout", ..) => { subclass::Property("retry-timeout", ..) => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let new_value = value.get_some().expect("type checked upstream"); let new_value = value.get_some().expect("type checked upstream");
@ -389,6 +413,10 @@ impl ObjectImpl for FallbackSrc {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
Ok(settings.timeout.to_value()) Ok(settings.timeout.to_value())
} }
subclass::Property("restart-timeout", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.restart_timeout.to_value())
}
subclass::Property("retry-timeout", ..) => { subclass::Property("retry-timeout", ..) => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
Ok(settings.retry_timeout.to_value()) Ok(settings.retry_timeout.to_value())
@ -424,7 +452,7 @@ impl ObjectImpl for FallbackSrc {
} }
if state.buffering_percent < 100 if state.buffering_percent < 100
|| state.source_pending_timeout.is_some() || state.source_restart_timeout.is_some()
|| state.streams.is_none() || state.streams.is_none()
|| (have_audio || (have_audio
&& state && state
@ -606,6 +634,8 @@ impl FallbackSrc {
Ok(identity) Ok(identity)
}) })
.expect("No clocksync or identity found"); .expect("No clocksync or identity found");
let queue = gst::ElementFactory::make("queue", Some("fallback_queue"))
.expect("No queue found");
input input
.add_many(&[ .add_many(&[
@ -615,11 +645,18 @@ impl FallbackSrc {
&videoscale, &videoscale,
&imagefreeze, &imagefreeze,
&clocksync, &clocksync,
&queue,
]) ])
.unwrap(); .unwrap();
gst::Element::link_many(&[&filesrc, &typefind]).unwrap(); gst::Element::link_many(&[&filesrc, &typefind]).unwrap();
gst::Element::link_many(&[&videoconvert, &videoscale, &imagefreeze, &clocksync]) gst::Element::link_many(&[
.unwrap(); &videoconvert,
&videoscale,
&imagefreeze,
&clocksync,
&queue,
])
.unwrap();
filesrc filesrc
.dynamic_cast_ref::<gst::URIHandler>() .dynamic_cast_ref::<gst::URIHandler>()
@ -708,7 +745,7 @@ impl FallbackSrc {
}) })
.unwrap(); .unwrap();
clocksync.get_static_pad("src").unwrap() queue.get_static_pad("src").unwrap()
} }
None => { None => {
let videotestsrc = let videotestsrc =
@ -893,7 +930,7 @@ impl FallbackSrc {
source, source,
source_is_live: false, source_is_live: false,
source_pending_restart: false, source_pending_restart: false,
source_pending_timeout: None, source_restart_timeout: None,
source_pending_restart_timeout: None, source_pending_restart_timeout: None,
source_retry_timeout: None, source_retry_timeout: None,
video_stream, video_stream,
@ -954,7 +991,7 @@ impl FallbackSrc {
timeout.unschedule(); timeout.unschedule();
} }
if let Some(timeout) = state.source_pending_timeout.take() { if let Some(timeout) = state.source_restart_timeout.take() {
timeout.unschedule(); timeout.unschedule();
} }
@ -1018,12 +1055,21 @@ impl FallbackSrc {
"Source changed state successfully: {:?}", "Source changed state successfully: {:?}",
res res
); );
let mut state_guard = self.state.lock().unwrap();
let state = state_guard.as_mut().expect("no state");
// Remember if the source is live // Remember if the source is live
if transition == gst::StateChange::ReadyToPaused { if transition == gst::StateChange::ReadyToPaused {
let mut state_guard = self.state.lock().unwrap();
let state = state_guard.as_mut().expect("no state");
state.source_is_live = res == gst::StateChangeSuccess::NoPreroll; state.source_is_live = res == gst::StateChangeSuccess::NoPreroll;
} }
if (state.source_is_live && transition == gst::StateChange::ReadyToPaused)
|| (!state.source_is_live && transition == gst::StateChange::PausedToPlaying)
{
assert!(state.source_restart_timeout.is_none());
self.schedule_source_restart_timeout(element, state, 0.into());
}
} }
} }
@ -1641,7 +1687,7 @@ impl FallbackSrc {
} }
// Unschedule pending timeout, we're restarting now // Unschedule pending timeout, we're restarting now
if let Some(timeout) = state.source_pending_timeout.take() { if let Some(timeout) = state.source_restart_timeout.take() {
timeout.unschedule(); timeout.unschedule();
} }
@ -1754,6 +1800,11 @@ impl FallbackSrc {
state.source_pending_restart_timeout = None; state.source_pending_restart_timeout = None;
state.buffering_percent = 100; state.buffering_percent = 100;
state.last_buffering_update = None; state.last_buffering_update = None;
if let Some(timeout) = state.source_restart_timeout.take() {
gst_debug!(CAT, obj: element, "Unscheduling restart timeout");
timeout.unschedule();
}
drop(state_guard); drop(state_guard);
if let Some(old_source) = old_source { if let Some(old_source) = old_source {
@ -1768,6 +1819,11 @@ impl FallbackSrc {
let mut state_guard = src.state.lock().unwrap(); let mut state_guard = src.state.lock().unwrap();
let state = state_guard.as_mut().expect("no state"); let state = state_guard.as_mut().expect("no state");
src.handle_source_error(element, state); src.handle_source_error(element, state);
} else {
let mut state_guard = src.state.lock().unwrap();
let state = state_guard.as_mut().expect("no state");
assert!(state.source_restart_timeout.is_none());
src.schedule_source_restart_timeout(element, state, 0.into());
} }
}); });
}) })
@ -1776,6 +1832,122 @@ impl FallbackSrc {
}); });
} }
#[allow(clippy::block_in_if_condition_stmt)]
fn schedule_source_restart_timeout(
&self,
element: &gst::Bin,
state: &mut State,
elapsed: gst::ClockTime,
) {
let clock = gst::SystemClock::obtain();
let wait_time = clock.get_time()
+ gst::ClockTime::from_nseconds(state.settings.restart_timeout)
- elapsed;
assert!(wait_time.is_some());
gst_debug!(
CAT,
obj: element,
"Scheduling source restart timeout for {}",
wait_time,
);
let timeout = clock
.new_single_shot_id(wait_time)
.expect("can't create clock id");
let element_weak = element.downgrade();
timeout
.wait_async(move |_clock, _time, _id| {
let element = match element_weak.upgrade() {
None => return,
Some(element) => element,
};
element.call_async(move |element| {
let src = FallbackSrc::from_instance(element);
gst_debug!(CAT, obj: element, "Source restart timeout triggered");
let mut state_guard = src.state.lock().unwrap();
let state = match &mut *state_guard {
None => {
gst_debug!(CAT, obj: element, "Restarting source not needed anymore");
return;
}
Some(state) => state,
};
state.source_restart_timeout = None;
let mut have_audio = false;
let mut have_video = false;
if let Some(ref streams) = state.streams {
for stream in streams.iter() {
have_audio = have_audio
|| stream.get_stream_type().contains(gst::StreamType::AUDIO);
have_video = have_video
|| stream.get_stream_type().contains(gst::StreamType::VIDEO);
}
}
// If we have neither audio nor video (no streams yet), or active pad for the ones we have
// is the fallback pad then restart the source now.
if (!have_audio && !have_video)
|| (have_audio
&& state
.audio_stream
.as_ref()
.and_then(|s| {
s.switch
.get_property("active-pad")
.unwrap()
.get::<gst::Pad>()
.unwrap()
})
.map(|p| p.get_name() == "fallback_sink")
.unwrap_or(true))
|| (have_video
&& state
.video_stream
.as_ref()
.and_then(|s| {
s.switch
.get_property("active-pad")
.unwrap()
.get::<gst::Pad>()
.unwrap()
})
.map(|p| p.get_name() == "fallback_sink")
.unwrap_or(true))
{
// If we're not actively buffering right now let's restart the source
if state
.last_buffering_update
.map(|i| {
i.elapsed() >= Duration::from_nanos(state.settings.restart_timeout)
})
.unwrap_or(state.buffering_percent == 100)
{
gst_debug!(CAT, obj: element, "Not buffering, restarting source");
src.handle_source_error(element, state);
} else {
gst_debug!(CAT, obj: element, "Buffering, restarting source later");
let elapsed = state
.last_buffering_update
.map(|i| i.elapsed().as_nanos() as u64)
.unwrap_or(0);
src.schedule_source_restart_timeout(element, state, elapsed.into());
}
} else {
gst_debug!(CAT, obj: element, "Restarting source not needed anymore");
}
});
})
.expect("Failed to wait async");
state.source_restart_timeout = Some(timeout);
}
#[allow(clippy::block_in_if_condition_stmt)] #[allow(clippy::block_in_if_condition_stmt)]
fn handle_switch_active_pad_change(&self, element: &gst::Bin) { fn handle_switch_active_pad_change(&self, element: &gst::Bin) {
let mut state_guard = self.state.lock().unwrap(); let mut state_guard = self.state.lock().unwrap();
@ -1797,12 +1969,6 @@ impl FallbackSrc {
} }
} }
// We will schedule a new one if needed below
if let Some(timeout) = state.source_pending_timeout.take() {
gst_debug!(CAT, obj: element, "Unscheduling pending timeout");
timeout.unschedule();
}
// If we have neither audio nor video (no streams yet), or active pad for the ones we have // If we have neither audio nor video (no streams yet), or active pad for the ones we have
// is the fallback pad then start the retry timeout unless it was started already. // is the fallback pad then start the retry timeout unless it was started already.
// Otherwise cancel the retry timeout. // Otherwise cancel the retry timeout.
@ -1835,114 +2001,20 @@ impl FallbackSrc {
.unwrap_or(true)) .unwrap_or(true))
{ {
gst_warning!(CAT, obj: element, "Switched to fallback stream"); gst_warning!(CAT, obj: element, "Switched to fallback stream");
if state.source_restart_timeout.is_none() {
// If we're not actively buffering right now let's restart the source self.schedule_source_restart_timeout(element, state, 0.into());
if state }
.last_buffering_update } else {
.map(|i| i.elapsed() >= Duration::from_nanos(state.settings.timeout)) gst_debug!(CAT, obj: element, "Switched to main stream");
.unwrap_or(state.buffering_percent == 100) if let Some(timeout) = state.source_retry_timeout.take() {
{ gst_debug!(CAT, obj: element, "Unscheduling retry timeout");
gst_debug!(CAT, obj: element, "Not buffering, restarting source"); timeout.unschedule();
self.handle_source_error(element, state);
} else {
// Schedule another timeout after the last buffering activity
let clock = gst::SystemClock::obtain();
let diff = gst::ClockTime::from(
state.settings.timeout.saturating_sub(
state
.last_buffering_update
.map(|i| i.elapsed().as_nanos() as u64)
.unwrap_or(state.settings.timeout),
),
);
let wait_time = clock.get_time() + diff;
assert!(wait_time.is_some());
gst_debug!(CAT, obj: element, "Starting pending timeout for {}", diff);
let timeout = clock
.new_single_shot_id(wait_time)
.expect("can't create clock id");
let element_weak = element.downgrade();
timeout
.wait_async(move |_clock, _time, _id| {
let element = match element_weak.upgrade() {
None => return,
Some(element) => element,
};
element.call_async(|element| {
let src = FallbackSrc::from_instance(element);
src.handle_switch_active_pad_change(element);
});
})
.expect("failed to wait async");
state.source_pending_timeout = Some(timeout);
} }
if state.source_retry_timeout.is_none() { if let Some(timeout) = state.source_restart_timeout.take() {
let clock = gst::SystemClock::obtain(); gst_debug!(CAT, obj: element, "Unscheduling restart timeout");
let wait_time = timeout.unschedule();
clock.get_time() + gst::ClockTime::from(state.settings.retry_timeout);
assert!(wait_time.is_some());
gst_debug!(CAT, obj: element, "Starting retry timeout");
let timeout = clock
.new_single_shot_id(wait_time)
.expect("can't create clock id");
let element_weak = element.downgrade();
timeout
.wait_async(move |_clock, _time, _id| {
let element = match element_weak.upgrade() {
None => return,
Some(element) => element,
};
element.call_async(|element| {
let src = FallbackSrc::from_instance(element);
let mut state_guard = src.state.lock().unwrap();
let state = match &mut *state_guard {
None => return,
Some(ref mut state) => state,
};
if state.source_retry_timeout.take().is_none() {
return;
}
if let Some(timeout) = state.source_pending_restart_timeout.take() {
timeout.unschedule();
}
if let Some(timeout) = state.source_pending_timeout.take() {
timeout.unschedule();
}
state.source_pending_restart = false;
drop(state_guard);
gst_element_warning!(
element,
gst::ResourceError::OpenRead,
["Failed to start playback"]
);
gst_warning!(CAT, obj: element, "Retry timeout, finishing");
for pad in element.get_src_pads() {
element.call_async(move |_element| {
pad.push_event(gst::event::Eos::new());
});
}
});
})
.expect("failed to wait async");
state.source_retry_timeout = Some(timeout);
drop(state_guard);
element.notify("status");
} }
} else if let Some(timeout) = state.source_retry_timeout.take() {
gst_debug!(CAT, obj: element, "Unscheduling retry timeout");
timeout.unschedule();
drop(state_guard); drop(state_guard);
element.notify("status"); element.notify("status");