diff --git a/utils/fallbackswitch/src/fallbacksrc/imp.rs b/utils/fallbackswitch/src/fallbacksrc/imp.rs index cdaad4cc..f1ce0d35 100644 --- a/utils/fallbackswitch/src/fallbacksrc/imp.rs +++ b/utils/fallbackswitch/src/fallbacksrc/imp.rs @@ -20,10 +20,7 @@ use glib::subclass; use glib::subclass::prelude::*; use gst::prelude::*; use gst::subclass::prelude::*; -use gst::{ - gst_debug, gst_element_error, gst_element_warning, gst_error, gst_error_msg, gst_info, - gst_warning, -}; +use gst::{gst_debug, gst_element_error, gst_error, gst_error_msg, gst_info, gst_warning}; use std::mem; use std::sync::Mutex; @@ -32,6 +29,7 @@ use std::time::{Duration, Instant}; use once_cell::sync::Lazy; use super::custom_source::CustomSource; +use super::video_fallback::VideoFallbackSource; use super::{RetryReason, Status}; static CAT: Lazy = Lazy::new(|| { @@ -756,180 +754,11 @@ impl FallbackSrc { fn create_fallback_video_input( &self, - element: &super::FallbackSrc, + _element: &super::FallbackSrc, min_latency: u64, fallback_uri: Option<&str>, ) -> Result { - let input = gst::Bin::new(Some("fallback_video")); - - let srcpad = match fallback_uri { - Some(fallback_uri) => { - let filesrc = gst::ElementFactory::make("filesrc", Some("fallback_filesrc")) - .expect("No filesrc found"); - let typefind = gst::ElementFactory::make("typefind", Some("fallback_typefind")) - .expect("No typefind found"); - let videoconvert = - gst::ElementFactory::make("videoconvert", Some("fallback_videoconvert")) - .expect("No videoconvert found"); - let videoscale = - gst::ElementFactory::make("videoscale", Some("fallback_videoscale")) - .expect("No videoscale found"); - let imagefreeze = - gst::ElementFactory::make("imagefreeze", Some("fallback_imagefreeze")) - .expect("No imagefreeze found"); - let clocksync = gst::ElementFactory::make("clocksync", Some("fallback_clocksync")) - .or_else(|_| -> Result<_, glib::BoolError> { - let identity = - gst::ElementFactory::make("identity", Some("fallback_clocksync"))?; - identity.set_property("sync", &true).unwrap(); - Ok(identity) - }) - .expect("No clocksync or identity found"); - let queue = gst::ElementFactory::make("queue", Some("fallback_queue")) - .expect("No queue found"); - queue - .set_properties(&[ - ("max-size-buffers", &0u32), - ("max-size-bytes", &0u32), - ( - "max-size-time", - &gst::ClockTime::max(5 * gst::SECOND, min_latency.into()).unwrap(), - ), - ]) - .unwrap(); - - input - .add_many(&[ - &filesrc, - &typefind, - &videoconvert, - &videoscale, - &imagefreeze, - &clocksync, - &queue, - ]) - .unwrap(); - gst::Element::link_many(&[&filesrc, &typefind]).unwrap(); - gst::Element::link_many(&[ - &videoconvert, - &videoscale, - &imagefreeze, - &clocksync, - &queue, - ]) - .unwrap(); - - filesrc - .dynamic_cast_ref::() - .unwrap() - .set_uri(fallback_uri) - .map_err(|err| { - gst_error!(CAT, obj: element, "Failed to set fallback URI: {}", err); - gst_element_error!( - element, - gst::LibraryError::Settings, - ["Failed to set fallback URI: {}", err] - ); - gst::StateChangeError - })?; - - if imagefreeze.set_property("is-live", &true).is_err() { - gst_error!( - CAT, - obj: element, - "imagefreeze does not support live mode, this will probably misbehave" - ); - gst_element_warning!( - element, - gst::LibraryError::Settings, - ["imagefreeze does not support live mode, this will probably misbehave"] - ); - } - - let element_weak = element.downgrade(); - let input_weak = input.downgrade(); - let videoconvert_weak = videoconvert.downgrade(); - typefind - .connect("have-type", false, move |args| { - let typefind = args[0].get::().unwrap().unwrap(); - let _probability = args[1].get_some::().unwrap(); - let caps = args[2].get::().unwrap().unwrap(); - - let element = match element_weak.upgrade() { - Some(element) => element, - None => return None, - }; - - let input = match input_weak.upgrade() { - Some(element) => element, - None => return None, - }; - - let videoconvert = match videoconvert_weak.upgrade() { - Some(element) => element, - None => return None, - }; - - let s = caps.get_structure(0).unwrap(); - let decoder; - if s.get_name() == "image/jpeg" { - decoder = gst::ElementFactory::make("jpegdec", Some("decoder")) - .expect("jpegdec not found"); - } else if s.get_name() == "image/png" { - decoder = gst::ElementFactory::make("pngdec", Some("decoder")) - .expect("pngdec not found"); - } else { - gst_error!(CAT, obj: &element, "Unsupported caps {}", caps); - gst_element_error!( - element, - gst::StreamError::Format, - ["Unsupported caps {}", caps] - ); - return None; - } - - input.add(&decoder).unwrap(); - decoder.sync_state_with_parent().unwrap(); - if let Err(_err) = - gst::Element::link_many(&[&typefind, &decoder, &videoconvert]) - { - gst_error!(CAT, obj: &element, "Can't link fallback image decoder"); - gst_element_error!( - element, - gst::StreamError::Format, - ["Can't link fallback image decoder"] - ); - return None; - } - - None - }) - .unwrap(); - - queue.get_static_pad("src").unwrap() - } - None => { - let videotestsrc = - gst::ElementFactory::make("videotestsrc", Some("fallback_videosrc")) - .expect("No videotestsrc found"); - input.add_many(&[&videotestsrc]).unwrap(); - - videotestsrc.set_property_from_str("pattern", "black"); - videotestsrc.set_property("is-live", &true).unwrap(); - - videotestsrc.get_static_pad("src").unwrap() - } - }; - - input - .add_pad( - &gst::GhostPad::builder(Some("src"), gst::PadDirection::Src) - .build_with_target(&srcpad) - .unwrap(), - ) - .unwrap(); - - Ok(input.upcast()) + Ok(VideoFallbackSource::new(fallback_uri, min_latency).upcast()) } fn create_fallback_audio_input( @@ -1964,6 +1793,46 @@ impl FallbackSrc { return true; } + // Check if error is from video fallback input and if so, try another + // fallback to videotestsrc + if let Some(ref mut video_stream) = state.video_stream { + if src == video_stream.fallback_input + || src.has_as_ancestor(&video_stream.fallback_input) + { + gst_debug!(CAT, obj: element, "Got error from video fallback input"); + + let prev_fallback_uri = video_stream + .fallback_input + .get_property("uri") + .unwrap() + .get::() + .unwrap(); + + // This means previously videotestsrc was configured + // Something went wrong and there is no other way than to error out + if prev_fallback_uri.is_none() { + return false; + } + + let fallback_input = &video_stream.fallback_input; + fallback_input.call_async(|fallback_input| { + // Re-run video fallback input with videotestsrc + let _ = fallback_input.set_state(gst::State::Null); + let _ = fallback_input.set_property("uri", &None::<&str>); + let _ = fallback_input.sync_state_with_parent(); + }); + + return true; + } + } + + gst_error!( + CAT, + obj: element, + "Give up for error message from {}", + src.get_path_string() + ); + false } diff --git a/utils/fallbackswitch/src/fallbacksrc/mod.rs b/utils/fallbackswitch/src/fallbacksrc/mod.rs index e004b755..4c2f8afc 100644 --- a/utils/fallbackswitch/src/fallbacksrc/mod.rs +++ b/utils/fallbackswitch/src/fallbacksrc/mod.rs @@ -19,6 +19,7 @@ use glib::prelude::*; mod custom_source; mod imp; +mod video_fallback; #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::GEnum)] #[repr(u32)] diff --git a/utils/fallbackswitch/src/fallbacksrc/video_fallback/imp.rs b/utils/fallbackswitch/src/fallbacksrc/video_fallback/imp.rs new file mode 100644 index 00000000..65ae1f56 --- /dev/null +++ b/utils/fallbackswitch/src/fallbacksrc/video_fallback/imp.rs @@ -0,0 +1,485 @@ +// Copyright (C) 2020 Sebastian Dröge +// Copyright (C) 2020 Seungha Yang +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +use glib::prelude::*; +use glib::subclass; +use glib::subclass::prelude::*; +use gst::prelude::*; +use gst::subclass::prelude::*; +use gst::{gst_debug, gst_element_error, gst_element_warning, gst_error, gst_info, gst_warning}; + +use std::sync::{atomic::AtomicBool, atomic::Ordering, Mutex}; + +use once_cell::sync::Lazy; + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "fallbacksrc-video-source", + gst::DebugColorFlags::empty(), + Some("Fallback Video Source Bin"), + ) +}); + +static PROPERTIES: [subclass::Property; 2] = [ + subclass::Property("uri", |name| { + glib::ParamSpec::string( + name, + "URI", + "URI to use for video in case the main stream doesn't work", + None, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("min-latency", |name| { + glib::ParamSpec::uint64( + name, + "Minimum Latency", + "Minimum Latency", + 0, + std::u64::MAX, + 0, + glib::ParamFlags::READWRITE, + ) + }), +]; + +#[derive(Debug, Clone)] +struct Settings { + uri: Option, + min_latency: u64, +} + +impl Default for Settings { + fn default() -> Self { + Settings { + uri: None, + min_latency: 0, + } + } +} + +struct State { + source: gst::Element, +} + +pub struct VideoFallbackSource { + srcpad: gst::GhostPad, + got_error: AtomicBool, + + state: Mutex>, + settings: Mutex, +} + +impl ObjectSubclass for VideoFallbackSource { + const NAME: &'static str = "FallbackSrcVideoFallbackSource"; + type Type = super::VideoFallbackSource; + type ParentType = gst::Bin; + type Instance = gst::subclass::ElementInstanceStruct; + type Class = subclass::simple::ClassStruct; + + glib::glib_object_subclass!(); + + fn with_class(klass: &Self::Class) -> Self { + let templ = klass.get_pad_template("src").unwrap(); + let srcpad = gst::GhostPad::builder_with_template(&templ, Some(&templ.get_name())).build(); + + Self { + srcpad, + got_error: AtomicBool::new(false), + state: Mutex::new(None), + settings: Mutex::new(Settings::default()), + } + } + + fn class_init(klass: &mut Self::Class) { + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &gst::Caps::new_any(), + ) + .unwrap(); + klass.add_pad_template(src_pad_template); + klass.install_properties(&PROPERTIES); + } +} + +impl ObjectImpl for VideoFallbackSource { + fn set_property(&self, obj: &Self::Type, id: usize, value: &glib::Value) { + let prop = &PROPERTIES[id]; + + match *prop { + subclass::Property("uri", ..) => { + let mut settings = self.settings.lock().unwrap(); + let new_value = value.get().expect("type checked upstream"); + gst_info!( + CAT, + obj: obj, + "Changing URI from {:?} to {:?}", + settings.uri, + new_value, + ); + settings.uri = new_value; + } + subclass::Property("min-latency", ..) => { + let mut settings = self.settings.lock().unwrap(); + let new_value = value.get_some().expect("type checked upstream"); + gst_info!( + CAT, + obj: obj, + "Changing Minimum Latency from {:?} to {:?}", + settings.min_latency, + new_value, + ); + settings.min_latency = new_value; + } + _ => unreachable!(), + } + } + + fn get_property(&self, _obj: &Self::Type, id: usize) -> glib::Value { + let prop = &PROPERTIES[id]; + + match *prop { + subclass::Property("uri", ..) => { + let settings = self.settings.lock().unwrap(); + settings.uri.to_value() + } + subclass::Property("min-latency", ..) => { + let settings = self.settings.lock().unwrap(); + settings.min_latency.to_value() + } + _ => unimplemented!(), + } + } + + fn constructed(&self, obj: &Self::Type) { + self.parent_constructed(obj); + + obj.set_suppressed_flags(gst::ElementFlags::SOURCE | gst::ElementFlags::SINK); + obj.set_element_flags(gst::ElementFlags::SOURCE); + obj.add_pad(&self.srcpad).unwrap(); + } +} + +impl ElementImpl for VideoFallbackSource { + #[allow(clippy::single_match)] + fn change_state( + &self, + element: &Self::Type, + transition: gst::StateChange, + ) -> Result { + match transition { + gst::StateChange::NullToReady => { + self.start(element)?; + } + _ => (), + } + + self.parent_change_state(element, transition)?; + + match transition { + gst::StateChange::ReadyToNull => { + self.stop(element)?; + } + _ => (), + } + + Ok(gst::StateChangeSuccess::Success) + } +} + +impl BinImpl for VideoFallbackSource { + #[allow(clippy::single_match)] + fn handle_message(&self, bin: &Self::Type, msg: gst::Message) { + use gst::MessageView; + + match msg.view() { + MessageView::Error(err) => { + if !self + .got_error + .compare_and_swap(false, true, Ordering::SeqCst) + { + gst_warning!(CAT, obj: bin, "Got error {:?}", err); + self.parent_handle_message(bin, msg) + } else { + // Suppress error message if we posted error previously. + // Otherwise parent fallbacksrc would be confused by + // multiple error message. + gst_debug!(CAT, obj: bin, "Ignore error {:?}", err); + } + } + _ => self.parent_handle_message(bin, msg), + } + } +} + +impl VideoFallbackSource { + fn get_file_src_for_uri( + &self, + element: &super::VideoFallbackSource, + uri: Option<&str>, + ) -> Option { + uri?; + + let uri = uri.unwrap(); + let filesrc = gst::ElementFactory::make("filesrc", Some("fallback_filesrc")) + .expect("No filesrc found"); + + if let Err(err) = filesrc + .dynamic_cast_ref::() + .unwrap() + .set_uri(uri) + { + gst_warning!(CAT, obj: element, "Failed to set URI: {}", err); + return None; + } + + if filesrc.set_state(gst::State::Ready).is_err() { + gst_warning!(CAT, obj: element, "Couldn't set state READY"); + let _ = filesrc.set_state(gst::State::Null); + return None; + } + + // To invoke GstBaseSrc::start() method, activate pad manually. + // filesrc will check whether given file is readable or not + // via open() and fstat() in there. + let pad = filesrc.get_static_pad("src").unwrap(); + if pad.set_active(true).is_err() { + gst_warning!(CAT, obj: element, "Couldn't active pad"); + let _ = filesrc.set_state(gst::State::Null); + return None; + } + + Some(filesrc) + } + + fn create_source( + &self, + element: &super::VideoFallbackSource, + min_latency: u64, + uri: Option<&str>, + ) -> Result { + gst_debug!(CAT, obj: element, "Creating source with uri {:?}", uri); + + let source = gst::Bin::new(None); + let filesrc = self.get_file_src_for_uri(element, uri); + + let srcpad = match filesrc { + Some(filesrc) => { + let typefind = gst::ElementFactory::make("typefind", Some("fallback_typefind")) + .expect("No typefind found"); + let videoconvert = + gst::ElementFactory::make("videoconvert", Some("fallback_videoconvert")) + .expect("No videoconvert found"); + let videoscale = + gst::ElementFactory::make("videoscale", Some("fallback_videoscale")) + .expect("No videoscale found"); + let imagefreeze = + gst::ElementFactory::make("imagefreeze", Some("fallback_imagefreeze")) + .expect("No imagefreeze found"); + let clocksync = gst::ElementFactory::make("clocksync", Some("fallback_clocksync")) + .or_else(|_| -> Result<_, glib::BoolError> { + let identity = + gst::ElementFactory::make("identity", Some("fallback_clocksync"))?; + identity.set_property("sync", &true).unwrap(); + Ok(identity) + }) + .expect("No clocksync or identity found"); + let queue = gst::ElementFactory::make("queue", Some("fallback_queue")) + .expect("No queue found"); + queue + .set_properties(&[ + ("max-size-buffers", &0u32), + ("max-size-bytes", &0u32), + ( + "max-size-time", + &gst::ClockTime::max(5 * gst::SECOND, min_latency.into()).unwrap(), + ), + ]) + .unwrap(); + + source + .add_many(&[ + &filesrc, + &typefind, + &videoconvert, + &videoscale, + &imagefreeze, + &clocksync, + &queue, + ]) + .unwrap(); + gst::Element::link_many(&[&filesrc, &typefind]).unwrap(); + gst::Element::link_many(&[ + &videoconvert, + &videoscale, + &imagefreeze, + &clocksync, + &queue, + ]) + .unwrap(); + + if imagefreeze.set_property("is-live", &true).is_err() { + gst_error!( + CAT, + obj: element, + "imagefreeze does not support live mode, this will probably misbehave" + ); + gst_element_warning!( + element, + gst::LibraryError::Settings, + ["imagefreeze does not support live mode, this will probably misbehave"] + ); + } + + let element_weak = element.downgrade(); + let source_weak = source.downgrade(); + let videoconvert_weak = videoconvert.downgrade(); + typefind + .connect("have-type", false, move |args| { + let typefind = args[0].get::().unwrap().unwrap(); + let _probability = args[1].get_some::().unwrap(); + let caps = args[2].get::().unwrap().unwrap(); + + let element = match element_weak.upgrade() { + Some(element) => element, + None => return None, + }; + + let source = match source_weak.upgrade() { + Some(element) => element, + None => return None, + }; + + let videoconvert = match videoconvert_weak.upgrade() { + Some(element) => element, + None => return None, + }; + + let s = caps.get_structure(0).unwrap(); + let decoder; + if s.get_name() == "image/jpeg" { + decoder = gst::ElementFactory::make("jpegdec", Some("decoder")) + .expect("jpegdec not found"); + } else if s.get_name() == "image/png" { + decoder = gst::ElementFactory::make("pngdec", Some("decoder")) + .expect("pngdec not found"); + } else { + gst_error!(CAT, obj: &element, "Unsupported caps {}", caps); + gst_element_error!( + element, + gst::StreamError::Format, + ["Unsupported caps {}", caps] + ); + return None; + } + + source.add(&decoder).unwrap(); + decoder.sync_state_with_parent().unwrap(); + if let Err(_err) = + gst::Element::link_many(&[&typefind, &decoder, &videoconvert]) + { + gst_error!(CAT, obj: &element, "Can't link fallback image decoder"); + gst_element_error!( + element, + gst::StreamError::Format, + ["Can't link fallback image decoder"] + ); + return None; + } + + None + }) + .unwrap(); + + queue.get_static_pad("src").unwrap() + } + None => { + let videotestsrc = + gst::ElementFactory::make("videotestsrc", Some("fallback_videosrc")) + .expect("No videotestsrc found"); + source.add_many(&[&videotestsrc]).unwrap(); + + videotestsrc.set_property_from_str("pattern", "black"); + videotestsrc.set_property("is-live", &true).unwrap(); + + videotestsrc.get_static_pad("src").unwrap() + } + }; + + source + .add_pad( + &gst::GhostPad::builder(Some("src"), gst::PadDirection::Src) + .build_with_target(&srcpad) + .unwrap(), + ) + .unwrap(); + + Ok(source.upcast()) + } + + fn start( + &self, + element: &super::VideoFallbackSource, + ) -> Result { + gst_debug!(CAT, obj: element, "Starting"); + + let mut state_guard = self.state.lock().unwrap(); + if state_guard.is_some() { + gst_error!(CAT, obj: element, "State struct wasn't cleared"); + return Err(gst::StateChangeError); + } + + let settings = self.settings.lock().unwrap().clone(); + let uri = &settings.uri; + let source = self.create_source(element, settings.min_latency, uri.as_deref())?; + + element.add(&source).unwrap(); + + let srcpad = source.get_static_pad("src").unwrap(); + let _ = self.srcpad.set_target(Some(&srcpad)); + + *state_guard = Some(State { source }); + + Ok(gst::StateChangeSuccess::Success) + } + + fn stop( + &self, + element: &super::VideoFallbackSource, + ) -> Result { + gst_debug!(CAT, obj: element, "Stopping"); + + let mut state_guard = self.state.lock().unwrap(); + let state = match state_guard.take() { + Some(state) => state, + None => return Ok(gst::StateChangeSuccess::Success), + }; + + drop(state_guard); + + let _ = state.source.set_state(gst::State::Null); + let _ = self.srcpad.set_target(None::<&gst::Pad>); + element.remove(&state.source).unwrap(); + self.got_error.store(false, Ordering::Relaxed); + gst_debug!(CAT, obj: element, "Stopped"); + + Ok(gst::StateChangeSuccess::Success) + } +} diff --git a/utils/fallbackswitch/src/fallbacksrc/video_fallback/mod.rs b/utils/fallbackswitch/src/fallbacksrc/video_fallback/mod.rs new file mode 100644 index 00000000..ed2d5b72 --- /dev/null +++ b/utils/fallbackswitch/src/fallbacksrc/video_fallback/mod.rs @@ -0,0 +1,42 @@ +// Copyright (C) 2020 Sebastian Dröge +// Copyright (C) 2020 Seungha Yang +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +use glib::prelude::*; + +mod imp; + +glib::glib_wrapper! { + pub struct VideoFallbackSource(ObjectSubclass) @extends gst::Bin, gst::Element, gst::Object; +} + +// GStreamer elements need to be thread-safe. For the private implementation this is automatically +// enforced but for the public wrapper type we need to specify this manually. +unsafe impl Send for VideoFallbackSource {} +unsafe impl Sync for VideoFallbackSource {} + +impl VideoFallbackSource { + pub fn new(uri: Option<&str>, min_latency: u64) -> VideoFallbackSource { + glib::Object::new( + VideoFallbackSource::static_type(), + &[("uri", &uri), ("min-latency", &min_latency)], + ) + .unwrap() + .downcast() + .unwrap() + } +}