mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-01-04 08:18:52 +00:00
bd2ff494c7
fallbackswitch now supports multiple sink pads, and on a timeout of the active pad, it will automatically switch to the next lowest priority pad that has data available. fallbackswitch sink pads follow the `sink_%u` template and have `priority` as a pad property. Co-authored-by: Vivia Nikolaidou <vivia.nikolaidou@ltnglobal.com>
747 lines
23 KiB
Rust
747 lines
23 KiB
Rust
// Copyright (C) 2019 Sebastian Dröge <sebastian@centricular.com>
|
|
// Copyright (C) 2021 Jan Schmidt <jan@centricular.com>
|
|
//
|
|
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
|
|
// If a copy of the MPL was not distributed with this file, You can obtain one at
|
|
// <https://mozilla.org/MPL/2.0/>.
|
|
//
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
use gst::debug;
|
|
use gst::prelude::*;
|
|
|
|
use once_cell::sync::Lazy;
|
|
|
|
const LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(10);
|
|
|
|
static TEST_CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
|
gst::DebugCategory::new(
|
|
"fallbackswitch-test",
|
|
gst::DebugColorFlags::empty(),
|
|
Some("fallbackswitch test"),
|
|
)
|
|
});
|
|
|
|
fn init() {
|
|
use std::sync::Once;
|
|
static INIT: Once = Once::new();
|
|
|
|
INIT.call_once(|| {
|
|
gst::init().unwrap();
|
|
gstfallbackswitch::plugin_register_static().expect("gstfallbackswitch test");
|
|
});
|
|
}
|
|
|
|
macro_rules! assert_fallback_buffer {
|
|
($buffer:expr, $ts:expr) => {
|
|
assert_eq!($buffer.pts(), $ts);
|
|
assert_eq!($buffer.size(), 160 * 120 * 4);
|
|
};
|
|
}
|
|
|
|
macro_rules! assert_buffer {
|
|
($buffer:expr, $ts:expr) => {
|
|
assert_eq!($buffer.pts(), $ts);
|
|
assert_eq!($buffer.size(), 320 * 240 * 4);
|
|
};
|
|
}
|
|
|
|
#[test]
|
|
fn test_no_fallback_no_drops() {
|
|
let pipeline = setup_pipeline(None, None, None);
|
|
|
|
push_buffer(&pipeline, gst::ClockTime::ZERO);
|
|
set_time(&pipeline, gst::ClockTime::ZERO + LATENCY);
|
|
let buffer = pull_buffer(&pipeline);
|
|
assert_buffer!(buffer, Some(gst::ClockTime::ZERO));
|
|
|
|
push_buffer(&pipeline, gst::ClockTime::SECOND);
|
|
set_time(&pipeline, gst::ClockTime::SECOND + LATENCY);
|
|
let buffer = pull_buffer(&pipeline);
|
|
assert_buffer!(buffer, Some(gst::ClockTime::SECOND));
|
|
|
|
push_buffer(&pipeline, 2 * gst::ClockTime::SECOND);
|
|
set_time(&pipeline, 2 * gst::ClockTime::SECOND + LATENCY);
|
|
let buffer = pull_buffer(&pipeline);
|
|
assert_buffer!(buffer, Some(2 * gst::ClockTime::SECOND));
|
|
|
|
push_eos(&pipeline);
|
|
wait_eos(&pipeline);
|
|
|
|
stop_pipeline(pipeline);
|
|
}
|
|
|
|
#[test]
|
|
fn test_no_drops_live() {
|
|
test_no_drops(true);
|
|
}
|
|
|
|
#[test]
|
|
fn test_no_drops_not_live() {
|
|
test_no_drops(false);
|
|
}
|
|
|
|
fn test_no_drops(live: bool) {
|
|
let pipeline = setup_pipeline(Some(live), None, None);
|
|
|
|
push_buffer(&pipeline, gst::ClockTime::ZERO);
|
|
push_fallback_buffer(&pipeline, gst::ClockTime::ZERO);
|
|
set_time(&pipeline, gst::ClockTime::ZERO + LATENCY);
|
|
let buffer = pull_buffer(&pipeline);
|
|
assert_buffer!(buffer, Some(gst::ClockTime::ZERO));
|
|
|
|
push_fallback_buffer(&pipeline, gst::ClockTime::SECOND);
|
|
push_buffer(&pipeline, gst::ClockTime::SECOND);
|
|
set_time(&pipeline, gst::ClockTime::SECOND + LATENCY);
|
|
let buffer = pull_buffer(&pipeline);
|
|
assert_buffer!(buffer, Some(gst::ClockTime::SECOND));
|
|
|
|
push_buffer(&pipeline, 2 * gst::ClockTime::SECOND);
|
|
push_fallback_buffer(&pipeline, 2 * gst::ClockTime::SECOND);
|
|
set_time(&pipeline, 2 * gst::ClockTime::SECOND + LATENCY);
|
|
let buffer = pull_buffer(&pipeline);
|
|
assert_buffer!(buffer, Some(2 * gst::ClockTime::SECOND));
|
|
|
|
// EOS on the fallback should not be required
|
|
push_eos(&pipeline);
|
|
wait_eos(&pipeline);
|
|
|
|
stop_pipeline(pipeline);
|
|
}
|
|
|
|
#[test]
|
|
fn test_no_drops_but_no_fallback_frames_live() {
|
|
test_no_drops_but_no_fallback_frames(true);
|
|
}
|
|
|
|
#[test]
|
|
fn test_no_drops_but_no_fallback_frames_not_live() {
|
|
test_no_drops_but_no_fallback_frames(false);
|
|
}
|
|
|
|
fn test_no_drops_but_no_fallback_frames(live: bool) {
|
|
let pipeline = setup_pipeline(Some(live), None, None);
|
|
|
|
push_buffer(&pipeline, gst::ClockTime::ZERO);
|
|
// +10ms needed here because the immediate timeout will be always at running time 0, but
|
|
// aggregator also adds the latency to it so we end up at 10ms instead.
|
|
set_time(&pipeline, LATENCY);
|
|
let buffer = pull_buffer(&pipeline);
|
|
assert_buffer!(buffer, Some(gst::ClockTime::ZERO));
|
|
|
|
push_buffer(&pipeline, gst::ClockTime::SECOND);
|
|
set_time(&pipeline, gst::ClockTime::SECOND + LATENCY);
|
|
let buffer = pull_buffer(&pipeline);
|
|
assert_buffer!(buffer, Some(gst::ClockTime::SECOND));
|
|
|
|
push_buffer(&pipeline, 2 * gst::ClockTime::SECOND);
|
|
set_time(&pipeline, 2 * gst::ClockTime::SECOND + LATENCY);
|
|
let buffer = pull_buffer(&pipeline);
|
|
assert_buffer!(buffer, Some(2 * gst::ClockTime::SECOND));
|
|
|
|
// EOS on the fallback should not be required
|
|
push_eos(&pipeline);
|
|
wait_eos(&pipeline);
|
|
|
|
stop_pipeline(pipeline);
|
|
}
|
|
|
|
#[test]
|
|
fn test_short_drop_live() {
|
|
test_short_drop(true);
|
|
}
|
|
|
|
#[test]
|
|
fn test_short_drop_not_live() {
|
|
test_short_drop(false);
|
|
}
|
|
|
|
fn test_short_drop(live: bool) {
|
|
let pipeline = setup_pipeline(Some(live), None, None);
|
|
|
|
push_buffer(&pipeline, gst::ClockTime::ZERO);
|
|
push_fallback_buffer(&pipeline, gst::ClockTime::ZERO);
|
|
set_time(&pipeline, gst::ClockTime::ZERO + LATENCY);
|
|
let buffer = pull_buffer(&pipeline);
|
|
assert_buffer!(buffer, Some(gst::ClockTime::ZERO));
|
|
|
|
// A timeout at 1s will get rid of the fallback buffer
|
|
// but not output anything
|
|
push_fallback_buffer(&pipeline, gst::ClockTime::SECOND);
|
|
// Time out the fallback buffer at +10ms
|
|
set_time(
|
|
&pipeline,
|
|
gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND,
|
|
);
|
|
|
|
push_fallback_buffer(&pipeline, 2 * gst::ClockTime::SECOND);
|
|
push_buffer(&pipeline, 2 * gst::ClockTime::SECOND);
|
|
set_time(&pipeline, 2 * gst::ClockTime::SECOND + LATENCY);
|
|
let buffer = pull_buffer(&pipeline);
|
|
assert_buffer!(buffer, Some(2 * gst::ClockTime::SECOND));
|
|
|
|
push_eos(&pipeline);
|
|
push_fallback_eos(&pipeline);
|
|
wait_eos(&pipeline);
|
|
|
|
stop_pipeline(pipeline);
|
|
}
|
|
|
|
#[test]
|
|
fn test_long_drop_and_eos_live() {
|
|
test_long_drop_and_eos(true);
|
|
}
|
|
|
|
#[test]
|
|
fn test_long_drop_and_eos_not_live() {
|
|
test_long_drop_and_eos(false);
|
|
}
|
|
|
|
fn test_long_drop_and_eos(live: bool) {
|
|
let pipeline = setup_pipeline(Some(live), None, None);
|
|
|
|
// Produce the first frame
|
|
push_buffer(&pipeline, gst::ClockTime::ZERO);
|
|
push_fallback_buffer(&pipeline, gst::ClockTime::ZERO);
|
|
set_time(&pipeline, gst::ClockTime::ZERO);
|
|
let buffer = pull_buffer(&pipeline);
|
|
assert_buffer!(buffer, Some(gst::ClockTime::ZERO));
|
|
|
|
// Produce a second frame but only from the fallback source
|
|
push_fallback_buffer(&pipeline, gst::ClockTime::SECOND);
|
|
set_time(
|
|
&pipeline,
|
|
gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND,
|
|
);
|
|
|
|
// Produce a third frame but only from the fallback source
|
|
push_fallback_buffer(&pipeline, 2 * gst::ClockTime::SECOND);
|
|
set_time(
|
|
&pipeline,
|
|
2 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND,
|
|
);
|
|
|
|
// Produce a fourth frame but only from the fallback source
|
|
// This should be output now
|
|
push_fallback_buffer(&pipeline, 3 * gst::ClockTime::SECOND);
|
|
set_time(
|
|
&pipeline,
|
|
3 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND,
|
|
);
|
|
let buffer = pull_buffer(&pipeline);
|
|
assert_fallback_buffer!(buffer, Some(3 * gst::ClockTime::SECOND));
|
|
|
|
// Produce a fifth frame but only from the fallback source
|
|
// This should be output now
|
|
push_fallback_buffer(&pipeline, 4 * gst::ClockTime::SECOND);
|
|
set_time(
|
|
&pipeline,
|
|
4 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND,
|
|
);
|
|
let buffer = pull_buffer(&pipeline);
|
|
assert_fallback_buffer!(buffer, Some(4 * gst::ClockTime::SECOND));
|
|
|
|
// Wait for EOS to arrive at appsink
|
|
push_eos(&pipeline);
|
|
push_fallback_eos(&pipeline);
|
|
wait_eos(&pipeline);
|
|
|
|
stop_pipeline(pipeline);
|
|
}
|
|
|
|
#[test]
|
|
fn test_long_drop_and_recover_live() {
|
|
test_long_drop_and_recover(true);
|
|
}
|
|
|
|
#[test]
|
|
fn test_long_drop_and_recover_not_live() {
|
|
test_long_drop_and_recover(false);
|
|
}
|
|
|
|
fn test_long_drop_and_recover(live: bool) {
|
|
let pipeline = setup_pipeline(Some(live), None, None);
|
|
let switch = pipeline.by_name("switch").unwrap();
|
|
let mainsink = switch.static_pad("sink_0").unwrap();
|
|
|
|
// Produce the first frame
|
|
push_buffer(&pipeline, gst::ClockTime::ZERO);
|
|
push_fallback_buffer(&pipeline, gst::ClockTime::ZERO);
|
|
set_time(&pipeline, gst::ClockTime::ZERO);
|
|
let buffer = pull_buffer(&pipeline);
|
|
assert_buffer!(buffer, Some(gst::ClockTime::ZERO));
|
|
assert!(mainsink.property::<bool>("is-healthy"));
|
|
|
|
// Produce a second frame but only from the fallback source
|
|
push_fallback_buffer(&pipeline, gst::ClockTime::SECOND);
|
|
set_time(
|
|
&pipeline,
|
|
gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND,
|
|
);
|
|
|
|
// Produce a third frame but only from the fallback source
|
|
push_fallback_buffer(&pipeline, 2 * gst::ClockTime::SECOND);
|
|
set_time(
|
|
&pipeline,
|
|
2 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND,
|
|
);
|
|
|
|
// Produce a fourth frame but only from the fallback source
|
|
// This should be output now
|
|
push_fallback_buffer(&pipeline, 3 * gst::ClockTime::SECOND);
|
|
set_time(
|
|
&pipeline,
|
|
3 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND,
|
|
);
|
|
let buffer = pull_buffer(&pipeline);
|
|
assert_fallback_buffer!(buffer, Some(3 * gst::ClockTime::SECOND));
|
|
|
|
// Produce a fifth frame but only from the fallback source
|
|
// This should be output now
|
|
push_fallback_buffer(&pipeline, 4 * gst::ClockTime::SECOND);
|
|
set_time(
|
|
&pipeline,
|
|
4 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND,
|
|
);
|
|
let buffer = pull_buffer(&pipeline);
|
|
assert_fallback_buffer!(buffer, Some(4 * gst::ClockTime::SECOND));
|
|
|
|
// Produce a sixth frame from the normal source
|
|
push_buffer(&pipeline, 5 * gst::ClockTime::SECOND);
|
|
set_time(
|
|
&pipeline,
|
|
5 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND,
|
|
);
|
|
let buffer = pull_buffer(&pipeline);
|
|
assert_buffer!(buffer, Some(5 * gst::ClockTime::SECOND));
|
|
assert!(!mainsink.property::<bool>("is-healthy"));
|
|
drop(mainsink);
|
|
drop(switch);
|
|
|
|
// Produce a seventh frame from the normal source but no fallback.
|
|
// This should still be output immediately
|
|
push_buffer(&pipeline, 6 * gst::ClockTime::SECOND);
|
|
set_time(
|
|
&pipeline,
|
|
6 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND,
|
|
);
|
|
let buffer = pull_buffer(&pipeline);
|
|
assert_buffer!(buffer, Some(6 * gst::ClockTime::SECOND));
|
|
|
|
// Produce a eight frame from the normal source
|
|
push_buffer(&pipeline, 7 * gst::ClockTime::SECOND);
|
|
push_fallback_buffer(&pipeline, 7 * gst::ClockTime::SECOND);
|
|
set_time(
|
|
&pipeline,
|
|
7 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND,
|
|
);
|
|
let buffer = pull_buffer(&pipeline);
|
|
assert_buffer!(buffer, Some(7 * gst::ClockTime::SECOND));
|
|
|
|
// Wait for EOS to arrive at appsink
|
|
push_eos(&pipeline);
|
|
push_fallback_eos(&pipeline);
|
|
wait_eos(&pipeline);
|
|
|
|
stop_pipeline(pipeline);
|
|
}
|
|
|
|
#[test]
|
|
fn test_initial_timeout_live() {
|
|
test_initial_timeout(true);
|
|
}
|
|
|
|
#[test]
|
|
fn test_initial_timeout_not_live() {
|
|
test_initial_timeout(false);
|
|
}
|
|
|
|
fn test_initial_timeout(live: bool) {
|
|
let pipeline = setup_pipeline(Some(live), None, None);
|
|
|
|
// Produce the first frame but only from the fallback source
|
|
push_fallback_buffer(&pipeline, gst::ClockTime::ZERO);
|
|
set_time(&pipeline, gst::ClockTime::ZERO);
|
|
|
|
// Produce a second frame but only from the fallback source
|
|
push_fallback_buffer(&pipeline, gst::ClockTime::SECOND);
|
|
set_time(
|
|
&pipeline,
|
|
gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND,
|
|
);
|
|
|
|
// Produce a third frame but only from the fallback source
|
|
push_fallback_buffer(&pipeline, 2 * gst::ClockTime::SECOND);
|
|
set_time(
|
|
&pipeline,
|
|
2 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND,
|
|
);
|
|
|
|
// Produce a fourth frame but only from the fallback source
|
|
// This should be output now
|
|
push_fallback_buffer(&pipeline, 3 * gst::ClockTime::SECOND);
|
|
set_time(
|
|
&pipeline,
|
|
3 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND,
|
|
);
|
|
let buffer = pull_buffer(&pipeline);
|
|
assert_fallback_buffer!(buffer, Some(3 * gst::ClockTime::SECOND));
|
|
|
|
// Produce a fifth frame but only from the fallback source
|
|
// This should be output now
|
|
push_fallback_buffer(&pipeline, 4 * gst::ClockTime::SECOND);
|
|
set_time(
|
|
&pipeline,
|
|
4 * gst::ClockTime::SECOND + 10 * gst::ClockTime::MSECOND,
|
|
);
|
|
let buffer = pull_buffer(&pipeline);
|
|
assert_fallback_buffer!(buffer, Some(4 * gst::ClockTime::SECOND));
|
|
|
|
// Wait for EOS to arrive at appsink
|
|
push_eos(&pipeline);
|
|
push_fallback_eos(&pipeline);
|
|
wait_eos(&pipeline);
|
|
|
|
stop_pipeline(pipeline);
|
|
}
|
|
|
|
#[test]
|
|
fn test_immediate_fallback_live() {
|
|
test_immediate_fallback(true);
|
|
}
|
|
|
|
#[test]
|
|
fn test_immediate_fallback_not_live() {
|
|
test_immediate_fallback(false);
|
|
}
|
|
|
|
fn test_immediate_fallback(live: bool) {
|
|
let pipeline = setup_pipeline(Some(live), Some(true), None);
|
|
|
|
// Produce the first frame but only from the fallback source
|
|
push_fallback_buffer(&pipeline, gst::ClockTime::ZERO);
|
|
set_time(&pipeline, gst::ClockTime::ZERO);
|
|
|
|
let buffer = pull_buffer(&pipeline);
|
|
assert_fallback_buffer!(buffer, Some(gst::ClockTime::ZERO));
|
|
|
|
// Wait for EOS to arrive at appsink
|
|
push_eos(&pipeline);
|
|
push_fallback_eos(&pipeline);
|
|
wait_eos(&pipeline);
|
|
|
|
stop_pipeline(pipeline);
|
|
}
|
|
|
|
#[test]
|
|
fn test_manual_switch_live() {
|
|
test_manual_switch(true);
|
|
}
|
|
|
|
#[test]
|
|
fn test_manual_switch_not_live() {
|
|
test_manual_switch(false);
|
|
}
|
|
|
|
fn test_manual_switch(live: bool) {
|
|
let pipeline = setup_pipeline(Some(live), None, Some(false));
|
|
let switch = pipeline.by_name("switch").unwrap();
|
|
let mainsink = switch.static_pad("sink_0").unwrap();
|
|
let fallbacksink = switch.static_pad("sink_1").unwrap();
|
|
|
|
switch.set_property("active-pad", &mainsink);
|
|
push_buffer(&pipeline, gst::ClockTime::ZERO);
|
|
push_fallback_buffer(&pipeline, gst::ClockTime::ZERO);
|
|
set_time(&pipeline, gst::ClockTime::ZERO + LATENCY);
|
|
let buffer = pull_buffer(&pipeline);
|
|
assert_buffer!(buffer, Some(gst::ClockTime::ZERO));
|
|
|
|
switch.set_property("active-pad", &fallbacksink);
|
|
push_fallback_buffer(&pipeline, gst::ClockTime::SECOND);
|
|
push_buffer(&pipeline, gst::ClockTime::SECOND);
|
|
set_time(&pipeline, gst::ClockTime::SECOND + LATENCY);
|
|
let mut buffer = pull_buffer(&pipeline);
|
|
// FIXME: Sometimes we get the ZERO buffer from the fallback sink instead
|
|
if buffer.pts() == Some(gst::ClockTime::ZERO) {
|
|
buffer = pull_buffer(&pipeline);
|
|
}
|
|
assert_fallback_buffer!(buffer, Some(gst::ClockTime::SECOND));
|
|
|
|
switch.set_property("active-pad", &mainsink);
|
|
push_buffer(&pipeline, 2 * gst::ClockTime::SECOND);
|
|
push_fallback_buffer(&pipeline, 2 * gst::ClockTime::SECOND);
|
|
set_time(&pipeline, 2 * gst::ClockTime::SECOND + LATENCY);
|
|
let buffer = pull_buffer(&pipeline);
|
|
assert_buffer!(buffer, Some(2 * gst::ClockTime::SECOND));
|
|
|
|
drop(mainsink);
|
|
drop(fallbacksink);
|
|
drop(switch);
|
|
// EOS on the fallback should not be required
|
|
push_eos(&pipeline);
|
|
wait_eos(&pipeline);
|
|
|
|
stop_pipeline(pipeline);
|
|
}
|
|
|
|
struct Pipeline {
|
|
pipeline: gst::Pipeline,
|
|
clock_join_handle: Option<std::thread::JoinHandle<()>>,
|
|
}
|
|
|
|
impl std::ops::Deref for Pipeline {
|
|
type Target = gst::Pipeline;
|
|
|
|
fn deref(&self) -> &gst::Pipeline {
|
|
&self.pipeline
|
|
}
|
|
}
|
|
|
|
fn setup_pipeline(
|
|
with_live_fallback: Option<bool>,
|
|
immediate_fallback: Option<bool>,
|
|
auto_switch: Option<bool>,
|
|
) -> Pipeline {
|
|
init();
|
|
|
|
debug!(TEST_CAT, "Setting up pipeline");
|
|
|
|
let clock = gst_check::TestClock::new();
|
|
clock.set_time(gst::ClockTime::ZERO);
|
|
let pipeline = gst::Pipeline::new(None);
|
|
|
|
// Running time 0 in our pipeline is going to be clock time 1s. All
|
|
// clock ids before 1s are used for signalling to our clock advancing
|
|
// thread.
|
|
pipeline.use_clock(Some(&clock));
|
|
pipeline.set_base_time(gst::ClockTime::SECOND);
|
|
pipeline.set_start_time(gst::ClockTime::NONE);
|
|
|
|
let src = gst::ElementFactory::make("appsrc", Some("src"))
|
|
.unwrap()
|
|
.downcast::<gst_app::AppSrc>()
|
|
.unwrap();
|
|
src.set_property("is-live", true);
|
|
src.set_property("format", gst::Format::Time);
|
|
src.set_property("min-latency", LATENCY.nseconds() as i64);
|
|
src.set_property(
|
|
"caps",
|
|
gst::Caps::builder("video/x-raw")
|
|
.field("format", "ARGB")
|
|
.field("width", 320i32)
|
|
.field("height", 240i32)
|
|
.field("framerate", gst::Fraction::new(1, 1))
|
|
.build(),
|
|
);
|
|
|
|
let switch = gst::ElementFactory::make("fallbackswitch", Some("switch")).unwrap();
|
|
switch.set_property("timeout", 3 * gst::ClockTime::SECOND);
|
|
if let Some(imm) = immediate_fallback {
|
|
switch.set_property("immediate-fallback", imm);
|
|
}
|
|
if let Some(auto_switch) = auto_switch {
|
|
switch.set_property("auto-switch", auto_switch);
|
|
}
|
|
|
|
let sink = gst::ElementFactory::make("appsink", Some("sink"))
|
|
.unwrap()
|
|
.downcast::<gst_app::AppSink>()
|
|
.unwrap();
|
|
sink.set_property("sync", false);
|
|
|
|
let queue = gst::ElementFactory::make("queue", None).unwrap();
|
|
|
|
pipeline
|
|
.add_many(&[src.upcast_ref(), &switch, &queue, sink.upcast_ref()])
|
|
.unwrap();
|
|
src.link_pads(Some("src"), &switch, Some("sink_0")).unwrap();
|
|
switch.link_pads(Some("src"), &queue, Some("sink")).unwrap();
|
|
queue.link_pads(Some("src"), &sink, Some("sink")).unwrap();
|
|
|
|
if let Some(live) = with_live_fallback {
|
|
let fallback_src = gst::ElementFactory::make("appsrc", Some("fallback-src"))
|
|
.unwrap()
|
|
.downcast::<gst_app::AppSrc>()
|
|
.unwrap();
|
|
fallback_src.set_property("is-live", live);
|
|
fallback_src.set_property("format", gst::Format::Time);
|
|
fallback_src.set_property("min-latency", LATENCY.nseconds() as i64);
|
|
fallback_src.set_property(
|
|
"caps",
|
|
gst::Caps::builder("video/x-raw")
|
|
.field("format", "ARGB")
|
|
.field("width", 160i32)
|
|
.field("height", 120i32)
|
|
.field("framerate", gst::Fraction::new(1, 1))
|
|
.build(),
|
|
);
|
|
|
|
pipeline.add(&fallback_src).unwrap();
|
|
|
|
fallback_src
|
|
.link_pads(Some("src"), &switch, Some("sink_1"))
|
|
.unwrap();
|
|
}
|
|
|
|
pipeline.set_state(gst::State::Playing).unwrap();
|
|
|
|
let clock_join_handle = std::thread::spawn(move || {
|
|
loop {
|
|
while let Some(clock_id) = clock.peek_next_pending_id().and_then(|clock_id| {
|
|
// Process if the clock ID is in the past or now
|
|
if clock.time().map_or(false, |time| time >= clock_id.time()) {
|
|
Some(clock_id)
|
|
} else {
|
|
None
|
|
}
|
|
}) {
|
|
debug!(
|
|
TEST_CAT,
|
|
"Processing clock ID {} at {:?}",
|
|
clock_id.time(),
|
|
clock.time()
|
|
);
|
|
if let Some(clock_id) = clock.process_next_clock_id() {
|
|
debug!(TEST_CAT, "Processed clock ID {}", clock_id.time());
|
|
if clock_id.time().is_zero() {
|
|
debug!(TEST_CAT, "Stopping clock thread");
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Sleep for 5ms as long as we have pending clock IDs that are in the future
|
|
// at the top of the queue. We don't want to do a busy loop here.
|
|
while clock.peek_next_pending_id().iter().any(|clock_id| {
|
|
// Sleep if the clock ID is in the future
|
|
// FIXME probably can expect clock.time()
|
|
clock
|
|
.time()
|
|
.map_or(true, |clock_time| clock_time < clock_id.time())
|
|
}) {
|
|
use std::{thread, time};
|
|
|
|
thread::sleep(time::Duration::from_millis(10));
|
|
}
|
|
|
|
// Otherwise if there are none (or they are ready now) wait until there are
|
|
// clock ids again
|
|
let _ = clock.wait_for_next_pending_id();
|
|
}
|
|
});
|
|
|
|
Pipeline {
|
|
pipeline,
|
|
clock_join_handle: Some(clock_join_handle),
|
|
}
|
|
}
|
|
|
|
fn push_buffer(pipeline: &Pipeline, time: gst::ClockTime) {
|
|
let src = pipeline
|
|
.by_name("src")
|
|
.unwrap()
|
|
.downcast::<gst_app::AppSrc>()
|
|
.unwrap();
|
|
let mut buffer = gst::Buffer::with_size(320 * 240 * 4).unwrap();
|
|
{
|
|
let buffer = buffer.get_mut().unwrap();
|
|
buffer.set_pts(time);
|
|
}
|
|
src.push_buffer(buffer).unwrap();
|
|
}
|
|
|
|
fn push_fallback_buffer(pipeline: &Pipeline, time: gst::ClockTime) {
|
|
let src = pipeline
|
|
.by_name("fallback-src")
|
|
.unwrap()
|
|
.downcast::<gst_app::AppSrc>()
|
|
.unwrap();
|
|
let mut buffer = gst::Buffer::with_size(160 * 120 * 4).unwrap();
|
|
{
|
|
let buffer = buffer.get_mut().unwrap();
|
|
buffer.set_pts(time);
|
|
}
|
|
src.push_buffer(buffer).unwrap();
|
|
}
|
|
|
|
fn push_eos(pipeline: &Pipeline) {
|
|
let src = pipeline
|
|
.by_name("src")
|
|
.unwrap()
|
|
.downcast::<gst_app::AppSrc>()
|
|
.unwrap();
|
|
src.end_of_stream().unwrap();
|
|
}
|
|
|
|
fn push_fallback_eos(pipeline: &Pipeline) {
|
|
let src = pipeline
|
|
.by_name("fallback-src")
|
|
.unwrap()
|
|
.downcast::<gst_app::AppSrc>()
|
|
.unwrap();
|
|
src.end_of_stream().unwrap();
|
|
}
|
|
|
|
fn pull_buffer(pipeline: &Pipeline) -> gst::Buffer {
|
|
let sink = pipeline
|
|
.by_name("sink")
|
|
.unwrap()
|
|
.downcast::<gst_app::AppSink>()
|
|
.unwrap();
|
|
let sample = sink.pull_sample().unwrap();
|
|
sample.buffer_owned().unwrap()
|
|
}
|
|
|
|
fn set_time(pipeline: &Pipeline, time: gst::ClockTime) {
|
|
let clock = pipeline
|
|
.clock()
|
|
.unwrap()
|
|
.downcast::<gst_check::TestClock>()
|
|
.unwrap();
|
|
|
|
debug!(TEST_CAT, "Setting time to {}", time);
|
|
clock.set_time(gst::ClockTime::SECOND + time);
|
|
}
|
|
|
|
fn wait_eos(pipeline: &Pipeline) {
|
|
let sink = pipeline
|
|
.by_name("sink")
|
|
.unwrap()
|
|
.downcast::<gst_app::AppSink>()
|
|
.unwrap();
|
|
// FIXME: Ideally without a sleep
|
|
loop {
|
|
use std::{thread, time};
|
|
|
|
if sink.is_eos() {
|
|
debug!(TEST_CAT, "Waited for EOS");
|
|
break;
|
|
}
|
|
thread::sleep(time::Duration::from_millis(10));
|
|
}
|
|
}
|
|
|
|
fn stop_pipeline(mut pipeline: Pipeline) {
|
|
pipeline.set_state(gst::State::Null).unwrap();
|
|
|
|
let clock = pipeline
|
|
.clock()
|
|
.unwrap()
|
|
.downcast::<gst_check::TestClock>()
|
|
.unwrap();
|
|
|
|
// Signal shutdown to the clock thread
|
|
let clock_id = clock.new_single_shot_id(gst::ClockTime::ZERO);
|
|
let _ = clock_id.wait();
|
|
|
|
let switch = pipeline.by_name("switch").unwrap();
|
|
let switch_weak = switch.downgrade();
|
|
drop(switch);
|
|
let pipeline_weak = pipeline.downgrade();
|
|
|
|
pipeline.clock_join_handle.take().unwrap().join().unwrap();
|
|
drop(pipeline);
|
|
|
|
assert!(switch_weak.upgrade().is_none());
|
|
assert!(pipeline_weak.upgrade().is_none());
|
|
}
|