diff --git a/utils/fallbackswitch/Cargo.toml b/utils/fallbackswitch/Cargo.toml index 835c8b27..e811535f 100644 --- a/utils/fallbackswitch/Cargo.toml +++ b/utils/fallbackswitch/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "gst-plugin-fallbackswitch" version = "0.9.0" -authors = ["Sebastian Dröge "] +authors = ["Sebastian Dröge ", "Jan Schmidt "] repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" license = "MPL-2.0" edition = "2021" @@ -11,12 +11,13 @@ description = "Fallback Switcher Plugin" [dependencies] libc = { version = "0.2", optional = true } gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_14"] } -gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] } +gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_14"] } gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_14"] } gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_14"] } gtk = { git = "https://github.com/gtk-rs/gtk3-rs", optional = true } gio = { git = "https://github.com/gtk-rs/gtk-rs-core", optional = true } once_cell = "1.0" +parking_lot = "0.12" [dev-dependencies] gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_14"]} @@ -37,7 +38,6 @@ gst-plugin-version-helper = { path="../../version-helper" } [features] default = ["libc"] -v1_20 = ["gst/v1_20"] # We already use 1.14 which is new enough for static build static = [] capi = [] diff --git a/utils/fallbackswitch/examples/gtk_fallbackswitch.rs b/utils/fallbackswitch/examples/gtk_fallbackswitch.rs index 3728d7cc..50589c45 100644 --- a/utils/fallbackswitch/examples/gtk_fallbackswitch.rs +++ b/utils/fallbackswitch/examples/gtk_fallbackswitch.rs @@ -74,11 +74,12 @@ fn create_pipeline() -> (gst::Pipeline, gst::Pad, gst::Element, gtk::Widget) { ]) .unwrap(); + /* The first pad requested will be automatically preferred */ video_src - .link_pads(Some("src"), &fallbackswitch, Some("sink")) + .link_pads(Some("src"), &fallbackswitch, Some("sink_%u")) .unwrap(); fallback_video_src - .link_pads(Some("src"), &fallbackswitch, Some("fallback_sink")) + .link_pads(Some("src"), &fallbackswitch, Some("sink_%u")) .unwrap(); fallbackswitch .link_pads(Some("src"), &decodebin, Some("sink")) diff --git a/utils/fallbackswitch/src/fallbacksrc/imp.rs b/utils/fallbackswitch/src/fallbacksrc/imp.rs index 8993420b..24f3bc68 100644 --- a/utils/fallbackswitch/src/fallbacksrc/imp.rs +++ b/utils/fallbackswitch/src/fallbacksrc/imp.rs @@ -10,8 +10,8 @@ use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; +use parking_lot::Mutex; use std::mem; -use std::sync::Mutex; use std::time::Instant; use once_cell::sync::Lazy; @@ -316,7 +316,7 @@ impl ObjectImpl for FallbackSrc { ) { match pspec.name() { "enable-audio" => { - let mut settings = self.settings.lock().unwrap(); + let mut settings = self.settings.lock(); let new_value = value.get().expect("type checked upstream"); gst::info!( CAT, @@ -328,7 +328,7 @@ impl ObjectImpl for FallbackSrc { settings.enable_audio = new_value; } "enable-video" => { - let mut settings = self.settings.lock().unwrap(); + let mut settings = self.settings.lock(); let new_value = value.get().expect("type checked upstream"); gst::info!( CAT, @@ -340,7 +340,7 @@ impl ObjectImpl for FallbackSrc { settings.enable_video = new_value; } "uri" => { - let mut settings = self.settings.lock().unwrap(); + let mut settings = self.settings.lock(); let new_value = value.get().expect("type checked upstream"); gst::info!( CAT, @@ -352,7 +352,7 @@ impl ObjectImpl for FallbackSrc { settings.uri = new_value; } "source" => { - let mut settings = self.settings.lock().unwrap(); + let mut settings = self.settings.lock(); let new_value = value.get().expect("type checked upstream"); gst::info!( CAT, @@ -364,7 +364,7 @@ impl ObjectImpl for FallbackSrc { settings.source = new_value; } "fallback-uri" => { - let mut settings = self.settings.lock().unwrap(); + let mut settings = self.settings.lock(); let new_value = value.get().expect("type checked upstream"); gst::info!( CAT, @@ -376,7 +376,7 @@ impl ObjectImpl for FallbackSrc { settings.fallback_uri = new_value; } "timeout" => { - let mut settings = self.settings.lock().unwrap(); + let mut settings = self.settings.lock(); let new_value = value.get().expect("type checked upstream"); gst::info!( CAT, @@ -388,7 +388,7 @@ impl ObjectImpl for FallbackSrc { settings.timeout = new_value; } "restart-timeout" => { - let mut settings = self.settings.lock().unwrap(); + let mut settings = self.settings.lock(); let new_value = value.get().expect("type checked upstream"); gst::info!( CAT, @@ -400,7 +400,7 @@ impl ObjectImpl for FallbackSrc { settings.restart_timeout = new_value; } "retry-timeout" => { - let mut settings = self.settings.lock().unwrap(); + let mut settings = self.settings.lock(); let new_value = value.get().expect("type checked upstream"); gst::info!( CAT, @@ -412,7 +412,7 @@ impl ObjectImpl for FallbackSrc { settings.retry_timeout = new_value; } "restart-on-eos" => { - let mut settings = self.settings.lock().unwrap(); + let mut settings = self.settings.lock(); let new_value = value.get().expect("type checked upstream"); gst::info!( CAT, @@ -424,7 +424,7 @@ impl ObjectImpl for FallbackSrc { settings.restart_on_eos = new_value; } "min-latency" => { - let mut settings = self.settings.lock().unwrap(); + let mut settings = self.settings.lock(); let new_value = value.get().expect("type checked upstream"); gst::info!( CAT, @@ -436,7 +436,7 @@ impl ObjectImpl for FallbackSrc { settings.min_latency = new_value; } "buffer-duration" => { - let mut settings = self.settings.lock().unwrap(); + let mut settings = self.settings.lock(); let new_value = value.get().expect("type checked upstream"); gst::info!( CAT, @@ -448,7 +448,7 @@ impl ObjectImpl for FallbackSrc { settings.buffer_duration = new_value; } "immediate-fallback" => { - let mut settings = self.settings.lock().unwrap(); + let mut settings = self.settings.lock(); let new_value = value.get().expect("type checked upstream"); gst::info!( CAT, @@ -460,7 +460,7 @@ impl ObjectImpl for FallbackSrc { settings.immediate_fallback = new_value; } "manual-unblock" => { - let mut settings = self.settings.lock().unwrap(); + let mut settings = self.settings.lock(); let new_value = value.get().expect("type checked upstream"); gst::info!( CAT, @@ -481,43 +481,43 @@ impl ObjectImpl for FallbackSrc { fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { match pspec.name() { "enable-audio" => { - let settings = self.settings.lock().unwrap(); + let settings = self.settings.lock(); settings.enable_audio.to_value() } "enable-video" => { - let settings = self.settings.lock().unwrap(); + let settings = self.settings.lock(); settings.enable_video.to_value() } "uri" => { - let settings = self.settings.lock().unwrap(); + let settings = self.settings.lock(); settings.uri.to_value() } "source" => { - let settings = self.settings.lock().unwrap(); + let settings = self.settings.lock(); settings.source.to_value() } "fallback-uri" => { - let settings = self.settings.lock().unwrap(); + let settings = self.settings.lock(); settings.fallback_uri.to_value() } "timeout" => { - let settings = self.settings.lock().unwrap(); + let settings = self.settings.lock(); settings.timeout.to_value() } "restart-timeout" => { - let settings = self.settings.lock().unwrap(); + let settings = self.settings.lock(); settings.restart_timeout.to_value() } "retry-timeout" => { - let settings = self.settings.lock().unwrap(); + let settings = self.settings.lock(); settings.retry_timeout.to_value() } "restart-on-eos" => { - let settings = self.settings.lock().unwrap(); + let settings = self.settings.lock(); settings.restart_on_eos.to_value() } "status" => { - let state_guard = self.state.lock().unwrap(); + let state_guard = self.state.lock(); // If we have no state then we'r stopped let state = match &*state_guard { @@ -569,20 +569,20 @@ impl ObjectImpl for FallbackSrc { Status::Running.to_value() } "min-latency" => { - let settings = self.settings.lock().unwrap(); + let settings = self.settings.lock(); settings.min_latency.to_value() } "buffer-duration" => { - let settings = self.settings.lock().unwrap(); + let settings = self.settings.lock(); settings.buffer_duration.to_value() } "statistics" => self.stats().to_value(), "immediate-fallback" => { - let settings = self.settings.lock().unwrap(); + let settings = self.settings.lock(); settings.immediate_fallback.to_value() } "manual-unblock" => { - let settings = self.settings.lock().unwrap(); + let settings = self.settings.lock(); settings.manual_unblock.to_value() } _ => unimplemented!(), @@ -612,7 +612,7 @@ impl ObjectImpl for FallbackSrc { .class_handler(|_token, args| { let element = args[0].get::().expect("signal arg"); let src = element.imp(); - let mut state_guard = src.state.lock().unwrap(); + let mut state_guard = src.state.lock(); let state = match &mut *state_guard { None => { return None; @@ -730,7 +730,7 @@ impl ElementImpl for FallbackSrc { gst::EventView::Eos(..) => { gst::debug!(CAT, "Handling element-level EOS, forwarding to all streams"); - let mut state_guard = self.state.lock().unwrap(); + let mut state_guard = self.state.lock(); let state = match &mut *state_guard { None => { return true; @@ -942,10 +942,17 @@ impl FallbackSrc { switch.set_property("min-upstream-latency", min_latency.nseconds()); switch.set_property("immediate-fallback", immediate_fallback); - gst::Element::link_pads(&fallback_input, Some("src"), &switch, Some("fallback_sink")) - .unwrap(); + let fallback_srcpad = fallback_input.static_pad("src").unwrap(); + let switch_fallbacksink = switch.request_pad_simple("sink_%u").unwrap(); + fallback_srcpad.link(&switch_fallbacksink).unwrap(); + switch_fallbacksink.set_property("priority", 1u32); + gst::Element::link_pads(&clocksync_queue, Some("src"), &clocksync, Some("sink")).unwrap(); - gst::Element::link_pads(&clocksync, Some("src"), &switch, Some("sink")).unwrap(); + + let clocksync_srcpad = clocksync.static_pad("src").unwrap(); + let switch_mainsink = switch.request_pad_simple("sink_%u").unwrap(); + clocksync_srcpad.link(&switch_mainsink).unwrap(); + switch_mainsink.set_property("priority", 0u32); // clocksync_queue sink pad is not connected to anything yet at this point! let srcpad = switch.static_pad("src").unwrap(); @@ -984,12 +991,12 @@ impl FallbackSrc { fn start(&self, element: &super::FallbackSrc) -> Result<(), gst::StateChangeError> { gst::debug!(CAT, obj: element, "Starting"); - let mut state_guard = self.state.lock().unwrap(); + let mut state_guard = self.state.lock(); if state_guard.is_some() { return Err(gst::StateChangeError); } - let settings = self.settings.lock().unwrap().clone(); + let settings = self.settings.lock().clone(); let configured_source = match settings .uri .as_ref() @@ -1082,7 +1089,7 @@ impl FallbackSrc { fn stop(&self, element: &super::FallbackSrc) { gst::debug!(CAT, obj: element, "Stopping"); - let mut state_guard = self.state.lock().unwrap(); + let mut state_guard = self.state.lock(); let mut state = match state_guard.take() { Some(state) => state, None => return, @@ -1138,7 +1145,7 @@ impl FallbackSrc { fn change_source_state(&self, element: &super::FallbackSrc, transition: gst::StateChange) { gst::debug!(CAT, obj: element, "Changing source state: {:?}", transition); - let mut state_guard = self.state.lock().unwrap(); + let mut state_guard = self.state.lock(); let state = match &mut *state_guard { Some(state) => state, None => return, @@ -1174,7 +1181,7 @@ impl FallbackSrc { // Try again later if we're not shutting down if transition != gst::StateChange::ReadyToNull { let _ = source.set_state(gst::State::Null); - let mut state_guard = self.state.lock().unwrap(); + let mut state_guard = self.state.lock(); let state = state_guard.as_mut().expect("no state"); self.handle_source_error(element, state, RetryReason::StateChangeFailure); drop(state_guard); @@ -1189,7 +1196,7 @@ impl FallbackSrc { res ); - let mut state_guard = self.state.lock().unwrap(); + let mut state_guard = self.state.lock(); let state = state_guard.as_mut().expect("no state"); // Remember if the source is live @@ -1216,7 +1223,7 @@ impl FallbackSrc { ) -> Result { let res = gst::ProxyPad::chain_default(pad, Some(element), buffer); - let mut state_guard = self.state.lock().unwrap(); + let mut state_guard = self.state.lock(); let state = match &mut *state_guard { None => return res, Some(state) => state, @@ -1232,7 +1239,7 @@ impl FallbackSrc { ) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, obj: element, "Pad {} added to source", pad.name(),); - let mut state_guard = self.state.lock().unwrap(); + let mut state_guard = self.state.lock(); let state = match &mut *state_guard { None => { return Ok(()); @@ -1360,7 +1367,7 @@ impl FallbackSrc { pad.name() ); - let mut state_guard = src.state.lock().unwrap(); + let mut state_guard = src.state.lock(); let state = match &mut *state_guard { None => { return gst::PadProbeReturn::Ok; @@ -1469,7 +1476,7 @@ impl FallbackSrc { pad: &gst::Pad, pts: impl Into>, ) -> Result<(), gst::ErrorMessage> { - let mut state_guard = self.state.lock().unwrap(); + let mut state_guard = self.state.lock(); let state = match &mut *state_guard { None => { return Ok(()); @@ -1810,7 +1817,7 @@ impl FallbackSrc { fn handle_source_pad_removed(&self, element: &super::FallbackSrc, pad: &gst::Pad) { gst::debug!(CAT, obj: element, "Pad {} removed from source", pad.name()); - let mut state_guard = self.state.lock().unwrap(); + let mut state_guard = self.state.lock(); let state = match &mut *state_guard { None => { return; @@ -1846,7 +1853,7 @@ impl FallbackSrc { } fn handle_buffering(&self, element: &super::FallbackSrc, m: &gst::message::Buffering) { - let mut state_guard = self.state.lock().unwrap(); + let mut state_guard = self.state.lock(); let state = match &mut *state_guard { None => { return; @@ -1890,7 +1897,7 @@ impl FallbackSrc { element: &super::FallbackSrc, m: &gst::message::StreamsSelected, ) { - let mut state_guard = self.state.lock().unwrap(); + let mut state_guard = self.state.lock(); let state = match &mut *state_guard { None => { return; @@ -1950,7 +1957,7 @@ impl FallbackSrc { } fn handle_error(&self, element: &super::FallbackSrc, m: &gst::message::Error) -> bool { - let mut state_guard = self.state.lock().unwrap(); + let mut state_guard = self.state.lock(); let state = match &mut *state_guard { None => { return false; @@ -2074,7 +2081,7 @@ impl FallbackSrc { // Remove blocking pad probes if they are still there as otherwise shutting down the // source will deadlock on the probes. - let mut state_guard = src.state.lock().unwrap(); + let mut state_guard = src.state.lock(); let state = match &mut *state_guard { None | Some(State { @@ -2115,7 +2122,7 @@ impl FallbackSrc { // Sleep for 1s before retrying - let mut state_guard = src.state.lock().unwrap(); + let mut state_guard = src.state.lock(); let state = match &mut *state_guard { None | Some(State { @@ -2154,7 +2161,7 @@ impl FallbackSrc { element.call_async(|element| { let src = element.imp(); - let mut state_guard = src.state.lock().unwrap(); + let mut state_guard = src.state.lock(); let state = match &mut *state_guard { None | Some(State { @@ -2211,7 +2218,7 @@ impl FallbackSrc { if source.sync_state_with_parent().is_err() { gst::error!(CAT, obj: element, "Source failed to change state"); let _ = source.set_state(gst::State::Null); - let mut state_guard = src.state.lock().unwrap(); + let mut state_guard = src.state.lock(); let state = state_guard.as_mut().expect("no state"); src.handle_source_error( element, @@ -2221,7 +2228,7 @@ impl FallbackSrc { drop(state_guard); element.notify("statistics"); } else { - let mut state_guard = src.state.lock().unwrap(); + let mut state_guard = src.state.lock(); let state = state_guard.as_mut().expect("no state"); assert!(state.source_restart_timeout.is_none()); src.schedule_source_restart_timeout( @@ -2293,7 +2300,7 @@ impl FallbackSrc { let src = element.imp(); gst::debug!(CAT, obj: element, "Source restart timeout triggered"); - let mut state_guard = src.state.lock().unwrap(); + let mut state_guard = src.state.lock(); let state = match &mut *state_guard { None => { gst::debug!(CAT, obj: element, "Restarting source not needed anymore"); @@ -2358,7 +2365,7 @@ impl FallbackSrc { .audio_stream .as_ref() .and_then(|s| s.switch.property::>("active-pad")) - .map(|p| p.name() == "fallback_sink") + .map(|p| p.property::("priority") != 0) .unwrap_or(true)) || (have_video && state.video_stream.is_some() @@ -2366,12 +2373,12 @@ impl FallbackSrc { .video_stream .as_ref() .and_then(|s| s.switch.property::>("active-pad")) - .map(|p| p.name() == "fallback_sink") + .map(|p| p.property::("priority") != 0) .unwrap_or(true)) } fn handle_switch_active_pad_change(&self, element: &super::FallbackSrc) { - let mut state_guard = self.state.lock().unwrap(); + let mut state_guard = self.state.lock(); let state = match &mut *state_guard { None => { return; @@ -2404,7 +2411,7 @@ impl FallbackSrc { } fn stats(&self) -> gst::Structure { - let state_guard = self.state.lock().unwrap(); + let state_guard = self.state.lock(); let state = match &*state_guard { None => return Stats::default().to_structure(), diff --git a/utils/fallbackswitch/src/fallbackswitch/imp.rs b/utils/fallbackswitch/src/fallbackswitch/imp.rs index 7b6a0389..677b2b60 100644 --- a/utils/fallbackswitch/src/fallbackswitch/imp.rs +++ b/utils/fallbackswitch/src/fallbackswitch/imp.rs @@ -1,4 +1,5 @@ -// Copyright (C) 2019 Sebastian Dröge +// Copyright (C) 2020 Mathieu Duponchelle +// Copyright (C) 2021 Jan Schmidt // // This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. // If a copy of the MPL was not distributed with this file, You can obtain one at @@ -7,687 +8,882 @@ // SPDX-License-Identifier: MPL-2.0 use gst::glib; - -use gst_base::prelude::*; -use gst_base::subclass::prelude::*; +use gst::prelude::*; +use gst::subclass::prelude::*; +use gst::{debug, log, trace}; use once_cell::sync::Lazy; -use std::sync::{Mutex, RwLock}; +use parking_lot::Mutex; +use std::sync::atomic::{AtomicU32, Ordering}; -#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)] -#[repr(u32)] -#[enum_type(name = "GstFallbackSwitchStreamHealth")] -pub enum StreamHealth { - #[enum_value(name = "Data flow is inactive or late", nick = "inactive")] - Inactive = 0, - #[enum_value(name = "Data is currently flowing in the stream", nick = "present")] - Present = 1, -} +const PROP_PRIORITY: &str = "priority"; +const PROP_IS_HEALTHY: &str = "is-healthy"; -pub struct FallbackSwitch { - primary_sinkpad: gst_base::AggregatorPad, - primary_state: RwLock, - - fallback_sinkpad: RwLock>, - fallback_state: RwLock, - - active_sinkpad: Mutex>, - output_state: Mutex, - settings: Mutex, -} +const PROP_ACTIVE_PAD: &str = "active-pad"; +const PROP_AUTO_SWITCH: &str = "auto-switch"; +const PROP_IMMEDIATE_FALLBACK: &str = "immediate-fallback"; +const PROP_LATENCY: &str = "latency"; +const PROP_MIN_UPSTREAM_LATENCY: &str = "min-upstream-latency"; +const PROP_TIMEOUT: &str = "timeout"; static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( - "fallbackswitch", + "fallback-switch", gst::DebugColorFlags::empty(), - Some("Fallback switch Element"), + Some("Automatic priority-based input selector"), ) }); -#[derive(Debug, Default)] -struct PadOutputState { - last_sinkpad_time: Option, - stream_health: StreamHealth, -} - #[derive(Debug)] -struct OutputState { - last_output_time: Option, - primary: PadOutputState, - fallback: PadOutputState, +#[allow(clippy::large_enum_variant)] +enum CapsInfo { + None, + Audio(gst_audio::AudioInfo), + Video(gst_video::VideoInfo), } -#[derive(Debug, Default)] -struct PadInputState { - caps: Option, - audio_info: Option, - video_info: Option, -} - -const DEFAULT_TIMEOUT: gst::ClockTime = gst::ClockTime::from_seconds(5); -const DEFAULT_AUTO_SWITCH: bool = true; -const DEFAULT_STREAM_HEALTH: StreamHealth = StreamHealth::Inactive; -const DEFAULT_IMMEDIATE_FALLBACK: bool = false; - -#[derive(Debug, Clone)] +#[derive(Clone, Debug)] struct Settings { timeout: gst::ClockTime, - auto_switch: bool, + latency: gst::ClockTime, + min_upstream_latency: gst::ClockTime, immediate_fallback: bool, -} - -impl Default for StreamHealth { - fn default() -> Self { - DEFAULT_STREAM_HEALTH - } -} - -impl Default for OutputState { - fn default() -> Self { - OutputState { - last_output_time: gst::ClockTime::NONE, - primary: PadOutputState::default(), - fallback: PadOutputState::default(), - } - } + auto_switch: bool, } impl Default for Settings { - fn default() -> Self { + fn default() -> Settings { Settings { - timeout: DEFAULT_TIMEOUT, - auto_switch: DEFAULT_AUTO_SWITCH, - immediate_fallback: DEFAULT_IMMEDIATE_FALLBACK, + timeout: gst::ClockTime::SECOND, + latency: gst::ClockTime::ZERO, + min_upstream_latency: gst::ClockTime::ZERO, + immediate_fallback: false, + auto_switch: true, } } } -impl OutputState { - #[allow(clippy::blocks_in_if_conditions)] - fn health( - &self, - settings: &Settings, - check_primary_pad: bool, - cur_running_time: impl Into>, - ) -> StreamHealth { - let last_sinkpad_time = if check_primary_pad { - self.primary.last_sinkpad_time - } else { - self.fallback.last_sinkpad_time - }; +#[derive(Debug)] +struct State { + active_sinkpad: Option, + upstream_latency: gst::ClockTime, + timed_out: bool, + switched_pad: bool, + discont_pending: bool, + first: bool, - if last_sinkpad_time.is_none() { - StreamHealth::Inactive - } else if cur_running_time.into().map_or(false, |cur_running_time| { - cur_running_time < last_sinkpad_time.expect("checked above") + settings.timeout - }) { - StreamHealth::Present - } else { - StreamHealth::Inactive + output_running_time: Option, + + timeout_running_time: gst::ClockTime, + timeout_clock_id: Option, +} + +impl Default for State { + fn default() -> State { + State { + active_sinkpad: None, + upstream_latency: gst::ClockTime::ZERO, + timed_out: false, + switched_pad: false, + discont_pending: true, + first: true, + + output_running_time: None, + + timeout_running_time: gst::ClockTime::ZERO, + timeout_clock_id: None, + } + } +} + +impl State { + fn cancel_timeout(&mut self) { + /* clear any previous timeout */ + if let Some(clock_id) = self.timeout_clock_id.take() { + clock_id.unschedule(); + } + } +} + +impl Drop for State { + fn drop(&mut self) { + self.cancel_timeout(); + } +} + +#[derive(Debug)] +pub struct FallbackSwitchSinkPad { + state: Mutex, + settings: Mutex, +} + +#[glib::object_subclass] +impl ObjectSubclass for FallbackSwitchSinkPad { + const NAME: &'static str = "FallbackSwitchSinkPad"; + type Type = super::FallbackSwitchSinkPad; + type ParentType = gst::Pad; + + fn new() -> Self { + Self { + state: Mutex::new(SinkState::default()), + settings: Mutex::new(SinkSettings::default()), + } + } +} + +impl GstObjectImpl for FallbackSwitchSinkPad {} + +impl ObjectImpl for FallbackSwitchSinkPad { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecUInt::new( + PROP_PRIORITY, + "Stream Priority", + "Selection priority for this stream", + 0, + std::u32::MAX, + SinkSettings::default().priority, + glib::ParamFlags::READWRITE, + ), + glib::ParamSpecBoolean::new( + PROP_IS_HEALTHY, + "Stream Health", + "Whether this stream is healthy", + false, + glib::ParamFlags::READABLE, + ), + ] + }); + + PROPERTIES.as_ref() + } + + fn set_property( + &self, + _obj: &Self::Type, + _id: usize, + value: &glib::Value, + pspec: &glib::ParamSpec, + ) { + match pspec.name() { + PROP_PRIORITY => { + let mut settings = self.settings.lock(); + let priority = value.get().expect("type checked upstream"); + settings.priority = priority; + } + _ => unimplemented!(), } } - fn check_health_changes( + fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + PROP_PRIORITY => { + let settings = self.settings.lock(); + settings.priority.to_value() + } + PROP_IS_HEALTHY => { + let state = self.state.lock(); + state.is_healthy.to_value() + } + _ => unimplemented!(), + } + } +} + +impl PadImpl for FallbackSwitchSinkPad {} + +impl FallbackSwitchSinkPad {} + +#[derive(Clone, Debug, Default)] +struct SinkSettings { + priority: u32, +} + +#[derive(Debug)] +struct SinkState { + is_healthy: bool, + + segment: gst::FormattedSegment, + caps_info: CapsInfo, + + current_running_time: Option, + eos: bool, + flushing: bool, + clock_id: Option, +} + +impl Default for SinkState { + fn default() -> Self { + Self { + is_healthy: false, + + segment: gst::FormattedSegment::new(), + caps_info: CapsInfo::None, + + current_running_time: gst::ClockTime::NONE, + eos: false, + flushing: false, + clock_id: None, + } + } +} + +impl SinkState { + fn flush_start(&mut self) { + self.flushing = true; + if let Some(clock_id) = self.clock_id.take() { + clock_id.unschedule(); + } + } + fn cancel_wait(&mut self) { + if let Some(clock_id) = self.clock_id.take() { + clock_id.unschedule(); + } + } + fn reset(&mut self) { + self.flushing = false; + self.eos = false; + self.caps_info = CapsInfo::None; + } + + fn get_sync_time( + &self, + buffer: &gst::Buffer, + ) -> (Option, Option) { + let last_ts = self.current_running_time; + let duration = buffer.duration().unwrap_or(gst::ClockTime::ZERO); + + let start_ts = match buffer.dts_or_pts() { + Some(ts) => ts, + None => return (last_ts, last_ts), + }; + let end_ts = start_ts.saturating_add(duration); + + let (start_ts, end_ts) = match self.segment.clip(start_ts, end_ts) { + Some((start_ts, end_ts)) => (start_ts, end_ts), + None => return (None, None), + }; + let start_ts = self.segment.to_running_time(start_ts); + let end_ts = self.segment.to_running_time(end_ts); + + (start_ts, end_ts) + } + + fn schedule_clock( &mut self, - settings: &Settings, - backup_pad: &Option<&gst_base::AggregatorPad>, - preferred_is_primary: bool, - cur_running_time: impl Into> + Copy, - ) -> (bool, bool) { - let preferred_health = self.health(settings, preferred_is_primary, cur_running_time); - let backup_health = if backup_pad.is_some() { - self.health(settings, !preferred_is_primary, cur_running_time) - } else { - StreamHealth::Inactive + element: &super::FallbackSwitch, + running_time: Option, + extra_time: gst::ClockTime, + ) -> Option { + let clock = match element.clock() { + None => return None, + Some(clock) => clock, }; - if preferred_is_primary { - let primary_changed = preferred_health != self.primary.stream_health; - let fallback_changed = backup_health != self.fallback.stream_health; + let base_time = element.base_time(); + let wait_until = match running_time + .zip(base_time) + .map(|(running_time, base_time)| running_time + base_time) + { + Some(wait_until) => wait_until, + None => return None, + }; + let wait_until = wait_until.saturating_add(extra_time); - self.primary.stream_health = preferred_health; - self.fallback.stream_health = backup_health; + let now = clock.time(); - (primary_changed, fallback_changed) - } else { - let primary_changed = backup_health != self.primary.stream_health; - let fallback_changed = preferred_health != self.fallback.stream_health; - - self.primary.stream_health = backup_health; - self.fallback.stream_health = preferred_health; - - (primary_changed, fallback_changed) - } - } -} - -impl FallbackSwitch { - fn drain_pad_to_time( - &self, - state: &mut OutputState, - pad: &gst_base::AggregatorPad, - target_running_time: impl Into> + Copy, - ) -> Result<(), gst::FlowError> { - let segment = pad.segment(); - - /* No segment yet - no data */ - if segment.format() == gst::Format::Undefined { - return Ok(()); + /* If the buffer is already late, skip the clock wait */ + if now.map_or(true, |now| wait_until < now) { + debug!( + CAT, + obj: element, + "Skipping buffer wait until {} - clock already {:?}", + wait_until, + now + ); + return None; } - let segment = segment.downcast::().map_err(|_| { - gst::error!(CAT, obj: pad, "Only TIME segments supported"); - gst::FlowError::Error - })?; - - let mut running_time = gst::ClockTime::NONE; - - while let Some(buffer) = pad.peek_buffer() { - let pts = buffer.dts_or_pts(); - let new_running_time = segment.to_running_time(pts); - - if pts.is_none() - || new_running_time - .opt_le(target_running_time.into()) - .unwrap_or(false) - { - gst::debug!(CAT, obj: pad, "Dropping trailing buffer {:?}", buffer); - pad.drop_buffer(); - running_time = new_running_time; - } else { - break; - } - } - if running_time.is_some() { - if pad == &self.primary_sinkpad { - state.primary.last_sinkpad_time = running_time; - } else { - state.fallback.last_sinkpad_time = running_time; - } - } - Ok(()) - } - - #[allow(clippy::too_many_arguments)] - fn handle_main_timed_item( - &self, - agg: &super::FallbackSwitch, - state: &mut OutputState, - settings: &Settings, - mut item: TimedItem, - preferred_pad: &gst_base::AggregatorPad, - backup_pad: &Option<&gst_base::AggregatorPad>, - cur_running_time: impl Into>, - ) -> Result, gst::FlowError> { - gst::debug!( + debug!( CAT, - obj: preferred_pad, - "Got {} on pad {} - {:?}", - item.name(), - preferred_pad.name(), - item + obj: element, + "Scheduling buffer wait until {} = {:?} + extra {:?} + base time {:?}", + wait_until, + running_time, + extra_time, + base_time ); - let segment = preferred_pad - .segment() - .downcast::() - .map_err(|_| { - gst::error!(CAT, obj: preferred_pad, "Only TIME segments supported"); - gst::FlowError::Error - })?; - - let running_time = if item.pts().is_none() { - // re-use ts from previous buffer - let running_time = state - .primary - .last_sinkpad_time - .max(state.fallback.last_sinkpad_time); - - gst::debug!( - CAT, - obj: preferred_pad, - "{} does not have PTS, re-use ts from previous buffer: {}", - item.name(), - running_time.display() - ); - - running_time - } else { - segment.to_running_time(item.pts()) - }; - - item.update_ts(running_time, &segment); - - if preferred_pad == &self.primary_sinkpad { - state.primary.last_sinkpad_time = running_time; - } else { - state.fallback.last_sinkpad_time = running_time; - } - - let cur_running_time = cur_running_time.into(); - let (is_late, deadline) = match (cur_running_time, agg.latency(), running_time) { - (Some(cur_running_time), Some(latency), Some(running_time)) => { - let deadline = running_time + latency + 40 * gst::ClockTime::MSECOND; - (cur_running_time > deadline, Some(deadline)) - } - _ => (false, None), - }; - - if is_late { - gst::debug!( - CAT, - obj: preferred_pad, - "{} is too late: {} > {}", - item.name(), - cur_running_time.display(), - deadline.display(), - ); - - let is_late = state - .last_output_time - .opt_add(settings.timeout) - .opt_le(running_time); - - if let Some(true) = is_late { - /* This buffer arrived too late - we either already switched - * to the other pad or there's no point outputting this anyway */ - gst::debug!( - CAT, - obj: preferred_pad, - "{} is too late and timeout reached: {} + {} <= {}", - item.name(), - state.last_output_time.display(), - settings.timeout, - running_time.display(), - ); - - return Ok(None); - } - } - - let mut active_sinkpad = self.active_sinkpad.lock().unwrap(); - let pad_change = settings.auto_switch - && active_sinkpad.as_ref() != Some(preferred_pad.upcast_ref::()); - - if pad_change { - if !item.is_keyframe() { - gst::info!( - CAT, - obj: preferred_pad, - "Can't change back to sinkpad {}, waiting for keyframe", - preferred_pad.name() - ); - preferred_pad.push_event( - gst_video::UpstreamForceKeyUnitEvent::builder() - .all_headers(true) - .build(), - ); - return Ok(None); - } - - gst::info!(CAT, obj: preferred_pad, "Active pad changed to sinkpad"); - *active_sinkpad = Some(preferred_pad.clone().upcast()); - } - drop(active_sinkpad); - - if !is_late || state.last_output_time.is_none() { - state.last_output_time = running_time; - } - - let active_caps = if preferred_pad == &self.primary_sinkpad { - let pad_state = self.primary_state.read().unwrap(); - pad_state.caps.as_ref().unwrap().clone() - } else { - let pad_state = self.fallback_state.read().unwrap(); - pad_state.caps.as_ref().unwrap().clone() - }; - - // Drop all older buffers from the fallback sinkpad - if let Some(backup_pad) = backup_pad { - self.drain_pad_to_time(state, backup_pad, state.last_output_time)?; - } - - Ok(Some((item, active_caps, pad_change))) + let clock_id = clock.new_single_shot_id(wait_until); + self.clock_id = Some(clock_id.clone()); + Some(clock_id) } - #[allow(clippy::too_many_arguments)] - fn handle_main_buffer( + fn is_healthy(&self, state: &State, settings: &Settings) -> bool { + match self.current_running_time { + Some(current_running_time) => { + current_running_time >= state.timeout_running_time.saturating_sub(settings.timeout) + && current_running_time <= state.timeout_running_time + } + None => false, + } + } +} + +#[derive(Debug)] +pub struct FallbackSwitch { + state: Mutex, + settings: Mutex, + src_pad: gst::Pad, + sink_pad_serial: AtomicU32, +} + +impl GstObjectImpl for FallbackSwitch {} + +impl FallbackSwitch { + fn set_active_pad(&self, state: &mut State, pad: &super::FallbackSwitchSinkPad) { + if state.active_sinkpad.as_ref() == Some(pad) { + return; + }; + + state.active_sinkpad = Some(pad.clone()); + state.switched_pad = true; + state.discont_pending = true; + + let mut pad_state = pad.imp().state.lock(); + pad_state.cancel_wait(); + drop(pad_state); + + debug!(CAT, obj: pad, "Now active pad"); + } + + fn handle_timeout( &self, - agg: &super::FallbackSwitch, - state: &mut OutputState, + element: &super::FallbackSwitch, + state: &mut State, settings: &Settings, - buffer: gst::Buffer, - preferred_pad: &gst_base::AggregatorPad, - backup_pad: &Option<&gst_base::AggregatorPad>, - cur_running_time: impl Into>, - ) -> Result, gst::FlowError> { - // If we got a buffer on the sinkpad just handle it - let res = self.handle_main_timed_item( - agg, - state, - settings, - TimedItem::Buffer(buffer), - preferred_pad, - backup_pad, - cur_running_time, - )?; - - Ok(res.map(|res| (res.0.buffer(), res.1, res.2))) - } - - fn backup_buffer( - &self, - state: &mut OutputState, - settings: &Settings, - backup_pad: &gst_base::AggregatorPad, - ) -> Result<(gst::Buffer, gst::Caps, bool), gst::FlowError> { - // If we have a fallback sinkpad and timeout, try to get a fallback buffer from here - // and drop all too old buffers in the process - loop { - let mut buffer = backup_pad - .pop_buffer() - .ok_or(gst_base::AGGREGATOR_FLOW_NEED_DATA)?; - - gst::debug!( - CAT, - obj: backup_pad, - "Got buffer on fallback sinkpad {:?}", - buffer - ); - - let backup_segment = - backup_pad - .segment() - .downcast::() - .map_err(|_| { - gst::error!(CAT, obj: backup_pad, "Only TIME segments supported"); - gst::FlowError::Error - })?; - - let running_time = if buffer.pts().is_none() { - // re-use ts from previous buffer - let running_time = state - .primary - .last_sinkpad_time - .max(state.fallback.last_sinkpad_time); - - gst::debug!( - CAT, - obj: backup_pad, - "Buffer does not have PTS, re-use ts from previous buffer: {}", - running_time.display() - ); - - running_time - } else { - backup_segment.to_running_time(buffer.pts()) - }; - - { - // FIXME: This will not work correctly for negative DTS - let buffer = buffer.make_mut(); - buffer.set_pts(running_time); - buffer.set_dts(backup_segment.to_running_time(buffer.dts())); - } - - // If we never had a real buffer, initialize with the running time of the fallback - // sinkpad so that we still output fallback buffers after the timeout - if state.last_output_time.is_none() { - state.last_output_time = running_time; - } - - // If the other pad never received a buffer, we want to start consuming - // buffers on this pad in order to provide an output at start up - // (for example with a slow primary) - let ignore_timeout = settings.immediate_fallback && { - if backup_pad == &self.primary_sinkpad { - state.primary.last_sinkpad_time = running_time; - state.fallback.last_sinkpad_time.is_none() - } else { - state.fallback.last_sinkpad_time = running_time; - state.primary.last_sinkpad_time.is_none() - } - }; - - if !ignore_timeout { - let timed_out = state - .last_output_time - .opt_add(settings.timeout) - .opt_le(running_time) - .unwrap_or(true); - - // Get the next one if this one is before the timeout - if !timed_out { - gst::debug!( - CAT, - obj: backup_pad, - "Timeout not reached yet: {} + {} > {}", - state.last_output_time.display(), - settings.timeout, - running_time.display(), - ); - continue; - } - gst::debug!( - CAT, - obj: backup_pad, - "Timeout reached: {} + {} <= {}", - state.last_output_time.display(), - settings.timeout, - running_time.display(), - ); - } else { - gst::debug!( - CAT, - obj: backup_pad, - "Consuming buffer as we haven't yet received a buffer on the other pad", - ); - } - - let mut active_sinkpad = self.active_sinkpad.lock().unwrap(); - let pad_change = settings.auto_switch - && active_sinkpad.as_ref() != Some(backup_pad.upcast_ref::()); - if pad_change { - if buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) { - gst::info!( - CAT, - obj: backup_pad, - "Can't change to sinkpad {} yet, waiting for keyframe", - backup_pad.name() - ); - backup_pad.push_event( - gst_video::UpstreamForceKeyUnitEvent::builder() - .all_headers(true) - .build(), - ); - continue; - } - - gst::info!( - CAT, - obj: backup_pad, - "Active pad changed to fallback sinkpad" - ); - *active_sinkpad = Some(backup_pad.clone().upcast()); - } - drop(active_sinkpad); - - let active_caps = if backup_pad == &self.primary_sinkpad { - let pad_state = self.primary_state.read().unwrap(); - pad_state.caps.as_ref().unwrap().clone() - } else { - let pad_state = self.fallback_state.read().unwrap(); - pad_state.caps.as_ref().unwrap().clone() - }; - - break Ok((buffer, active_caps, pad_change)); - } - } - - #[allow(clippy::type_complexity)] - fn next_buffer( - &self, - agg: &super::FallbackSwitch, - timeout: bool, - ) -> ( - Result< - ( - gst::Buffer, // Next buffer from the chosen pad - gst::Caps, // Caps for the buffer - bool, // If the input pad changed to/from primary<->fallback - ), - gst::FlowError, - >, - ( - bool, // If the health of the primary pad changed - bool, // If the health of the fallback pad changed - ), ) { - let settings = self.settings.lock().unwrap().clone(); - let mut state = self.output_state.lock().unwrap(); + debug!( + CAT, + obj: element, + "timeout fired - looking for a pad to switch to" + ); - gst::debug!(CAT, obj: agg, "Aggregate called: timeout {}", timeout); + /* Advance the output running time to this timeout */ + state.output_running_time = Some(state.timeout_running_time); - if self.primary_sinkpad.is_eos() { - gst::log!(CAT, obj: agg, "Sinkpad is EOS"); - return (Err(gst::FlowError::Eos), (false, false)); + let mut best_priority = 0u32; + let mut best_pad = None; + + for pad in element.sink_pads() { + /* Don't consider the active sinkpad */ + let pad = pad.downcast_ref::().unwrap(); + let pad_imp = FallbackSwitchSinkPad::from_instance(pad); + if Some(pad) == state.active_sinkpad.as_ref() { + continue; + } + let pad_state = pad_imp.state.lock(); + let pad_settings = pad_imp.settings.lock().clone(); + #[allow(clippy::collapsible_if)] + /* If this pad has data that arrived within the 'timeout' window + * before the timeout fired, we can switch to it */ + if pad_state.is_healthy(state, settings) { + if best_pad.is_none() || pad_settings.priority < best_priority { + best_pad = Some(pad.clone()); + best_priority = pad_settings.priority; + } + } } - /* Choose which pad we check first */ - let active_sinkpad = self.active_sinkpad.lock().unwrap(); - let prefer_primary = settings.auto_switch - || active_sinkpad.is_none() - || active_sinkpad.as_ref() == Some(self.primary_sinkpad.upcast_ref::()); - drop(active_sinkpad); - - let fallback_sinkpad = self.fallback_sinkpad.read().unwrap(); - - let (preferred_pad, backup_pad) = if prefer_primary { - (&self.primary_sinkpad, fallback_sinkpad.as_ref()) + if let Some(best_pad) = best_pad { + debug!( + CAT, + obj: element, + "Found viable pad to switch to: {:?}", + best_pad + ); + self.set_active_pad(state, &best_pad) } else { - ( - fallback_sinkpad.as_ref().unwrap(), - Some(&self.primary_sinkpad), + state.timed_out = true; + } + } + + fn on_timeout( + &self, + element: &super::FallbackSwitch, + clock_id: &gst::ClockId, + settings: &Settings, + ) { + let mut state = self.state.lock(); + + if state.timeout_clock_id.as_ref() != Some(clock_id) { + /* Timeout fired late, ignore it. */ + debug!(CAT, obj: element, "Late timeout callback. Ignoring"); + return; + } + self.handle_timeout(element, &mut state, settings); + } + + fn cancel_waits(&self, element: &super::FallbackSwitch) { + for pad in element.sink_pads() { + let sink_pad = FallbackSwitchSinkPad::from_instance(pad.downcast_ref().unwrap()); + let mut pad_state = sink_pad.state.lock(); + pad_state.cancel_wait(); + } + } + + fn schedule_timeout( + &self, + element: &super::FallbackSwitch, + state: &mut State, + settings: &Settings, + running_time: gst::ClockTime, + ) { + /* clear any previous timeout */ + if let Some(clock_id) = state.timeout_clock_id.take() { + clock_id.unschedule(); + } + + let clock = match element.clock() { + None => return, + Some(clock) => clock, + }; + + let timeout_running_time = running_time + .saturating_add(state.upstream_latency + settings.timeout + settings.latency); + + let base_time = element.base_time(); + + let wait_until = match Some(timeout_running_time) + .zip(base_time) + .map(|(running_time, base_time)| running_time + base_time) + { + Some(wait_until) => wait_until, + None => return, + }; + + state.timeout_running_time = timeout_running_time; + + /* If we're already running behind, fire the timeout immediately */ + let now = clock.time(); + if now.map_or(false, |now| wait_until <= now) { + self.handle_timeout(element, state, settings); + return; + } + + debug!(CAT, obj: element, "Scheduling timeout for {}", wait_until); + let timeout_id = clock.new_single_shot_id(wait_until); + + state.timeout_clock_id = Some(timeout_id.clone().into()); + state.timed_out = false; + + let element_weak = element.downgrade(); + timeout_id + .wait_async(move |_clock, _time, clock_id| { + let element = match element_weak.upgrade() { + None => return, + Some(element) => element, + }; + let fallbackswitch = FallbackSwitch::from_instance(&element); + let settings = fallbackswitch.settings.lock().clone(); + fallbackswitch.on_timeout(&element, clock_id, &settings); + }) + .expect("Failed to wait async"); + } + + fn sink_chain( + &self, + pad: &super::FallbackSwitchSinkPad, + element: &super::FallbackSwitch, + mut buffer: gst::Buffer, + ) -> Result { + /* There are 4 cases coming in: + * 1. This is not the active pad but is higher priority: + * - become the active pad, then goto 4. + * 2. This is not the active pad, but the output timed out due to all pads running + * late. + * - become the active pad, then goto 4. + * 3. This is not the active pad, but might become the active pad + * - Wait for the buffer end time (or buffer start time + timeout if there's no + * duration). If we get woken early, and became the active pad, then output the + * buffer. + * 4. This is the active pad: + * - sleep until the buffer running time, then check if we're still active + */ + + /* First see if we should become the active pad */ + let mut state = self.state.lock(); + let settings = self.settings.lock().clone(); + let pad = pad.downcast_ref().unwrap(); + let pad_imp = FallbackSwitchSinkPad::from_instance(pad); + + if state.active_sinkpad.as_ref() != Some(pad) && settings.auto_switch { + let pad_settings = pad_imp.settings.lock().clone(); + let mut switch_to_pad = state.timed_out; + + switch_to_pad |= if let Some(active_sinkpad) = &state.active_sinkpad { + let active_sinkpad_imp = active_sinkpad.imp(); + let active_pad_settings = active_sinkpad_imp.settings.lock().clone(); + (pad_settings.priority < active_pad_settings.priority) + || (state.first && settings.immediate_fallback) + } else { + match settings.immediate_fallback { + true => true, + false => pad_settings.priority == 0, + } + }; + if state.first { + state.first = false; + } + + drop(pad_settings); + + if switch_to_pad { + state.timed_out = false; + self.set_active_pad(&mut state, pad) + } + }; + + /* This might be the active_sinkpad now */ + let is_active = state.active_sinkpad.as_ref() == Some(pad); + + let mut pad_state = pad_imp.state.lock(); + let (start_running_time, end_running_time) = pad_state.get_sync_time(&buffer); + + log!( + CAT, + obj: pad, + "Handling buffer {:?} run ts start {:?} end {:?} pad active {}", + buffer, + start_running_time, + end_running_time, + is_active + ); + + #[allow(clippy::blocks_in_if_conditions)] + let output_clockid = if is_active { + pad_state.schedule_clock( + element, + start_running_time, + state.upstream_latency + settings.latency, ) - }; - - let clock = agg.clock(); - let base_time = agg.base_time(); - - let cur_running_time = if let Some(clock) = clock { - clock.time().opt_checked_sub(base_time).ok().flatten() - } else { - gst::ClockTime::NONE - }; - - /* See if there's a buffer on the preferred pad and output that */ - if let Some(buffer) = preferred_pad.pop_buffer() { - match self.handle_main_buffer( - agg, - &mut *state, - &settings, + } else if end_running_time.map_or(false, |end_running_time| { + end_running_time < state.timeout_running_time + }) { + log!( + CAT, + obj: pad, + "Dropping trailing buffer {:?} before timeout {}", buffer, - preferred_pad, - &backup_pad, - cur_running_time, - ) { - Ok(Some(res)) => { - return ( - Ok(res), - state.check_health_changes( - &settings, - &backup_pad, - prefer_primary, - cur_running_time, - ), - ) - } - Err(e) => { - return ( - Err(e), - state.check_health_changes( - &settings, - &backup_pad, - prefer_primary, - cur_running_time, - ), - ) - } - _ => (), - } - } - - /* If we can't auto-switch, then can't fetch anything from the backup pad */ - if !settings.auto_switch { - /* Not switching, but backup pad needs draining of late buffers still */ - gst::log!( - CAT, - obj: agg, - "No primary buffer, but can't autoswitch - draining backup pad" + state.timeout_running_time ); - if let Some(backup_pad) = &backup_pad { - if let Err(e) = self.drain_pad_to_time(&mut *state, backup_pad, cur_running_time) { - return ( - Err(e), - state.check_health_changes( - &settings, - &Some(backup_pad), - prefer_primary, - cur_running_time, - ), - ); - } - } - return ( - Err(gst_base::AGGREGATOR_FLOW_NEED_DATA), - state.check_health_changes( - &settings, - &backup_pad, - prefer_primary, - cur_running_time, - ), - ); - } - - if let (false, Some(backup_pad)) = (timeout, &backup_pad) { - gst::debug!(CAT, obj: agg, "Have fallback sinkpad but no timeout yet"); - ( - Err(gst_base::AGGREGATOR_FLOW_NEED_DATA), - state.check_health_changes( - &settings, - &Some(backup_pad), - prefer_primary, - cur_running_time, - ), - ) - } else if let (true, Some(backup_pad)) = (timeout, &backup_pad) { - ( - self.backup_buffer(&mut *state, &settings, backup_pad), - state.check_health_changes( - &settings, - &Some(backup_pad), - prefer_primary, - cur_running_time, - ), - ) + return Ok(gst::FlowSuccess::Ok); } else { - // Otherwise there's not much we can do at this point - gst::debug!( - CAT, - obj: agg, - "Got no buffer on sinkpad and have no fallback sinkpad" - ); - ( - Err(gst_base::AGGREGATOR_FLOW_NEED_DATA), - state.check_health_changes( - &settings, - &backup_pad, - prefer_primary, - cur_running_time, - ), + pad_state.schedule_clock( + element, + end_running_time, + state.upstream_latency + settings.timeout + settings.latency, ) + }; + + if let Some(running_time) = start_running_time { + pad_state.current_running_time = Some(running_time); + } + drop(pad_state); + + /* Before sleeping, ensure there is a timeout to switch active pads, + * in case the initial active pad never receives a buffer */ + if let Some(running_time) = start_running_time { + if state.timeout_clock_id.is_none() && !is_active { + self.schedule_timeout(element, &mut state, &settings, running_time); + } + } + + let mut state = if let Some(clock_id) = &output_clockid { + drop(state); + + let (_res, _) = clock_id.wait(); + self.state.lock() + } else { + state + }; + + let pad_state = pad_imp.state.lock(); + if pad_state.flushing { + debug!(CAT, obj: element, "Flushing"); + return Err(gst::FlowError::Flushing); + } + drop(pad_state); + + let is_active = state.active_sinkpad.as_ref() == Some(pad); + let switched_pad = state.switched_pad; + let discont_pending = state.discont_pending; + + if is_active { + if start_running_time < state.output_running_time { + log!( + CAT, + obj: pad, + "Dropping trailing buffer {:?} before timeout {}", + buffer, + state.timeout_running_time + ); + + return Ok(gst::FlowSuccess::Ok); + } + + if start_running_time.is_some() { + state.output_running_time = start_running_time; + } + + if let Some(end_running_time) = end_running_time { + self.schedule_timeout(element, &mut state, &settings, end_running_time); + } else { + state.cancel_timeout(); + } + + state.switched_pad = false; + state.discont_pending = false; + } + + let mut pad_state = pad_imp.state.lock(); + pad_state.is_healthy = pad_state.is_healthy(&state, &settings); + drop(pad_state); + + if !is_active { + log!(CAT, obj: pad, "Dropping {:?}", buffer); + return Ok(gst::FlowSuccess::Ok); + } + let _stream_lock = self.src_pad.stream_lock(); + drop(state); + + if switched_pad { + let _ = pad.push_event(gst::event::Reconfigure::new()); + pad.sticky_events_foreach(|event| { + self.src_pad.push_event(event.clone()); + std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep) + }); + + element.notify(PROP_ACTIVE_PAD); + } + + if discont_pending && !buffer.flags().contains(gst::BufferFlags::DISCONT) { + let buffer = buffer.make_mut(); + buffer.set_flags(gst::BufferFlags::DISCONT); + } + + /* TODO: Clip raw video and audio buffers to avoid going backward? */ + + log!(CAT, obj: pad, "Forwarding {:?}", buffer); + self.src_pad.push(buffer) + } + + fn sink_chain_list( + &self, + pad: &super::FallbackSwitchSinkPad, + element: &super::FallbackSwitch, + list: gst::BufferList, + ) -> Result { + log!(CAT, obj: pad, "Handling buffer list {:?}", list); + // TODO: Keep the list intact and forward it in one go (or broken into several + // pieces if needed) when outputting to the active pad + for buffer in list.iter_owned() { + self.sink_chain(pad, element, buffer)?; + } + + Ok(gst::FlowSuccess::Ok) + } + + fn sink_event( + &self, + pad: &super::FallbackSwitchSinkPad, + element: &super::FallbackSwitch, + event: gst::Event, + ) -> bool { + let mut state = self.state.lock(); + let forward = state.active_sinkpad.as_ref() == Some(pad); + + let mut pad_state = pad.imp().state.lock(); + + match event.view() { + gst::EventView::Caps(caps) => { + let caps = caps.caps(); + debug!(CAT, obj: pad, "Received caps {}", caps); + + let caps_info = match caps.structure(0).unwrap().name() { + "audio/x-raw" => { + CapsInfo::Audio(gst_audio::AudioInfo::from_caps(caps).unwrap()) + } + "video/x-raw" => { + CapsInfo::Video(gst_video::VideoInfo::from_caps(caps).unwrap()) + } + _ => CapsInfo::None, + }; + + pad_state.caps_info = caps_info; + } + gst::EventView::Segment(e) => { + let segment = match e.segment().clone().downcast::() { + Err(segment) => { + gst::element_error!( + element, + gst::StreamError::Format, + ["Only TIME segments supported, got {:?}", segment.format(),] + ); + return false; + } + Ok(segment) => segment, + }; + + pad_state.segment = segment; + } + gst::EventView::FlushStart(_) => { + pad_state.flush_start(); + } + gst::EventView::FlushStop(_) => { + pad_state.reset(); + state.first = true; + } + _ => {} + } + + let fwd_sticky = if forward && state.switched_pad && event.is_serialized() { + state.switched_pad = false; + true + } else { + false + }; + + drop(pad_state); + let _stream_lock = forward.then(|| self.src_pad.stream_lock()); + drop(state); + + if forward { + if fwd_sticky { + let _ = pad.push_event(gst::event::Reconfigure::new()); + pad.sticky_events_foreach(|event| { + self.src_pad.push_event(event.clone()); + std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep) + }); + + element.notify(PROP_ACTIVE_PAD); + } + self.src_pad.push_event(event); + } + true + } + + fn sink_query( + &self, + pad: &super::FallbackSwitchSinkPad, + element: &super::FallbackSwitch, + query: &mut gst::QueryRef, + ) -> bool { + use gst::QueryView; + + log!(CAT, obj: pad, "Handling query {:?}", query); + + let forward = match query.view() { + QueryView::Context(_) => true, + QueryView::Position(_) => true, + QueryView::Duration(_) => true, + QueryView::Caps(_) => true, + QueryView::Allocation(_) => { + let state = self.state.lock(); + /* Forward allocation only for the active sink pad, + * for others switching will send a reconfigure event upstream + */ + state.active_sinkpad.as_ref() == Some(pad) + } + _ => { + pad.query_default(Some(element), query); + false + } + }; + + if forward { + log!(CAT, obj: pad, "Forwarding query {:?}", query); + self.src_pad.peer_query(query) + } else { + false + } + } + + fn reset(&self, _element: &super::FallbackSwitch) { + let mut state = self.state.lock(); + *state = State::default(); + } + + fn src_query( + &self, + pad: &gst::Pad, + element: &super::FallbackSwitch, + query: &mut gst::QueryRef, + ) -> bool { + use gst::QueryViewMut; + + log!(CAT, obj: pad, "Handling {:?}", query); + + match query.view_mut() { + QueryViewMut::Latency(ref mut q) => { + let mut ret = true; + let mut min_latency = gst::ClockTime::ZERO; + let mut max_latency = gst::ClockTime::NONE; + + for pad in element.sink_pads() { + let mut peer_query = gst::query::Latency::new(); + + ret = pad.peer_query(&mut peer_query); + + if ret { + let (live, min, max) = peer_query.result(); + if live { + min_latency = min.max(min_latency); + max_latency = max + .zip(max_latency) + .map(|(max, max_latency)| max.min(max_latency)) + .or(max); + } + } + } + + let mut state = self.state.lock(); + let settings = self.settings.lock().clone(); + min_latency = min_latency.max(settings.min_upstream_latency); + state.upstream_latency = min_latency; + log!(CAT, obj: pad, "Upstream latency {}", min_latency); + + q.set(true, min_latency + settings.latency, max_latency); + + ret + } + QueryViewMut::Caps(_) => { + let sinkpad = { + let state = self.state.lock(); + state.active_sinkpad.clone() + }; + + if let Some(sinkpad) = sinkpad { + sinkpad.peer_query(query) + } else { + pad.query_default(Some(element), query) + } + } + _ => { + let sinkpad = { + let state = self.state.lock(); + state.active_sinkpad.clone() + }; + + if let Some(sinkpad) = sinkpad { + sinkpad.peer_query(query) + } else { + true + } + } } } } @@ -696,21 +892,26 @@ impl FallbackSwitch { impl ObjectSubclass for FallbackSwitch { const NAME: &'static str = "FallbackSwitch"; type Type = super::FallbackSwitch; - type ParentType = gst_base::Aggregator; + type ParentType = gst::Element; + type Interfaces = (gst::ChildProxy,); fn with_class(klass: &Self::Class) -> Self { - let templ = klass.pad_template("sink").unwrap(); - let sinkpad = - gst::PadBuilder::::from_template(&templ, Some("sink")).build(); + let templ = klass.pad_template("src").unwrap(); + let srcpad = gst::Pad::builder_with_template(&templ, Some("src")) + .query_function(|pad, parent, query| { + FallbackSwitch::catch_panic_pad_function( + parent, + || false, + |fallbackswitch, element| fallbackswitch.src_query(pad, element, query), + ) + }) + .build(); Self { - primary_sinkpad: sinkpad, - primary_state: RwLock::new(PadInputState::default()), - fallback_sinkpad: RwLock::new(None), - fallback_state: RwLock::new(PadInputState::default()), - active_sinkpad: Mutex::new(None), - output_state: Mutex::new(OutputState::default()), + src_pad: srcpad, + state: Mutex::new(State::default()), settings: Mutex::new(Settings::default()), + sink_pad_serial: AtomicU32::new(0), } } } @@ -719,51 +920,53 @@ impl ObjectImpl for FallbackSwitch { fn properties() -> &'static [glib::ParamSpec] { static PROPERTIES: Lazy> = Lazy::new(|| { vec![ - glib::ParamSpecUInt64::new( - "timeout", - "Timeout", - "Timeout in nanoseconds", - 0, - std::u64::MAX - 1, - DEFAULT_TIMEOUT.nseconds() as u64, - glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, - ), glib::ParamSpecObject::new( - "active-pad", + PROP_ACTIVE_PAD, "Active Pad", - "Currently active pad. Writes are ignored if auto-switch=true", + "Currently active pad", gst::Pad::static_type(), glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_PLAYING, ), - glib::ParamSpecBoolean::new( - "auto-switch", - "Automatically switch pads", - "Automatically switch pads (If true, prefer primary sink, otherwise manual selection via the active-pad property)", - DEFAULT_AUTO_SWITCH, - glib::ParamFlags::READWRITE| gst::PARAM_FLAG_MUTABLE_READY, + glib::ParamSpecUInt64::new( + PROP_TIMEOUT, + "Input timeout", + "Timeout on an input before switching to a lower priority input.", + 0, + std::u64::MAX - 1, + Settings::default().timeout.nseconds(), + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_PLAYING, ), - glib::ParamSpecEnum::new( - "primary-health", - "Primary stream state", - "Reports the health of the primary stream on the sink pad", - StreamHealth::static_type(), - DEFAULT_STREAM_HEALTH as i32, - glib::ParamFlags::READABLE, + glib::ParamSpecUInt64::new( + PROP_LATENCY, + "Latency", + "Additional latency in live mode to allow upstream to take longer to produce buffers for the current position (in nanoseconds)", + 0, + std::u64::MAX - 1, + Settings::default().latency.nseconds(), + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, ), - glib::ParamSpecEnum::new( - "fallback-health", - "Fallback stream state", - "Reports the health of the fallback stream on the fallback_sink pad", - StreamHealth::static_type(), - DEFAULT_STREAM_HEALTH as i32, - glib::ParamFlags::READABLE, + glib::ParamSpecUInt64::new( + PROP_MIN_UPSTREAM_LATENCY, + "Minimum Upstream Latency", + "When sources with a higher latency are expected to be plugged in dynamically after the fallbackswitch has started playing, this allows overriding the minimum latency reported by the initial source(s). This is only taken into account when larger than the actually reported minimum latency. (nanoseconds)", + 0, + std::u64::MAX - 1, + Settings::default().min_upstream_latency.nseconds(), + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, ), glib::ParamSpecBoolean::new( - "immediate-fallback", + PROP_IMMEDIATE_FALLBACK, "Immediate fallback", - "Forward the fallback stream immediately at startup, when the primary stream is slow to start up and immediate output is required", - DEFAULT_IMMEDIATE_FALLBACK, - glib::ParamFlags::READWRITE| gst::PARAM_FLAG_MUTABLE_READY, + "Forward lower-priority streams immediately at startup, when the stream with priority 0 is slow to start up and immediate output is required", + Settings::default().immediate_fallback, + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), + glib::ParamSpecBoolean::new( + PROP_AUTO_SWITCH, + "Automatically switch pads", + "Automatically switch pads (If true, use the priority pad property, otherwise manual selection via the active-pad property)", + Settings::default().auto_switch, + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, ), ] }); @@ -771,12 +974,6 @@ impl ObjectImpl for FallbackSwitch { PROPERTIES.as_ref() } - fn constructed(&self, obj: &Self::Type) { - self.parent_constructed(obj); - - obj.add_pad(&self.primary_sinkpad).unwrap(); - } - fn set_property( &self, obj: &Self::Type, @@ -785,21 +982,8 @@ impl ObjectImpl for FallbackSwitch { pspec: &glib::ParamSpec, ) { match pspec.name() { - "timeout" => { - let mut settings = self.settings.lock().unwrap(); - let timeout = value.get().expect("type checked upstream"); - gst::info!( - CAT, - obj: obj, - "Changing timeout from {} to {}", - settings.timeout, - timeout - ); - settings.timeout = timeout; - drop(settings); - } - "active-pad" => { - let settings = self.settings.lock().unwrap(); + PROP_ACTIVE_PAD => { + let settings = self.settings.lock(); if settings.auto_switch { gst::warning!( CAT, @@ -811,21 +995,51 @@ impl ObjectImpl for FallbackSwitch { .get::>() .expect("type checked upstream"); /* Trigger a pad switch if needed */ - let mut cur_active_pad = self.active_sinkpad.lock().unwrap(); - if *cur_active_pad != active_pad { - *cur_active_pad = active_pad; + if let Some(active_pad) = active_pad { + self.set_active_pad( + &mut self.state.lock(), + active_pad + .downcast_ref::() + .unwrap(), + ); } - drop(cur_active_pad); } drop(settings); } - "auto-switch" => { - let mut settings = self.settings.lock().unwrap(); - settings.auto_switch = value.get().expect("type checked upstream"); + PROP_TIMEOUT => { + let mut settings = self.settings.lock(); + let new_value = value.get().expect("type checked upstream"); + + settings.timeout = new_value; + debug!(CAT, obj: obj, "Timeout now {}", settings.timeout); + drop(settings); + let _ = obj.post_message(gst::message::Latency::builder().src(obj).build()); } - "immediate-fallback" => { - let mut settings = self.settings.lock().unwrap(); - settings.immediate_fallback = value.get().expect("type checked upstream"); + PROP_LATENCY => { + let mut settings = self.settings.lock(); + let new_value = value.get().expect("type checked upstream"); + + settings.latency = new_value; + drop(settings); + let _ = obj.post_message(gst::message::Latency::builder().src(obj).build()); + } + PROP_MIN_UPSTREAM_LATENCY => { + let mut settings = self.settings.lock(); + let new_value = value.get().expect("type checked upstream"); + + settings.min_upstream_latency = new_value; + drop(settings); + let _ = obj.post_message(gst::message::Latency::builder().src(obj).build()); + } + PROP_IMMEDIATE_FALLBACK => { + let mut settings = self.settings.lock(); + let new_value = value.get().expect("type checked upstream"); + settings.immediate_fallback = new_value; + } + PROP_AUTO_SWITCH => { + let mut settings = self.settings.lock(); + let new_value = value.get().expect("type checked upstream"); + settings.auto_switch = new_value; } _ => unimplemented!(), } @@ -833,45 +1047,51 @@ impl ObjectImpl for FallbackSwitch { fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { match pspec.name() { - "timeout" => { - let settings = self.settings.lock().unwrap(); - settings.timeout.to_value() - } - "active-pad" => { - let active_pad = self.active_sinkpad.lock().unwrap().clone(); + PROP_ACTIVE_PAD => { + let state = self.state.lock(); + let active_pad = state.active_sinkpad.clone(); active_pad.to_value() } - "auto-switch" => { - let settings = self.settings.lock().unwrap(); - settings.auto_switch.to_value() + PROP_TIMEOUT => { + let settings = self.settings.lock(); + settings.timeout.to_value() } - "primary-health" => { - let state = self.output_state.lock().unwrap(); - state.primary.stream_health.to_value() + PROP_LATENCY => { + let settings = self.settings.lock(); + settings.latency.to_value() } - "fallback-health" => { - let state = self.output_state.lock().unwrap(); - state.fallback.stream_health.to_value() + PROP_MIN_UPSTREAM_LATENCY => { + let settings = self.settings.lock(); + settings.min_upstream_latency.to_value() } - "immediate-fallback" => { - let settings = self.settings.lock().unwrap(); + PROP_IMMEDIATE_FALLBACK => { + let settings = self.settings.lock(); settings.immediate_fallback.to_value() } + PROP_AUTO_SWITCH => { + let settings = self.settings.lock(); + settings.auto_switch.to_value() + } _ => unimplemented!(), } } -} -impl GstObjectImpl for FallbackSwitch {} + fn constructed(&self, obj: &Self::Type) { + self.parent_constructed(obj); + + obj.add_pad(&self.src_pad).unwrap(); + obj.set_element_flags(gst::ElementFlags::REQUIRE_CLOCK); + } +} impl ElementImpl for FallbackSwitch { fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { static ELEMENT_METADATA: Lazy = Lazy::new(|| { gst::subclass::ElementMetadata::new( - "Fallback Switch", + "Priority-based input selector", "Generic", - "Allows switching to a fallback input after a given timeout", - "Sebastian Dröge ", + "Priority-based automatic input selector element", + "Jan Schmidt ", ) }); @@ -881,539 +1101,185 @@ impl ElementImpl for FallbackSwitch { fn pad_templates() -> &'static [gst::PadTemplate] { static PAD_TEMPLATES: Lazy> = Lazy::new(|| { let caps = gst::Caps::new_any(); - let src_pad_template = gst::PadTemplate::with_gtype( + let sink_pad_template = gst::PadTemplate::with_gtype( + "sink_%u", + gst::PadDirection::Sink, + gst::PadPresence::Request, + &caps, + super::FallbackSwitchSinkPad::static_type(), + ) + .unwrap(); + + let src_pad_template = gst::PadTemplate::new( "src", gst::PadDirection::Src, gst::PadPresence::Always, &caps, - gst_base::AggregatorPad::static_type(), ) .unwrap(); - let sink_pad_template = gst::PadTemplate::with_gtype( - "sink", - gst::PadDirection::Sink, - gst::PadPresence::Always, - &caps, - gst_base::AggregatorPad::static_type(), - ) - .unwrap(); - - let fallbacksink_pad_template = gst::PadTemplate::with_gtype( - "fallback_sink", - gst::PadDirection::Sink, - gst::PadPresence::Request, - &caps, - gst_base::AggregatorPad::static_type(), - ) - .unwrap(); - - vec![ - src_pad_template, - sink_pad_template, - fallbacksink_pad_template, - ] + vec![sink_pad_template, src_pad_template] }); PAD_TEMPLATES.as_ref() } + fn change_state( + &self, + element: &Self::Type, + transition: gst::StateChange, + ) -> Result { + trace!(CAT, obj: element, "Changing state {:?}", transition); + + match transition { + gst::StateChange::PlayingToPaused => { + self.cancel_waits(element); + } + gst::StateChange::ReadyToNull => { + self.reset(element); + } + gst::StateChange::ReadyToPaused => { + let mut state = self.state.lock(); + *state = State::default(); + let pads = element.sink_pads(); + if let Some(pad) = pads.first() { + state.active_sinkpad = Some( + pad.clone() + .downcast::() + .unwrap(), + ); + state.switched_pad = true; + state.discont_pending = true; + drop(state); + element.notify(PROP_ACTIVE_PAD); + } + for pad in pads { + let pad = pad.downcast_ref::().unwrap(); + let pad_imp = pad.imp(); + *pad_imp.state.lock() = SinkState::default(); + } + } + _ => (), + } + + let mut success = self.parent_change_state(element, transition)?; + + match transition { + gst::StateChange::ReadyToPaused => { + success = gst::StateChangeSuccess::NoPreroll; + } + gst::StateChange::PlayingToPaused => { + success = gst::StateChangeSuccess::NoPreroll; + } + gst::StateChange::PausedToReady => { + *self.state.lock() = State::default(); + for pad in element.sink_pads() { + let pad = pad.downcast_ref::().unwrap(); + let pad_imp = pad.imp(); + *pad_imp.state.lock() = SinkState::default(); + } + } + _ => (), + } + + Ok(success) + } + fn request_new_pad( &self, element: &Self::Type, templ: &gst::PadTemplate, - name: Option, + _name: Option, _caps: Option<&gst::Caps>, ) -> Option { - let fallback_sink_templ = element.pad_template("fallback_sink").unwrap(); - if templ != &fallback_sink_templ - || (name.is_some() && name.as_deref() != Some("fallback_sink")) - { - gst::error!(CAT, obj: element, "Wrong pad template or name"); - return None; + let mut state = self.state.lock(); + + let pad_serial = self.sink_pad_serial.fetch_add(1, Ordering::SeqCst); + + let pad = gst::PadBuilder::::from_template( + templ, + Some(format!("sink_{}", pad_serial).as_str()), + ) + .chain_function(|pad, parent, buffer| { + FallbackSwitch::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |fallbackswitch, element| fallbackswitch.sink_chain(pad, element, buffer), + ) + }) + .chain_list_function(|pad, parent, bufferlist| { + FallbackSwitch::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |fallbackswitch, element| fallbackswitch.sink_chain_list(pad, element, bufferlist), + ) + }) + .event_function(|pad, parent, event| { + FallbackSwitch::catch_panic_pad_function( + parent, + || false, + |fallbackswitch, element| fallbackswitch.sink_event(pad, element, event), + ) + }) + .query_function(|pad, parent, query| { + FallbackSwitch::catch_panic_pad_function( + parent, + || false, + |fallbackswitch, element| fallbackswitch.sink_query(pad, element, query), + ) + }) + .build(); + + pad.set_active(true).unwrap(); + element.add_pad(&pad).unwrap(); + + if state.active_sinkpad.is_none() { + state.active_sinkpad = Some(pad.clone()); + state.switched_pad = true; + state.discont_pending = true; + element.notify(PROP_ACTIVE_PAD); } - let mut fallback_sinkpad = self.fallback_sinkpad.write().unwrap(); - if fallback_sinkpad.is_some() { - gst::error!(CAT, obj: element, "Already have a fallback sinkpad"); - return None; - } - - let sinkpad = - gst::PadBuilder::::from_template(templ, Some("fallback_sink")) - .build(); - - *fallback_sinkpad = Some(sinkpad.clone()); - drop(fallback_sinkpad); - - let mut state = self.output_state.lock().unwrap(); - state.fallback = PadOutputState::default(); + let mut pad_settings = pad.imp().settings.lock(); + pad_settings.priority = pad_serial; + drop(pad_settings); drop(state); - element.add_pad(&sinkpad).unwrap(); + let _ = element.post_message(gst::message::Latency::builder().src(element).build()); - Some(sinkpad.upcast()) + element.child_added(&pad, &pad.name()); + Some(pad.upcast()) } fn release_pad(&self, element: &Self::Type, pad: &gst::Pad) { - let mut fallback_sinkpad = self.fallback_sinkpad.write().unwrap(); + element.remove_pad(pad).unwrap(); - if fallback_sinkpad.as_ref().map(|p| p.upcast_ref()) == Some(pad) { - *fallback_sinkpad = None; - drop(fallback_sinkpad); - element.remove_pad(pad).unwrap(); - gst::debug!(CAT, obj: element, "Removed fallback sinkpad {:?}", pad); - } - *self.fallback_state.write().unwrap() = PadInputState::default(); - *self.active_sinkpad.lock().unwrap() = None; + element.child_removed(pad, &pad.name()); + let _ = element.post_message(gst::message::Latency::builder().src(element).build()); } } -impl AggregatorImpl for FallbackSwitch { - fn start(&self, _agg: &Self::Type) -> Result<(), gst::ErrorMessage> { - *self.output_state.lock().unwrap() = OutputState::default(); - - *self.primary_state.write().unwrap() = PadInputState::default(); - *self.fallback_state.write().unwrap() = PadInputState::default(); - - Ok(()) +// Implementation of gst::ChildProxy virtual methods. +// +// This allows accessing the pads and their properties from e.g. gst-launch. +impl ChildProxyImpl for FallbackSwitch { + fn children_count(&self, object: &Self::Type) -> u32 { + object.num_pads() as u32 } - fn stop(&self, _agg: &Self::Type) -> Result<(), gst::ErrorMessage> { - *self.active_sinkpad.lock().unwrap() = None; - - Ok(()) + fn child_by_name(&self, object: &Self::Type, name: &str) -> Option { + object + .pads() + .into_iter() + .find(|p| p.name() == name) + .map(|p| p.upcast()) } - #[cfg(feature = "v1_20")] - fn sink_event_pre_queue( - &self, - agg: &Self::Type, - agg_pad: &gst_base::AggregatorPad, - event: gst::Event, - ) -> Result { - use gst::EventView; - - match event.view() { - EventView::Gap(gap) => { - if gap.gap_flags().contains(gst::GapFlags::DATA) { - let settings = self.settings.lock().unwrap().clone(); - let mut state = self.output_state.lock().unwrap(); - let fallback_sinkpad = self.fallback_sinkpad.read().unwrap(); - - let on_active_sinkpad = { - let active_sinkpad = self.active_sinkpad.lock().unwrap(); - active_sinkpad.as_ref() == Some(agg_pad.upcast_ref::()) - }; - - // handle GAP as timed item - let clock = agg.clock(); - let base_time = agg.base_time(); - - let cur_running_time = if let Some(clock) = clock { - clock.time().opt_checked_sub(base_time).ok().flatten() - } else { - gst::ClockTime::NONE - }; - - let mut forwarded = false; - if let Ok(Some((item, _, _))) = self.handle_main_timed_item( - agg, - &mut *state, - &settings, - TimedItem::GapEvent(event), - agg_pad, - &fallback_sinkpad.as_ref(), - cur_running_time, - ) { - // push GAP downstream only if it's from the active sink pad - if on_active_sinkpad { - let event = item.event(); - // FIXME: API to retrieve src pad from Aggregator? - let src_pad = agg.static_pad("src").unwrap(); - - gst::debug!( - CAT, - obj: agg_pad, - "Forwarding gap event downstream: {:?}", - event - ); - - src_pad.push_event(event); - forwarded = true; - } - } - - if !forwarded { - gst::debug!(CAT, obj: agg_pad, "Dropping gap event"); - } - - Ok(gst::FlowSuccess::Ok) - } else { - self.parent_sink_event_pre_queue(agg, agg_pad, event) - } - } - _ => self.parent_sink_event_pre_queue(agg, agg_pad, event), - } - } - - fn sink_event( - &self, - agg: &Self::Type, - agg_pad: &gst_base::AggregatorPad, - event: gst::Event, - ) -> bool { - use gst::EventView; - - match event.view() { - EventView::Caps(caps) => { - let caps = caps.caps_owned(); - gst::debug!(CAT, obj: agg_pad, "Received caps {}", caps); - - let audio_info; - let video_info; - if caps.structure(0).unwrap().name() == "audio/x-raw" { - audio_info = gst_audio::AudioInfo::from_caps(&caps).ok(); - video_info = None; - } else if caps.structure(0).unwrap().name() == "video/x-raw" { - audio_info = None; - video_info = gst_video::VideoInfo::from_caps(&caps).ok(); - } else { - audio_info = None; - video_info = None; - } - - let new_pad_state = PadInputState { - caps: Some(caps), - audio_info, - video_info, - }; - - if agg_pad == &self.primary_sinkpad { - *self.primary_state.write().unwrap() = new_pad_state; - } else if Some(agg_pad) == self.fallback_sinkpad.read().unwrap().as_ref() { - *self.fallback_state.write().unwrap() = new_pad_state; - } - - self.parent_sink_event(agg, agg_pad, event) - } - _ => self.parent_sink_event(agg, agg_pad, event), - } - } - - fn next_time(&self, agg: &Self::Type) -> Option { - /* At each iteration, we have a preferred pad and a backup pad. If autoswitch is true, - * the sinkpad is always preferred, otherwise it's the active sinkpad as set by the app. - * The backup pad is the other one (may be None if there's no fallback pad yet). - * - * If we have a buffer on the preferred pad then the timeout is always going to be immediately, - * i.e. 0. We want to output that buffer immediately, no matter what. - * - * Otherwise if we have a backup sinkpad and it has a buffer, then the timeout is going - * to be that buffer's running time. We will then either output the buffer or drop it, depending on - * its distance from the last output time - */ - let settings = self.settings.lock().unwrap(); - let active_sinkpad = self.active_sinkpad.lock().unwrap(); - let fallback_sinkpad = self.fallback_sinkpad.read().unwrap(); - - let prefer_primary = settings.auto_switch - || active_sinkpad.is_none() - || active_sinkpad.as_ref() == Some(self.primary_sinkpad.upcast_ref::()); - - let (preferred_pad, backup_pad) = if prefer_primary { - (&self.primary_sinkpad, fallback_sinkpad.as_ref()) - } else { - ( - fallback_sinkpad.as_ref().unwrap(), - Some(&self.primary_sinkpad), - ) - }; - - if preferred_pad.peek_buffer().is_some() { - gst::debug!( - CAT, - obj: agg, - "Have buffer on sinkpad {}, immediate timeout", - preferred_pad.name() - ); - Some(gst::ClockTime::ZERO) - } else if self.primary_sinkpad.is_eos() { - gst::debug!(CAT, obj: agg, "Sinkpad is EOS, immediate timeout"); - Some(gst::ClockTime::ZERO) - } else if let Some((buffer, backup_sinkpad)) = backup_pad - .as_ref() - .and_then(|p| p.peek_buffer().map(|buffer| (buffer, p))) - { - let segment = match backup_sinkpad.segment().downcast::() { - Ok(segment) => segment, - Err(_) => { - gst::error!(CAT, obj: agg, "Only TIME segments supported"); - // Trigger aggregate immediately to error out immediately - return Some(gst::ClockTime::ZERO); - } - }; - - let running_time = if buffer.pts().is_none() { - // re-use ts from previous buffer - let state = self.output_state.lock().unwrap(); - let running_time = state - .primary - .last_sinkpad_time - .max(state.fallback.last_sinkpad_time); - - gst::debug!( - CAT, - obj: agg, - "Buffer does not have PTS, re-use ts from previous buffer: {}", - running_time.display(), - ); - - running_time - } else { - segment.to_running_time(buffer.pts()) - }; - - gst::debug!( - CAT, - obj: agg, - "Have buffer on {} pad, timeout at {}", - backup_sinkpad.name(), - running_time.display(), - ); - running_time - } else { - gst::debug!(CAT, obj: agg, "No buffer available on either input"); - gst::ClockTime::NONE - } - } - - // Clip the raw audio/video buffers we have to the segment boundaries to ensure that - // calculating the running times later works correctly - fn clip( - &self, - agg: &Self::Type, - agg_pad: &gst_base::AggregatorPad, - mut buffer: gst::Buffer, - ) -> Option { - let segment = match agg_pad.segment().downcast::() { - Ok(segment) => segment, - Err(_) => { - gst::error!(CAT, obj: agg, "Only TIME segments supported"); - return Some(buffer); - } - }; - - let pts = buffer.pts(); - if pts.is_none() { - gst::debug!(CAT, obj: agg, "Only clipping buffers with PTS supported"); - return Some(buffer); - } - - let primary_state = self.primary_state.read().unwrap(); - let fallback_state = self.fallback_state.read().unwrap(); - - let pad_state = if agg_pad == &self.primary_sinkpad { - &primary_state - } else if Some(agg_pad) == self.fallback_sinkpad.read().unwrap().as_ref() { - &fallback_state - } else { - unreachable!() - }; - - if pad_state.audio_info.is_none() && pad_state.video_info.is_none() { - // No clipping possible for non-raw formats - return Some(buffer); - } - - let duration = if let Some(duration) = buffer.duration() { - Some(duration) - } else if let Some(ref audio_info) = pad_state.audio_info { - gst::ClockTime::SECOND.mul_div_floor( - buffer.size() as u64, - audio_info.rate() as u64 * audio_info.bpf() as u64, - ) - } else if let Some(ref video_info) = pad_state.video_info { - if video_info.fps().numer() > 0 { - gst::ClockTime::SECOND.mul_div_floor( - video_info.fps().denom() as u64, - video_info.fps().numer() as u64, - ) - } else { - gst::ClockTime::NONE - } - } else { - unreachable!() - }; - - gst::debug!( - CAT, - obj: agg_pad, - "Clipping buffer {:?} with PTS {} and duration {}", - buffer, - pts.display(), - duration.display(), - ); - if let Some(ref audio_info) = pad_state.audio_info { - gst_audio::audio_buffer_clip( - buffer, - segment.upcast_ref(), - audio_info.rate(), - audio_info.bpf(), - ) - } else if pad_state.video_info.is_some() { - let stop = pts.opt_add(duration); - segment.clip(pts, stop).map(|(start, stop)| { - { - let buffer = buffer.make_mut(); - buffer.set_pts(start); - if duration.is_some() { - buffer.set_duration(stop.opt_checked_sub(start).ok().flatten()); - } - } - - buffer - }) - } else { - unreachable!(); - } - } - - fn aggregate( - &self, - agg: &Self::Type, - timeout: bool, - ) -> Result { - gst::debug!(CAT, obj: agg, "Aggregate called: timeout {}", timeout); - - let (res, (primary_health_change, fallback_health_change)) = self.next_buffer(agg, timeout); - - if primary_health_change { - gst::debug!( - CAT, - obj: agg, - "Primary pad health now {}", - &primary_health_change - ); - agg.notify("primary-health"); - } - if fallback_health_change { - gst::debug!( - CAT, - obj: agg, - "Fallback pad health now {}", - &fallback_health_change - ); - agg.notify("fallback-health"); - } - - let (mut buffer, active_caps, pad_change) = res?; - - let current_src_caps = agg.static_pad("src").unwrap().current_caps(); - if Some(&active_caps) != current_src_caps.as_ref() { - gst::info!( - CAT, - obj: agg, - "Caps change from {:?} to {:?}", - current_src_caps, - active_caps - ); - agg.set_src_caps(&active_caps); - } - - if pad_change { - agg.notify("active-pad"); - buffer.make_mut().set_flags(gst::BufferFlags::DISCONT); - } - gst::debug!(CAT, obj: agg, "Finishing buffer {:?}", buffer); - agg.finish_buffer(buffer) - } - - fn negotiate(&self, _agg: &Self::Type) -> bool { - true - } -} - -#[derive(Debug)] -enum TimedItem { - Buffer(gst::Buffer), - // only used with feature v1_20 - #[allow(dead_code)] - GapEvent(gst::Event), -} - -impl TimedItem { - fn name(&self) -> &str { - match self { - TimedItem::Buffer(_) => "buffer", - TimedItem::GapEvent(_) => "GAP event", - } - } - - fn pts(&self) -> Option { - match self { - TimedItem::Buffer(buffer) => buffer.pts(), - TimedItem::GapEvent(event) => match event.view() { - gst::EventView::Gap(gap) => { - let (pts, _duration) = gap.get(); - Some(pts) - } - _ => unreachable!(), - }, - } - } - - fn update_ts( - &mut self, - running_time: Option, - segment: &gst::FormattedSegment, - ) { - match self { - TimedItem::Buffer(buffer) => { - // FIXME: This will not work correctly for negative DTS - let buffer = buffer.make_mut(); - buffer.set_pts(running_time); - buffer.set_dts(segment.to_running_time(buffer.dts())); - } - TimedItem::GapEvent(event) => { - let new_event = match event.view() { - gst::EventView::Gap(gap) => { - let (pts, duration) = gap.get(); - let builder = gst::event::Gap::builder(pts) - .duration(duration) - .seqnum(event.seqnum()); - - #[cfg(feature = "v1_20")] - let builder = builder.gap_flags(gap.gap_flags()); - - builder.build() - } - _ => unreachable!(), - }; - *event = new_event; - } - } - } - - fn is_keyframe(&self) -> bool { - match self { - TimedItem::Buffer(buffer) => !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT), - TimedItem::GapEvent(_) => false, - } - } - - fn buffer(self) -> gst::Buffer { - match self { - TimedItem::Buffer(buffer) => buffer, - _ => unreachable!(), - } - } - - #[cfg(feature = "v1_20")] - fn event(self) -> gst::Event { - match self { - TimedItem::GapEvent(event) => event, - _ => unreachable!(), - } + fn child_by_index(&self, object: &Self::Type, index: u32) -> Option { + object + .pads() + .into_iter() + .nth(index as usize) + .map(|p| p.upcast()) } } diff --git a/utils/fallbackswitch/src/fallbackswitch/mod.rs b/utils/fallbackswitch/src/fallbackswitch/mod.rs index 26d3de1b..7a0c8e8d 100644 --- a/utils/fallbackswitch/src/fallbackswitch/mod.rs +++ b/utils/fallbackswitch/src/fallbackswitch/mod.rs @@ -1,4 +1,7 @@ -// Copyright (C) 2019 Sebastian Dröge +// Copyright (C) 2020 Sebastian Dröge +// Copyright (C) 2021 Jan Schmidt +// Copyright (C) 2020 Mathieu Duponchelle +// Copyright (C) 2022 Vivia Nikolaidou // // This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. // If a copy of the MPL was not distributed with this file, You can obtain one at @@ -11,12 +14,19 @@ use gst::prelude::*; mod imp; -pub use imp::StreamHealth; - +// The public Rust wrapper type for our element glib::wrapper! { - pub struct FallbackSwitch(ObjectSubclass) @extends gst_base::Aggregator, gst::Element, gst::Object; + pub struct FallbackSwitch(ObjectSubclass) @extends gst::Element, gst::Object, @implements gst::ChildProxy; } +// The public Rust wrapper type for our sink pad +glib::wrapper! { + pub struct FallbackSwitchSinkPad(ObjectSubclass) @extends gst::Pad, gst::Object; +} + +// Registers the type for our element, and then registers in GStreamer under +// the name "fallbackswitch" for being able to instantiate it via e.g. +// gst::ElementFactory::make(). pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { gst::Element::register( Some(plugin), diff --git a/utils/fallbackswitch/src/lib.rs b/utils/fallbackswitch/src/lib.rs index 3aee2407..ca8f5e46 100644 --- a/utils/fallbackswitch/src/lib.rs +++ b/utils/fallbackswitch/src/lib.rs @@ -13,7 +13,6 @@ mod fallbacksrc; mod fallbackswitch; pub use fallbacksrc::{RetryReason, Status}; -pub use fallbackswitch::StreamHealth; fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { fallbacksrc::register(plugin)?; diff --git a/utils/fallbackswitch/tests/fallbackswitch.rs b/utils/fallbackswitch/tests/fallbackswitch.rs index f91d1919..107853d7 100644 --- a/utils/fallbackswitch/tests/fallbackswitch.rs +++ b/utils/fallbackswitch/tests/fallbackswitch.rs @@ -1,4 +1,5 @@ // Copyright (C) 2019 Sebastian Dröge +// Copyright (C) 2021 Jan Schmidt // // This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. // If a copy of the MPL was not distributed with this file, You can obtain one at @@ -6,10 +7,13 @@ // // SPDX-License-Identifier: MPL-2.0 +use gst::debug; use gst::prelude::*; use once_cell::sync::Lazy; +const LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(10); + static TEST_CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( "fallbackswitch-test", @@ -44,20 +48,20 @@ macro_rules! assert_buffer { #[test] fn test_no_fallback_no_drops() { - let pipeline = setup_pipeline(None); + let pipeline = setup_pipeline(None, None, None); push_buffer(&pipeline, gst::ClockTime::ZERO); - set_time(&pipeline, gst::ClockTime::ZERO); + set_time(&pipeline, gst::ClockTime::ZERO + LATENCY); let buffer = pull_buffer(&pipeline); assert_buffer!(buffer, Some(gst::ClockTime::ZERO)); push_buffer(&pipeline, gst::ClockTime::SECOND); - set_time(&pipeline, gst::ClockTime::SECOND); + set_time(&pipeline, gst::ClockTime::SECOND + LATENCY); let buffer = pull_buffer(&pipeline); assert_buffer!(buffer, Some(gst::ClockTime::SECOND)); push_buffer(&pipeline, 2 * gst::ClockTime::SECOND); - set_time(&pipeline, 2 * gst::ClockTime::SECOND); + set_time(&pipeline, 2 * gst::ClockTime::SECOND + LATENCY); let buffer = pull_buffer(&pipeline); assert_buffer!(buffer, Some(2 * gst::ClockTime::SECOND)); @@ -78,23 +82,23 @@ fn test_no_drops_not_live() { } fn test_no_drops(live: bool) { - let pipeline = setup_pipeline(Some(live)); + let pipeline = setup_pipeline(Some(live), None, None); push_buffer(&pipeline, gst::ClockTime::ZERO); push_fallback_buffer(&pipeline, gst::ClockTime::ZERO); - set_time(&pipeline, gst::ClockTime::ZERO); + set_time(&pipeline, gst::ClockTime::ZERO + LATENCY); let buffer = pull_buffer(&pipeline); assert_buffer!(buffer, Some(gst::ClockTime::ZERO)); push_fallback_buffer(&pipeline, gst::ClockTime::SECOND); push_buffer(&pipeline, gst::ClockTime::SECOND); - set_time(&pipeline, gst::ClockTime::SECOND); + set_time(&pipeline, gst::ClockTime::SECOND + LATENCY); let buffer = pull_buffer(&pipeline); assert_buffer!(buffer, Some(gst::ClockTime::SECOND)); push_buffer(&pipeline, 2 * gst::ClockTime::SECOND); push_fallback_buffer(&pipeline, 2 * gst::ClockTime::SECOND); - set_time(&pipeline, 2 * gst::ClockTime::SECOND); + set_time(&pipeline, 2 * gst::ClockTime::SECOND + LATENCY); let buffer = pull_buffer(&pipeline); assert_buffer!(buffer, Some(2 * gst::ClockTime::SECOND)); @@ -116,22 +120,22 @@ fn test_no_drops_but_no_fallback_frames_not_live() { } fn test_no_drops_but_no_fallback_frames(live: bool) { - let pipeline = setup_pipeline(Some(live)); + let pipeline = setup_pipeline(Some(live), None, None); push_buffer(&pipeline, gst::ClockTime::ZERO); // +10ms needed here because the immediate timeout will be always at running time 0, but // aggregator also adds the latency to it so we end up at 10ms instead. - set_time(&pipeline, 10 * gst::ClockTime::MSECOND); + set_time(&pipeline, LATENCY); let buffer = pull_buffer(&pipeline); assert_buffer!(buffer, Some(gst::ClockTime::ZERO)); push_buffer(&pipeline, gst::ClockTime::SECOND); - set_time(&pipeline, gst::ClockTime::SECOND); + set_time(&pipeline, gst::ClockTime::SECOND + LATENCY); let buffer = pull_buffer(&pipeline); assert_buffer!(buffer, Some(gst::ClockTime::SECOND)); push_buffer(&pipeline, 2 * gst::ClockTime::SECOND); - set_time(&pipeline, 2 * gst::ClockTime::SECOND); + set_time(&pipeline, 2 * gst::ClockTime::SECOND + LATENCY); let buffer = pull_buffer(&pipeline); assert_buffer!(buffer, Some(2 * gst::ClockTime::SECOND)); @@ -153,11 +157,11 @@ fn test_short_drop_not_live() { } fn test_short_drop(live: bool) { - let pipeline = setup_pipeline(Some(live)); + let pipeline = setup_pipeline(Some(live), None, None); push_buffer(&pipeline, gst::ClockTime::ZERO); push_fallback_buffer(&pipeline, gst::ClockTime::ZERO); - set_time(&pipeline, gst::ClockTime::ZERO); + set_time(&pipeline, gst::ClockTime::ZERO + LATENCY); let buffer = pull_buffer(&pipeline); assert_buffer!(buffer, Some(gst::ClockTime::ZERO)); @@ -172,7 +176,7 @@ fn test_short_drop(live: bool) { push_fallback_buffer(&pipeline, 2 * gst::ClockTime::SECOND); push_buffer(&pipeline, 2 * gst::ClockTime::SECOND); - set_time(&pipeline, 2 * gst::ClockTime::SECOND); + set_time(&pipeline, 2 * gst::ClockTime::SECOND + LATENCY); let buffer = pull_buffer(&pipeline); assert_buffer!(buffer, Some(2 * gst::ClockTime::SECOND)); @@ -194,7 +198,7 @@ fn test_long_drop_and_eos_not_live() { } fn test_long_drop_and_eos(live: bool) { - let pipeline = setup_pipeline(Some(live)); + let pipeline = setup_pipeline(Some(live), None, None); // Produce the first frame push_buffer(&pipeline, gst::ClockTime::ZERO); @@ -256,7 +260,9 @@ fn test_long_drop_and_recover_not_live() { } fn test_long_drop_and_recover(live: bool) { - let pipeline = setup_pipeline(Some(live)); + let pipeline = setup_pipeline(Some(live), None, None); + let switch = pipeline.by_name("switch").unwrap(); + let mainsink = switch.static_pad("sink_0").unwrap(); // Produce the first frame push_buffer(&pipeline, gst::ClockTime::ZERO); @@ -264,6 +270,7 @@ fn test_long_drop_and_recover(live: bool) { set_time(&pipeline, gst::ClockTime::ZERO); let buffer = pull_buffer(&pipeline); assert_buffer!(buffer, Some(gst::ClockTime::ZERO)); + assert!(mainsink.property::("is-healthy")); // Produce a second frame but only from the fallback source push_fallback_buffer(&pipeline, gst::ClockTime::SECOND); @@ -301,22 +308,33 @@ fn test_long_drop_and_recover(live: bool) { // Produce a sixth frame from the normal source push_buffer(&pipeline, 5 * gst::ClockTime::SECOND); - push_fallback_buffer(&pipeline, 5 * gst::ClockTime::SECOND); - set_time(&pipeline, 5 * gst::ClockTime::SECOND); + set_time( + &pipeline, + 5 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND, + ); let buffer = pull_buffer(&pipeline); assert_buffer!(buffer, Some(5 * gst::ClockTime::SECOND)); + assert!(!mainsink.property::("is-healthy")); + drop(mainsink); + drop(switch); // Produce a seventh frame from the normal source but no fallback. // This should still be output immediately push_buffer(&pipeline, 6 * gst::ClockTime::SECOND); - set_time(&pipeline, 6 * gst::ClockTime::SECOND); + set_time( + &pipeline, + 6 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND, + ); let buffer = pull_buffer(&pipeline); assert_buffer!(buffer, Some(6 * gst::ClockTime::SECOND)); // Produce a eight frame from the normal source push_buffer(&pipeline, 7 * gst::ClockTime::SECOND); push_fallback_buffer(&pipeline, 7 * gst::ClockTime::SECOND); - set_time(&pipeline, 7 * gst::ClockTime::SECOND); + set_time( + &pipeline, + 7 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND, + ); let buffer = pull_buffer(&pipeline); assert_buffer!(buffer, Some(7 * gst::ClockTime::SECOND)); @@ -328,6 +346,144 @@ fn test_long_drop_and_recover(live: bool) { stop_pipeline(pipeline); } +#[test] +fn test_initial_timeout_live() { + test_initial_timeout(true); +} + +#[test] +fn test_initial_timeout_not_live() { + test_initial_timeout(false); +} + +fn test_initial_timeout(live: bool) { + let pipeline = setup_pipeline(Some(live), None, None); + + // Produce the first frame but only from the fallback source + push_fallback_buffer(&pipeline, gst::ClockTime::ZERO); + set_time(&pipeline, gst::ClockTime::ZERO); + + // Produce a second frame but only from the fallback source + push_fallback_buffer(&pipeline, gst::ClockTime::SECOND); + set_time( + &pipeline, + gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND, + ); + + // Produce a third frame but only from the fallback source + push_fallback_buffer(&pipeline, 2 * gst::ClockTime::SECOND); + set_time( + &pipeline, + 2 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND, + ); + + // Produce a fourth frame but only from the fallback source + // This should be output now + push_fallback_buffer(&pipeline, 3 * gst::ClockTime::SECOND); + set_time( + &pipeline, + 3 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND, + ); + let buffer = pull_buffer(&pipeline); + assert_fallback_buffer!(buffer, Some(3 * gst::ClockTime::SECOND)); + + // Produce a fifth frame but only from the fallback source + // This should be output now + push_fallback_buffer(&pipeline, 4 * gst::ClockTime::SECOND); + set_time( + &pipeline, + 4 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND, + ); + let buffer = pull_buffer(&pipeline); + assert_fallback_buffer!(buffer, Some(4 * gst::ClockTime::SECOND)); + + // Wait for EOS to arrive at appsink + push_eos(&pipeline); + push_fallback_eos(&pipeline); + wait_eos(&pipeline); + + stop_pipeline(pipeline); +} + +#[test] +fn test_immediate_fallback_live() { + test_immediate_fallback(true); +} + +#[test] +fn test_immediate_fallback_not_live() { + test_immediate_fallback(false); +} + +fn test_immediate_fallback(live: bool) { + let pipeline = setup_pipeline(Some(live), Some(true), None); + + // Produce the first frame but only from the fallback source + push_fallback_buffer(&pipeline, gst::ClockTime::ZERO); + set_time(&pipeline, gst::ClockTime::ZERO); + + let buffer = pull_buffer(&pipeline); + assert_fallback_buffer!(buffer, Some(gst::ClockTime::ZERO)); + + // Wait for EOS to arrive at appsink + push_eos(&pipeline); + push_fallback_eos(&pipeline); + wait_eos(&pipeline); + + stop_pipeline(pipeline); +} + +#[test] +fn test_manual_switch_live() { + test_manual_switch(true); +} + +#[test] +fn test_manual_switch_not_live() { + test_manual_switch(false); +} + +fn test_manual_switch(live: bool) { + let pipeline = setup_pipeline(Some(live), None, Some(false)); + let switch = pipeline.by_name("switch").unwrap(); + let mainsink = switch.static_pad("sink_0").unwrap(); + let fallbacksink = switch.static_pad("sink_1").unwrap(); + + switch.set_property("active-pad", &mainsink); + push_buffer(&pipeline, gst::ClockTime::ZERO); + push_fallback_buffer(&pipeline, gst::ClockTime::ZERO); + set_time(&pipeline, gst::ClockTime::ZERO + LATENCY); + let buffer = pull_buffer(&pipeline); + assert_buffer!(buffer, Some(gst::ClockTime::ZERO)); + + switch.set_property("active-pad", &fallbacksink); + push_fallback_buffer(&pipeline, gst::ClockTime::SECOND); + push_buffer(&pipeline, gst::ClockTime::SECOND); + set_time(&pipeline, gst::ClockTime::SECOND + LATENCY); + let mut buffer = pull_buffer(&pipeline); + // FIXME: Sometimes we get the ZERO buffer from the fallback sink instead + if buffer.pts() == Some(gst::ClockTime::ZERO) { + buffer = pull_buffer(&pipeline); + } + assert_fallback_buffer!(buffer, Some(gst::ClockTime::SECOND)); + + switch.set_property("active-pad", &mainsink); + push_buffer(&pipeline, 2 * gst::ClockTime::SECOND); + push_fallback_buffer(&pipeline, 2 * gst::ClockTime::SECOND); + set_time(&pipeline, 2 * gst::ClockTime::SECOND + LATENCY); + let buffer = pull_buffer(&pipeline); + assert_buffer!(buffer, Some(2 * gst::ClockTime::SECOND)); + + drop(mainsink); + drop(fallbacksink); + drop(switch); + // EOS on the fallback should not be required + push_eos(&pipeline); + wait_eos(&pipeline); + + stop_pipeline(pipeline); +} + struct Pipeline { pipeline: gst::Pipeline, clock_join_handle: Option>, @@ -341,10 +497,14 @@ impl std::ops::Deref for Pipeline { } } -fn setup_pipeline(with_live_fallback: Option) -> Pipeline { +fn setup_pipeline( + with_live_fallback: Option, + immediate_fallback: Option, + auto_switch: Option, +) -> Pipeline { init(); - gst::debug!(TEST_CAT, "Setting up pipeline"); + debug!(TEST_CAT, "Setting up pipeline"); let clock = gst_check::TestClock::new(); clock.set_time(gst::ClockTime::ZERO); @@ -363,19 +523,25 @@ fn setup_pipeline(with_live_fallback: Option) -> Pipeline { .unwrap(); src.set_property("is-live", true); src.set_property("format", gst::Format::Time); - src.set_property("min-latency", 10i64); + src.set_property("min-latency", LATENCY.nseconds() as i64); src.set_property( "caps", - &gst::Caps::builder("video/x-raw") + gst::Caps::builder("video/x-raw") .field("format", "ARGB") - .field("width", 320) - .field("height", 240) + .field("width", 320i32) + .field("height", 240i32) .field("framerate", gst::Fraction::new(1, 1)) .build(), ); let switch = gst::ElementFactory::make("fallbackswitch", Some("switch")).unwrap(); switch.set_property("timeout", 3 * gst::ClockTime::SECOND); + if let Some(imm) = immediate_fallback { + switch.set_property("immediate-fallback", imm); + } + if let Some(auto_switch) = auto_switch { + switch.set_property("auto-switch", auto_switch); + } let sink = gst::ElementFactory::make("appsink", Some("sink")) .unwrap() @@ -388,7 +554,7 @@ fn setup_pipeline(with_live_fallback: Option) -> Pipeline { pipeline .add_many(&[src.upcast_ref(), &switch, &queue, sink.upcast_ref()]) .unwrap(); - src.link_pads(Some("src"), &switch, Some("sink")).unwrap(); + src.link_pads(Some("src"), &switch, Some("sink_0")).unwrap(); switch.link_pads(Some("src"), &queue, Some("sink")).unwrap(); queue.link_pads(Some("src"), &sink, Some("sink")).unwrap(); @@ -399,13 +565,13 @@ fn setup_pipeline(with_live_fallback: Option) -> Pipeline { .unwrap(); fallback_src.set_property("is-live", live); fallback_src.set_property("format", gst::Format::Time); - fallback_src.set_property("min-latency", 10i64); + fallback_src.set_property("min-latency", LATENCY.nseconds() as i64); fallback_src.set_property( "caps", - &gst::Caps::builder("video/x-raw") + gst::Caps::builder("video/x-raw") .field("format", "ARGB") - .field("width", 160) - .field("height", 120) + .field("width", 160i32) + .field("height", 120i32) .field("framerate", gst::Fraction::new(1, 1)) .build(), ); @@ -413,7 +579,7 @@ fn setup_pipeline(with_live_fallback: Option) -> Pipeline { pipeline.add(&fallback_src).unwrap(); fallback_src - .link_pads(Some("src"), &switch, Some("fallback_sink")) + .link_pads(Some("src"), &switch, Some("sink_1")) .unwrap(); } @@ -429,11 +595,16 @@ fn setup_pipeline(with_live_fallback: Option) -> Pipeline { None } }) { - gst::debug!(TEST_CAT, "Processing clock ID at {}", clock_id.time()); + debug!( + TEST_CAT, + "Processing clock ID {} at {:?}", + clock_id.time(), + clock.time() + ); if let Some(clock_id) = clock.process_next_clock_id() { - gst::debug!(TEST_CAT, "Processed clock ID at {}", clock_id.time()); + debug!(TEST_CAT, "Processed clock ID {}", clock_id.time()); if clock_id.time().is_zero() { - gst::debug!(TEST_CAT, "Stopping clock thread"); + debug!(TEST_CAT, "Stopping clock thread"); return; } } @@ -528,7 +699,7 @@ fn set_time(pipeline: &Pipeline, time: gst::ClockTime) { .downcast::() .unwrap(); - gst::debug!(TEST_CAT, "Setting time to {}", time); + debug!(TEST_CAT, "Setting time to {}", time); clock.set_time(gst::ClockTime::SECOND + time); } @@ -543,7 +714,7 @@ fn wait_eos(pipeline: &Pipeline) { use std::{thread, time}; if sink.is_eos() { - gst::debug!(TEST_CAT, "Waited for EOS"); + debug!(TEST_CAT, "Waited for EOS"); break; } thread::sleep(time::Duration::from_millis(10));