mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-04-30 21:44:58 +00:00
fallbackswitch: Add manual stream control mode
Add properties to report and notify on stream health changes, and a mode where the app can control the stream switching by setting the active-pad property manually. This is useful for modifying the policy of fallbackswitch stream choices, and to synchronise switching of multiple fallbackswitches
This commit is contained in:
parent
c7fe08bf6d
commit
d5b648921c
1 changed files with 498 additions and 150 deletions
|
@ -36,12 +36,25 @@ use once_cell::sync::Lazy;
|
|||
|
||||
use std::sync::{Mutex, RwLock};
|
||||
|
||||
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, GEnum)]
|
||||
#[repr(u32)]
|
||||
#[genum(type_name = "GstFallbackSwitchStreamHealth")]
|
||||
pub(crate) enum StreamHealth {
|
||||
#[genum(name = "Data flow is inactive or late", nick = "inactive")]
|
||||
Inactive = 0,
|
||||
#[genum(name = "Data is currently flowing in the stream", nick = "present")]
|
||||
Present = 1,
|
||||
}
|
||||
|
||||
pub struct FallbackSwitch {
|
||||
sinkpad: gst_base::AggregatorPad,
|
||||
primary_sinkpad: gst_base::AggregatorPad,
|
||||
primary_state: RwLock<PadInputState>,
|
||||
|
||||
fallback_sinkpad: RwLock<Option<gst_base::AggregatorPad>>,
|
||||
fallback_state: RwLock<PadInputState>,
|
||||
|
||||
active_sinkpad: Mutex<Option<gst::Pad>>,
|
||||
output_state: Mutex<OutputState>,
|
||||
pad_states: RwLock<PadStates>,
|
||||
settings: Mutex<Settings>,
|
||||
}
|
||||
|
||||
|
@ -53,35 +66,48 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
|||
)
|
||||
});
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct PadOutputState {
|
||||
last_sinkpad_time: gst::ClockTime,
|
||||
stream_health: StreamHealth,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct OutputState {
|
||||
last_sinkpad_time: gst::ClockTime,
|
||||
last_output_time: gst::ClockTime,
|
||||
primary: PadOutputState,
|
||||
fallback: PadOutputState,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct PadStates {
|
||||
sinkpad: PadState,
|
||||
fallback_sinkpad: Option<PadState>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct PadState {
|
||||
struct PadInputState {
|
||||
caps: Option<gst::Caps>,
|
||||
audio_info: Option<gst_audio::AudioInfo>,
|
||||
video_info: Option<gst_video::VideoInfo>,
|
||||
}
|
||||
|
||||
const DEFAULT_TIMEOUT: u64 = 5 * gst::SECOND_VAL;
|
||||
const DEFAULT_AUTO_SWITCH: bool = true;
|
||||
const DEFAULT_STREAM_HEALTH: StreamHealth = StreamHealth::Inactive;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct Settings {
|
||||
timeout: gst::ClockTime,
|
||||
auto_switch: bool,
|
||||
}
|
||||
|
||||
impl Default for StreamHealth {
|
||||
fn default() -> Self {
|
||||
DEFAULT_STREAM_HEALTH
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for OutputState {
|
||||
fn default() -> Self {
|
||||
OutputState {
|
||||
last_sinkpad_time: gst::CLOCK_TIME_NONE,
|
||||
last_output_time: gst::CLOCK_TIME_NONE,
|
||||
primary: PadOutputState::default(),
|
||||
fallback: PadOutputState::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -90,11 +116,12 @@ impl Default for Settings {
|
|||
fn default() -> Self {
|
||||
Settings {
|
||||
timeout: DEFAULT_TIMEOUT.into(),
|
||||
auto_switch: DEFAULT_AUTO_SWITCH,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static PROPERTIES: [subclass::Property; 2] = [
|
||||
static PROPERTIES: [subclass::Property; 5] = [
|
||||
subclass::Property("timeout", |name| {
|
||||
glib::ParamSpec::uint64(
|
||||
name,
|
||||
|
@ -110,32 +137,171 @@ static PROPERTIES: [subclass::Property; 2] = [
|
|||
glib::ParamSpec::object(
|
||||
name,
|
||||
"Active Pad",
|
||||
"Currently active pad",
|
||||
"Currently active pad. Writes are ignored if auto-switch=true",
|
||||
gst::Pad::static_type(),
|
||||
glib::ParamFlags::READWRITE,
|
||||
)
|
||||
}),
|
||||
subclass::Property("auto-switch", |name| {
|
||||
glib::ParamSpec::boolean(
|
||||
name,
|
||||
"Automatically switch pads",
|
||||
"Automatically switch pads (If true, prefer primary sink, otherwise manual selection via the active-pad property)",
|
||||
DEFAULT_AUTO_SWITCH,
|
||||
glib::ParamFlags::READWRITE,
|
||||
)
|
||||
}),
|
||||
subclass::Property("primary-health", |name| {
|
||||
glib::ParamSpec::enum_(
|
||||
name,
|
||||
"Primary stream state",
|
||||
"Reports the health of the primary stream on the sink pad",
|
||||
StreamHealth::static_type(),
|
||||
DEFAULT_STREAM_HEALTH as i32,
|
||||
glib::ParamFlags::READABLE,
|
||||
)
|
||||
}),
|
||||
subclass::Property("fallback-health", |name| {
|
||||
glib::ParamSpec::enum_(
|
||||
name,
|
||||
"Fallback stream state",
|
||||
"Reports the health of the fallback stream on the fallback_sink pad",
|
||||
StreamHealth::static_type(),
|
||||
DEFAULT_STREAM_HEALTH as i32,
|
||||
glib::ParamFlags::READABLE,
|
||||
)
|
||||
}),
|
||||
];
|
||||
|
||||
impl FallbackSwitch {
|
||||
fn drain_pad_to_time(
|
||||
&self,
|
||||
agg: &super::FallbackSwitch,
|
||||
state: &mut OutputState,
|
||||
pad: &gst_base::AggregatorPad,
|
||||
target_running_time: gst::ClockTime,
|
||||
) -> Result<(), gst::FlowError> {
|
||||
let segment = pad.get_segment();
|
||||
|
||||
/* No segment yet - no data */
|
||||
if segment.get_format() == gst::Format::Undefined {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let segment = segment.downcast::<gst::ClockTime>().map_err(|_| {
|
||||
gst_error!(CAT, obj: agg, "Only TIME segments supported");
|
||||
gst::FlowError::Error
|
||||
})?;
|
||||
|
||||
let mut running_time = gst::ClockTime::none();
|
||||
|
||||
while let Some(buffer) = pad.peek_buffer() {
|
||||
let pts = buffer.get_dts_or_pts();
|
||||
let new_running_time = segment.to_running_time(pts);
|
||||
if pts.is_none() || running_time <= target_running_time {
|
||||
gst_debug!(CAT, obj: agg, "Dropping trailing buffer {:?}", buffer);
|
||||
pad.drop_buffer();
|
||||
running_time = new_running_time;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if running_time != gst::ClockTime::none() {
|
||||
if pad == &self.primary_sinkpad {
|
||||
state.primary.last_sinkpad_time = running_time;
|
||||
} else {
|
||||
state.fallback.last_sinkpad_time = running_time;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_health(
|
||||
&self,
|
||||
state: &OutputState,
|
||||
settings: &Settings,
|
||||
pad: &gst_base::AggregatorPad,
|
||||
cur_running_time: gst::ClockTime,
|
||||
) -> StreamHealth {
|
||||
let last_sinkpad_time = if pad == &self.primary_sinkpad {
|
||||
state.primary.last_sinkpad_time
|
||||
} else {
|
||||
state.fallback.last_sinkpad_time
|
||||
};
|
||||
|
||||
if last_sinkpad_time == gst::ClockTime::none() {
|
||||
StreamHealth::Inactive
|
||||
} else if cur_running_time != gst::ClockTime::none()
|
||||
&& cur_running_time < last_sinkpad_time + settings.timeout
|
||||
{
|
||||
StreamHealth::Present
|
||||
} else {
|
||||
StreamHealth::Inactive
|
||||
}
|
||||
}
|
||||
|
||||
fn check_health_changes(
|
||||
&self,
|
||||
state: &mut OutputState,
|
||||
settings: &Settings,
|
||||
preferred_pad: &gst_base::AggregatorPad,
|
||||
backup_pad: &Option<&gst_base::AggregatorPad>,
|
||||
cur_running_time: gst::ClockTime,
|
||||
) -> (bool, bool) {
|
||||
let preferred_is_primary = preferred_pad == &self.primary_sinkpad;
|
||||
|
||||
let preferred_health = self.get_health(state, settings, preferred_pad, cur_running_time);
|
||||
let backup_health = if let Some(pad) = backup_pad {
|
||||
self.get_health(state, settings, pad, cur_running_time)
|
||||
} else {
|
||||
StreamHealth::Inactive
|
||||
};
|
||||
|
||||
if preferred_is_primary {
|
||||
let primary_changed = preferred_health != state.primary.stream_health;
|
||||
let fallback_changed = backup_health != state.fallback.stream_health;
|
||||
|
||||
state.primary.stream_health = preferred_health;
|
||||
state.fallback.stream_health = backup_health;
|
||||
|
||||
(primary_changed, fallback_changed)
|
||||
} else {
|
||||
let primary_changed = backup_health != state.primary.stream_health;
|
||||
let fallback_changed = preferred_health != state.fallback.stream_health;
|
||||
|
||||
state.primary.stream_health = backup_health;
|
||||
state.fallback.stream_health = preferred_health;
|
||||
|
||||
(primary_changed, fallback_changed)
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn handle_main_buffer(
|
||||
&self,
|
||||
agg: &super::FallbackSwitch,
|
||||
state: &mut OutputState,
|
||||
settings: &Settings,
|
||||
mut buffer: gst::Buffer,
|
||||
fallback_sinkpad: Option<&gst_base::AggregatorPad>,
|
||||
preferred_pad: &gst_base::AggregatorPad,
|
||||
backup_pad: &Option<&gst_base::AggregatorPad>,
|
||||
cur_running_time: gst::ClockTime,
|
||||
) -> Result<Option<(gst::Buffer, gst::Caps, bool)>, gst::FlowError> {
|
||||
// If we got a buffer on the sinkpad just handle it
|
||||
gst_debug!(CAT, obj: agg, "Got buffer on sinkpad {:?}", buffer);
|
||||
gst_debug!(
|
||||
CAT,
|
||||
obj: agg,
|
||||
"Got buffer on pad {} - {:?}",
|
||||
preferred_pad.get_name(),
|
||||
buffer
|
||||
);
|
||||
|
||||
if buffer.get_pts().is_none() {
|
||||
gst_error!(CAT, obj: agg, "Only buffers with PTS supported");
|
||||
return Err(gst::FlowError::Error);
|
||||
}
|
||||
|
||||
let segment = self
|
||||
.sinkpad
|
||||
let segment = preferred_pad
|
||||
.get_segment()
|
||||
.downcast::<gst::ClockTime>()
|
||||
.map_err(|_| {
|
||||
|
@ -144,6 +310,7 @@ impl FallbackSwitch {
|
|||
})?;
|
||||
|
||||
let running_time = segment.to_running_time(buffer.get_dts_or_pts());
|
||||
|
||||
{
|
||||
// FIXME: This will not work correctly for negative DTS
|
||||
let buffer = buffer.make_mut();
|
||||
|
@ -151,19 +318,26 @@ impl FallbackSwitch {
|
|||
buffer.set_dts(segment.to_running_time(buffer.get_dts()));
|
||||
}
|
||||
|
||||
if preferred_pad == &self.primary_sinkpad {
|
||||
state.primary.last_sinkpad_time = running_time;
|
||||
} else {
|
||||
state.fallback.last_sinkpad_time = running_time;
|
||||
}
|
||||
|
||||
let is_late = {
|
||||
let clock = agg.get_clock();
|
||||
let base_time = agg.get_base_time();
|
||||
|
||||
if let Some(clock) = clock {
|
||||
let now = clock.get_time();
|
||||
if cur_running_time != gst::ClockTime::none() {
|
||||
let latency = agg.get_latency();
|
||||
|
||||
if latency.is_some() {
|
||||
let deadline = base_time + running_time + latency + 40 * gst::MSECOND;
|
||||
let deadline = running_time + latency + 40 * gst::MSECOND;
|
||||
|
||||
if now > deadline {
|
||||
gst_debug!(CAT, obj: agg, "Buffer is too late: {} > {}", now, deadline);
|
||||
if cur_running_time > deadline {
|
||||
gst_debug!(
|
||||
CAT,
|
||||
obj: agg,
|
||||
"Buffer is too late: {} > {}",
|
||||
cur_running_time,
|
||||
deadline
|
||||
);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
|
@ -176,15 +350,17 @@ impl FallbackSwitch {
|
|||
}
|
||||
};
|
||||
|
||||
if state.last_sinkpad_time.is_some()
|
||||
if state.last_output_time.is_some()
|
||||
&& is_late
|
||||
&& state.last_sinkpad_time + settings.timeout <= running_time
|
||||
&& state.last_output_time + settings.timeout <= running_time
|
||||
{
|
||||
/* This buffer arrived too late - we either already switched
|
||||
* to the other pad or there's no point outputting this anyway */
|
||||
gst_debug!(
|
||||
CAT,
|
||||
obj: agg,
|
||||
"Buffer is too late and timeout reached: {} + {} <= {}",
|
||||
state.last_sinkpad_time,
|
||||
state.last_output_time,
|
||||
settings.timeout,
|
||||
running_time,
|
||||
);
|
||||
|
@ -193,15 +369,18 @@ impl FallbackSwitch {
|
|||
}
|
||||
|
||||
let mut active_sinkpad = self.active_sinkpad.lock().unwrap();
|
||||
let pad_change = active_sinkpad.as_ref() != Some(self.sinkpad.upcast_ref::<gst::Pad>());
|
||||
let pad_change = settings.auto_switch
|
||||
&& active_sinkpad.as_ref() != Some(preferred_pad.upcast_ref::<gst::Pad>());
|
||||
|
||||
if pad_change {
|
||||
if buffer.get_flags().contains(gst::BufferFlags::DELTA_UNIT) {
|
||||
gst_info!(
|
||||
CAT,
|
||||
obj: agg,
|
||||
"Can't change back to sinkpad, waiting for keyframe"
|
||||
"Can't change back to sinkpad {}, waiting for keyframe",
|
||||
preferred_pad.get_name()
|
||||
);
|
||||
self.sinkpad.push_event(
|
||||
preferred_pad.push_event(
|
||||
gst_video::UpstreamForceKeyUnitEvent::builder()
|
||||
.all_headers(true)
|
||||
.build(),
|
||||
|
@ -210,65 +389,41 @@ impl FallbackSwitch {
|
|||
}
|
||||
|
||||
gst_info!(CAT, obj: agg, "Active pad changed to sinkpad");
|
||||
*active_sinkpad = Some(self.sinkpad.clone().upcast());
|
||||
*active_sinkpad = Some(preferred_pad.clone().upcast());
|
||||
}
|
||||
drop(active_sinkpad);
|
||||
|
||||
if !is_late || state.last_sinkpad_time.is_none() {
|
||||
state.last_sinkpad_time = running_time;
|
||||
if !is_late || state.last_output_time.is_none() {
|
||||
state.last_output_time = running_time;
|
||||
}
|
||||
|
||||
let active_caps = if preferred_pad == &self.primary_sinkpad {
|
||||
let pad_state = self.primary_state.read().unwrap();
|
||||
pad_state.caps.as_ref().unwrap().clone()
|
||||
} else {
|
||||
let pad_state = self.fallback_state.read().unwrap();
|
||||
pad_state.caps.as_ref().unwrap().clone()
|
||||
};
|
||||
|
||||
// Drop all older buffers from the fallback sinkpad
|
||||
if let Some(fallback_sinkpad) = fallback_sinkpad {
|
||||
let segment = fallback_sinkpad.get_segment();
|
||||
|
||||
// Might have no segment at all yet
|
||||
if segment.get_format() != gst::Format::Undefined {
|
||||
let fallback_segment = fallback_sinkpad
|
||||
.get_segment()
|
||||
.downcast::<gst::ClockTime>()
|
||||
.map_err(|_| {
|
||||
gst_error!(CAT, obj: agg, "Only TIME segments supported");
|
||||
gst::FlowError::Error
|
||||
})?;
|
||||
|
||||
while let Some(fallback_buffer) = fallback_sinkpad.peek_buffer() {
|
||||
let fallback_pts = fallback_buffer.get_dts_or_pts();
|
||||
if fallback_pts.is_none()
|
||||
|| fallback_segment.to_running_time(fallback_pts) <= state.last_sinkpad_time
|
||||
{
|
||||
gst_debug!(
|
||||
CAT,
|
||||
obj: agg,
|
||||
"Dropping fallback buffer {:?}",
|
||||
fallback_buffer
|
||||
);
|
||||
fallback_sinkpad.drop_buffer();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(backup_pad) = backup_pad {
|
||||
self.drain_pad_to_time(&agg, state, &backup_pad, state.last_output_time)?;
|
||||
}
|
||||
|
||||
let pad_states = self.pad_states.read().unwrap();
|
||||
let active_caps = pad_states.sinkpad.caps.as_ref().unwrap().clone();
|
||||
drop(pad_states);
|
||||
|
||||
Ok(Some((buffer, active_caps, pad_change)))
|
||||
}
|
||||
|
||||
fn get_fallback_buffer(
|
||||
fn get_backup_buffer(
|
||||
&self,
|
||||
agg: &super::FallbackSwitch,
|
||||
state: &mut OutputState,
|
||||
settings: &Settings,
|
||||
fallback_sinkpad: &gst_base::AggregatorPad,
|
||||
backup_pad: &gst_base::AggregatorPad,
|
||||
) -> Result<(gst::Buffer, gst::Caps, bool), gst::FlowError> {
|
||||
// If we have a fallback sinkpad and timeout, try to get a fallback buffer from here
|
||||
// and drop all too old buffers in the process
|
||||
loop {
|
||||
let mut buffer = fallback_sinkpad
|
||||
let mut buffer = backup_pad
|
||||
.pop_buffer()
|
||||
.ok_or(gst_base::AGGREGATOR_FLOW_NEED_DATA)?;
|
||||
|
||||
|
@ -279,35 +434,40 @@ impl FallbackSwitch {
|
|||
return Err(gst::FlowError::Error);
|
||||
}
|
||||
|
||||
let fallback_segment = fallback_sinkpad
|
||||
let backup_segment = backup_pad
|
||||
.get_segment()
|
||||
.downcast::<gst::ClockTime>()
|
||||
.map_err(|_| {
|
||||
gst_error!(CAT, obj: agg, "Only TIME segments supported");
|
||||
gst::FlowError::Error
|
||||
})?;
|
||||
let running_time = fallback_segment.to_running_time(buffer.get_dts_or_pts());
|
||||
let running_time = backup_segment.to_running_time(buffer.get_dts_or_pts());
|
||||
|
||||
{
|
||||
// FIXME: This will not work correctly for negative DTS
|
||||
let buffer = buffer.make_mut();
|
||||
buffer.set_pts(fallback_segment.to_running_time(buffer.get_pts()));
|
||||
buffer.set_dts(fallback_segment.to_running_time(buffer.get_dts()));
|
||||
buffer.set_pts(backup_segment.to_running_time(buffer.get_pts()));
|
||||
buffer.set_dts(backup_segment.to_running_time(buffer.get_dts()));
|
||||
}
|
||||
|
||||
// If we never had a real buffer, initialize with the running time of the fallback
|
||||
// sinkpad so that we still output fallback buffers after the timeout
|
||||
if state.last_sinkpad_time.is_none() {
|
||||
state.last_sinkpad_time = running_time;
|
||||
if state.last_output_time.is_none() {
|
||||
state.last_output_time = running_time;
|
||||
}
|
||||
if backup_pad == &self.primary_sinkpad {
|
||||
state.primary.last_sinkpad_time = running_time;
|
||||
} else {
|
||||
state.fallback.last_sinkpad_time = running_time;
|
||||
}
|
||||
|
||||
// Get the next one if this one is before the timeout
|
||||
if state.last_sinkpad_time + settings.timeout > running_time {
|
||||
if state.last_output_time + settings.timeout > running_time {
|
||||
gst_debug!(
|
||||
CAT,
|
||||
obj: agg,
|
||||
"Timeout not reached yet: {} + {} > {}",
|
||||
state.last_sinkpad_time,
|
||||
state.last_output_time,
|
||||
settings.timeout,
|
||||
running_time
|
||||
);
|
||||
|
@ -319,22 +479,23 @@ impl FallbackSwitch {
|
|||
CAT,
|
||||
obj: agg,
|
||||
"Timeout reached: {} + {} <= {}",
|
||||
state.last_sinkpad_time,
|
||||
state.last_output_time,
|
||||
settings.timeout,
|
||||
running_time
|
||||
);
|
||||
|
||||
let mut active_sinkpad = self.active_sinkpad.lock().unwrap();
|
||||
let pad_change =
|
||||
active_sinkpad.as_ref() != Some(fallback_sinkpad.upcast_ref::<gst::Pad>());
|
||||
let pad_change = settings.auto_switch
|
||||
&& active_sinkpad.as_ref() != Some(backup_pad.upcast_ref::<gst::Pad>());
|
||||
if pad_change {
|
||||
if buffer.get_flags().contains(gst::BufferFlags::DELTA_UNIT) {
|
||||
gst_info!(
|
||||
CAT,
|
||||
obj: agg,
|
||||
"Can't change to fallback sinkpad yet, waiting for keyframe"
|
||||
"Can't change to sinkpad {} yet, waiting for keyframe",
|
||||
backup_pad.get_name()
|
||||
);
|
||||
fallback_sinkpad.push_event(
|
||||
backup_pad.push_event(
|
||||
gst_video::UpstreamForceKeyUnitEvent::builder()
|
||||
.all_headers(true)
|
||||
.build(),
|
||||
|
@ -343,57 +504,165 @@ impl FallbackSwitch {
|
|||
}
|
||||
|
||||
gst_info!(CAT, obj: agg, "Active pad changed to fallback sinkpad");
|
||||
*active_sinkpad = Some(fallback_sinkpad.clone().upcast());
|
||||
*active_sinkpad = Some(backup_pad.clone().upcast());
|
||||
}
|
||||
drop(active_sinkpad);
|
||||
|
||||
let pad_states = self.pad_states.read().unwrap();
|
||||
let active_caps = match pad_states.fallback_sinkpad {
|
||||
None => {
|
||||
// This can happen if the pad is removed in the meantime,
|
||||
// not a problem really
|
||||
return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
|
||||
}
|
||||
Some(ref fallback_sinkpad) => fallback_sinkpad.caps.as_ref().unwrap().clone(),
|
||||
let active_caps = if backup_pad == &self.primary_sinkpad {
|
||||
let pad_state = self.primary_state.read().unwrap();
|
||||
pad_state.caps.as_ref().unwrap().clone()
|
||||
} else {
|
||||
let pad_state = self.fallback_state.read().unwrap();
|
||||
pad_state.caps.as_ref().unwrap().clone()
|
||||
};
|
||||
drop(pad_states);
|
||||
|
||||
break Ok((buffer, active_caps, pad_change));
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
fn get_next_buffer(
|
||||
&self,
|
||||
agg: &super::FallbackSwitch,
|
||||
timeout: bool,
|
||||
) -> Result<(gst::Buffer, gst::Caps, bool), gst::FlowError> {
|
||||
) -> (
|
||||
Result<(gst::Buffer, gst::Caps, bool), gst::FlowError>,
|
||||
(bool, bool),
|
||||
) {
|
||||
let settings = self.settings.lock().unwrap().clone();
|
||||
let mut state = self.output_state.lock().unwrap();
|
||||
let fallback_sinkpad = self.fallback_sinkpad.read().unwrap();
|
||||
|
||||
gst_debug!(CAT, obj: agg, "Aggregate called: timeout {}", timeout);
|
||||
|
||||
if let Some(buffer) = self.sinkpad.pop_buffer() {
|
||||
if let Some(res) = self.handle_main_buffer(
|
||||
if self.primary_sinkpad.is_eos() {
|
||||
gst_log!(CAT, obj: agg, "Sinkpad is EOS");
|
||||
return (Err(gst::FlowError::Eos), (false, false));
|
||||
}
|
||||
|
||||
/* Choose which pad we check first */
|
||||
let active_sinkpad = self.active_sinkpad.lock().unwrap();
|
||||
let prefer_primary = settings.auto_switch
|
||||
|| active_sinkpad.is_none()
|
||||
|| active_sinkpad.as_ref() == Some(self.primary_sinkpad.upcast_ref::<gst::Pad>());
|
||||
drop(active_sinkpad);
|
||||
|
||||
let fallback_sinkpad = self.fallback_sinkpad.read().unwrap();
|
||||
|
||||
let (preferred_pad, backup_pad) = if prefer_primary {
|
||||
(&self.primary_sinkpad, fallback_sinkpad.as_ref())
|
||||
} else {
|
||||
(
|
||||
fallback_sinkpad.as_ref().unwrap(),
|
||||
Some(&self.primary_sinkpad),
|
||||
)
|
||||
};
|
||||
|
||||
let clock = agg.get_clock();
|
||||
let base_time = agg.get_base_time();
|
||||
|
||||
let cur_running_time = if let Some(clock) = clock {
|
||||
clock.get_time() - base_time
|
||||
} else {
|
||||
gst::ClockTime::none()
|
||||
};
|
||||
|
||||
/* See if there's a buffer on the preferred pad and output that */
|
||||
if let Some(buffer) = preferred_pad.pop_buffer() {
|
||||
match self.handle_main_buffer(
|
||||
agg,
|
||||
&mut *state,
|
||||
&settings,
|
||||
buffer,
|
||||
fallback_sinkpad.as_ref(),
|
||||
)? {
|
||||
return Ok(res);
|
||||
preferred_pad,
|
||||
&backup_pad,
|
||||
cur_running_time,
|
||||
) {
|
||||
Ok(Some(res)) => {
|
||||
return (
|
||||
Ok(res),
|
||||
self.check_health_changes(
|
||||
&mut *state,
|
||||
&settings,
|
||||
preferred_pad,
|
||||
&backup_pad,
|
||||
cur_running_time,
|
||||
),
|
||||
)
|
||||
}
|
||||
Err(e) => {
|
||||
return (
|
||||
Err(e),
|
||||
self.check_health_changes(
|
||||
&mut *state,
|
||||
&settings,
|
||||
preferred_pad,
|
||||
&backup_pad,
|
||||
cur_running_time,
|
||||
),
|
||||
)
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
} else if self.sinkpad.is_eos() {
|
||||
gst_log!(CAT, obj: agg, "Sinkpad is EOS");
|
||||
return Err(gst::FlowError::Eos);
|
||||
}
|
||||
|
||||
if let (false, Some(_)) = (timeout, &*fallback_sinkpad) {
|
||||
gst_debug!(CAT, obj: agg, "Have fallback sinkpad but no timeout yet");
|
||||
/* If we can't auto-switch, then can't fetch anything from the backup pad */
|
||||
if !settings.auto_switch {
|
||||
/* Use a dummy drain_pad_to_time() call to update the last_sinkpad_time */
|
||||
if let Some(backup_pad) = &backup_pad {
|
||||
if let Err(e) = self.drain_pad_to_time(
|
||||
&agg,
|
||||
&mut *state,
|
||||
&backup_pad,
|
||||
gst::ClockTime::from_seconds(0),
|
||||
) {
|
||||
return (
|
||||
Err(e),
|
||||
self.check_health_changes(
|
||||
&mut *state,
|
||||
&settings,
|
||||
preferred_pad,
|
||||
&Some(backup_pad),
|
||||
cur_running_time,
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Err(gst_base::AGGREGATOR_FLOW_NEED_DATA)
|
||||
} else if let (true, Some(fallback_sinkpad)) = (timeout, &*fallback_sinkpad) {
|
||||
self.get_fallback_buffer(agg, &mut *state, &settings, fallback_sinkpad)
|
||||
return (
|
||||
Err(gst_base::AGGREGATOR_FLOW_NEED_DATA),
|
||||
self.check_health_changes(
|
||||
&mut *state,
|
||||
&settings,
|
||||
preferred_pad,
|
||||
&backup_pad,
|
||||
cur_running_time,
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
if let (false, Some(backup_pad)) = (timeout, &backup_pad) {
|
||||
gst_debug!(CAT, obj: agg, "Have fallback sinkpad but no timeout yet");
|
||||
(
|
||||
Err(gst_base::AGGREGATOR_FLOW_NEED_DATA),
|
||||
self.check_health_changes(
|
||||
&mut *state,
|
||||
&settings,
|
||||
preferred_pad,
|
||||
&Some(backup_pad),
|
||||
cur_running_time,
|
||||
),
|
||||
)
|
||||
} else if let (true, Some(backup_pad)) = (timeout, &backup_pad) {
|
||||
(
|
||||
self.get_backup_buffer(agg, &mut *state, &settings, backup_pad),
|
||||
self.check_health_changes(
|
||||
&mut *state,
|
||||
&settings,
|
||||
preferred_pad,
|
||||
&Some(backup_pad),
|
||||
cur_running_time,
|
||||
),
|
||||
)
|
||||
} else {
|
||||
// Otherwise there's not much we can do at this point
|
||||
gst_debug!(
|
||||
|
@ -401,7 +670,16 @@ impl FallbackSwitch {
|
|||
obj: agg,
|
||||
"Got no buffer on sinkpad and have no fallback sinkpad"
|
||||
);
|
||||
Err(gst_base::AGGREGATOR_FLOW_NEED_DATA)
|
||||
(
|
||||
Err(gst_base::AGGREGATOR_FLOW_NEED_DATA),
|
||||
self.check_health_changes(
|
||||
&mut *state,
|
||||
&settings,
|
||||
preferred_pad,
|
||||
&backup_pad,
|
||||
cur_running_time,
|
||||
),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -421,11 +699,12 @@ impl ObjectSubclass for FallbackSwitch {
|
|||
gst::PadBuilder::<gst_base::AggregatorPad>::from_template(&templ, Some("sink")).build();
|
||||
|
||||
Self {
|
||||
sinkpad,
|
||||
primary_sinkpad: sinkpad,
|
||||
primary_state: RwLock::new(PadInputState::default()),
|
||||
fallback_sinkpad: RwLock::new(None),
|
||||
fallback_state: RwLock::new(PadInputState::default()),
|
||||
active_sinkpad: Mutex::new(None),
|
||||
output_state: Mutex::new(OutputState::default()),
|
||||
pad_states: RwLock::new(PadStates::default()),
|
||||
settings: Mutex::new(Settings::default()),
|
||||
}
|
||||
}
|
||||
|
@ -477,7 +756,7 @@ impl ObjectImpl for FallbackSwitch {
|
|||
fn constructed(&self, obj: &Self::Type) {
|
||||
self.parent_constructed(obj);
|
||||
|
||||
obj.add_pad(&self.sinkpad).unwrap();
|
||||
obj.add_pad(&self.primary_sinkpad).unwrap();
|
||||
}
|
||||
|
||||
fn set_property(&self, obj: &Self::Type, id: usize, value: &glib::Value) {
|
||||
|
@ -497,6 +776,29 @@ impl ObjectImpl for FallbackSwitch {
|
|||
settings.timeout = timeout;
|
||||
drop(settings);
|
||||
}
|
||||
subclass::Property("active-pad", ..) => {
|
||||
let settings = self.settings.lock().unwrap();
|
||||
if settings.auto_switch {
|
||||
gst_warning!(
|
||||
CAT,
|
||||
obj: obj,
|
||||
"active-pad property setting ignored, because auto-switch=true"
|
||||
);
|
||||
} else {
|
||||
let active_pad = value.get::<gst::Pad>().expect("type checked upstream");
|
||||
/* Trigger a pad switch if needed */
|
||||
let mut cur_active_pad = self.active_sinkpad.lock().unwrap();
|
||||
if *cur_active_pad != active_pad {
|
||||
*cur_active_pad = active_pad;
|
||||
}
|
||||
drop(cur_active_pad);
|
||||
}
|
||||
drop(settings);
|
||||
}
|
||||
subclass::Property("auto-switch", ..) => {
|
||||
let mut settings = self.settings.lock().unwrap();
|
||||
settings.auto_switch = value.get_some().expect("type checked upstream");
|
||||
}
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
}
|
||||
|
@ -513,6 +815,18 @@ impl ObjectImpl for FallbackSwitch {
|
|||
let active_pad = self.active_sinkpad.lock().unwrap().clone();
|
||||
active_pad.to_value()
|
||||
}
|
||||
subclass::Property("auto-switch", ..) => {
|
||||
let settings = self.settings.lock().unwrap();
|
||||
Ok(settings.auto_switch.to_value())
|
||||
}
|
||||
subclass::Property("primary-health", ..) => {
|
||||
let state = self.output_state.lock().unwrap();
|
||||
Ok(state.primary.stream_health.to_value())
|
||||
}
|
||||
subclass::Property("fallback-health", ..) => {
|
||||
let state = self.output_state.lock().unwrap();
|
||||
Ok(state.fallback.stream_health.to_value())
|
||||
}
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
}
|
||||
|
@ -545,9 +859,14 @@ impl ElementImpl for FallbackSwitch {
|
|||
Some("fallback_sink"),
|
||||
)
|
||||
.build();
|
||||
|
||||
*fallback_sinkpad = Some(sinkpad.clone());
|
||||
drop(fallback_sinkpad);
|
||||
|
||||
let mut state = self.output_state.lock().unwrap();
|
||||
state.fallback = PadOutputState::default();
|
||||
drop(state);
|
||||
|
||||
element.add_pad(&sinkpad).unwrap();
|
||||
|
||||
Some(sinkpad.upcast())
|
||||
|
@ -555,12 +874,9 @@ impl ElementImpl for FallbackSwitch {
|
|||
|
||||
fn release_pad(&self, element: &Self::Type, pad: &gst::Pad) {
|
||||
let mut fallback_sinkpad = self.fallback_sinkpad.write().unwrap();
|
||||
let mut pad_states = self.pad_states.write().unwrap();
|
||||
|
||||
if fallback_sinkpad.as_ref().map(|p| p.upcast_ref()) == Some(pad) {
|
||||
*fallback_sinkpad = None;
|
||||
pad_states.fallback_sinkpad = None;
|
||||
drop(pad_states);
|
||||
drop(fallback_sinkpad);
|
||||
element.remove_pad(pad).unwrap();
|
||||
gst_debug!(CAT, obj: element, "Removed fallback sinkpad {:?}", pad);
|
||||
|
@ -571,7 +887,9 @@ impl ElementImpl for FallbackSwitch {
|
|||
impl AggregatorImpl for FallbackSwitch {
|
||||
fn start(&self, _agg: &Self::Type) -> Result<(), gst::ErrorMessage> {
|
||||
*self.output_state.lock().unwrap() = OutputState::default();
|
||||
*self.pad_states.write().unwrap() = PadStates::default();
|
||||
|
||||
*self.primary_state.write().unwrap() = PadInputState::default();
|
||||
*self.fallback_state.write().unwrap() = PadInputState::default();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -625,19 +943,17 @@ impl AggregatorImpl for FallbackSwitch {
|
|||
video_info = None;
|
||||
}
|
||||
|
||||
let new_pad_state = PadState {
|
||||
let new_pad_state = PadInputState {
|
||||
caps: Some(caps),
|
||||
audio_info,
|
||||
video_info,
|
||||
};
|
||||
|
||||
let mut pad_states = self.pad_states.write().unwrap();
|
||||
if agg_pad == &self.sinkpad {
|
||||
pad_states.sinkpad = new_pad_state;
|
||||
if agg_pad == &self.primary_sinkpad {
|
||||
*self.primary_state.write().unwrap() = new_pad_state;
|
||||
} else if Some(agg_pad) == self.fallback_sinkpad.read().unwrap().as_ref() {
|
||||
pad_states.fallback_sinkpad = Some(new_pad_state);
|
||||
*self.fallback_state.write().unwrap() = new_pad_state;
|
||||
}
|
||||
drop(pad_states);
|
||||
|
||||
self.parent_sink_event(agg, agg_pad, event)
|
||||
}
|
||||
|
@ -646,22 +962,46 @@ impl AggregatorImpl for FallbackSwitch {
|
|||
}
|
||||
|
||||
fn get_next_time(&self, agg: &Self::Type) -> gst::ClockTime {
|
||||
// If we have a buffer on the sinkpad then the timeout is always going to be immediately,
|
||||
// i.e. 0. We want to output that buffer immediately, no matter what.
|
||||
//
|
||||
// Otherwise if we have a fallback sinkpad and it has a buffer, then the timeout is going
|
||||
// to be its running time. We will then either output the buffer or drop it, depending on
|
||||
// its distance from the last sinkpad time
|
||||
if self.sinkpad.peek_buffer().is_some() {
|
||||
gst_debug!(CAT, obj: agg, "Have buffer on sinkpad, immediate timeout");
|
||||
/* At each iteration, we have a preferred pad and a backup pad. If autoswitch is true,
|
||||
* the sinkpad is always preferred, otherwise it's the active sinkpad as set by the app.
|
||||
* The backup pad is the other one (may be None if there's no fallback pad yet).
|
||||
*
|
||||
* If we have a buffer on the preferred pad then the timeout is always going to be immediately,
|
||||
* i.e. 0. We want to output that buffer immediately, no matter what.
|
||||
*
|
||||
* Otherwise if we have a backup sinkpad and it has a buffer, then the timeout is going
|
||||
* to be that buffer's running time. We will then either output the buffer or drop it, depending on
|
||||
* its distance from the last output time
|
||||
*/
|
||||
let settings = self.settings.lock().unwrap();
|
||||
let active_sinkpad = self.active_sinkpad.lock().unwrap();
|
||||
let fallback_sinkpad = self.fallback_sinkpad.read().unwrap();
|
||||
|
||||
let prefer_primary = settings.auto_switch
|
||||
|| active_sinkpad.is_none()
|
||||
|| active_sinkpad.as_ref() == Some(self.primary_sinkpad.upcast_ref::<gst::Pad>());
|
||||
|
||||
let (preferred_pad, backup_pad) = if prefer_primary {
|
||||
(&self.primary_sinkpad, fallback_sinkpad.as_ref())
|
||||
} else {
|
||||
(
|
||||
fallback_sinkpad.as_ref().unwrap(),
|
||||
Some(&self.primary_sinkpad),
|
||||
)
|
||||
};
|
||||
|
||||
if preferred_pad.peek_buffer().is_some() {
|
||||
gst_debug!(
|
||||
CAT,
|
||||
obj: agg,
|
||||
"Have buffer on sinkpad {}, immediate timeout",
|
||||
preferred_pad.get_name()
|
||||
);
|
||||
0.into()
|
||||
} else if self.sinkpad.is_eos() {
|
||||
} else if self.primary_sinkpad.is_eos() {
|
||||
gst_debug!(CAT, obj: agg, "Sinkpad is EOS, immediate timeout");
|
||||
0.into()
|
||||
} else if let Some((buffer, fallback_sinkpad)) = self
|
||||
.fallback_sinkpad
|
||||
.read()
|
||||
.unwrap()
|
||||
} else if let Some((buffer, backup_sinkpad)) = backup_pad
|
||||
.as_ref()
|
||||
.and_then(|p| p.peek_buffer().map(|buffer| (buffer, p)))
|
||||
{
|
||||
|
@ -671,7 +1011,7 @@ impl AggregatorImpl for FallbackSwitch {
|
|||
return 0.into();
|
||||
}
|
||||
|
||||
let segment = match fallback_sinkpad.get_segment().downcast::<gst::ClockTime>() {
|
||||
let segment = match backup_sinkpad.get_segment().downcast::<gst::ClockTime>() {
|
||||
Ok(segment) => segment,
|
||||
Err(_) => {
|
||||
gst_error!(CAT, obj: agg, "Only TIME segments supported");
|
||||
|
@ -684,12 +1024,13 @@ impl AggregatorImpl for FallbackSwitch {
|
|||
gst_debug!(
|
||||
CAT,
|
||||
obj: agg,
|
||||
"Have buffer on fallback sinkpad, timeout at {}",
|
||||
"Have buffer on {} pad, timeout at {}",
|
||||
backup_sinkpad.get_name(),
|
||||
running_time
|
||||
);
|
||||
running_time
|
||||
} else {
|
||||
gst_debug!(CAT, obj: agg, "Have no buffer at all yet");
|
||||
gst_debug!(CAT, obj: agg, "No buffer available on either input");
|
||||
gst::CLOCK_TIME_NONE
|
||||
}
|
||||
}
|
||||
|
@ -716,15 +1057,13 @@ impl AggregatorImpl for FallbackSwitch {
|
|||
return Some(buffer);
|
||||
}
|
||||
|
||||
let pad_states = self.pad_states.read().unwrap();
|
||||
let pad_state = if agg_pad == &self.sinkpad {
|
||||
&pad_states.sinkpad
|
||||
let primary_state = self.primary_state.read().unwrap();
|
||||
let fallback_state = self.fallback_state.read().unwrap();
|
||||
|
||||
let pad_state = if agg_pad == &self.primary_sinkpad {
|
||||
&primary_state
|
||||
} else if Some(agg_pad) == self.fallback_sinkpad.read().unwrap().as_ref() {
|
||||
if let Some(ref pad_state) = pad_states.fallback_sinkpad {
|
||||
pad_state
|
||||
} else {
|
||||
return Some(buffer);
|
||||
}
|
||||
&fallback_state
|
||||
} else {
|
||||
unreachable!()
|
||||
};
|
||||
|
@ -798,7 +1137,17 @@ impl AggregatorImpl for FallbackSwitch {
|
|||
) -> Result<gst::FlowSuccess, gst::FlowError> {
|
||||
gst_debug!(CAT, obj: agg, "Aggregate called: timeout {}", timeout);
|
||||
|
||||
let (mut buffer, active_caps, pad_change) = self.get_next_buffer(agg, timeout)?;
|
||||
let (res, (primary_health_change, fallback_health_change)) =
|
||||
self.get_next_buffer(agg, timeout);
|
||||
|
||||
if primary_health_change {
|
||||
agg.notify("primary-health");
|
||||
}
|
||||
if fallback_health_change {
|
||||
agg.notify("fallback-health");
|
||||
}
|
||||
|
||||
let (mut buffer, active_caps, pad_change) = res?;
|
||||
|
||||
let current_src_caps = agg.get_static_pad("src").unwrap().get_current_caps();
|
||||
if Some(&active_caps) != current_src_caps.as_ref() {
|
||||
|
@ -816,7 +1165,6 @@ impl AggregatorImpl for FallbackSwitch {
|
|||
agg.notify("active-pad");
|
||||
buffer.make_mut().set_flags(gst::BufferFlags::DISCONT);
|
||||
}
|
||||
|
||||
gst_debug!(CAT, obj: agg, "Finishing buffer {:?}", buffer);
|
||||
agg.finish_buffer(buffer)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue