From 38753b08acc0fe045997cf1deede912e8c839bcb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Fri, 19 Aug 2022 17:34:17 +0300 Subject: [PATCH] fallbacksrc: Implement support for fallback streams --- docs/plugins/gst_plugins_cache.json | 4 +- utils/fallbackswitch/src/fallbacksrc/imp.rs | 2046 ++++++++++++----- utils/fallbackswitch/src/fallbacksrc/mod.rs | 1 - .../src/fallbacksrc/video_fallback/imp.rs | 464 ---- .../src/fallbacksrc/video_fallback/mod.rs | 22 - 5 files changed, 1472 insertions(+), 1065 deletions(-) delete mode 100644 utils/fallbackswitch/src/fallbacksrc/video_fallback/imp.rs delete mode 100644 utils/fallbackswitch/src/fallbacksrc/video_fallback/mod.rs diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 79c119d4..0bc054f6 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -1017,7 +1017,7 @@ "elements": { "fallbacksrc": { "author": "Sebastian Dröge ", - "description": "Live source with uridecodebin3 or custom source, and fallback image stream", + "description": "Live source with uridecodebin3 or custom source, and fallback stream", "hierarchy": [ "FallbackSrc", "GstBin", @@ -1213,7 +1213,7 @@ "construct": false, "construct-only": false, "controllable": false, - "default": "application/x-fallbacksrc-stats, num-retry=(guint64)0, last-retry-reason=(GstFallbackSourceRetryReason)none, buffering-percent=(int)100;", + "default": "application/x-fallbacksrc-stats, num-retry=(guint64)0, num-fallback-retry=(guint64)0, last-retry-reason=(GstFallbackSourceRetryReason)none, last-fallback-retry-reason=(GstFallbackSourceRetryReason)none, buffering-percent=(int)100, fallback-buffering-percent=(int)100;", "mutable": "null", "readable": true, "type": "GstStructure", diff --git a/utils/fallbackswitch/src/fallbacksrc/imp.rs b/utils/fallbackswitch/src/fallbacksrc/imp.rs index 907c007e..0dc74ff8 100644 --- a/utils/fallbackswitch/src/fallbacksrc/imp.rs +++ b/utils/fallbackswitch/src/fallbacksrc/imp.rs @@ -11,13 +11,12 @@ use gst::prelude::*; use gst::subclass::prelude::*; use parking_lot::Mutex; -use std::mem; use std::time::Instant; +use std::{cmp, mem}; use once_cell::sync::Lazy; use super::custom_source::CustomSource; -use super::video_fallback::VideoFallbackSource; use super::{RetryReason, Status}; static CAT: Lazy = Lazy::new(|| { @@ -31,16 +30,22 @@ static CAT: Lazy = Lazy::new(|| { #[derive(Debug, Clone)] struct Stats { num_retry: u64, + num_fallback_retry: u64, last_retry_reason: RetryReason, + last_fallback_retry_reason: RetryReason, buffering_percent: i32, + fallback_buffering_percent: i32, } impl Default for Stats { fn default() -> Self { Self { num_retry: 0, + num_fallback_retry: 0, last_retry_reason: RetryReason::None, + last_fallback_retry_reason: RetryReason::None, buffering_percent: 100, + fallback_buffering_percent: 100, } } } @@ -49,8 +54,17 @@ impl Stats { fn to_structure(&self) -> gst::Structure { gst::Structure::builder("application/x-fallbacksrc-stats") .field("num-retry", self.num_retry) + .field("num-fallback-retry", &self.num_fallback_retry) .field("last-retry-reason", self.last_retry_reason) + .field( + "last-fallback-retry-reason", + self.last_fallback_retry_reason, + ) .field("buffering-percent", self.buffering_percent) + .field( + "fallback-buffering-percent", + self.fallback_buffering_percent, + ) .build() } } @@ -113,47 +127,72 @@ struct Block { running_time: Option, } -// Connects one source pad with fallbackswitch and the corresponding fallback input -struct Stream { - // Fallback input stream - // for video: filesrc, decoder, converters, imagefreeze - // for audio: live audiotestsrc, converters - fallback_input: gst::Element, - - fallback_capsfilter: gst::Element, - - // source pad from source - source_srcpad: Option, +struct StreamBranch { + // source pad from actual source inside the source bin + source_srcpad: gst::Pad, + // blocking pad probe on the source pad of the source queue source_srcpad_block: Option, - // clocksync for source source pad - clocksync: gst::Element, - - // imagefreeze if this is an image stream + // other elements in the source bin before the ghostpad + // imagefreeze before the clocksync if this is a stillframe stream imagefreeze: Option, + clocksync: gst::Element, + converters: gst::Element, + queue: gst::Element, + // queue source pad, target pad of the source ghost pad + queue_srcpad: gst::Pad, - clocksync_queue: gst::Element, - clocksync_queue_srcpad: gst::Pad, + // Request pad on the fallbackswitch + switch_pad: gst::Pad, +} + +// Connects one source pad with fallbackswitch and the corresponding fallback input +struct Stream { + // Main stream and fallback stream branches to the fallback switch + main_branch: Option, + // If this does not exist then the fallbackswitch is connected directly to the dummy + // audio/video sources + fallback_branch: Option, // fallbackswitch + // fallbackswitch in the main bin, linked to the ghostpads above switch: gst::Element, - // output source pad, connected to switch + // output source pad on the main bin, switch source pad is ghostpad target srcpad: gst::GhostPad, + + // filter caps for the fallback/dummy streams + filter_caps: gst::Caps, +} + +struct SourceBin { + // uridecodebin3 or custom source element inside a bin. + // + // This bin would also contain imagefreeze, clocksync and queue elements as needed for the + // outputs and would be connected via ghost pads to the fallbackswitch elements. + source: gst::Bin, + pending_restart: bool, + is_live: bool, + is_image: bool, + + // For timing out the source and shutting it down to restart it + restart_timeout: Option, + // For restarting the source after shutting it down + pending_restart_timeout: Option, + // For failing completely if we didn't recover after the retry timeout + retry_timeout: Option, + + // Stream collection posted by source + streams: Option, } struct State { - // uridecodebin3 or custom source element - source: gst::Element, - source_is_live: bool, - source_pending_restart: bool, + source: SourceBin, + fallback_source: Option, - // For timing out the source and shutting it down to restart it - source_restart_timeout: Option, - // For restarting the source after shutting it down - source_pending_restart_timeout: Option, - // For failing completely if we didn't recover after the retry timeout - source_retry_timeout: Option, + // audio/video dummy source if the fallback source fails or is not started yet + audio_dummy_source: Option, + video_dummy_source: Option, // All our output streams, selected by properties video_stream: Option, @@ -161,9 +200,7 @@ struct State { flow_combiner: gst_base::UniqueFlowCombiner, last_buffering_update: Option, - - // Stream collection posted by source - streams: Option, + fallback_last_buffering_update: Option, // Configure settings settings: Settings, @@ -177,8 +214,6 @@ struct State { // So that we don't schedule a restart when manually unblocking // and our source hasn't reached the required state schedule_restart_on_unblock: bool, - - is_image: bool, } #[derive(Default)] @@ -555,9 +590,9 @@ impl ObjectImpl for FallbackSrc { }; // If any restarts/retries are pending, we're retrying - if state.source_pending_restart - || state.source_pending_restart_timeout.is_some() - || state.source_retry_timeout.is_some() + if state.source.pending_restart + || state.source.pending_restart_timeout.is_some() + || state.source.retry_timeout.is_some() { return Status::Retrying.to_value(); } @@ -566,7 +601,7 @@ impl ObjectImpl for FallbackSrc { // streams there is no source pad yet, we're buffering let mut have_audio = false; let mut have_video = false; - if let Some(ref streams) = state.streams { + if let Some(ref streams) = state.source.streams { for stream in streams.iter() { have_audio = have_audio || stream.stream_type().contains(gst::StreamType::AUDIO); @@ -576,19 +611,21 @@ impl ObjectImpl for FallbackSrc { } if state.stats.buffering_percent < 100 - || state.source_restart_timeout.is_some() - || state.streams.is_none() + || state.source.restart_timeout.is_some() + || state.source.streams.is_none() || (have_audio && state .audio_stream .as_ref() - .map(|s| s.source_srcpad.is_none() || s.source_srcpad_block.is_some()) + .and_then(|s| s.main_branch.as_ref()) + .map(|b| b.source_srcpad_block.is_some()) .unwrap_or(true)) || (have_video && state .video_stream .as_ref() - .map(|s| s.source_srcpad.is_none() || s.source_srcpad_block.is_some()) + .and_then(|s| s.main_branch.as_ref()) + .map(|b| b.source_srcpad_block.is_some()) .unwrap_or(true)) { return Status::Buffering.to_value(); @@ -664,10 +701,11 @@ impl ObjectImpl for FallbackSrc { &element, state, gst::ClockTime::ZERO, + false, ); } - src.unblock_pads(&element, state); + src.unblock_pads(&element, state, false); None }) @@ -697,7 +735,7 @@ impl ElementImpl for FallbackSrc { gst::subclass::ElementMetadata::new( "Fallback Source", "Generic/Source", - "Live source with uridecodebin3 or custom source, and fallback image stream", + "Live source with uridecodebin3 or custom source, and fallback stream", "Sebastian Dröge ", ) }); @@ -735,6 +773,8 @@ impl ElementImpl for FallbackSrc { element: &Self::Type, transition: gst::StateChange, ) -> Result { + gst::debug!(CAT, obj: element, "Changing state {:?}", transition); + match transition { gst::StateChange::NullToReady => { self.start(element)?; @@ -742,11 +782,25 @@ impl ElementImpl for FallbackSrc { _ => (), } - self.parent_change_state(element, transition)?; + self.parent_change_state(element, transition) + .map_err(|err| { + gst::error!( + CAT, + obj: element, + "Parent state change transition {:?} failed", + transition + ); + + err + })?; // Change the source state manually here to be able to catch errors. State changes always // happen from sink to source, so we do this after chaining up. - self.change_source_state(element, transition); + self.change_source_state(element, transition, false); + + // Change the fallback source state manually here to be able to catch errors. State changes always + // happen from sink to source, so we do this after chaining up. + self.change_source_state(element, transition, true); // Ignore parent state change return to prevent spurious async/no-preroll return values // due to core state change bugs @@ -762,10 +816,14 @@ impl ElementImpl for FallbackSrc { } } - fn send_event(&self, _element: &Self::Type, event: gst::Event) -> bool { + fn send_event(&self, element: &Self::Type, event: gst::Event) -> bool { match event.view() { gst::EventView::Eos(..) => { - gst::debug!(CAT, "Handling element-level EOS, forwarding to all streams"); + gst::debug!( + CAT, + obj: element, + "Handling element-level EOS, forwarding to all streams" + ); let mut state_guard = self.state.lock(); let state = match &mut *state_guard { @@ -776,34 +834,42 @@ impl ElementImpl for FallbackSrc { }; // We don't want to hold the state lock while pushing out EOS - let mut send_eos_elements: Vec = vec![]; - let mut send_eos_pads: Vec = vec![]; + let mut send_eos_elements = vec![]; + let mut send_eos_pads = vec![]; - send_eos_elements.push(state.source.clone()); + send_eos_elements.push(state.source.source.clone()); - for stream in [&mut state.video_stream, &mut state.audio_stream] + // Not strictly necessary as the switch will EOS when receiving + // EOS on its primary pad, just good form. + if let Some(ref source) = state.fallback_source { + send_eos_elements.push(source.source.clone()); + } + if let Some(ref source) = state.audio_dummy_source { + send_eos_elements.push(source.clone()); + } + if let Some(ref source) = state.video_dummy_source { + send_eos_elements.push(source.clone()); + } + + for branch in [&mut state.video_stream, &mut state.audio_stream] .iter_mut() .filter_map(|v| v.as_mut()) + .flat_map(|s| [s.main_branch.as_mut(), s.fallback_branch.as_mut()]) + .flatten() { - // Not strictly necessary as the switch will EOS when receiving - // EOS on its primary pad, just good form. - send_eos_elements.push(stream.fallback_input.clone()); // If our source hadn't been connected to the switch as a primary // stream, we need to send EOS there ourselves - if stream.source_srcpad.is_none() { - let clocksync_queue_sinkpad = - stream.clocksync_queue.static_pad("sink").unwrap(); - send_eos_pads.push(clocksync_queue_sinkpad.clone()); - } + let queue_sinkpad = branch.queue.static_pad("sink").unwrap(); + send_eos_pads.push(queue_sinkpad.clone()); } drop(state_guard); - for elem in send_eos_elements.drain(..) { + for elem in send_eos_elements { elem.send_event(event.clone()); } - for pad in send_eos_pads.drain(..) { + for pad in send_eos_pads { pad.send_event(event.clone()); } @@ -841,12 +907,128 @@ impl BinImpl for FallbackSrc { } impl FallbackSrc { + fn create_dummy_audio_source(filter_caps: &gst::Caps, min_latency: gst::ClockTime) -> gst::Bin { + let bin = gst::Bin::new(None); + + let audiotestsrc = gst::ElementFactory::make("audiotestsrc", Some("audiosrc")) + .expect("No audiotestsrc found"); + + let audioconvert = gst::ElementFactory::make("audioconvert", Some("audio_audioconvert")) + .expect("No audioconvert found"); + + let audioresample = gst::ElementFactory::make("audioresample", Some("audio_audioresample")) + .expect("No audioresample found"); + + let capsfilter = gst::ElementFactory::make("capsfilter", Some("audio_capsfilter")) + .expect("No capsfilter found"); + + let queue = gst::ElementFactory::make("queue", None).expect("No queue found"); + + audiotestsrc.set_property_from_str("wave", "silence"); + audiotestsrc.set_property("is-live", true); + + capsfilter.set_property("caps", filter_caps); + + queue.set_properties(&[ + ("max-size-bytes", &0u32), + ("max-size-buffers", &0u32), + ( + "max-size-time", + &(cmp::max(min_latency, gst::ClockTime::from_seconds(1))), + ), + ]); + + bin.add_many(&[ + &audiotestsrc, + &audioconvert, + &audioresample, + &capsfilter, + &queue, + ]) + .unwrap(); + + gst::Element::link_many(&[ + &audiotestsrc, + &audioconvert, + &audioresample, + &capsfilter, + &queue, + ]) + .unwrap(); + + let ghostpad = + gst::GhostPad::with_target(Some("src"), &queue.static_pad("src").unwrap()).unwrap(); + ghostpad.set_active(true).unwrap(); + bin.add_pad(&ghostpad).unwrap(); + + bin + } + + fn create_dummy_video_source(filter_caps: &gst::Caps, min_latency: gst::ClockTime) -> gst::Bin { + let bin = gst::Bin::new(None); + + let videotestsrc = gst::ElementFactory::make("videotestsrc", Some("videosrc")) + .expect("No videotestsrc found"); + + let videoconvert = gst::ElementFactory::make("videoconvert", Some("video_videoconvert")) + .expect("No videoconvert found"); + + let videoscale = gst::ElementFactory::make("videoscale", Some("video_videoscale")) + .expect("No videoscale found"); + + let capsfilter = gst::ElementFactory::make("capsfilter", Some("video_capsfilter")) + .expect("No capsfilter found"); + + let queue = gst::ElementFactory::make("queue", None).expect("No queue found"); + + videotestsrc.set_property_from_str("pattern", "black"); + videotestsrc.set_property("is-live", true); + + capsfilter.set_property("caps", filter_caps); + + queue.set_properties(&[ + ("max-size-bytes", &0u32), + ("max-size-buffers", &0u32), + ( + "max-size-time", + &(cmp::max(min_latency, gst::ClockTime::from_seconds(1))), + ), + ]); + + bin.add_many(&[ + &videotestsrc, + &videoconvert, + &videoscale, + &capsfilter, + &queue, + ]) + .unwrap(); + + gst::Element::link_many(&[ + &videotestsrc, + &videoconvert, + &videoscale, + &capsfilter, + &queue, + ]) + .unwrap(); + + let ghostpad = + gst::GhostPad::with_target(Some("src"), &queue.static_pad("src").unwrap()).unwrap(); + ghostpad.set_active(true).unwrap(); + bin.add_pad(&ghostpad).unwrap(); + + bin + } + fn create_main_input( &self, element: &super::FallbackSrc, source: &Source, buffer_duration: i64, - ) -> gst::Element { + ) -> SourceBin { + let bin = gst::Bin::new(None); + let source = match source { Source::Uri(ref uri) => { let source = gst::ElementFactory::make("uridecodebin3", Some("uridecodebin")) @@ -863,12 +1045,14 @@ impl FallbackSrc { Source::Element(ref source) => CustomSource::new(source).upcast(), }; + bin.add(&source).unwrap(); + // Handle any async state changes internally, they don't affect the pipeline because we // convert everything to a live stream - source.set_property("async-handling", true); + bin.set_property("async-handling", true); // Don't let the bin handle state changes of the source. We want to do it manually to catch // possible errors and retry, without causing the whole bin state change to fail - source.set_locked_state(true); + bin.set_locked_state(true); let element_weak = element.downgrade(); source.connect_pad_added(move |_, pad| { @@ -878,7 +1062,7 @@ impl FallbackSrc { }; let src = element.imp(); - if let Err(msg) = src.handle_source_pad_added(&element, pad) { + if let Err(msg) = src.handle_source_pad_added(&element, pad, false) { element.post_error_message(msg); } }); @@ -890,42 +1074,88 @@ impl FallbackSrc { }; let src = element.imp(); - src.handle_source_pad_removed(&element, pad); + src.handle_source_pad_removed(&element, pad, false); }); - element.add_many(&[&source]).unwrap(); + element.add(&bin).unwrap(); - source + SourceBin { + source: bin, + pending_restart: false, + is_live: false, + is_image: false, + restart_timeout: None, + pending_restart_timeout: None, + retry_timeout: None, + streams: None, + } } - fn create_fallback_video_input( + fn create_fallback_input( &self, - _element: &super::FallbackSrc, - min_latency: gst::ClockTime, + element: &super::FallbackSrc, fallback_uri: Option<&str>, - ) -> gst::Element { - VideoFallbackSource::new(fallback_uri, min_latency).upcast() - } + buffer_duration: i64, + ) -> Option { + let source: gst::Element = match fallback_uri { + Some(uri) => { + let dbin = gst::ElementFactory::make("uridecodebin3", Some("uridecodebin")) + .expect("No uridecodebin3 found"); + dbin.set_property("uri", uri); + dbin.set_property("use-buffering", true); + dbin.set_property("buffer-duration", buffer_duration); - fn create_fallback_audio_input(&self, _element: &super::FallbackSrc) -> gst::Element { - let input = gst::Bin::new(Some("fallback_audio")); - let audiotestsrc = gst::ElementFactory::make("audiotestsrc", Some("fallback_audiosrc")) - .expect("No audiotestsrc found"); - input.add_many(&[&audiotestsrc]).unwrap(); + dbin + } + None => return None, + }; - audiotestsrc.set_property_from_str("wave", "silence"); - audiotestsrc.set_property("is-live", true); + let bin = gst::Bin::new(None); - let srcpad = audiotestsrc.static_pad("src").unwrap(); - input - .add_pad( - &gst::GhostPad::builder(Some("src"), gst::PadDirection::Src) - .build_with_target(&srcpad) - .unwrap(), - ) - .unwrap(); + bin.add(&source).unwrap(); - input.upcast() + let element_weak = element.downgrade(); + source.connect_pad_added(move |_, pad| { + let element = match element_weak.upgrade() { + None => return, + Some(element) => element, + }; + let src = FallbackSrc::from_instance(&element); + + if let Err(msg) = src.handle_source_pad_added(&element, pad, true) { + element.post_error_message(msg); + } + }); + let element_weak = element.downgrade(); + source.connect_pad_removed(move |_, pad| { + let element = match element_weak.upgrade() { + None => return, + Some(element) => element, + }; + let src = FallbackSrc::from_instance(&element); + + src.handle_source_pad_removed(&element, pad, true); + }); + + // Handle any async state changes internally, they don't affect the pipeline because we + // convert everything to a live stream + bin.set_property("async-handling", true); + // Don't let the bin handle state changes of the dbin. We want to do it manually to catch + // possible errors and retry, without causing the whole bin state change to fail + bin.set_locked_state(true); + + element.add(&bin).unwrap(); + + Some(SourceBin { + source: bin, + pending_restart: false, + is_live: false, + is_image: false, + restart_timeout: None, + pending_restart_timeout: None, + retry_timeout: None, + streams: None, + }) } #[allow(clippy::too_many_arguments)] @@ -935,65 +1165,23 @@ impl FallbackSrc { timeout: gst::ClockTime, min_latency: gst::ClockTime, is_audio: bool, - fallback_uri: Option<&str>, immediate_fallback: bool, - fallback_caps: &gst::Caps, + dummy_source: &gst::Bin, + filter_caps: &gst::Caps, ) -> Stream { - let fallback_input = if is_audio { - self.create_fallback_audio_input(element) - } else { - self.create_fallback_video_input(element, min_latency, fallback_uri) - }; - - let fallback_capsfilter = - gst::ElementFactory::make("capsfilter", None).expect("No capsfilter found"); - fallback_capsfilter.set_property("caps", fallback_caps); - let switch = gst::ElementFactory::make("fallbackswitch", None).expect("No fallbackswitch found"); - let clocksync = gst::ElementFactory::make("clocksync", None) - .or_else(|_| -> Result<_, glib::BoolError> { - let identity = gst::ElementFactory::make("identity", None)?; - identity.set_property("sync", true); - Ok(identity) - }) - .expect("No clocksync or identity found"); - // Workaround for issues caused by https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/800 - let clocksync_queue = gst::ElementFactory::make("queue", None).expect("No queue found"); - clocksync_queue.set_properties(&[ - ("max-size-buffers", &0u32), - ("max-size-bytes", &0u32), - ("max-size-time", &gst::ClockTime::SECOND), - ]); - - element - .add_many(&[ - &fallback_input, - &fallback_capsfilter, - &switch, - &clocksync_queue, - &clocksync, - ]) - .unwrap(); - fallback_input.link(&fallback_capsfilter).unwrap(); + element.add(&switch).unwrap(); switch.set_property("timeout", timeout.nseconds()); switch.set_property("min-upstream-latency", min_latency.nseconds()); switch.set_property("immediate-fallback", immediate_fallback); - gst::Element::link_pads(&clocksync_queue, Some("src"), &clocksync, Some("sink")).unwrap(); - - let clocksync_srcpad = clocksync.static_pad("src").unwrap(); - let switch_mainsink = switch.request_pad_simple("sink_%u").unwrap(); - clocksync_srcpad.link(&switch_mainsink).unwrap(); - switch_mainsink.set_property("priority", 0u32); - // clocksync_queue sink pad is not connected to anything yet at this point! - - let fallback_srcpad = fallback_capsfilter.static_pad("src").unwrap(); - let switch_fallbacksink = switch.request_pad_simple("sink_%u").unwrap(); - fallback_srcpad.link(&switch_fallbacksink).unwrap(); - switch_fallbacksink.set_property("priority", 1u32); + let dummy_srcpad = dummy_source.static_pad("src").unwrap(); + let dummy_sinkpad = switch.request_pad_simple("sink_%u").unwrap(); + dummy_srcpad.link(&dummy_sinkpad).unwrap(); + dummy_sinkpad.set_property("priority", 2u32); let element_weak = element.downgrade(); switch.connect_notify(Some("active-pad"), move |_switch, _pspec| { @@ -1003,7 +1191,7 @@ impl FallbackSrc { }; let src = element.imp(); - src.handle_switch_active_pad_change(&element); + src.handle_switch_active_pad_change(&element, is_audio); }); let srcpad = switch.static_pad("src").unwrap(); @@ -1026,19 +1214,16 @@ impl FallbackSrc { .build_with_target(&srcpad) .unwrap(); + let _ = ghostpad.set_active(true); + element.add_pad(&ghostpad).unwrap(); Stream { - fallback_input, - fallback_capsfilter, - source_srcpad: None, - source_srcpad_block: None, - clocksync, - imagefreeze: None, - clocksync_queue_srcpad: clocksync_queue.static_pad("src").unwrap(), - clocksync_queue, + main_branch: None, + fallback_branch: None, switch, srcpad: ghostpad.upcast(), + filter_caps: filter_caps.clone(), } } @@ -1074,62 +1259,77 @@ impl FallbackSrc { // Create main input let source = self.create_main_input(element, &configured_source, settings.buffer_duration); + // Create fallback input + let fallback_source = + self.create_fallback_input(element, fallback_uri.as_deref(), settings.buffer_duration); + let mut flow_combiner = gst_base::UniqueFlowCombiner::new(); - // Create video stream - let video_stream = if settings.enable_video { + // Create video stream and video dummy input + let (video_stream, video_dummy_source) = if settings.enable_video { + let video_dummy_source = Self::create_dummy_video_source( + &settings.fallback_video_caps, + settings.min_latency, + ); + element.add(&video_dummy_source).unwrap(); + let stream = self.create_stream( element, settings.timeout, settings.min_latency, false, - fallback_uri.as_deref(), settings.immediate_fallback, + &video_dummy_source, &settings.fallback_video_caps, ); flow_combiner.add_pad(&stream.srcpad); - Some(stream) + + (Some(stream), Some(video_dummy_source)) } else { - None + (None, None) }; - // Create audio stream - let audio_stream = if settings.enable_audio { + // Create audio stream and out dummy input + let (audio_stream, audio_dummy_source) = if settings.enable_audio { + let audio_dummy_source = Self::create_dummy_audio_source( + &settings.fallback_audio_caps, + settings.min_latency, + ); + element.add(&audio_dummy_source).unwrap(); + let stream = self.create_stream( element, settings.timeout, settings.min_latency, true, - None, settings.immediate_fallback, + &audio_dummy_source, &settings.fallback_audio_caps, ); flow_combiner.add_pad(&stream.srcpad); - Some(stream) + + (Some(stream), Some(audio_dummy_source)) } else { - None + (None, None) }; let manually_blocked = settings.manual_unblock; *state_guard = Some(State { source, - source_is_live: false, - source_pending_restart: false, - source_restart_timeout: None, - source_pending_restart_timeout: None, - source_retry_timeout: None, + fallback_source, video_stream, audio_stream, + audio_dummy_source, + video_dummy_source, flow_combiner, last_buffering_update: None, - streams: None, + fallback_last_buffering_update: None, settings, configured_source, stats: Stats::default(), manually_blocked, schedule_restart_on_unblock: false, - is_image: false, }); drop(state_guard); @@ -1159,11 +1359,21 @@ impl FallbackSrc { .iter() .filter_map(|v| v.as_ref()) { + for branch in [&stream.main_branch, &stream.fallback_branch] + .iter() + .filter_map(|v| v.as_ref()) + { + element.remove(&branch.queue).unwrap(); + element.remove(&branch.converters).unwrap(); + element.remove(&branch.clocksync).unwrap(); + if let Some(ref imagefreeze) = branch.imagefreeze { + element.remove(imagefreeze).unwrap(); + } + if branch.switch_pad.parent().as_ref() == Some(stream.switch.upcast_ref()) { + stream.switch.release_request_pad(&branch.switch_pad); + } + } element.remove(&stream.switch).unwrap(); - element.remove(&stream.clocksync_queue).unwrap(); - element.remove(&stream.clocksync).unwrap(); - element.remove(&stream.fallback_capsfilter).unwrap(); - element.remove(&stream.fallback_input).unwrap(); let _ = stream.srcpad.set_target(None::<&gst::Pad>); let _ = element.remove_pad(&stream.srcpad); } @@ -1173,59 +1383,100 @@ impl FallbackSrc { if let Source::Element(ref source) = state.configured_source { // Explicitly remove the source element from the CustomSource so that we can // later create a new CustomSource and add it again there. - if source.has_as_parent(&state.source) { + if source.has_as_parent(&state.source.source) { let _ = source.set_state(gst::State::Null); let _ = state + .source .source .downcast_ref::() .unwrap() .remove(source); } } - element.remove(&state.source).unwrap(); - if let Some(timeout) = state.source_pending_restart_timeout.take() { - timeout.unschedule(); + for source in [Some(&mut state.source), state.fallback_source.as_mut()] + .iter_mut() + .flatten() + { + element.remove(&source.source).unwrap(); + + if let Some(timeout) = source.pending_restart_timeout.take() { + timeout.unschedule(); + } + + if let Some(timeout) = source.retry_timeout.take() { + timeout.unschedule(); + } + + if let Some(timeout) = source.restart_timeout.take() { + timeout.unschedule(); + } } - if let Some(timeout) = state.source_retry_timeout.take() { - timeout.unschedule(); - } - - if let Some(timeout) = state.source_restart_timeout.take() { - timeout.unschedule(); + for source in [ + state.video_dummy_source.take(), + state.audio_dummy_source.take(), + ] + .iter() + .flatten() + { + let _ = source.set_state(gst::State::Null); + element.remove(source).unwrap(); } gst::debug!(CAT, obj: element, "Stopped"); } - fn change_source_state(&self, element: &super::FallbackSrc, transition: gst::StateChange) { - gst::debug!(CAT, obj: element, "Changing source state: {:?}", transition); + fn change_source_state( + &self, + element: &super::FallbackSrc, + transition: gst::StateChange, + fallback_source: bool, + ) { + gst::debug!( + CAT, + obj: element, + "Changing {}source state: {:?}", + if fallback_source { "fallback " } else { "" }, + transition + ); let mut state_guard = self.state.lock(); let state = match &mut *state_guard { Some(state) => state, None => return, }; - if transition.current() <= transition.next() && state.source_pending_restart { + let mut source = if fallback_source { + if let Some(ref mut source) = state.fallback_source { + source + } else { + return; + } + } else { + &mut state.source + }; + + if transition.current() <= transition.next() && source.pending_restart { gst::debug!( CAT, obj: element, - "Not starting source because pending restart" + "Not starting {}source because pending restart", + if fallback_source { "fallback " } else { "" } ); return; - } else if transition.next() <= gst::State::Ready && state.source_pending_restart { + } else if transition.next() <= gst::State::Ready && source.pending_restart { gst::debug!( CAT, obj: element, - "Unsetting pending restart because shutting down" + "Unsetting pending {}restart because shutting down", + if fallback_source { "fallback " } else { "" } ); - state.source_pending_restart = false; - if let Some(timeout) = state.source_pending_restart_timeout.take() { + source.pending_restart = false; + if let Some(timeout) = source.pending_restart_timeout.take() { timeout.unschedule(); } } - let source = state.source.clone(); + let source = source.source.clone(); drop(state_guard); element.notify("status"); @@ -1233,13 +1484,23 @@ impl FallbackSrc { let res = source.set_state(transition.next()); match res { Err(_) => { - gst::error!(CAT, obj: element, "Source failed to change state"); + gst::error!( + CAT, + obj: element, + "{}source failed to change state", + if fallback_source { "fallback " } else { "" } + ); // Try again later if we're not shutting down if transition != gst::StateChange::ReadyToNull { let _ = source.set_state(gst::State::Null); let mut state_guard = self.state.lock(); let state = state_guard.as_mut().expect("no state"); - self.handle_source_error(element, state, RetryReason::StateChangeFailure); + self.handle_source_error( + element, + state, + RetryReason::StateChangeFailure, + fallback_source, + ); drop(state_guard); element.notify("statistics"); } @@ -1248,24 +1509,57 @@ impl FallbackSrc { gst::debug!( CAT, obj: element, - "Source changed state successfully: {:?}", + "{}source changed state successfully: {:?}", + if fallback_source { "fallback " } else { "" }, res ); let mut state_guard = self.state.lock(); let state = state_guard.as_mut().expect("no state"); + let source = if fallback_source { + if let Some(ref mut source) = state.fallback_source { + source + } else { + return; + } + } else { + &mut state.source + }; + // Remember if the source is live if transition == gst::StateChange::ReadyToPaused { - state.source_is_live = res == gst::StateChangeSuccess::NoPreroll; + source.is_live = res == gst::StateChangeSuccess::NoPreroll; } - if (state.source_is_live && transition == gst::StateChange::ReadyToPaused) - || (!state.source_is_live && transition == gst::StateChange::PausedToPlaying) + if (!source.is_live && transition == gst::StateChange::ReadyToPaused) + || (source.is_live && transition == gst::StateChange::PausedToPlaying) { - assert!(state.source_restart_timeout.is_none()); - state.schedule_restart_on_unblock = true; - self.schedule_source_restart_timeout(element, state, gst::ClockTime::ZERO); + if !fallback_source { + state.schedule_restart_on_unblock = true; + } + if source.restart_timeout.is_none() { + self.schedule_source_restart_timeout( + element, + state, + gst::ClockTime::ZERO, + fallback_source, + ); + } + } else if (!source.is_live && transition == gst::StateChange::PausedToReady) + || (source.is_live && transition == gst::StateChange::PlayingToPaused) + { + if let Some(timeout) = source.pending_restart_timeout.take() { + timeout.unschedule(); + } + + if let Some(timeout) = source.retry_timeout.take() { + timeout.unschedule(); + } + + if let Some(timeout) = source.restart_timeout.take() { + timeout.unschedule(); + } } } } @@ -1292,8 +1586,27 @@ impl FallbackSrc { &self, element: &super::FallbackSrc, pad: &gst::Pad, + fallback_source: bool, ) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, obj: element, "Pad {} added to source", pad.name(),); + gst::debug!( + CAT, + obj: element, + "Pad {} added to {}source", + pad.name(), + if fallback_source { "fallback " } else { "" } + ); + + let mut is_image = false; + + if let Some(ev) = pad.sticky_event::(0) { + let stream = ev.stream(); + + if let Some(caps) = stream.and_then(|s| s.caps()) { + if let Some(s) = caps.structure(0) { + is_image = s.name().starts_with("image/"); + } + } + } let mut state_guard = self.state.lock(); let state = match &mut *state_guard { @@ -1303,39 +1616,35 @@ impl FallbackSrc { Some(state) => state, }; - let mut is_image = false; - - if let Some(ev) = pad.sticky_event::(0) { - let stream = ev.stream(); - - if let Some(stream) = stream { - if let Some(caps) = stream.caps() { - if let Some(s) = caps.structure(0) { - is_image = s.name().starts_with("image/"); - } - } + let source = if fallback_source { + if let Some(ref mut source) = state.fallback_source { + source + } else { + return Ok(()); } - } + } else { + &mut state.source + }; if is_image { - if let Some(timeout) = state.source_pending_restart_timeout.take() { + if let Some(timeout) = source.pending_restart_timeout.take() { timeout.unschedule(); } - if let Some(timeout) = state.source_retry_timeout.take() { + if let Some(timeout) = source.retry_timeout.take() { timeout.unschedule(); } - if let Some(timeout) = state.source_restart_timeout.take() { + if let Some(timeout) = source.restart_timeout.take() { timeout.unschedule(); } } - state.is_image |= is_image; + source.is_image |= is_image; let (is_video, stream) = match pad.name() { - x if x.starts_with("audio_") => (false, &mut state.audio_stream), - x if x.starts_with("video_") => (true, &mut state.video_stream), + x if x.starts_with("audio") => (false, &mut state.audio_stream), + x if x.starts_with("video") => (true, &mut state.video_stream), _ => { let caps = match pad.current_caps().unwrap_or_else(|| pad.query_caps(None)) { caps if !caps.is_any() && !caps.is_empty() => caps, @@ -1357,34 +1666,151 @@ impl FallbackSrc { let type_ = if is_video { "video" } else { "audio" }; - let stream = match stream { + let (branch_storage, filter_caps, switch) = match stream { None => { gst::debug!(CAT, obj: element, "No {} stream enabled", type_); return Ok(()); } Some(Stream { - source_srcpad: Some(_), + ref mut main_branch, + ref switch, + ref filter_caps, + .. + }) if !fallback_source => { + if main_branch.is_some() { + gst::debug!(CAT, obj: element, "Already configured a {} stream", type_); + return Ok(()); + } + + (main_branch, filter_caps, switch) + } + Some(Stream { + ref mut fallback_branch, + ref switch, + ref filter_caps, .. }) => { - gst::debug!(CAT, obj: element, "Already configured a {} stream", type_); - return Ok(()); + if fallback_branch.is_some() { + gst::debug!( + CAT, + obj: element, + "Already configured a {} fallback stream", + type_ + ); + return Ok(()); + } + + (fallback_branch, filter_caps, switch) } - Some(ref mut stream) => stream, }; - let sinkpad = if is_image { - let imagefreeze = if let Some(ref imagefreeze) = stream.imagefreeze { - imagefreeze - } else { - let imagefreeze = - gst::ElementFactory::make("imagefreeze", None).expect("no imagefreeze found"); + let converters = if is_video { + let bin = gst::Bin::new(None); - gst::debug!(CAT, "image stream, inserting imagefreeze"); - element.add(&imagefreeze).unwrap(); - imagefreeze.set_property("is-live", true); - stream.imagefreeze = Some(imagefreeze); - stream.imagefreeze.as_ref().unwrap() - }; + let videoconvert = + gst::ElementFactory::make("videoconvert", Some("video_videoconvert")) + .expect("No videoconvert found"); + + let videoscale = gst::ElementFactory::make("videoscale", Some("video_videoscale")) + .expect("No videoscale found"); + + let capsfilter = gst::ElementFactory::make("capsfilter", Some("video_capsfilter")) + .expect("No capsfilter found"); + + capsfilter.set_property("caps", filter_caps); + + bin.add_many(&[&videoconvert, &videoscale, &capsfilter]) + .unwrap(); + + gst::Element::link_many(&[&videoconvert, &videoscale, &capsfilter]).unwrap(); + + let ghostpad = + gst::GhostPad::with_target(Some("sink"), &videoconvert.static_pad("sink").unwrap()) + .unwrap(); + ghostpad.set_active(true).unwrap(); + bin.add_pad(&ghostpad).unwrap(); + + let ghostpad = + gst::GhostPad::with_target(Some("src"), &capsfilter.static_pad("src").unwrap()) + .unwrap(); + ghostpad.set_active(true).unwrap(); + bin.add_pad(&ghostpad).unwrap(); + + bin.upcast() + } else { + let bin = gst::Bin::new(None); + + let audioconvert = + gst::ElementFactory::make("audioconvert", Some("audio_audioconvert")) + .expect("No audioconvert found"); + + let audioresample = + gst::ElementFactory::make("audioresample", Some("audio_audioresample")) + .expect("No audioresample found"); + + let capsfilter = gst::ElementFactory::make("capsfilter", Some("audio_capsfilter")) + .expect("No capsfilter found"); + + capsfilter.set_property("caps", filter_caps); + + bin.add_many(&[&audioconvert, &audioresample, &capsfilter]) + .unwrap(); + + gst::Element::link_many(&[&audioconvert, &audioresample, &capsfilter]).unwrap(); + + let ghostpad = + gst::GhostPad::with_target(Some("sink"), &audioconvert.static_pad("sink").unwrap()) + .unwrap(); + ghostpad.set_active(true).unwrap(); + bin.add_pad(&ghostpad).unwrap(); + + let ghostpad = + gst::GhostPad::with_target(Some("src"), &capsfilter.static_pad("src").unwrap()) + .unwrap(); + ghostpad.set_active(true).unwrap(); + bin.add_pad(&ghostpad).unwrap(); + + bin.upcast() + }; + + let queue = gst::ElementFactory::make("queue", None).unwrap(); + queue.set_properties(&[ + ("max-size-bytes", &0u32), + ("max-size-buffers", &0u32), + ( + "max-size-time", + &(cmp::max(state.settings.min_latency, gst::ClockTime::from_seconds(1))), + ), + ]); + let clocksync = gst::ElementFactory::make("clocksync", None).unwrap_or_else(|_| { + let identity = gst::ElementFactory::make("identity", None).unwrap(); + identity.set_property("sync", true); + identity + }); + + source + .source + .add_many(&[&converters, &queue, &clocksync]) + .unwrap(); + converters.sync_state_with_parent().unwrap(); + queue.sync_state_with_parent().unwrap(); + clocksync.sync_state_with_parent().unwrap(); + + let sinkpad = converters.static_pad("sink").unwrap(); + pad.link(&sinkpad).map_err(|err| { + gst::error!(CAT, obj: element, "Failed to link new source pad: {}", err); + gst::error_msg!( + gst::CoreError::Negotiation, + ["Failed to link new source pad: {}", err] + ) + })?; + + let imagefreeze = if is_image { + gst::debug!(CAT, obj: element, "Image stream, inserting imagefreeze"); + let imagefreeze = + gst::ElementFactory::make("imagefreeze", None).expect("no imagefreeze found"); + source.source.add(&imagefreeze).unwrap(); + imagefreeze.set_property("is-live", true); if imagefreeze.sync_state_with_parent().is_err() { gst::error!(CAT, obj: element, "imagefreeze failed to change state",); @@ -1393,30 +1819,23 @@ impl FallbackSrc { ["Failed to change imagefreeze state"] )); } - imagefreeze.link(&stream.clocksync_queue).unwrap(); - imagefreeze.static_pad("sink").unwrap() + converters.link(&imagefreeze).unwrap(); + imagefreeze.link(&queue).unwrap(); + Some(imagefreeze) } else { - if let Some(imagefreeze) = stream.imagefreeze.take() { - imagefreeze.set_locked_state(true); - let _ = imagefreeze.set_state(gst::State::Null); - element.remove(&imagefreeze).unwrap(); - } - - stream.clocksync_queue.static_pad("sink").unwrap() + converters.link(&queue).unwrap(); + None }; - pad.link(&sinkpad).map_err(|err| { - gst::error!( - CAT, - obj: element, - "Failed to link source pad to clocksync: {}", - err - ); - gst::error_msg!( - gst::CoreError::Negotiation, - ["Failed to link source pad to clocksync: {}", err] - ) - })?; + let ghostpad = + gst::GhostPad::with_target(Some(type_), &queue.static_pad("src").unwrap()).unwrap(); + let _ = ghostpad.set_active(true); + source.source.add_pad(&ghostpad).unwrap(); + + // Link the new source pad in + let switch_pad = switch.request_pad_simple("sink_%u").unwrap(); + switch_pad.set_property("priority", if fallback_source { 1u32 } else { 0u32 }); + ghostpad.link(&switch_pad).unwrap(); let element_weak = element.downgrade(); pad.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, move |pad, info| { @@ -1432,7 +1851,8 @@ impl FallbackSrc { gst::debug!( CAT, obj: &element, - "Received EOS from source on pad {}", + "Received EOS from {}source on pad {}", + if fallback_source { "fallback " } else { "" }, pad.name() ); @@ -1447,7 +1867,7 @@ impl FallbackSrc { if is_image { gst::PadProbeReturn::Ok } else if state.settings.restart_on_eos { - src.handle_source_error(&element, state, RetryReason::Eos); + src.handle_source_error(&element, state, RetryReason::Eos, fallback_source); drop(state_guard); element.notify("statistics"); @@ -1460,13 +1880,10 @@ impl FallbackSrc { state.video_stream.as_ref() } } { - if other_stream.source_srcpad.is_none() { - let fallback_input = &other_stream.fallback_input; - let clocksync_queue_sinkpad = - other_stream.clocksync_queue.static_pad("sink").unwrap(); - fallback_input.call_async(move |fallback_input| { - fallback_input.send_event(gst::event::Eos::new()); - clocksync_queue_sinkpad.send_event(gst::event::Eos::new()); + if other_stream.main_branch.is_none() { + let sinkpad = other_stream.switch.static_pad("sink").unwrap(); + element.call_async(move |_| { + sinkpad.send_event(gst::event::Eos::new()); }); } } @@ -1478,9 +1895,20 @@ impl FallbackSrc { } }); - assert!(stream.source_srcpad_block.is_none()); - stream.source_srcpad = Some(pad.clone()); - stream.source_srcpad_block = Some(self.add_pad_probe(element, stream)); + let queue_srcpad = queue.static_pad("src").unwrap(); + let source_srcpad_block = + Some(self.add_pad_probe(element, pad, &queue_srcpad, fallback_source)); + + *branch_storage = Some(StreamBranch { + source_srcpad: pad.clone(), + source_srcpad_block, + imagefreeze, + clocksync, + converters, + queue, + queue_srcpad, + switch_pad, + }); drop(state_guard); element.notify("status"); @@ -1488,7 +1916,13 @@ impl FallbackSrc { Ok(()) } - fn add_pad_probe(&self, element: &super::FallbackSrc, stream: &mut Stream) -> Block { + fn add_pad_probe( + &self, + element: &super::FallbackSrc, + pad: &gst::Pad, + block_pad: &gst::Pad, + fallback_source: bool, + ) -> Block { // FIXME: Not literally correct as we add the probe to the queue source pad but that's only // a workaround until // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/800 @@ -1496,13 +1930,14 @@ impl FallbackSrc { gst::debug!( CAT, obj: element, - "Adding probe to pad {}", - stream.source_srcpad.as_ref().unwrap().name() + "Adding blocking probe to pad {} for pad {} (fallback: {})", + block_pad.name(), + pad.name(), + fallback_source, ); let element_weak = element.downgrade(); - let probe_id = stream - .clocksync_queue_srcpad + let probe_id = block_pad .add_probe( gst::PadProbeType::BLOCK | gst::PadProbeType::BUFFER @@ -1523,7 +1958,7 @@ impl FallbackSrc { let src = element.imp(); - if let Err(msg) = src.handle_pad_blocked(&element, pad, pts) { + if let Err(msg) = src.handle_pad_blocked(&element, pad, pts, fallback_source) { element.post_error_message(msg); } @@ -1533,7 +1968,7 @@ impl FallbackSrc { .unwrap(); Block { - pad: stream.clocksync_queue_srcpad.clone(), + pad: block_pad.clone(), probe_id, running_time: gst::ClockTime::NONE, } @@ -1544,6 +1979,7 @@ impl FallbackSrc { element: &super::FallbackSrc, pad: &gst::Pad, pts: impl Into>, + fallback_source: bool, ) -> Result<(), gst::ErrorMessage> { let mut state_guard = self.state.lock(); let state = match &mut *state_guard { @@ -1553,57 +1989,100 @@ impl FallbackSrc { Some(state) => state, }; - // FIXME: Not literally correct as we added the probe to the queue source pad but that's only - // a workaround until - // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/800 - // is fixed. - - let stream = if let Some(stream) = state - .audio_stream - .as_mut() - .filter(|s| &s.clocksync_queue_srcpad == pad) - { - gst::debug!( - CAT, - obj: element, - "Called probe on pad {}", - stream.source_srcpad.as_ref().unwrap().name() - ); - stream - } else if let Some(stream) = state - .video_stream - .as_mut() - .filter(|s| &s.clocksync_queue_srcpad == pad) - { - gst::debug!( - CAT, - obj: element, - "Called probe on pad {}", - stream.source_srcpad.as_ref().unwrap().name() - ); - stream - } else { - unreachable!(); - }; - - // Directly unblock for live streams - if state.source_is_live { - for (source_srcpad, block) in [state.video_stream.as_mut(), state.audio_stream.as_mut()] - .iter_mut() - .filter_map(|s| s.as_mut()) - .filter_map(|s| { - if let Some(block) = s.source_srcpad_block.take() { - Some((s.source_srcpad.as_ref().unwrap(), block)) - } else { - None - } - }) - { + let (branch, source) = match &mut *state { + State { + audio_stream: + Some(Stream { + main_branch: Some(ref mut branch), + .. + }), + ref source, + .. + } if !fallback_source && &branch.queue_srcpad == pad => { gst::debug!( CAT, obj: element, - "Removing pad probe for pad {}", - source_srcpad.name() + "Called probe on pad {} for pad {} (fallback: {})", + pad.name(), + branch.source_srcpad.name(), + fallback_source + ); + + (branch, source) + } + State { + audio_stream: + Some(Stream { + fallback_branch: Some(ref mut branch), + .. + }), + fallback_source: Some(ref source), + .. + } if fallback_source && &branch.queue_srcpad == pad => { + gst::debug!( + CAT, + obj: element, + "Called probe on pad {} for pad {} (fallback: {})", + pad.name(), + branch.source_srcpad.name(), + fallback_source + ); + + (branch, source) + } + State { + video_stream: + Some(Stream { + main_branch: Some(ref mut branch), + .. + }), + ref source, + .. + } if !fallback_source && &branch.queue_srcpad == pad => { + gst::debug!( + CAT, + obj: element, + "Called probe on pad {} for pad {} (fallback: {})", + pad.name(), + branch.source_srcpad.name(), + fallback_source, + ); + + (branch, source) + } + State { + video_stream: + Some(Stream { + fallback_branch: Some(ref mut branch), + .. + }), + fallback_source: Some(ref source), + .. + } if fallback_source && &branch.queue_srcpad == pad => { + gst::debug!( + CAT, + obj: element, + "Called probe on pad {} for pad {} (fallback: {})", + pad.name(), + branch.source_srcpad.name(), + fallback_source + ); + + (branch, source) + } + _ => unreachable!(), + }; + + // Directly unblock for live streams + if source.is_live { + if let Some(block) = branch.source_srcpad_block.take() { + gst::debug!( + CAT, + obj: element, + "Removing pad probe on pad {} for pad {} (fallback: {})", + pad.name(), + branch.source_srcpad.name(), + fallback_source, ); block.pad.remove_probe(block.probe_id); } @@ -1617,7 +2096,7 @@ impl FallbackSrc { } // Update running time for this block - let block = match stream.source_srcpad_block { + let block = match branch.source_srcpad_block { Some(ref mut block) => block, None => return Ok(()), }; @@ -1655,7 +2134,7 @@ impl FallbackSrc { block.running_time = running_time; - self.unblock_pads(element, state); + self.unblock_pads(element, state, fallback_source); drop(state_guard); element.notify("status"); @@ -1663,15 +2142,25 @@ impl FallbackSrc { Ok(()) } - fn unblock_pads(&self, element: &super::FallbackSrc, state: &mut State) { - if state.manually_blocked { + fn unblock_pads(&self, element: &super::FallbackSrc, state: &mut State, fallback_source: bool) { + let current_running_time = match element.current_running_time() { + Some(current_running_time) => current_running_time, + None => { + gst::debug!(CAT, obj: element, "Waiting for current_running_time"); + return; + } + }; + + if !fallback_source && state.manually_blocked { gst::debug!(CAT, obj: element, "Not unblocking yet: manual unblock",); return; } // Check if all streams are blocked and have a running time and we have // 100% buffering - if state.stats.buffering_percent < 100 { + if (fallback_source && state.stats.fallback_buffering_percent < 100) + || (!fallback_source && state.stats.buffering_percent < 100) + { gst::debug!( CAT, obj: element, @@ -1681,7 +2170,18 @@ impl FallbackSrc { return; } - let streams = match state.streams { + let source = if fallback_source { + if let Some(ref source) = state.fallback_source { + source + } else { + // There are no blocked pads if there is no fallback source + return; + } + } else { + &state.source + }; + + let streams = match source.streams { None => { gst::debug!(CAT, obj: element, "Have no stream collection yet"); return; @@ -1695,28 +2195,46 @@ impl FallbackSrc { have_video = have_video || stream.stream_type().contains(gst::StreamType::VIDEO); } - let want_audio = state.settings.enable_audio; - let want_video = state.settings.enable_video; + // For the fallback source, if we have no audio/video then that's OK and we would continue + // using the corresponding dummy source + let want_audio = if fallback_source { + have_audio + } else { + state.settings.enable_audio + }; + let want_video = if fallback_source { + have_video + } else { + state.settings.enable_video + }; - let audio_running_time = state - .audio_stream + // FIXME: All this surely can be simplified somehow + let mut audio_branch = state.audio_stream.as_mut().and_then(|s| { + if fallback_source { + s.fallback_branch.as_mut() + } else { + s.main_branch.as_mut() + } + }); + let mut video_branch = state.video_stream.as_mut().and_then(|s| { + if fallback_source { + s.fallback_branch.as_mut() + } else { + s.main_branch.as_mut() + } + }); + + let audio_running_time = audio_branch .as_ref() - .and_then(|s| s.source_srcpad_block.as_ref()) + .and_then(|b| b.source_srcpad_block.as_ref()) .and_then(|b| b.running_time); - let video_running_time = state - .video_stream + let video_running_time = video_branch .as_ref() - .and_then(|s| s.source_srcpad_block.as_ref()) + .and_then(|b| b.source_srcpad_block.as_ref()) .and_then(|b| b.running_time); - let audio_srcpad = state - .audio_stream - .as_ref() - .and_then(|s| s.source_srcpad.as_ref().cloned()); - let video_srcpad = state - .video_stream - .as_ref() - .and_then(|s| s.source_srcpad.as_ref().cloned()); + let audio_srcpad = audio_branch.as_ref().map(|b| b.source_srcpad.clone()); + let video_srcpad = video_branch.as_ref().map(|b| b.source_srcpad.clone()); let audio_is_eos = audio_srcpad .as_ref() @@ -1731,15 +2249,6 @@ impl FallbackSrc { // Also consider EOS, we'd never get a new running time after EOS so don't need to wait. // FIXME: All this surely can be simplified somehow - // FIXME I guess this could be moved up - let current_running_time = match element.current_running_time() { - Some(current_running_time) => current_running_time, - None => { - gst::debug!(CAT, obj: element, "Waiting for current_running_time"); - return; - } - }; - if have_audio && want_audio && have_video && want_video { if audio_running_time.is_none() && !audio_is_eos @@ -1789,10 +2298,9 @@ impl FallbackSrc { video_is_eos, ); - if let Some(block) = state - .audio_stream + if let Some(block) = audio_branch .as_mut() - .and_then(|s| s.source_srcpad_block.take()) + .and_then(|b| b.source_srcpad_block.take()) { if !audio_is_eos { block.pad.set_offset(offset); @@ -1800,10 +2308,9 @@ impl FallbackSrc { block.pad.remove_probe(block.probe_id); } - if let Some(block) = state - .video_stream + if let Some(block) = video_branch .as_mut() - .and_then(|s| s.source_srcpad_block.take()) + .and_then(|b| b.source_srcpad_block.take()) { if !video_is_eos { block.pad.set_offset(offset); @@ -1835,10 +2342,9 @@ impl FallbackSrc { audio_is_eos ); - if let Some(block) = state - .audio_stream + if let Some(block) = audio_branch .as_mut() - .and_then(|s| s.source_srcpad_block.take()) + .and_then(|b| b.source_srcpad_block.take()) { if !audio_is_eos { block.pad.set_offset(offset); @@ -1870,10 +2376,9 @@ impl FallbackSrc { video_is_eos ); - if let Some(block) = state - .video_stream + if let Some(block) = video_branch .as_mut() - .and_then(|s| s.source_srcpad_block.take()) + .and_then(|b| b.source_srcpad_block.take()) { if !video_is_eos { block.pad.set_offset(offset); @@ -1883,8 +2388,19 @@ impl FallbackSrc { } } - fn handle_source_pad_removed(&self, element: &super::FallbackSrc, pad: &gst::Pad) { - gst::debug!(CAT, obj: element, "Pad {} removed from source", pad.name()); + fn handle_source_pad_removed( + &self, + element: &super::FallbackSrc, + pad: &gst::Pad, + fallback_source: bool, + ) { + gst::debug!( + CAT, + obj: element, + "Pad {} removed from {}source", + pad.name(), + if fallback_source { "fallback " } else { "" } + ); let mut state_guard = self.state.lock(); let state = match &mut *state_guard { @@ -1894,28 +2410,96 @@ impl FallbackSrc { Some(state) => state, }; - // Don't have to do anything here other than forgetting about the pad. Unlinking will - // automatically happen while the pad is being removed from source and thus leaves the - // bin hierarchy - let stream = if let Some(stream) = state - .audio_stream - .as_mut() - .filter(|s| s.source_srcpad.as_ref() == Some(pad)) - { - stream - } else if let Some(stream) = state - .video_stream - .as_mut() - .filter(|s| s.source_srcpad.as_ref() == Some(pad)) - { - stream - } else { - return; + let (mut branch, is_video, source, switch) = match &mut *state { + State { + audio_stream: + Some(Stream { + ref mut main_branch, + ref switch, + .. + }), + ref source, + .. + } if !fallback_source + && main_branch.as_ref().map(|b| &b.source_srcpad) == Some(pad) => + { + (main_branch.take().unwrap(), false, source, switch) + } + State { + audio_stream: + Some(Stream { + ref mut fallback_branch, + ref switch, + .. + }), + fallback_source: Some(ref source), + .. + } if fallback_source + && fallback_branch.as_ref().map(|b| &b.source_srcpad) == Some(pad) => + { + (fallback_branch.take().unwrap(), false, source, switch) + } + State { + video_stream: + Some(Stream { + ref mut main_branch, + ref switch, + .. + }), + ref source, + .. + } if !fallback_source + && main_branch.as_ref().map(|b| &b.source_srcpad) == Some(pad) => + { + (main_branch.take().unwrap(), true, source, switch) + } + State { + video_stream: + Some(Stream { + ref mut fallback_branch, + ref switch, + .. + }), + fallback_source: Some(ref source), + .. + } if fallback_source + && fallback_branch.as_ref().map(|b| &b.source_srcpad) == Some(pad) => + { + (fallback_branch.take().unwrap(), true, source, switch) + } + _ => return, }; - stream.source_srcpad = None; + branch.queue.set_locked_state(true); + let _ = branch.queue.set_state(gst::State::Null); + source.source.remove(&branch.queue).unwrap(); - self.unblock_pads(element, state); + branch.converters.set_locked_state(true); + let _ = branch.converters.set_state(gst::State::Null); + source.source.remove(&branch.converters).unwrap(); + + branch.clocksync.set_locked_state(true); + let _ = branch.clocksync.set_state(gst::State::Null); + source.source.remove(&branch.clocksync).unwrap(); + + if let Some(imagefreeze) = branch.imagefreeze.take() { + imagefreeze.set_locked_state(true); + let _ = imagefreeze.set_state(gst::State::Null); + source.source.remove(&imagefreeze).unwrap(); + } + + if branch.switch_pad.parent().as_ref() == Some(switch.upcast_ref()) { + switch.release_request_pad(&branch.switch_pad); + } + + let ghostpad = source + .source + .static_pad(if is_video { "video" } else { "audio" }) + .unwrap(); + let _ = ghostpad.set_active(false); + source.source.remove_pad(&ghostpad).unwrap(); + + self.unblock_pads(element, state, fallback_source); drop(state_guard); element.notify("status"); @@ -1930,30 +2514,85 @@ impl FallbackSrc { Some(state) => state, }; - if state.source_pending_restart { + let src = match m.src() { + Some(src) => src, + None => return, + }; + + let fallback_source = if let Some(ref source) = state.fallback_source { + src.has_as_ancestor(&source.source) + } else if src.has_as_ancestor(&state.source.source) { + false + } else { + return; + }; + + let source = if fallback_source { + if let Some(ref mut source) = state.fallback_source { + source + } else { + return; + } + } else { + &mut state.source + }; + + if source.pending_restart { gst::debug!(CAT, obj: element, "Has pending restart"); return; } - gst::debug!(CAT, obj: element, "Got buffering {}%", m.percent()); + gst::debug!( + CAT, + obj: element, + "Got buffering {}% (fallback: {})", + m.percent(), + fallback_source + ); - state.stats.buffering_percent = m.percent(); - if state.stats.buffering_percent < 100 { - state.last_buffering_update = Some(Instant::now()); + let buffering_percent = if fallback_source { + &mut state.stats.fallback_buffering_percent + } else { + &mut state.stats.buffering_percent + }; + let last_buffering_update = if fallback_source { + &mut state.fallback_last_buffering_update + } else { + &mut state.last_buffering_update + }; + + *buffering_percent = m.percent(); + if *buffering_percent < 100 { + *last_buffering_update = Some(Instant::now()); // Block source pads if needed to pause - if let Some(ref mut stream) = state.audio_stream { - if stream.source_srcpad_block.is_none() && stream.source_srcpad.is_some() { - stream.source_srcpad_block = Some(self.add_pad_probe(element, stream)); - } - } - if let Some(ref mut stream) = state.video_stream { - if stream.source_srcpad_block.is_none() && stream.source_srcpad.is_some() { - stream.source_srcpad_block = Some(self.add_pad_probe(element, stream)); + for stream in [state.audio_stream.as_mut(), state.video_stream.as_mut()] + .iter_mut() + .flatten() + { + let branch = match stream { + Stream { + main_branch: Some(ref mut branch), + .. + } if !fallback_source => branch, + Stream { + fallback_branch: Some(ref mut branch), + .. + } if fallback_source => branch, + _ => continue, + }; + + if branch.source_srcpad_block.is_none() { + branch.source_srcpad_block = Some(self.add_pad_probe( + element, + &branch.source_srcpad, + &branch.queue_srcpad, + fallback_source, + )); } } } else { // Check if we can unblock now - self.unblock_pads(element, state); + self.unblock_pads(element, state, fallback_source); } drop(state_guard); @@ -1974,13 +2613,27 @@ impl FallbackSrc { Some(state) => state, }; + let src = match m.src() { + Some(src) => src, + None => return, + }; + + let fallback_source = if let Some(ref source) = state.fallback_source { + src.has_as_ancestor(&source.source) + } else if src.has_as_ancestor(&state.source.source) { + false + } else { + return; + }; + let streams = m.stream_collection(); gst::debug!( CAT, obj: element, - "Got stream collection {:?}", - streams.debug() + "Got stream collection {:?} (fallback: {})", + streams.debug(), + fallback_source, ); let mut have_audio = false; @@ -2006,20 +2659,38 @@ impl FallbackSrc { ); } - state.streams = Some(streams); + if fallback_source { + if let Some(ref mut source) = state.fallback_source { + source.streams = Some(streams); + } + } else { + state.source.streams = Some(streams); + } // This might not be the first stream collection and we might have some unblocked pads from // before already, which would need to be blocked again now for keeping things in sync - for stream in [&mut state.video_stream, &mut state.audio_stream] + for branch in [state.video_stream.as_mut(), state.audio_stream.as_mut()] .iter_mut() - .filter_map(|v| v.as_mut()) + .flatten() + .filter_map(|s| { + if fallback_source { + s.fallback_branch.as_mut() + } else { + s.main_branch.as_mut() + } + }) { - if stream.source_srcpad.is_some() && stream.source_srcpad_block.is_none() { - stream.source_srcpad_block = Some(self.add_pad_probe(element, stream)); + if branch.source_srcpad_block.is_none() { + branch.source_srcpad_block = Some(self.add_pad_probe( + element, + &branch.source_srcpad, + &branch.queue_srcpad, + fallback_source, + )); } } - self.unblock_pads(element, state); + self.unblock_pads(element, state, fallback_source); drop(state_guard); element.notify("status"); @@ -2046,41 +2717,21 @@ impl FallbackSrc { src.path_string() ); - if src == state.source || src.has_as_ancestor(&state.source) { - self.handle_source_error(element, state, RetryReason::Error); + if src == state.source.source || src.has_as_ancestor(&state.source.source) { + self.handle_source_error(element, state, RetryReason::Error, false); drop(state_guard); element.notify("status"); element.notify("statistics"); return true; } - // Check if error is from video fallback input and if so, try another - // fallback to videotestsrc - if let Some(ref mut video_stream) = state.video_stream { - if src == video_stream.fallback_input - || src.has_as_ancestor(&video_stream.fallback_input) - { - gst::debug!(CAT, obj: element, "Got error from video fallback input"); - - let prev_fallback_uri = video_stream - .fallback_input - .property::>("uri"); - - // This means previously videotestsrc was configured - // Something went wrong and there is no other way than to error out - if prev_fallback_uri.is_none() { - return false; - } - - let fallback_input = &video_stream.fallback_input; - fallback_input.call_async(|fallback_input| { - // Re-run video fallback input with videotestsrc - let _ = fallback_input.set_state(gst::State::Null); - fallback_input.set_property("uri", None::<&str>); - let _ = fallback_input.sync_state_with_parent(); - }); - - return true; + // Check if error is from fallback input and if so, use a dummy fallback + if let Some(ref source) = state.fallback_source { + if src == source.source || src.has_as_ancestor(&source.source) { + self.handle_source_error(element, state, RetryReason::Error, true); + drop(state_guard); + element.notify("status"); + element.notify("statistics"); } } @@ -2099,30 +2750,57 @@ impl FallbackSrc { element: &super::FallbackSrc, state: &mut State, reason: RetryReason, + fallback_source: bool, ) { - gst::debug!(CAT, obj: element, "Handling source error: {:?}", reason); + gst::debug!( + CAT, + obj: element, + "Handling source error (fallback: {}): {:?}", + fallback_source, + reason + ); - state.stats.last_retry_reason = reason; - if state.source_pending_restart { - gst::debug!(CAT, obj: element, "Source is already pending restart"); + if fallback_source { + state.stats.last_fallback_retry_reason = reason; + } else { + state.stats.last_retry_reason = reason; + } + + let source = if fallback_source { + state.fallback_source.as_mut().unwrap() + } else { + &mut state.source + }; + + if source.pending_restart { + gst::debug!( + CAT, + obj: element, + "{}source is already pending restart", + if fallback_source { "fallback " } else { "" } + ); return; } // Increase retry count only if there was no pending restart - state.stats.num_retry += 1; + if fallback_source { + state.stats.num_fallback_retry += 1; + } else { + state.stats.num_retry += 1; + } // Unschedule pending timeout, we're restarting now - if let Some(timeout) = state.source_restart_timeout.take() { + if let Some(timeout) = source.restart_timeout.take() { timeout.unschedule(); } // Prevent state changes from changing the state in an uncoordinated way - state.source_pending_restart = true; + source.pending_restart = true; // Drop any EOS events from any source pads of the source that might happen because of the // error. We don't need to remove these pad probes because restarting the source will also // remove/add the pads again. - for pad in state.source.src_pads() { + for pad in source.source.src_pads() { pad.add_probe( gst::PadProbeType::EVENT_DOWNSTREAM, |_pad, info| match info.data { @@ -2139,7 +2817,7 @@ impl FallbackSrc { .unwrap(); } - let source_weak = state.source.downgrade(); + let source_weak = source.source.downgrade(); element.call_async(move |element| { let src = element.imp(); @@ -2152,113 +2830,179 @@ impl FallbackSrc { // source will deadlock on the probes. let mut state_guard = src.state.lock(); let state = match &mut *state_guard { - None - | Some(State { - source_pending_restart: false, + None => { + gst::debug!( + CAT, + obj: element, + "Restarting {}source not needed anymore", + if fallback_source { "fallback " } else { "" } + ); + return; + } + Some(State { + source: + SourceBin { + pending_restart: false, + .. + }, .. - }) => { - gst::debug!(CAT, obj: element, "Restarting source not needed anymore"); + }) if !fallback_source => { + gst::debug!( + CAT, + obj: element, + "Restarting {}source not needed anymore", + if fallback_source { "fallback " } else { "" } + ); + return; + } + Some(State { + fallback_source: + Some(SourceBin { + pending_restart: false, + .. + }), + .. + }) if fallback_source => { + gst::debug!( + CAT, + obj: element, + "Restarting {}source not needed anymore", + if fallback_source { "fallback " } else { "" } + ); return; } Some(state) => state, }; - for (source_srcpad_name, block) in - [state.video_stream.as_mut(), state.audio_stream.as_mut()] - .iter_mut() - .filter_map(|s| s.as_mut()) - .filter_map(|s| { - if let Some(block) = s.source_srcpad_block.take() { - Some((s.source_srcpad.as_ref().map(|pad| pad.name()), block)) - } else { - None - } - }) + for (source_srcpad, block) in [state.video_stream.as_mut(), state.audio_stream.as_mut()] + .iter_mut() + .flatten() + .filter_map(|s| { + if fallback_source { + s.fallback_branch.as_mut() + } else { + s.main_branch.as_mut() + } + }) + .filter_map(|branch| { + if let Some(block) = branch.source_srcpad_block.take() { + Some((&branch.source_srcpad, block)) + } else { + None + } + }) { gst::debug!( CAT, obj: element, "Removing pad probe for pad {}", - source_srcpad_name.as_deref().unwrap_or("UNKNOWN") + source_srcpad.name() ); block.pad.remove_probe(block.probe_id); } - let stream_sinkpads = [state.audio_stream.as_ref(), state.video_stream.as_ref()] + let switch_sinkpads = [state.audio_stream.as_ref(), state.video_stream.as_ref()] .into_iter() .flatten() - .map(|s| { - if let Some(ref imagefreeze) = s.imagefreeze { - imagefreeze.static_pad("sink").unwrap() + .filter_map(|s| { + if fallback_source { + s.fallback_branch.as_ref() } else { - s.clocksync_queue.static_pad("sink").unwrap() + s.main_branch.as_ref() } }) - .collect::>(); - - let stream_srcpads = [state.audio_stream.as_ref(), state.video_stream.as_ref()] - .into_iter() - .flatten() - .map(|s| { - let srcpad = s.srcpad.clone(); - let probe_id = srcpad - .add_probe( - gst::PadProbeType::EVENT_DOWNSTREAM | gst::PadProbeType::EVENT_FLUSH, - move |_pad, info| match info.data { - Some(gst::PadProbeData::Event(ref ev)) => match ev.view() { - gst::EventView::FlushStart(_) => gst::PadProbeReturn::Drop, - gst::EventView::FlushStop(_) => gst::PadProbeReturn::Drop, - _ => gst::PadProbeReturn::Ok, - }, - _ => gst::PadProbeReturn::Ok, - }, - ) - .unwrap(); - (probe_id, srcpad) - }) + .map(|branch| branch.switch_pad.clone()) .collect::>(); drop(state_guard); gst::debug!(CAT, obj: element, "Flushing source"); - let _ = source.send_event(gst::event::FlushStart::builder().build()); + for pad in switch_sinkpads { + let _ = pad.push_event(gst::event::FlushStart::builder().build()); + if let Some(switch) = pad.parent().map(|p| p.downcast::().unwrap()) { + switch.release_request_pad(&pad); + } + } - gst::debug!(CAT, obj: element, "Shutting down source"); + gst::debug!( + CAT, + obj: element, + "Shutting down {}source", + if fallback_source { "fallback " } else { "" } + ); let _ = source.set_state(gst::State::Null); - gst::debug!(CAT, obj: element, "Stop flushing downstream of source"); - for pad in stream_sinkpads { - let _ = pad.send_event(gst::event::FlushStop::builder(true).build()); - } - - for (probe_id, pad) in stream_srcpads { - pad.remove_probe(probe_id); - } - // Sleep for 1s before retrying let mut state_guard = src.state.lock(); let state = match &mut *state_guard { - None - | Some(State { - source_pending_restart: false, + None => { + gst::debug!( + CAT, + obj: element, + "Restarting {}source not needed anymore", + if fallback_source { "fallback " } else { "" } + ); + return; + } + Some(State { + source: + SourceBin { + pending_restart: false, + .. + }, .. - }) => { - gst::debug!(CAT, obj: element, "Restarting source not needed anymore"); + }) if !fallback_source => { + gst::debug!( + CAT, + obj: element, + "Restarting {}source not needed anymore", + if fallback_source { "fallback " } else { "" } + ); + return; + } + Some(State { + fallback_source: + Some(SourceBin { + pending_restart: false, + .. + }), + .. + }) if fallback_source => { + gst::debug!( + CAT, + obj: element, + "Restarting {}source not needed anymore", + if fallback_source { "fallback " } else { "" } + ); return; } Some(state) => state, }; - for stream in [state.video_stream.as_mut(), state.audio_stream.as_mut()] + for branch in [state.video_stream.as_mut(), state.audio_stream.as_mut()] .iter_mut() - .filter_map(|s| s.as_mut()) + .flatten() + .filter_map(|s| { + if fallback_source { + s.fallback_branch.as_mut() + } else { + s.main_branch.as_mut() + } + }) { - stream.source_srcpad_block = None; - stream.source_srcpad = None; + branch.source_srcpad_block = None; } gst::debug!(CAT, obj: element, "Waiting for 1s before retrying"); let clock = gst::SystemClock::obtain(); let wait_time = clock.time().unwrap() + gst::ClockTime::SECOND; - assert!(state.source_pending_restart_timeout.is_none()); + if fallback_source { + assert!(state + .fallback_source + .as_ref() + .map(|s| s.pending_restart_timeout.is_none()) + .unwrap_or(true)); + } else { + assert!(state.source.pending_restart_timeout.is_none()); + } let timeout = clock.new_single_shot_id(wait_time); let element_weak = element.downgrade(); @@ -2270,55 +3014,100 @@ impl FallbackSrc { }; gst::debug!(CAT, obj: &element, "Woke up, retrying"); - element.call_async(|element| { + element.call_async(move |element| { let src = element.imp(); let mut state_guard = src.state.lock(); let state = match &mut *state_guard { - None - | Some(State { - source_pending_restart: false, - .. - }) => { + None => { gst::debug!( CAT, obj: element, - "Restarting source not needed anymore" + "Restarting {}source not needed anymore", + if fallback_source { "fallback " } else { "" } + ); + return; + } + Some(State { + source: + SourceBin { + pending_restart: false, + .. + }, + .. + }) if !fallback_source => { + gst::debug!( + CAT, + obj: element, + "Restarting {}source not needed anymore", + if fallback_source { "fallback " } else { "" } + ); + return; + } + Some(State { + fallback_source: + Some(SourceBin { + pending_restart: false, + .. + }), + .. + }) if fallback_source => { + gst::debug!( + CAT, + obj: element, + "Restarting {}source not needed anymore", + if fallback_source { "fallback " } else { "" } ); return; } Some(state) => state, }; - let (source, old_source) = if let Source::Uri(..) = state.configured_source - { - // FIXME: Create a new uridecodebin3 because it currently is not reusable - // See https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/746 - element.remove(&state.source).unwrap(); + let (source, old_source) = if !fallback_source { + if let Source::Uri(..) = state.configured_source { + // FIXME: Create a new uridecodebin3 because it currently is not reusable + // See https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/746 + element.remove(&state.source.source).unwrap(); - let source = src.create_main_input( - element, - &state.configured_source, - state.settings.buffer_duration, - ); + let source = src.create_main_input( + element, + &state.configured_source, + state.settings.buffer_duration, + ); - ( - source.clone(), - Some(mem::replace(&mut state.source, source)), - ) + ( + source.source.clone(), + Some(mem::replace(&mut state.source, source)), + ) + } else { + state.source.pending_restart = false; + state.source.pending_restart_timeout = None; + state.stats.buffering_percent = 100; + state.last_buffering_update = None; + + if let Some(timeout) = state.source.restart_timeout.take() { + gst::debug!(CAT, obj: element, "Unscheduling restart timeout"); + timeout.unschedule(); + } + + (state.source.source.clone(), None) + } + } else if let Some(ref mut source) = state.fallback_source { + source.pending_restart = false; + source.pending_restart_timeout = None; + state.stats.fallback_buffering_percent = 100; + state.fallback_last_buffering_update = None; + + if let Some(timeout) = source.restart_timeout.take() { + gst::debug!(CAT, obj: element, "Unscheduling restart timeout"); + timeout.unschedule(); + } + + (source.source.clone(), None) } else { - (state.source.clone(), None) + return; }; - state.source_pending_restart = false; - state.source_pending_restart_timeout = None; - state.stats.buffering_percent = 100; - state.last_buffering_update = None; - - if let Some(timeout) = state.source_restart_timeout.take() { - gst::debug!(CAT, obj: element, "Unscheduling restart timeout"); - timeout.unschedule(); - } drop(state_guard); if let Some(old_source) = old_source { @@ -2328,7 +3117,12 @@ impl FallbackSrc { } if source.sync_state_with_parent().is_err() { - gst::error!(CAT, obj: element, "Source failed to change state"); + gst::error!( + CAT, + obj: element, + "{}source failed to change state", + if fallback_source { "fallback " } else { "" } + ); let _ = source.set_state(gst::State::Null); let mut state_guard = src.state.lock(); let state = state_guard.as_mut().expect("no state"); @@ -2336,23 +3130,39 @@ impl FallbackSrc { element, state, RetryReason::StateChangeFailure, + fallback_source, ); drop(state_guard); element.notify("statistics"); } else { let mut state_guard = src.state.lock(); let state = state_guard.as_mut().expect("no state"); - assert!(state.source_restart_timeout.is_none()); + if fallback_source { + assert!(state + .fallback_source + .as_ref() + .map(|s| s.restart_timeout.is_none()) + .unwrap_or(true)); + } else { + assert!(state.source.restart_timeout.is_none()); + } src.schedule_source_restart_timeout( element, state, gst::ClockTime::ZERO, + fallback_source, ); } }); }) .expect("Failed to wait async"); - state.source_pending_restart_timeout = Some(timeout); + if fallback_source { + if let Some(ref mut source) = state.fallback_source { + source.pending_restart_timeout = Some(timeout); + } + } else { + state.source.pending_restart_timeout = Some(timeout); + } }); } @@ -2362,26 +3172,48 @@ impl FallbackSrc { element: &super::FallbackSrc, state: &mut State, elapsed: gst::ClockTime, + fallback_source: bool, ) { - if state.source_pending_restart { - gst::debug!( + if fallback_source { + gst::fixme!( CAT, obj: element, - "Not scheduling source restart timeout because source is pending restart already", + "Restart timeout not implemented for fallback source" ); return; } - if state.is_image { + let source = if fallback_source { + if let Some(ref mut source) = state.fallback_source { + source + } else { + return; + } + } else { + &mut state.source + }; + + if source.pending_restart { gst::debug!( CAT, obj: element, - "Not scheduling source restart timeout because we are playing back an image", + "Not scheduling {}source restart timeout because source is pending restart already", + if fallback_source { "fallback " } else { "" }, ); return; } - if state.manually_blocked { + if source.is_image { + gst::debug!( + CAT, + obj: element, + "Not scheduling {}source restart timeout because we are playing back an image", + if fallback_source { "fallback " } else { "" }, + ); + return; + } + + if !fallback_source && state.manually_blocked { gst::debug!( CAT, obj: element, @@ -2395,7 +3227,8 @@ impl FallbackSrc { gst::debug!( CAT, obj: element, - "Scheduling source restart timeout for {}", + "Scheduling {}source restart timeout for {}", + if fallback_source { "fallback " } else { "" }, wait_time, ); @@ -2411,57 +3244,108 @@ impl FallbackSrc { element.call_async(move |element| { let src = element.imp(); - gst::debug!(CAT, obj: element, "Source restart timeout triggered"); + gst::debug!( + CAT, + obj: element, + "{}source restart timeout triggered", + if fallback_source { "fallback " } else { "" } + ); let mut state_guard = src.state.lock(); let state = match &mut *state_guard { None => { - gst::debug!(CAT, obj: element, "Restarting source not needed anymore"); + gst::debug!( + CAT, + obj: element, + "Restarting {}source not needed anymore", + if fallback_source { "fallback " } else { "" } + ); return; } Some(state) => state, }; - state.source_restart_timeout = None; + let source = if fallback_source { + if let Some(ref mut source) = state.fallback_source { + source + } else { + return; + } + } else { + &mut state.source + }; + + source.restart_timeout = None; // If we have the fallback activated then restart the source now. - if src.have_fallback_activated(element, state) { + if fallback_source || src.have_fallback_activated(element, state) { + let (last_buffering_update, buffering_percent) = if fallback_source { + ( + state.fallback_last_buffering_update, + state.stats.fallback_buffering_percent, + ) + } else { + (state.last_buffering_update, state.stats.buffering_percent) + }; // If we're not actively buffering right now let's restart the source - if state - .last_buffering_update + if last_buffering_update .map(|i| i.elapsed() >= state.settings.restart_timeout.into()) - .unwrap_or(state.stats.buffering_percent == 100) + .unwrap_or(buffering_percent == 100) { - gst::debug!(CAT, obj: element, "Not buffering, restarting source"); + gst::debug!( + CAT, + obj: element, + "Not buffering, restarting {}source", + if fallback_source { "fallback " } else { "" } + ); - src.handle_source_error(element, state, RetryReason::Timeout); + src.handle_source_error( + element, + state, + RetryReason::Timeout, + fallback_source, + ); drop(state_guard); element.notify("statistics"); } else { - gst::debug!(CAT, obj: element, "Buffering, restarting source later"); - let elapsed = state - .last_buffering_update + gst::debug!( + CAT, + obj: element, + "Buffering, restarting {}source later", + if fallback_source { "fallback " } else { "" } + ); + let elapsed = last_buffering_update .and_then(|last_buffering_update| { gst::ClockTime::try_from(last_buffering_update.elapsed()).ok() }) .unwrap_or(gst::ClockTime::ZERO); - src.schedule_source_restart_timeout(element, state, elapsed); + src.schedule_source_restart_timeout( + element, + state, + elapsed, + fallback_source, + ); } } else { - gst::debug!(CAT, obj: element, "Restarting source not needed anymore"); + gst::debug!( + CAT, + obj: element, + "Restarting {}source not needed anymore", + if fallback_source { "fallback " } else { "" } + ); } }); }) .expect("Failed to wait async"); - state.source_restart_timeout = Some(timeout); + source.restart_timeout = Some(timeout); } #[allow(clippy::blocks_in_if_conditions)] fn have_fallback_activated(&self, _element: &super::FallbackSrc, state: &State) -> bool { let mut have_audio = false; let mut have_video = false; - if let Some(ref streams) = state.streams { + if let Some(ref streams) = state.source.streams { for stream in streams.iter() { have_audio = have_audio || stream.stream_type().contains(gst::StreamType::AUDIO); have_video = have_video || stream.stream_type().contains(gst::StreamType::VIDEO); @@ -2489,7 +3373,7 @@ impl FallbackSrc { .unwrap_or(true)) } - fn handle_switch_active_pad_change(&self, element: &super::FallbackSrc) { + fn handle_switch_active_pad_change(&self, element: &super::FallbackSrc, is_audio: bool) { let mut state_guard = self.state.lock(); let state = match &mut *state_guard { None => { @@ -2501,18 +3385,28 @@ impl FallbackSrc { // If we have the fallback activated then start the retry timeout unless it was started // already. Otherwise cancel the retry timeout. if self.have_fallback_activated(element, state) { - gst::warning!(CAT, obj: element, "Switched to fallback stream"); - if state.source_restart_timeout.is_none() { - self.schedule_source_restart_timeout(element, state, gst::ClockTime::ZERO); + gst::warning!( + CAT, + obj: element, + "Switched to {} fallback stream", + if is_audio { "audio" } else { "video " } + ); + if state.source.restart_timeout.is_none() { + self.schedule_source_restart_timeout(element, state, gst::ClockTime::ZERO, false); } } else { - gst::debug!(CAT, obj: element, "Switched to main stream"); - if let Some(timeout) = state.source_retry_timeout.take() { + gst::debug!( + CAT, + obj: element, + "Switched to {} main stream", + if is_audio { "audio" } else { "video" } + ); + if let Some(timeout) = state.source.retry_timeout.take() { gst::debug!(CAT, obj: element, "Unscheduling retry timeout"); timeout.unschedule(); } - if let Some(timeout) = state.source_restart_timeout.take() { + if let Some(timeout) = state.source.restart_timeout.take() { gst::debug!(CAT, obj: element, "Unscheduling restart timeout"); timeout.unschedule(); } diff --git a/utils/fallbackswitch/src/fallbacksrc/mod.rs b/utils/fallbackswitch/src/fallbacksrc/mod.rs index 7bb80af8..bbc2380a 100644 --- a/utils/fallbackswitch/src/fallbacksrc/mod.rs +++ b/utils/fallbackswitch/src/fallbacksrc/mod.rs @@ -11,7 +11,6 @@ use gst::prelude::*; mod custom_source; mod imp; -mod video_fallback; #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)] #[repr(u32)] diff --git a/utils/fallbackswitch/src/fallbacksrc/video_fallback/imp.rs b/utils/fallbackswitch/src/fallbacksrc/video_fallback/imp.rs deleted file mode 100644 index 30eec5b8..00000000 --- a/utils/fallbackswitch/src/fallbacksrc/video_fallback/imp.rs +++ /dev/null @@ -1,464 +0,0 @@ -// Copyright (C) 2020 Sebastian Dröge -// Copyright (C) 2020 Seungha Yang -// -// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. -// If a copy of the MPL was not distributed with this file, You can obtain one at -// . -// -// SPDX-License-Identifier: MPL-2.0 - -use gst::glib; -use gst::prelude::*; -use gst::subclass::prelude::*; - -use std::sync::{atomic::AtomicBool, atomic::Ordering, Mutex}; - -use once_cell::sync::Lazy; - -static CAT: Lazy = Lazy::new(|| { - gst::DebugCategory::new( - "fallbacksrc-video-source", - gst::DebugColorFlags::empty(), - Some("Fallback Video Source Bin"), - ) -}); - -#[derive(Debug, Clone)] -struct Settings { - uri: Option, - min_latency: gst::ClockTime, -} - -impl Default for Settings { - fn default() -> Self { - Settings { - uri: None, - min_latency: gst::ClockTime::ZERO, - } - } -} - -struct State { - source: gst::Element, -} - -pub struct VideoFallbackSource { - srcpad: gst::GhostPad, - got_error: AtomicBool, - - state: Mutex>, - settings: Mutex, -} - -#[glib::object_subclass] -impl ObjectSubclass for VideoFallbackSource { - const NAME: &'static str = "FallbackSrcVideoFallbackSource"; - type Type = super::VideoFallbackSource; - type ParentType = gst::Bin; - - fn with_class(klass: &Self::Class) -> Self { - let templ = klass.pad_template("src").unwrap(); - let srcpad = gst::GhostPad::builder_with_template(&templ, Some(&templ.name())).build(); - - Self { - srcpad, - got_error: AtomicBool::new(false), - state: Mutex::new(None), - settings: Mutex::new(Settings::default()), - } - } -} - -impl ObjectImpl for VideoFallbackSource { - fn properties() -> &'static [glib::ParamSpec] { - static PROPERTIES: Lazy> = Lazy::new(|| { - vec![ - glib::ParamSpecString::builder("uri") - .nick("URI") - .blurb("URI to use for video in case the main stream doesn't work") - .build(), - glib::ParamSpecUInt64::builder("min-latency") - .nick("Minimum Latency") - .blurb("Minimum Latency") - .build(), - ] - }); - - PROPERTIES.as_ref() - } - - fn set_property( - &self, - obj: &Self::Type, - _id: usize, - value: &glib::Value, - pspec: &glib::ParamSpec, - ) { - match pspec.name() { - "uri" => { - let mut settings = self.settings.lock().unwrap(); - let new_value = value.get().expect("type checked upstream"); - gst::info!( - CAT, - obj: obj, - "Changing URI from {:?} to {:?}", - settings.uri, - new_value, - ); - settings.uri = new_value; - } - "min-latency" => { - let mut settings = self.settings.lock().unwrap(); - let new_value = value.get().expect("type checked upstream"); - gst::info!( - CAT, - obj: obj, - "Changing Minimum Latency from {} to {}", - settings.min_latency, - new_value, - ); - settings.min_latency = new_value; - } - _ => unreachable!(), - } - } - - fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { - match pspec.name() { - "uri" => { - let settings = self.settings.lock().unwrap(); - settings.uri.to_value() - } - "min-latency" => { - let settings = self.settings.lock().unwrap(); - settings.min_latency.to_value() - } - _ => unimplemented!(), - } - } - - fn constructed(&self, obj: &Self::Type) { - self.parent_constructed(obj); - - obj.set_suppressed_flags(gst::ElementFlags::SOURCE | gst::ElementFlags::SINK); - obj.set_element_flags(gst::ElementFlags::SOURCE); - obj.add_pad(&self.srcpad).unwrap(); - } -} - -impl GstObjectImpl for VideoFallbackSource {} - -impl ElementImpl for VideoFallbackSource { - fn pad_templates() -> &'static [gst::PadTemplate] { - static PAD_TEMPLATES: Lazy> = Lazy::new(|| { - let src_pad_template = gst::PadTemplate::new( - "src", - gst::PadDirection::Src, - gst::PadPresence::Always, - &gst::Caps::new_any(), - ) - .unwrap(); - - vec![src_pad_template] - }); - - PAD_TEMPLATES.as_ref() - } - - #[allow(clippy::single_match)] - fn change_state( - &self, - element: &Self::Type, - transition: gst::StateChange, - ) -> Result { - match transition { - gst::StateChange::NullToReady => { - self.start(element)?; - } - _ => (), - } - - self.parent_change_state(element, transition)?; - - match transition { - gst::StateChange::ReadyToNull => { - self.stop(element); - } - _ => (), - } - - Ok(gst::StateChangeSuccess::Success) - } -} - -impl BinImpl for VideoFallbackSource { - #[allow(clippy::single_match)] - fn handle_message(&self, bin: &Self::Type, msg: gst::Message) { - use gst::MessageView; - - match msg.view() { - MessageView::Error(err) => { - if self - .got_error - .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) - .is_err() - { - gst::warning!(CAT, obj: bin, "Got error {:?}", err); - self.parent_handle_message(bin, msg) - } else { - // Suppress error message if we posted error previously. - // Otherwise parent fallbacksrc would be confused by - // multiple error message. - gst::debug!(CAT, obj: bin, "Ignore error {:?}", err); - } - } - _ => self.parent_handle_message(bin, msg), - } - } -} - -impl VideoFallbackSource { - fn file_src_for_uri( - &self, - element: &super::VideoFallbackSource, - uri: Option<&str>, - ) -> Option { - uri?; - - let uri = uri.unwrap(); - let filesrc = gst::ElementFactory::make("filesrc", Some("fallback_filesrc")) - .expect("No filesrc found"); - - if let Err(err) = filesrc - .dynamic_cast_ref::() - .unwrap() - .set_uri(uri) - { - gst::warning!(CAT, obj: element, "Failed to set URI: {}", err); - return None; - } - - if filesrc.set_state(gst::State::Ready).is_err() { - gst::warning!(CAT, obj: element, "Couldn't set state READY"); - let _ = filesrc.set_state(gst::State::Null); - return None; - } - - // To invoke GstBaseSrc::start() method, activate pad manually. - // filesrc will check whether given file is readable or not - // via open() and fstat() in there. - let pad = filesrc.static_pad("src").unwrap(); - if pad.set_active(true).is_err() { - gst::warning!(CAT, obj: element, "Couldn't active pad"); - let _ = filesrc.set_state(gst::State::Null); - return None; - } - - Some(filesrc) - } - - fn create_source( - &self, - element: &super::VideoFallbackSource, - min_latency: gst::ClockTime, - uri: Option<&str>, - ) -> gst::Element { - gst::debug!(CAT, obj: element, "Creating source with uri {:?}", uri); - - let source = gst::Bin::new(None); - let filesrc = self.file_src_for_uri(element, uri); - - let srcpad = match filesrc { - Some(filesrc) => { - let typefind = gst::ElementFactory::make("typefind", Some("fallback_typefind")) - .expect("No typefind found"); - let videoconvert = - gst::ElementFactory::make("videoconvert", Some("fallback_videoconvert")) - .expect("No videoconvert found"); - let videoscale = - gst::ElementFactory::make("videoscale", Some("fallback_videoscale")) - .expect("No videoscale found"); - let imagefreeze = - gst::ElementFactory::make("imagefreeze", Some("fallback_imagefreeze")) - .expect("No imagefreeze found"); - let clocksync = gst::ElementFactory::make("clocksync", Some("fallback_clocksync")) - .or_else(|_| -> Result<_, glib::BoolError> { - let identity = - gst::ElementFactory::make("identity", Some("fallback_clocksync"))?; - identity.set_property("sync", true); - Ok(identity) - }) - .expect("No clocksync or identity found"); - let queue = gst::ElementFactory::make("queue", Some("fallback_queue")) - .expect("No queue found"); - queue.set_properties(&[ - ("max-size-buffers", &0u32), - ("max-size-bytes", &0u32), - ( - "max-size-time", - &min_latency.max(5 * gst::ClockTime::SECOND).nseconds(), - ), - ]); - - source - .add_many(&[ - &filesrc, - &typefind, - &videoconvert, - &videoscale, - &imagefreeze, - &clocksync, - &queue, - ]) - .unwrap(); - gst::Element::link_many(&[&filesrc, &typefind]).unwrap(); - gst::Element::link_many(&[ - &videoconvert, - &videoscale, - &imagefreeze, - &clocksync, - &queue, - ]) - .unwrap(); - - if imagefreeze.try_set_property("is-live", true).is_err() { - gst::error!( - CAT, - obj: element, - "imagefreeze does not support live mode, this will probably misbehave" - ); - gst::element_warning!( - element, - gst::LibraryError::Settings, - ["imagefreeze does not support live mode, this will probably misbehave"] - ); - } - - let element_weak = element.downgrade(); - let source_weak = source.downgrade(); - let videoconvert_weak = videoconvert.downgrade(); - typefind.connect("have-type", false, move |args| { - let typefind = args[0].get::().unwrap(); - let _probability = args[1].get::().unwrap(); - let caps = args[2].get::().unwrap(); - - let element = match element_weak.upgrade() { - Some(element) => element, - None => return None, - }; - - let source = match source_weak.upgrade() { - Some(element) => element, - None => return None, - }; - - let videoconvert = match videoconvert_weak.upgrade() { - Some(element) => element, - None => return None, - }; - - let s = caps.structure(0).unwrap(); - let decoder; - if s.name() == "image/jpeg" { - decoder = gst::ElementFactory::make("jpegdec", Some("decoder")) - .expect("jpegdec not found"); - } else if s.name() == "image/png" { - decoder = gst::ElementFactory::make("pngdec", Some("decoder")) - .expect("pngdec not found"); - } else { - gst::error!(CAT, obj: &element, "Unsupported caps {}", caps); - gst::element_error!( - element, - gst::StreamError::Format, - ["Unsupported caps {}", caps] - ); - return None; - } - - source.add(&decoder).unwrap(); - decoder.sync_state_with_parent().unwrap(); - if let Err(_err) = - gst::Element::link_many(&[&typefind, &decoder, &videoconvert]) - { - gst::error!(CAT, obj: &element, "Can't link fallback image decoder"); - gst::element_error!( - element, - gst::StreamError::Format, - ["Can't link fallback image decoder"] - ); - return None; - } - - None - }); - - queue.static_pad("src").unwrap() - } - None => { - let videotestsrc = - gst::ElementFactory::make("videotestsrc", Some("fallback_videosrc")) - .expect("No videotestsrc found"); - source.add_many(&[&videotestsrc]).unwrap(); - - videotestsrc.set_property_from_str("pattern", "black"); - videotestsrc.set_property("is-live", true); - - videotestsrc.static_pad("src").unwrap() - } - }; - - source - .add_pad( - &gst::GhostPad::builder(Some("src"), gst::PadDirection::Src) - .build_with_target(&srcpad) - .unwrap(), - ) - .unwrap(); - - source.upcast() - } - - fn start( - &self, - element: &super::VideoFallbackSource, - ) -> Result { - gst::debug!(CAT, obj: element, "Starting"); - - let mut state_guard = self.state.lock().unwrap(); - if state_guard.is_some() { - gst::error!(CAT, obj: element, "State struct wasn't cleared"); - return Err(gst::StateChangeError); - } - - let settings = self.settings.lock().unwrap().clone(); - let uri = &settings.uri; - let source = self.create_source(element, settings.min_latency, uri.as_deref()); - - element.add(&source).unwrap(); - - let srcpad = source.static_pad("src").unwrap(); - let _ = self.srcpad.set_target(Some(&srcpad)); - - *state_guard = Some(State { source }); - - Ok(gst::StateChangeSuccess::Success) - } - - fn stop(&self, element: &super::VideoFallbackSource) { - gst::debug!(CAT, obj: element, "Stopping"); - - let mut state_guard = self.state.lock().unwrap(); - let state = match state_guard.take() { - Some(state) => state, - None => return, - }; - - drop(state_guard); - - let _ = state.source.set_state(gst::State::Null); - let _ = self.srcpad.set_target(None::<&gst::Pad>); - element.remove(&state.source).unwrap(); - self.got_error.store(false, Ordering::Relaxed); - gst::debug!(CAT, obj: element, "Stopped"); - } -} diff --git a/utils/fallbackswitch/src/fallbacksrc/video_fallback/mod.rs b/utils/fallbackswitch/src/fallbacksrc/video_fallback/mod.rs deleted file mode 100644 index 0dad207c..00000000 --- a/utils/fallbackswitch/src/fallbacksrc/video_fallback/mod.rs +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright (C) 2020 Sebastian Dröge -// Copyright (C) 2020 Seungha Yang -// -// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. -// If a copy of the MPL was not distributed with this file, You can obtain one at -// . -// -// SPDX-License-Identifier: MPL-2.0 - -use gst::glib; - -mod imp; - -glib::wrapper! { - pub struct VideoFallbackSource(ObjectSubclass) @extends gst::Bin, gst::Element, gst::Object; -} - -impl VideoFallbackSource { - pub fn new(uri: Option<&str>, min_latency: gst::ClockTime) -> VideoFallbackSource { - glib::Object::new(&[("uri", &uri), ("min-latency", &min_latency.nseconds())]).unwrap() - } -}