diff --git a/utils/fallbackswitch/src/fallbacksrc.rs b/utils/fallbackswitch/src/fallbacksrc.rs index 2167223a..26234d1f 100644 --- a/utils/fallbackswitch/src/fallbacksrc.rs +++ b/utils/fallbackswitch/src/fallbacksrc.rs @@ -43,6 +43,7 @@ struct Settings { source: Option, fallback_uri: Option, timeout: u64, + restart_timeout: u64, retry_timeout: u64, } @@ -55,6 +56,7 @@ impl Default for Settings { source: None, fallback_uri: None, timeout: 5 * gst::SECOND_VAL, + restart_timeout: 5 * gst::SECOND_VAL, retry_timeout: 60 * gst::SECOND_VAL, } } @@ -104,9 +106,8 @@ struct State { source_is_live: bool, source_pending_restart: bool, - // For timing out the source if we have to wait some additional time - // after fallbackswitch switched due to recent buffering activity - source_pending_timeout: Option, + // For timing out the source and shutting it down to restart it + source_restart_timeout: Option, // For restarting the source after shutting it down source_pending_restart_timeout: Option, // For failing completely if we didn't recover after the retry timeout @@ -143,7 +144,7 @@ enum Status { Running, } -static PROPERTIES: [subclass::Property; 8] = [ +static PROPERTIES: [subclass::Property; 9] = [ subclass::Property("enable-audio", |name| { glib::ParamSpec::boolean( name, @@ -194,6 +195,17 @@ static PROPERTIES: [subclass::Property; 8] = [ glib::ParamFlags::READWRITE, ) }), + subclass::Property("restart-timeout", |name| { + glib::ParamSpec::uint64( + name, + "Timeout", + "Timeout for restarting an active source", + 0, + std::u64::MAX, + 5 * gst::SECOND_VAL, + glib::ParamFlags::READWRITE, + ) + }), subclass::Property("retry-timeout", |name| { glib::ParamSpec::uint64( name, @@ -342,6 +354,18 @@ impl ObjectImpl for FallbackSrc { ); settings.timeout = new_value; } + subclass::Property("restart-timeout", ..) => { + let mut settings = self.settings.lock().unwrap(); + let new_value = value.get_some().expect("type checked upstream"); + gst_info!( + CAT, + obj: element, + "Changing Restart Timeout from {:?} to {:?}", + settings.restart_timeout, + new_value, + ); + settings.restart_timeout = new_value; + } subclass::Property("retry-timeout", ..) => { let mut settings = self.settings.lock().unwrap(); let new_value = value.get_some().expect("type checked upstream"); @@ -389,6 +413,10 @@ impl ObjectImpl for FallbackSrc { let settings = self.settings.lock().unwrap(); Ok(settings.timeout.to_value()) } + subclass::Property("restart-timeout", ..) => { + let settings = self.settings.lock().unwrap(); + Ok(settings.restart_timeout.to_value()) + } subclass::Property("retry-timeout", ..) => { let settings = self.settings.lock().unwrap(); Ok(settings.retry_timeout.to_value()) @@ -424,7 +452,7 @@ impl ObjectImpl for FallbackSrc { } if state.buffering_percent < 100 - || state.source_pending_timeout.is_some() + || state.source_restart_timeout.is_some() || state.streams.is_none() || (have_audio && state @@ -606,6 +634,8 @@ impl FallbackSrc { Ok(identity) }) .expect("No clocksync or identity found"); + let queue = gst::ElementFactory::make("queue", Some("fallback_queue")) + .expect("No queue found"); input .add_many(&[ @@ -615,11 +645,18 @@ impl FallbackSrc { &videoscale, &imagefreeze, &clocksync, + &queue, ]) .unwrap(); gst::Element::link_many(&[&filesrc, &typefind]).unwrap(); - gst::Element::link_many(&[&videoconvert, &videoscale, &imagefreeze, &clocksync]) - .unwrap(); + gst::Element::link_many(&[ + &videoconvert, + &videoscale, + &imagefreeze, + &clocksync, + &queue, + ]) + .unwrap(); filesrc .dynamic_cast_ref::() @@ -708,7 +745,7 @@ impl FallbackSrc { }) .unwrap(); - clocksync.get_static_pad("src").unwrap() + queue.get_static_pad("src").unwrap() } None => { let videotestsrc = @@ -893,7 +930,7 @@ impl FallbackSrc { source, source_is_live: false, source_pending_restart: false, - source_pending_timeout: None, + source_restart_timeout: None, source_pending_restart_timeout: None, source_retry_timeout: None, video_stream, @@ -954,7 +991,7 @@ impl FallbackSrc { timeout.unschedule(); } - if let Some(timeout) = state.source_pending_timeout.take() { + if let Some(timeout) = state.source_restart_timeout.take() { timeout.unschedule(); } @@ -1018,12 +1055,21 @@ impl FallbackSrc { "Source changed state successfully: {:?}", res ); + + let mut state_guard = self.state.lock().unwrap(); + let state = state_guard.as_mut().expect("no state"); + // Remember if the source is live if transition == gst::StateChange::ReadyToPaused { - let mut state_guard = self.state.lock().unwrap(); - let state = state_guard.as_mut().expect("no state"); state.source_is_live = res == gst::StateChangeSuccess::NoPreroll; } + + if (state.source_is_live && transition == gst::StateChange::ReadyToPaused) + || (!state.source_is_live && transition == gst::StateChange::PausedToPlaying) + { + assert!(state.source_restart_timeout.is_none()); + self.schedule_source_restart_timeout(element, state, 0.into()); + } } } @@ -1641,7 +1687,7 @@ impl FallbackSrc { } // Unschedule pending timeout, we're restarting now - if let Some(timeout) = state.source_pending_timeout.take() { + if let Some(timeout) = state.source_restart_timeout.take() { timeout.unschedule(); } @@ -1754,6 +1800,11 @@ impl FallbackSrc { state.source_pending_restart_timeout = None; state.buffering_percent = 100; state.last_buffering_update = None; + + if let Some(timeout) = state.source_restart_timeout.take() { + gst_debug!(CAT, obj: element, "Unscheduling restart timeout"); + timeout.unschedule(); + } drop(state_guard); if let Some(old_source) = old_source { @@ -1768,6 +1819,11 @@ impl FallbackSrc { let mut state_guard = src.state.lock().unwrap(); let state = state_guard.as_mut().expect("no state"); src.handle_source_error(element, state); + } else { + let mut state_guard = src.state.lock().unwrap(); + let state = state_guard.as_mut().expect("no state"); + assert!(state.source_restart_timeout.is_none()); + src.schedule_source_restart_timeout(element, state, 0.into()); } }); }) @@ -1776,6 +1832,122 @@ impl FallbackSrc { }); } + #[allow(clippy::block_in_if_condition_stmt)] + fn schedule_source_restart_timeout( + &self, + element: &gst::Bin, + state: &mut State, + elapsed: gst::ClockTime, + ) { + let clock = gst::SystemClock::obtain(); + let wait_time = clock.get_time() + + gst::ClockTime::from_nseconds(state.settings.restart_timeout) + - elapsed; + assert!(wait_time.is_some()); + gst_debug!( + CAT, + obj: element, + "Scheduling source restart timeout for {}", + wait_time, + ); + + let timeout = clock + .new_single_shot_id(wait_time) + .expect("can't create clock id"); + let element_weak = element.downgrade(); + timeout + .wait_async(move |_clock, _time, _id| { + let element = match element_weak.upgrade() { + None => return, + Some(element) => element, + }; + + element.call_async(move |element| { + let src = FallbackSrc::from_instance(element); + + gst_debug!(CAT, obj: element, "Source restart timeout triggered"); + let mut state_guard = src.state.lock().unwrap(); + let state = match &mut *state_guard { + None => { + gst_debug!(CAT, obj: element, "Restarting source not needed anymore"); + return; + } + Some(state) => state, + }; + + state.source_restart_timeout = None; + + let mut have_audio = false; + let mut have_video = false; + if let Some(ref streams) = state.streams { + for stream in streams.iter() { + have_audio = have_audio + || stream.get_stream_type().contains(gst::StreamType::AUDIO); + have_video = have_video + || stream.get_stream_type().contains(gst::StreamType::VIDEO); + } + } + + // If we have neither audio nor video (no streams yet), or active pad for the ones we have + // is the fallback pad then restart the source now. + if (!have_audio && !have_video) + || (have_audio + && state + .audio_stream + .as_ref() + .and_then(|s| { + s.switch + .get_property("active-pad") + .unwrap() + .get::() + .unwrap() + }) + .map(|p| p.get_name() == "fallback_sink") + .unwrap_or(true)) + || (have_video + && state + .video_stream + .as_ref() + .and_then(|s| { + s.switch + .get_property("active-pad") + .unwrap() + .get::() + .unwrap() + }) + .map(|p| p.get_name() == "fallback_sink") + .unwrap_or(true)) + { + // If we're not actively buffering right now let's restart the source + if state + .last_buffering_update + .map(|i| { + i.elapsed() >= Duration::from_nanos(state.settings.restart_timeout) + }) + .unwrap_or(state.buffering_percent == 100) + { + gst_debug!(CAT, obj: element, "Not buffering, restarting source"); + + src.handle_source_error(element, state); + } else { + gst_debug!(CAT, obj: element, "Buffering, restarting source later"); + let elapsed = state + .last_buffering_update + .map(|i| i.elapsed().as_nanos() as u64) + .unwrap_or(0); + + src.schedule_source_restart_timeout(element, state, elapsed.into()); + } + } else { + gst_debug!(CAT, obj: element, "Restarting source not needed anymore"); + } + }); + }) + .expect("Failed to wait async"); + + state.source_restart_timeout = Some(timeout); + } + #[allow(clippy::block_in_if_condition_stmt)] fn handle_switch_active_pad_change(&self, element: &gst::Bin) { let mut state_guard = self.state.lock().unwrap(); @@ -1797,12 +1969,6 @@ impl FallbackSrc { } } - // We will schedule a new one if needed below - if let Some(timeout) = state.source_pending_timeout.take() { - gst_debug!(CAT, obj: element, "Unscheduling pending timeout"); - timeout.unschedule(); - } - // If we have neither audio nor video (no streams yet), or active pad for the ones we have // is the fallback pad then start the retry timeout unless it was started already. // Otherwise cancel the retry timeout. @@ -1835,114 +2001,20 @@ impl FallbackSrc { .unwrap_or(true)) { gst_warning!(CAT, obj: element, "Switched to fallback stream"); - - // If we're not actively buffering right now let's restart the source - if state - .last_buffering_update - .map(|i| i.elapsed() >= Duration::from_nanos(state.settings.timeout)) - .unwrap_or(state.buffering_percent == 100) - { - gst_debug!(CAT, obj: element, "Not buffering, restarting source"); - self.handle_source_error(element, state); - } else { - // Schedule another timeout after the last buffering activity - let clock = gst::SystemClock::obtain(); - let diff = gst::ClockTime::from( - state.settings.timeout.saturating_sub( - state - .last_buffering_update - .map(|i| i.elapsed().as_nanos() as u64) - .unwrap_or(state.settings.timeout), - ), - ); - let wait_time = clock.get_time() + diff; - assert!(wait_time.is_some()); - - gst_debug!(CAT, obj: element, "Starting pending timeout for {}", diff); - let timeout = clock - .new_single_shot_id(wait_time) - .expect("can't create clock id"); - - let element_weak = element.downgrade(); - timeout - .wait_async(move |_clock, _time, _id| { - let element = match element_weak.upgrade() { - None => return, - Some(element) => element, - }; - - element.call_async(|element| { - let src = FallbackSrc::from_instance(element); - src.handle_switch_active_pad_change(element); - }); - }) - .expect("failed to wait async"); - - state.source_pending_timeout = Some(timeout); + if state.source_restart_timeout.is_none() { + self.schedule_source_restart_timeout(element, state, 0.into()); + } + } else { + gst_debug!(CAT, obj: element, "Switched to main stream"); + if let Some(timeout) = state.source_retry_timeout.take() { + gst_debug!(CAT, obj: element, "Unscheduling retry timeout"); + timeout.unschedule(); } - if state.source_retry_timeout.is_none() { - let clock = gst::SystemClock::obtain(); - let wait_time = - clock.get_time() + gst::ClockTime::from(state.settings.retry_timeout); - assert!(wait_time.is_some()); - - gst_debug!(CAT, obj: element, "Starting retry timeout"); - let timeout = clock - .new_single_shot_id(wait_time) - .expect("can't create clock id"); - - let element_weak = element.downgrade(); - timeout - .wait_async(move |_clock, _time, _id| { - let element = match element_weak.upgrade() { - None => return, - Some(element) => element, - }; - - element.call_async(|element| { - let src = FallbackSrc::from_instance(element); - let mut state_guard = src.state.lock().unwrap(); - let state = match &mut *state_guard { - None => return, - Some(ref mut state) => state, - }; - if state.source_retry_timeout.take().is_none() { - return; - } - if let Some(timeout) = state.source_pending_restart_timeout.take() { - timeout.unschedule(); - } - if let Some(timeout) = state.source_pending_timeout.take() { - timeout.unschedule(); - } - state.source_pending_restart = false; - drop(state_guard); - - gst_element_warning!( - element, - gst::ResourceError::OpenRead, - ["Failed to start playback"] - ); - gst_warning!(CAT, obj: element, "Retry timeout, finishing"); - - for pad in element.get_src_pads() { - element.call_async(move |_element| { - pad.push_event(gst::event::Eos::new()); - }); - } - }); - }) - .expect("failed to wait async"); - - state.source_retry_timeout = Some(timeout); - - drop(state_guard); - element.notify("status"); + if let Some(timeout) = state.source_restart_timeout.take() { + gst_debug!(CAT, obj: element, "Unscheduling restart timeout"); + timeout.unschedule(); } - } else if let Some(timeout) = state.source_retry_timeout.take() { - gst_debug!(CAT, obj: element, "Unscheduling retry timeout"); - timeout.unschedule(); drop(state_guard); element.notify("status");