gst-plugins-rs/gst-plugin-threadshare/tests/appsrc.rs
François Laignel 116cf9bd3c threadshare/*src: rework pause/flush_start/flush_stop
This commit fixes several issues with the `Ts*Src` elements.

The pause functions used cancel_task which breaks the Task loop at await
points. For some elements, this implies making sure no item is being lost.
Moreover, cancelling the Task also cancels downstream processing, which
makes it difficult to ensure elements can handle all cases.

This commit reimplements Task::pause which allows completing the running
loop iteration before pausing the loop.

See https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/277#note_439529

In the Paused state, incoming items were rejected by TsAppSrc and DataQueue.

See https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/277#note_438455

- FlushStart must engage items rejection and cancel the Task.
- FlushStop must purge the internal stream & accept items again.

If the task was cancelled, `push_prelude` could set `need_initial_events`
to `true` when the events weren't actually pushed yet.

TsAppSrc used to renew its internal channel which could cause Buffer loss
when transitionning Playing -> Paused -> Playing.

See https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/issues/98
2020-03-21 18:46:03 +00:00

303 lines
7.4 KiB
Rust

// Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.
use glib::prelude::*;
use gst;
use gst::prelude::*;
use gst_check;
use gstthreadshare;
fn init() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
gst::init().unwrap();
gstthreadshare::plugin_register_static().expect("gstthreadshare appsrc test");
});
}
#[test]
fn push() {
init();
let mut h = gst_check::Harness::new("ts-appsrc");
let caps = gst::Caps::new_simple("foo/bar", &[]);
{
let appsrc = h.get_element().unwrap();
appsrc.set_property("caps", &caps).unwrap();
appsrc.set_property("do-timestamp", &true).unwrap();
appsrc.set_property("context", &"appsrc-push").unwrap();
}
h.play();
{
let appsrc = h.get_element().unwrap();
for _ in 0..3 {
assert!(appsrc
.emit("push-buffer", &[&gst::Buffer::new()])
.unwrap()
.unwrap()
.get_some::<bool>()
.unwrap());
}
assert!(appsrc
.emit("end-of-stream", &[])
.unwrap()
.unwrap()
.get_some::<bool>()
.unwrap());
}
for _ in 0..3 {
let _buffer = h.pull().unwrap();
}
let mut n_events = 0;
loop {
use gst::EventView;
let event = h.pull_event().unwrap();
match event.view() {
EventView::StreamStart(..) => {
assert_eq!(n_events, 0);
}
EventView::Caps(ev) => {
assert_eq!(n_events, 1);
let event_caps = ev.get_caps();
assert_eq!(caps.as_ref(), event_caps);
}
EventView::Segment(..) => {
assert_eq!(n_events, 2);
}
EventView::Eos(..) => {
break;
}
_ => (),
}
n_events += 1;
}
assert!(n_events >= 2);
}
#[test]
fn pause() {
init();
let mut h = gst_check::Harness::new("ts-appsrc");
let caps = gst::Caps::new_simple("foo/bar", &[]);
{
let appsrc = h.get_element().unwrap();
appsrc.set_property("caps", &caps).unwrap();
appsrc.set_property("do-timestamp", &true).unwrap();
appsrc.set_property("context", &"appsrc-pause").unwrap();
}
h.play();
let appsrc = h.get_element().unwrap();
// Initial buffer
assert!(appsrc
.emit("push-buffer", &[&gst::Buffer::from_slice(vec![1, 2, 3, 4])])
.unwrap()
.unwrap()
.get_some::<bool>()
.unwrap());
let _ = h.pull().unwrap();
appsrc
.change_state(gst::StateChange::PlayingToPaused)
.unwrap();
// Pre-pause buffer
assert!(appsrc
.emit("push-buffer", &[&gst::Buffer::from_slice(vec![5, 6, 7])])
.unwrap()
.unwrap()
.get_some::<bool>()
.unwrap());
appsrc
.change_state(gst::StateChange::PlayingToPaused)
.unwrap();
// Buffer is queued during Paused
assert!(appsrc
.emit("push-buffer", &[&gst::Buffer::from_slice(vec![8, 9])])
.unwrap()
.unwrap()
.get_some::<bool>()
.unwrap());
appsrc
.change_state(gst::StateChange::PausedToPlaying)
.unwrap();
// Pull Pre-pause buffer
let _ = h.pull().unwrap();
// Pull buffer queued while Paused
let _ = h.pull().unwrap();
// Can push again
assert!(appsrc
.emit("push-buffer", &[&gst::Buffer::new()])
.unwrap()
.unwrap()
.get_some::<bool>()
.unwrap());
let _ = h.pull().unwrap();
assert!(h.try_pull().is_none());
}
#[test]
fn flush() {
init();
let mut h = gst_check::Harness::new("ts-appsrc");
let caps = gst::Caps::new_simple("foo/bar", &[]);
{
let appsrc = h.get_element().unwrap();
appsrc.set_property("caps", &caps).unwrap();
appsrc.set_property("do-timestamp", &true).unwrap();
appsrc.set_property("context", &"appsrc-flush").unwrap();
}
h.play();
let appsrc = h.get_element().unwrap();
// Initial buffer
assert!(appsrc
.emit("push-buffer", &[&gst::Buffer::from_slice(vec![1, 2, 3, 4])])
.unwrap()
.unwrap()
.get_some::<bool>()
.unwrap());
let _ = h.pull().unwrap();
// FlushStart
assert!(h.push_upstream_event(gst::Event::new_flush_start().build()));
// Can't push buffer while flushing
assert!(!appsrc
.emit("push-buffer", &[&gst::Buffer::new()])
.unwrap()
.unwrap()
.get_some::<bool>()
.unwrap());
assert!(h.try_pull().is_none());
// FlushStop
assert!(h.push_upstream_event(gst::Event::new_flush_stop(true).build()));
// No buffer available due to flush
assert!(h.try_pull().is_none());
// Can push again
assert!(appsrc
.emit("push-buffer", &[&gst::Buffer::new()])
.unwrap()
.unwrap()
.get_some::<bool>()
.unwrap());
let _ = h.pull().unwrap();
assert!(h.try_pull().is_none());
}
#[test]
fn pause_flush() {
init();
let mut h = gst_check::Harness::new("ts-appsrc");
let caps = gst::Caps::new_simple("foo/bar", &[]);
{
let appsrc = h.get_element().unwrap();
appsrc.set_property("caps", &caps).unwrap();
appsrc.set_property("do-timestamp", &true).unwrap();
appsrc
.set_property("context", &"appsrc-pause_flush")
.unwrap();
}
h.play();
let appsrc = h.get_element().unwrap();
// Initial buffer
assert!(appsrc
.emit("push-buffer", &[&gst::Buffer::from_slice(vec![1, 2, 3, 4])])
.unwrap()
.unwrap()
.get_some::<bool>()
.unwrap());
let _ = h.pull().unwrap();
appsrc
.change_state(gst::StateChange::PlayingToPaused)
.unwrap();
// FlushStart
assert!(h.push_upstream_event(gst::Event::new_flush_start().build()));
// Can't push buffer while flushing
assert!(!appsrc
.emit("push-buffer", &[&gst::Buffer::new()])
.unwrap()
.unwrap()
.get_some::<bool>()
.unwrap());
assert!(h.try_pull().is_none());
// FlushStop
assert!(h.push_upstream_event(gst::Event::new_flush_stop(true).build()));
appsrc
.change_state(gst::StateChange::PausedToPlaying)
.unwrap();
// No buffer available due to flush
assert!(h.try_pull().is_none());
// Can push again
assert!(appsrc
.emit("push-buffer", &[&gst::Buffer::new()])
.unwrap()
.unwrap()
.get_some::<bool>()
.unwrap());
let _ = h.pull().unwrap();
assert!(h.try_pull().is_none());
}