utils/fallbacksrc: Add a 1s queue between uridecodebin and clocksync and do buffering after the queue

This adds 1s more buffering (of uncompressed data), but works around
https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/800
This commit is contained in:
Sebastian Dröge 2020-07-30 14:05:08 +03:00
parent 34fab8786f
commit 30c711886e

View file

@ -99,6 +99,9 @@ struct Stream {
// clocksync for source source pad // clocksync for source source pad
clocksync: gst::Element, clocksync: gst::Element,
clocksync_queue: gst::Element,
clocksync_queue_srcpad: gst::Pad,
// fallbackswitch // fallbackswitch
switch: gst::Element, switch: gst::Element,
@ -919,8 +922,18 @@ impl FallbackSrc {
}) })
.expect("No clocksync or identity found"); .expect("No clocksync or identity found");
// Workaround for issues caused by https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/800
let clocksync_queue = gst::ElementFactory::make("queue", None).expect("No queue found");
clocksync_queue
.set_properties(&[
("max-size-buffers", &0u32),
("max-size-bytes", &0u32),
("max-size-time", &gst::SECOND),
])
.unwrap();
element element
.add_many(&[&fallback_input, &switch, &clocksync]) .add_many(&[&fallback_input, &switch, &clocksync_queue, &clocksync])
.unwrap(); .unwrap();
let element_weak = element.downgrade(); let element_weak = element.downgrade();
@ -940,8 +953,9 @@ impl FallbackSrc {
gst::Element::link_pads(&fallback_input, Some("src"), &switch, Some("fallback_sink")) gst::Element::link_pads(&fallback_input, Some("src"), &switch, Some("fallback_sink"))
.unwrap(); .unwrap();
gst::Element::link_pads(&clocksync_queue, Some("src"), &clocksync, Some("sink")).unwrap();
gst::Element::link_pads(&clocksync, Some("src"), &switch, Some("sink")).unwrap(); gst::Element::link_pads(&clocksync, Some("src"), &switch, Some("sink")).unwrap();
// clocksync sink pad is not connected to anything yet at this point! // clocksync_queue sink pad is not connected to anything yet at this point!
let srcpad = switch.get_static_pad("src").unwrap(); let srcpad = switch.get_static_pad("src").unwrap();
let templ = element let templ = element
@ -970,6 +984,8 @@ impl FallbackSrc {
source_srcpad: None, source_srcpad: None,
source_srcpad_block: None, source_srcpad_block: None,
clocksync, clocksync,
clocksync_queue_srcpad: clocksync_queue.get_static_pad("src").unwrap(),
clocksync_queue,
switch, switch,
srcpad: ghostpad.upcast(), srcpad: ghostpad.upcast(),
}) })
@ -1084,6 +1100,7 @@ impl FallbackSrc {
.filter_map(|v| v.as_ref()) .filter_map(|v| v.as_ref())
{ {
element.remove(&stream.switch).unwrap(); element.remove(&stream.switch).unwrap();
element.remove(&stream.clocksync_queue).unwrap();
element.remove(&stream.clocksync).unwrap(); element.remove(&stream.clocksync).unwrap();
element.remove(&stream.fallback_input).unwrap(); element.remove(&stream.fallback_input).unwrap();
let _ = stream.srcpad.set_target(None::<&gst::Pad>); let _ = stream.srcpad.set_target(None::<&gst::Pad>);
@ -1256,7 +1273,7 @@ impl FallbackSrc {
Some(ref mut stream) => stream, Some(ref mut stream) => stream,
}; };
let sinkpad = stream.clocksync.get_static_pad("sink").unwrap(); let sinkpad = stream.clocksync_queue.get_static_pad("sink").unwrap();
pad.link(&sinkpad).map_err(|err| { pad.link(&sinkpad).map_err(|err| {
gst_error!( gst_error!(
CAT, CAT,
@ -1308,7 +1325,8 @@ impl FallbackSrc {
} }
stream.source_srcpad = Some(pad.clone()); stream.source_srcpad = Some(pad.clone());
stream.source_srcpad_block = Some(self.add_pad_probe(element, pad)); stream.source_srcpad_block =
Some(self.add_pad_probe(element, &stream.clocksync_queue_srcpad));
drop(state_guard); drop(state_guard);
element.notify("status"); element.notify("status");
@ -1395,13 +1413,13 @@ impl FallbackSrc {
let stream = if let Some(stream) = state let stream = if let Some(stream) = state
.audio_stream .audio_stream
.as_mut() .as_mut()
.filter(|s| s.source_srcpad.as_ref() == Some(pad)) .filter(|s| &s.clocksync_queue_srcpad == pad)
{ {
stream stream
} else if let Some(stream) = state } else if let Some(stream) = state
.video_stream .video_stream
.as_mut() .as_mut()
.filter(|s| s.source_srcpad.as_ref() == Some(pad)) .filter(|s| &s.clocksync_queue_srcpad == pad)
{ {
stream stream
} else { } else {
@ -1709,17 +1727,15 @@ impl FallbackSrc {
state.last_buffering_update = Some(Instant::now()); state.last_buffering_update = Some(Instant::now());
// Block source pads if needed to pause // Block source pads if needed to pause
if let Some(ref mut stream) = state.audio_stream { if let Some(ref mut stream) = state.audio_stream {
if stream.source_srcpad_block.is_none() { if stream.source_srcpad_block.is_none() && stream.source_srcpad.is_some() {
if let Some(ref pad) = stream.source_srcpad { stream.source_srcpad_block =
stream.source_srcpad_block = Some(self.add_pad_probe(element, pad)); Some(self.add_pad_probe(element, &stream.clocksync_queue_srcpad));
}
} }
} }
if let Some(ref mut stream) = state.video_stream { if let Some(ref mut stream) = state.video_stream {
if stream.source_srcpad_block.is_none() { if stream.source_srcpad_block.is_none() && stream.source_srcpad.is_some() {
if let Some(ref pad) = stream.source_srcpad { stream.source_srcpad_block =
stream.source_srcpad_block = Some(self.add_pad_probe(element, pad)); Some(self.add_pad_probe(element, &stream.clocksync_queue_srcpad));
}
} }
} }
@ -1783,10 +1799,9 @@ impl FallbackSrc {
.iter_mut() .iter_mut()
.filter_map(|v| v.as_mut()) .filter_map(|v| v.as_mut())
{ {
if let Some(ref srcpad) = stream.source_srcpad { if stream.source_srcpad.is_some() && stream.source_srcpad_block.is_none() {
if stream.source_srcpad_block.is_none() { stream.source_srcpad_block =
stream.source_srcpad_block = Some(self.add_pad_probe(element, srcpad)); Some(self.add_pad_probe(element, &stream.clocksync_queue_srcpad));
}
} }
} }