fallbacksrc: send EOS on fallback-only stream

When both audio and video are enabled, but the primary stream
only has either, when that stream ends we want to end the other
fallback-only stream as well.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/515>
This commit is contained in:
Mathieu Duponchelle 2021-05-26 00:16:31 +02:00 committed by GStreamer Marge Bot
parent a5a80281f3
commit 0b08f855c5

View file

@ -1142,9 +1142,9 @@ impl FallbackSrc {
Some(state) => state, Some(state) => state,
}; };
let (type_, stream) = match pad.name() { let (is_video, stream) = match pad.name() {
x if x.starts_with("audio_") => ("audio", &mut state.audio_stream), x if x.starts_with("audio_") => (false, &mut state.audio_stream),
x if x.starts_with("video_") => ("video", &mut state.video_stream), x if x.starts_with("video_") => (true, &mut state.video_stream),
_ => { _ => {
let caps = match pad.current_caps().unwrap_or_else(|| pad.query_caps(None)) { let caps = match pad.current_caps().unwrap_or_else(|| pad.query_caps(None)) {
caps if !caps.is_any() && !caps.is_empty() => caps, caps if !caps.is_any() && !caps.is_empty() => caps,
@ -1154,9 +1154,9 @@ impl FallbackSrc {
let s = caps.structure(0).unwrap(); let s = caps.structure(0).unwrap();
if s.name().starts_with("audio/") { if s.name().starts_with("audio/") {
("audio", &mut state.audio_stream) (false, &mut state.audio_stream)
} else if s.name().starts_with("video/") { } else if s.name().starts_with("video/") {
("video", &mut state.video_stream) (true, &mut state.video_stream)
} else { } else {
// TODO: handle subtitles etc // TODO: handle subtitles etc
return Ok(()); return Ok(());
@ -1164,6 +1164,8 @@ impl FallbackSrc {
} }
}; };
let type_ = if is_video { "video" } else { "audio" };
let stream = match stream { let stream = match stream {
None => { None => {
gst_debug!(CAT, obj: element, "No {} stream enabled", type_); gst_debug!(CAT, obj: element, "No {} stream enabled", type_);
@ -1193,42 +1195,62 @@ impl FallbackSrc {
) )
})?; })?;
if state.settings.restart_on_eos { let element_weak = element.downgrade();
let element_weak = element.downgrade(); pad.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, move |pad, info| {
pad.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, move |pad, info| { let element = match element_weak.upgrade() {
let element = match element_weak.upgrade() { None => return gst::PadProbeReturn::Ok,
None => return gst::PadProbeReturn::Ok, Some(element) => element,
Some(element) => element, };
};
let src = FallbackSrc::from_instance(&element); let src = FallbackSrc::from_instance(&element);
match info.data { match info.data {
Some(gst::PadProbeData::Event(ref ev)) if ev.type_() == gst::EventType::Eos => { Some(gst::PadProbeData::Event(ref ev)) if ev.type_() == gst::EventType::Eos => {
gst_debug!( gst_debug!(
CAT, CAT,
obj: &element, obj: &element,
"Received EOS from source on pad {}, restarting", "Received EOS from source on pad {}",
pad.name() pad.name()
); );
let mut state_guard = src.state.lock().unwrap(); let mut state_guard = src.state.lock().unwrap();
let state = match &mut *state_guard { let state = match &mut *state_guard {
None => { None => {
return gst::PadProbeReturn::Ok; return gst::PadProbeReturn::Ok;
} }
Some(state) => state, Some(state) => state,
}; };
if state.settings.restart_on_eos {
src.handle_source_error(&element, state, RetryReason::Eos); src.handle_source_error(&element, state, RetryReason::Eos);
drop(state_guard); drop(state_guard);
element.notify("statistics"); element.notify("statistics");
gst::PadProbeReturn::Drop gst::PadProbeReturn::Drop
} else {
if let Some(other_stream) = {
if is_video {
state.audio_stream.as_ref()
} else {
state.video_stream.as_ref()
}
} {
if other_stream.source_srcpad.is_none() {
let fallback_input = &other_stream.fallback_input;
let clocksync_queue_sinkpad =
other_stream.clocksync_queue.static_pad("sink").unwrap();
fallback_input.call_async(move |fallback_input| {
fallback_input.send_event(gst::event::Eos::new());
clocksync_queue_sinkpad.send_event(gst::event::Eos::new());
});
}
}
gst::PadProbeReturn::Ok
} }
_ => gst::PadProbeReturn::Ok,
} }
}); _ => gst::PadProbeReturn::Ok,
} }
});
assert!(stream.source_srcpad_block.is_none()); assert!(stream.source_srcpad_block.is_none());
stream.source_srcpad = Some(pad.clone()); stream.source_srcpad = Some(pad.clone());
@ -1854,7 +1876,7 @@ impl FallbackSrc {
state: &mut State, state: &mut State,
reason: RetryReason, reason: RetryReason,
) { ) {
gst_debug!(CAT, obj: element, "Handling source error"); gst_debug!(CAT, obj: element, "Handling source error: {:?}", reason);
state.stats.last_retry_reason = reason; state.stats.last_retry_reason = reason;
if state.source_pending_restart { if state.source_pending_restart {