2019-11-24 20:12:40 +00:00
|
|
|
// Copyright (C) 2019 François Laignel <fengalin@free.fr>
|
|
|
|
//
|
|
|
|
// This library is free software; you can redistribute it and/or
|
|
|
|
// modify it under the terms of the GNU Library General Public
|
|
|
|
// License as published by the Free Software Foundation; either
|
|
|
|
// version 2 of the License, or (at your option) any later version.
|
|
|
|
//
|
|
|
|
// This library is distributed in the hope that it will be useful,
|
|
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
|
|
// Library General Public License for more details.
|
|
|
|
//
|
|
|
|
// You should have received a copy of the GNU Library General Public
|
|
|
|
// License along with this library; if not, write to the
|
|
|
|
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
|
|
|
|
// Boston, MA 02110-1335, USA.
|
|
|
|
|
2019-12-02 09:30:07 +00:00
|
|
|
use gst::prelude::*;
|
|
|
|
use gst::{gst_debug, gst_error};
|
|
|
|
|
|
|
|
use lazy_static::lazy_static;
|
|
|
|
|
|
|
|
use std::sync::mpsc;
|
2019-11-24 20:12:40 +00:00
|
|
|
|
2019-12-02 09:30:07 +00:00
|
|
|
lazy_static! {
|
|
|
|
static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
|
|
|
|
"ts-test",
|
|
|
|
gst::DebugColorFlags::empty(),
|
|
|
|
Some("Thread-sharing test"),
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
2019-11-24 20:12:40 +00:00
|
|
|
fn init() {
|
|
|
|
use std::sync::Once;
|
|
|
|
static INIT: Once = Once::new();
|
|
|
|
|
|
|
|
INIT.call_once(|| {
|
|
|
|
gst::init().unwrap();
|
|
|
|
gstthreadshare::plugin_register_static().expect("gstthreadshare pipeline test");
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
2019-12-02 09:30:07 +00:00
|
|
|
fn multiple_contexts_queue() {
|
2019-11-24 20:12:40 +00:00
|
|
|
use std::net;
|
|
|
|
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
2019-12-02 09:30:07 +00:00
|
|
|
use std::sync::mpsc;
|
2019-11-24 20:12:40 +00:00
|
|
|
|
|
|
|
init();
|
|
|
|
|
|
|
|
const CONTEXT_NB: u32 = 2;
|
|
|
|
const SRC_NB: u16 = 4;
|
|
|
|
const CONTEXT_WAIT: u32 = 1;
|
|
|
|
const BUFFER_NB: u32 = 3;
|
2019-12-02 09:30:07 +00:00
|
|
|
const FIRST_PORT: u16 = 40000;
|
2019-11-24 20:12:40 +00:00
|
|
|
|
|
|
|
let l = glib::MainLoop::new(None, false);
|
|
|
|
let pipeline = gst::Pipeline::new(None);
|
|
|
|
|
2019-12-02 09:30:07 +00:00
|
|
|
let (sender, receiver) = mpsc::channel();
|
2019-11-24 20:12:40 +00:00
|
|
|
|
|
|
|
for i in 0..SRC_NB {
|
|
|
|
let src =
|
|
|
|
gst::ElementFactory::make("ts-udpsrc", Some(format!("src-{}", i).as_str())).unwrap();
|
|
|
|
src.set_property("context", &format!("context-{}", (i as u32) % CONTEXT_NB))
|
|
|
|
.unwrap();
|
|
|
|
src.set_property("context-wait", &CONTEXT_WAIT).unwrap();
|
2020-04-27 09:22:26 +00:00
|
|
|
src.set_property("port", &((FIRST_PORT + i) as i32))
|
2019-12-02 09:30:07 +00:00
|
|
|
.unwrap();
|
2019-11-24 20:12:40 +00:00
|
|
|
|
|
|
|
let queue =
|
|
|
|
gst::ElementFactory::make("ts-queue", Some(format!("queue-{}", i).as_str())).unwrap();
|
|
|
|
queue
|
|
|
|
.set_property("context", &format!("context-{}", (i as u32) % CONTEXT_NB))
|
|
|
|
.unwrap();
|
|
|
|
queue.set_property("context-wait", &CONTEXT_WAIT).unwrap();
|
|
|
|
|
2019-12-02 09:30:07 +00:00
|
|
|
let sink =
|
|
|
|
gst::ElementFactory::make("appsink", Some(format!("sink-{}", i).as_str())).unwrap();
|
|
|
|
sink.set_property("sync", &false).unwrap();
|
|
|
|
sink.set_property("async", &false).unwrap();
|
|
|
|
sink.set_property("emit-signals", &true).unwrap();
|
2019-11-24 20:12:40 +00:00
|
|
|
|
2019-12-02 09:30:07 +00:00
|
|
|
pipeline.add_many(&[&src, &queue, &sink]).unwrap();
|
|
|
|
gst::Element::link_many(&[&src, &queue, &sink]).unwrap();
|
2019-11-24 20:12:40 +00:00
|
|
|
|
2019-12-02 09:30:07 +00:00
|
|
|
let appsink = sink.dynamic_cast::<gst_app::AppSink>().unwrap();
|
|
|
|
let sender_clone = sender.clone();
|
|
|
|
appsink.connect_new_sample(move |appsink| {
|
|
|
|
let _sample = appsink
|
|
|
|
.emit("pull-sample", &[])
|
|
|
|
.unwrap()
|
|
|
|
.unwrap()
|
|
|
|
.get::<gst::Sample>()
|
|
|
|
.unwrap()
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
sender_clone.send(()).unwrap();
|
|
|
|
Ok(gst::FlowSuccess::Ok)
|
|
|
|
});
|
2019-11-24 20:12:40 +00:00
|
|
|
}
|
|
|
|
|
2019-12-02 09:30:07 +00:00
|
|
|
let pipeline_clone = pipeline.clone();
|
|
|
|
let l_clone = l.clone();
|
|
|
|
let mut test_scenario = Some(move || {
|
|
|
|
let buffer = [0; 160];
|
|
|
|
let socket = net::UdpSocket::bind("0.0.0.0:0").unwrap();
|
|
|
|
|
|
|
|
let ipaddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
|
|
|
|
let destinations = (FIRST_PORT..(FIRST_PORT + SRC_NB))
|
|
|
|
.map(|port| SocketAddr::new(ipaddr, port))
|
|
|
|
.collect::<Vec<_>>();
|
|
|
|
|
|
|
|
for _ in 0..BUFFER_NB {
|
|
|
|
for dest in &destinations {
|
|
|
|
gst_debug!(CAT, "multiple_contexts_queue: sending buffer to {:?}", dest);
|
|
|
|
socket.send_to(&buffer, dest).unwrap();
|
|
|
|
std::thread::sleep(std::time::Duration::from_millis(CONTEXT_WAIT as u64));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
gst_debug!(
|
|
|
|
CAT,
|
|
|
|
"multiple_contexts_queue: waiting for all buffers notifications"
|
|
|
|
);
|
|
|
|
for _ in 0..(BUFFER_NB * (SRC_NB as u32)) {
|
|
|
|
receiver.recv().unwrap();
|
|
|
|
}
|
|
|
|
|
|
|
|
pipeline_clone.set_state(gst::State::Null).unwrap();
|
|
|
|
l_clone.quit();
|
|
|
|
});
|
|
|
|
|
2019-11-24 20:12:40 +00:00
|
|
|
let bus = pipeline.get_bus().unwrap();
|
|
|
|
let l_clone = l.clone();
|
|
|
|
bus.add_watch(move |_, msg| {
|
|
|
|
use gst::MessageView;
|
|
|
|
|
|
|
|
match msg.view() {
|
2019-12-02 09:30:07 +00:00
|
|
|
MessageView::StateChanged(state_changed) => {
|
|
|
|
if let Some(source) = state_changed.get_src() {
|
2019-12-22 09:18:30 +00:00
|
|
|
if source.get_type() == gst::Pipeline::static_type()
|
|
|
|
&& state_changed.get_old() == gst::State::Paused
|
|
|
|
&& state_changed.get_current() == gst::State::Playing
|
|
|
|
{
|
|
|
|
if let Some(test_scenario) = test_scenario.take() {
|
|
|
|
std::thread::spawn(test_scenario);
|
2019-12-02 09:30:07 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2019-11-24 20:12:40 +00:00
|
|
|
MessageView::Error(err) => {
|
2019-12-02 09:30:07 +00:00
|
|
|
gst_error!(
|
|
|
|
CAT,
|
|
|
|
"multiple_contexts_queue: Error from {:?}: {} ({:?})",
|
2019-11-24 20:12:40 +00:00
|
|
|
err.get_src().map(|s| s.get_path_string()),
|
|
|
|
err.get_error(),
|
|
|
|
err.get_debug()
|
|
|
|
);
|
|
|
|
l_clone.quit();
|
|
|
|
}
|
|
|
|
_ => (),
|
|
|
|
};
|
|
|
|
|
|
|
|
glib::Continue(true)
|
2019-11-30 18:51:31 +00:00
|
|
|
})
|
|
|
|
.unwrap();
|
2019-11-24 20:12:40 +00:00
|
|
|
|
2019-12-02 09:30:07 +00:00
|
|
|
pipeline.set_state(gst::State::Playing).unwrap();
|
|
|
|
|
|
|
|
gst_debug!(CAT, "Starting main loop for multiple_contexts_queue...");
|
|
|
|
l.run();
|
|
|
|
gst_debug!(CAT, "Stopping main loop for multiple_contexts_queue...");
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn multiple_contexts_proxy() {
|
|
|
|
use std::net;
|
|
|
|
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
|
|
|
|
|
|
|
init();
|
|
|
|
|
|
|
|
const CONTEXT_NB: u32 = 2;
|
|
|
|
const SRC_NB: u16 = 4;
|
|
|
|
const CONTEXT_WAIT: u32 = 1;
|
|
|
|
const BUFFER_NB: u32 = 3;
|
|
|
|
// Don't overlap with `multiple_contexts_queue`
|
|
|
|
const OFFSET: u16 = 10;
|
|
|
|
const FIRST_PORT: u16 = 40000 + OFFSET;
|
|
|
|
|
|
|
|
let l = glib::MainLoop::new(None, false);
|
|
|
|
let pipeline = gst::Pipeline::new(None);
|
|
|
|
|
|
|
|
let (sender, receiver) = mpsc::channel();
|
|
|
|
|
|
|
|
for i in 0..SRC_NB {
|
|
|
|
let pipeline_index = i + OFFSET;
|
|
|
|
|
|
|
|
let src = gst::ElementFactory::make(
|
|
|
|
"ts-udpsrc",
|
|
|
|
Some(format!("src-{}", pipeline_index).as_str()),
|
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
src.set_property("context", &format!("context-{}", (i as u32) % CONTEXT_NB))
|
|
|
|
.unwrap();
|
|
|
|
src.set_property("context-wait", &CONTEXT_WAIT).unwrap();
|
2020-04-27 09:22:26 +00:00
|
|
|
src.set_property("port", &((FIRST_PORT + i) as i32))
|
2019-12-02 09:30:07 +00:00
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
let proxysink = gst::ElementFactory::make(
|
|
|
|
"ts-proxysink",
|
|
|
|
Some(format!("proxysink-{}", pipeline_index).as_str()),
|
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
proxysink
|
|
|
|
.set_property("proxy-context", &format!("proxy-{}", pipeline_index))
|
|
|
|
.unwrap();
|
|
|
|
let proxysrc = gst::ElementFactory::make(
|
|
|
|
"ts-proxysrc",
|
|
|
|
Some(format!("proxysrc-{}", pipeline_index).as_str()),
|
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
proxysrc
|
|
|
|
.set_property(
|
|
|
|
"context",
|
|
|
|
&format!("context-{}", (pipeline_index as u32) % CONTEXT_NB),
|
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
proxysrc
|
|
|
|
.set_property("proxy-context", &format!("proxy-{}", pipeline_index))
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
let sink =
|
|
|
|
gst::ElementFactory::make("appsink", Some(format!("sink-{}", pipeline_index).as_str()))
|
|
|
|
.unwrap();
|
|
|
|
sink.set_property("sync", &false).unwrap();
|
|
|
|
sink.set_property("async", &false).unwrap();
|
|
|
|
sink.set_property("emit-signals", &true).unwrap();
|
|
|
|
|
|
|
|
pipeline
|
|
|
|
.add_many(&[&src, &proxysink, &proxysrc, &sink])
|
|
|
|
.unwrap();
|
|
|
|
src.link(&proxysink).unwrap();
|
|
|
|
proxysrc.link(&sink).unwrap();
|
|
|
|
|
|
|
|
let appsink = sink.dynamic_cast::<gst_app::AppSink>().unwrap();
|
|
|
|
let sender_clone = sender.clone();
|
|
|
|
appsink.connect_new_sample(move |appsink| {
|
|
|
|
let _sample = appsink
|
|
|
|
.emit("pull-sample", &[])
|
|
|
|
.unwrap()
|
|
|
|
.unwrap()
|
|
|
|
.get::<gst::Sample>()
|
|
|
|
.unwrap()
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
sender_clone.send(()).unwrap();
|
|
|
|
Ok(gst::FlowSuccess::Ok)
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2019-11-24 20:12:40 +00:00
|
|
|
let pipeline_clone = pipeline.clone();
|
|
|
|
let l_clone = l.clone();
|
2019-12-02 09:30:07 +00:00
|
|
|
let mut test_scenario = Some(move || {
|
2019-11-24 20:12:40 +00:00
|
|
|
let buffer = [0; 160];
|
|
|
|
let socket = net::UdpSocket::bind("0.0.0.0:0").unwrap();
|
|
|
|
|
|
|
|
let ipaddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
|
2019-12-02 09:30:07 +00:00
|
|
|
let destinations = (FIRST_PORT..(FIRST_PORT + SRC_NB))
|
2019-11-24 20:12:40 +00:00
|
|
|
.map(|port| SocketAddr::new(ipaddr, port))
|
|
|
|
.collect::<Vec<_>>();
|
|
|
|
|
|
|
|
for _ in 0..BUFFER_NB {
|
|
|
|
for dest in &destinations {
|
2019-12-02 09:30:07 +00:00
|
|
|
gst_debug!(CAT, "multiple_contexts_proxy: sending buffer to {:?}", dest);
|
2019-11-24 20:12:40 +00:00
|
|
|
socket.send_to(&buffer, dest).unwrap();
|
2019-12-02 09:30:07 +00:00
|
|
|
std::thread::sleep(std::time::Duration::from_millis(CONTEXT_WAIT as u64));
|
2019-11-24 20:12:40 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-12-02 09:30:07 +00:00
|
|
|
gst_debug!(
|
|
|
|
CAT,
|
|
|
|
"multiple_contexts_proxy: waiting for all buffers notifications"
|
|
|
|
);
|
|
|
|
for _ in 0..(BUFFER_NB * (SRC_NB as u32)) {
|
|
|
|
receiver.recv().unwrap();
|
|
|
|
}
|
2019-11-24 20:12:40 +00:00
|
|
|
|
|
|
|
pipeline_clone.set_state(gst::State::Null).unwrap();
|
|
|
|
l_clone.quit();
|
|
|
|
});
|
|
|
|
|
2019-12-02 09:30:07 +00:00
|
|
|
let bus = pipeline.get_bus().unwrap();
|
|
|
|
let l_clone = l.clone();
|
|
|
|
bus.add_watch(move |_, msg| {
|
|
|
|
use gst::MessageView;
|
|
|
|
|
|
|
|
match msg.view() {
|
|
|
|
MessageView::StateChanged(state_changed) => {
|
|
|
|
if let Some(source) = state_changed.get_src() {
|
2019-12-22 09:18:30 +00:00
|
|
|
if source.get_type() == gst::Pipeline::static_type()
|
|
|
|
&& state_changed.get_old() == gst::State::Paused
|
|
|
|
&& state_changed.get_current() == gst::State::Playing
|
|
|
|
{
|
|
|
|
if let Some(test_scenario) = test_scenario.take() {
|
|
|
|
std::thread::spawn(test_scenario);
|
2019-12-02 09:30:07 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
MessageView::Error(err) => {
|
|
|
|
gst_error!(
|
|
|
|
CAT,
|
|
|
|
"multiple_contexts_proxy: Error from {:?}: {} ({:?})",
|
|
|
|
err.get_src().map(|s| s.get_path_string()),
|
|
|
|
err.get_error(),
|
|
|
|
err.get_debug()
|
|
|
|
);
|
|
|
|
l_clone.quit();
|
|
|
|
}
|
|
|
|
_ => (),
|
|
|
|
};
|
2019-11-24 20:12:40 +00:00
|
|
|
|
2019-12-02 09:30:07 +00:00
|
|
|
glib::Continue(true)
|
2019-11-30 18:51:31 +00:00
|
|
|
})
|
|
|
|
.unwrap();
|
2019-11-24 20:12:40 +00:00
|
|
|
|
2019-12-02 09:30:07 +00:00
|
|
|
pipeline.set_state(gst::State::Playing).unwrap();
|
|
|
|
|
|
|
|
gst_debug!(CAT, "Starting main loop for multiple_contexts_proxy...");
|
2019-11-24 20:12:40 +00:00
|
|
|
l.run();
|
2019-12-02 09:30:07 +00:00
|
|
|
gst_debug!(CAT, "Stopping main loop for multiple_contexts_proxy...");
|
2019-11-24 20:12:40 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
2019-12-02 09:30:07 +00:00
|
|
|
fn eos() {
|
|
|
|
const CONTEXT: &str = "test_eos";
|
2019-11-24 20:12:40 +00:00
|
|
|
|
|
|
|
init();
|
|
|
|
|
|
|
|
let l = glib::MainLoop::new(None, false);
|
|
|
|
let pipeline = gst::Pipeline::new(None);
|
|
|
|
|
|
|
|
let caps = gst::Caps::new_simple("foo/bar", &[]);
|
|
|
|
|
2019-12-02 09:30:07 +00:00
|
|
|
let src = gst::ElementFactory::make("ts-appsrc", Some("src-eos")).unwrap();
|
|
|
|
src.set_property("caps", &caps).unwrap();
|
|
|
|
src.set_property("do-timestamp", &true).unwrap();
|
|
|
|
src.set_property("context", &CONTEXT).unwrap();
|
|
|
|
|
|
|
|
let queue = gst::ElementFactory::make("ts-queue", Some("queue-eos")).unwrap();
|
|
|
|
queue.set_property("context", &CONTEXT).unwrap();
|
|
|
|
|
|
|
|
let appsink = gst::ElementFactory::make("appsink", Some("sink-eos")).unwrap();
|
|
|
|
|
|
|
|
pipeline.add_many(&[&src, &queue, &appsink]).unwrap();
|
|
|
|
gst::Element::link_many(&[&src, &queue, &appsink]).unwrap();
|
|
|
|
|
|
|
|
appsink.set_property("sync", &false).unwrap();
|
|
|
|
appsink.set_property("async", &false).unwrap();
|
|
|
|
|
|
|
|
appsink.set_property("emit-signals", &true).unwrap();
|
|
|
|
let (sample_notifier, sample_notif_rcv) = mpsc::channel();
|
|
|
|
let (eos_notifier, eos_notif_rcv) = mpsc::channel();
|
|
|
|
let appsink = appsink.dynamic_cast::<gst_app::AppSink>().unwrap();
|
|
|
|
appsink.connect_new_sample(move |appsink| {
|
|
|
|
gst_debug!(CAT, obj: appsink, "eos: pulling sample");
|
|
|
|
let _ = appsink
|
|
|
|
.emit("pull-sample", &[])
|
|
|
|
.unwrap()
|
|
|
|
.unwrap()
|
|
|
|
.get::<gst::Sample>()
|
|
|
|
.unwrap()
|
|
|
|
.unwrap();
|
2019-11-24 20:12:40 +00:00
|
|
|
|
2019-12-02 09:30:07 +00:00
|
|
|
sample_notifier.send(()).unwrap();
|
2019-11-24 20:12:40 +00:00
|
|
|
|
2019-12-02 09:30:07 +00:00
|
|
|
Ok(gst::FlowSuccess::Ok)
|
|
|
|
});
|
2019-11-24 20:12:40 +00:00
|
|
|
|
2019-12-02 09:30:07 +00:00
|
|
|
appsink.connect_eos(move |_appsink| eos_notifier.send(()).unwrap());
|
2019-11-24 20:12:40 +00:00
|
|
|
|
2019-12-02 09:30:07 +00:00
|
|
|
fn push_buffer(src: &gst::Element) -> bool {
|
|
|
|
gst_debug!(CAT, obj: src, "eos: pushing buffer");
|
|
|
|
src.emit("push-buffer", &[&gst::Buffer::from_slice(vec![0; 1024])])
|
|
|
|
.unwrap()
|
|
|
|
.unwrap()
|
|
|
|
.get_some::<bool>()
|
|
|
|
.unwrap()
|
|
|
|
}
|
|
|
|
|
|
|
|
let pipeline_clone = pipeline.clone();
|
2019-11-24 20:12:40 +00:00
|
|
|
let l_clone = l.clone();
|
2019-12-02 09:30:07 +00:00
|
|
|
let mut scenario = Some(move || {
|
|
|
|
// Initialize the dataflow
|
|
|
|
assert!(push_buffer(&src));
|
|
|
|
|
|
|
|
sample_notif_rcv.recv().unwrap();
|
|
|
|
|
|
|
|
assert!(src
|
|
|
|
.emit("end-of-stream", &[])
|
|
|
|
.unwrap()
|
|
|
|
.unwrap()
|
|
|
|
.get_some::<bool>()
|
|
|
|
.unwrap());
|
|
|
|
|
|
|
|
eos_notif_rcv.recv().unwrap();
|
|
|
|
|
2020-04-20 19:35:06 +00:00
|
|
|
// FIXME not ideal, but better than previous approach.
|
|
|
|
// I think the "end-of-stream" signal should block
|
|
|
|
// until the **src** element has actually reached EOS
|
|
|
|
loop {
|
|
|
|
std::thread::sleep(std::time::Duration::from_millis(10));
|
|
|
|
if !push_buffer(&src) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
2019-12-02 09:30:07 +00:00
|
|
|
|
|
|
|
pipeline_clone.set_state(gst::State::Null).unwrap();
|
|
|
|
l_clone.quit();
|
|
|
|
});
|
|
|
|
|
|
|
|
let l_clone = l.clone();
|
2019-11-30 18:51:31 +00:00
|
|
|
pipeline
|
|
|
|
.get_bus()
|
|
|
|
.unwrap()
|
|
|
|
.add_watch(move |_, msg| {
|
|
|
|
use gst::MessageView;
|
|
|
|
|
|
|
|
match msg.view() {
|
|
|
|
MessageView::StateChanged(state_changed) => {
|
|
|
|
if let Some(source) = state_changed.get_src() {
|
|
|
|
if source.get_type() != gst::Pipeline::static_type() {
|
|
|
|
return glib::Continue(true);
|
|
|
|
}
|
|
|
|
if state_changed.get_old() == gst::State::Paused
|
|
|
|
&& state_changed.get_current() == gst::State::Playing
|
|
|
|
{
|
|
|
|
if let Some(scenario) = scenario.take() {
|
|
|
|
std::thread::spawn(scenario);
|
|
|
|
}
|
2019-12-02 09:30:07 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2019-11-30 18:51:31 +00:00
|
|
|
MessageView::Error(err) => {
|
|
|
|
gst_error!(
|
|
|
|
CAT,
|
|
|
|
"eos: Error from {:?}: {} ({:?})",
|
|
|
|
err.get_src().map(|s| s.get_path_string()),
|
|
|
|
err.get_error(),
|
|
|
|
err.get_debug()
|
|
|
|
);
|
|
|
|
l_clone.quit();
|
|
|
|
}
|
|
|
|
_ => (),
|
|
|
|
};
|
2019-11-24 20:12:40 +00:00
|
|
|
|
2019-11-30 18:51:31 +00:00
|
|
|
glib::Continue(true)
|
|
|
|
})
|
|
|
|
.unwrap();
|
2019-11-24 20:12:40 +00:00
|
|
|
|
2019-12-02 09:30:07 +00:00
|
|
|
pipeline.set_state(gst::State::Playing).unwrap();
|
|
|
|
|
|
|
|
gst_debug!(CAT, "Starting main loop for eos...");
|
|
|
|
l.run();
|
|
|
|
gst_debug!(CAT, "Stopping main loop for eos...");
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn premature_shutdown() {
|
|
|
|
init();
|
|
|
|
|
|
|
|
const APPSRC_CONTEXT_WAIT: u32 = 0;
|
|
|
|
const QUEUE_CONTEXT_WAIT: u32 = 1;
|
|
|
|
const QUEUE_ITEMS_CAPACITY: u32 = 1;
|
|
|
|
|
|
|
|
let l = glib::MainLoop::new(None, false);
|
|
|
|
let pipeline = gst::Pipeline::new(None);
|
|
|
|
|
|
|
|
let caps = gst::Caps::new_simple("foo/bar", &[]);
|
|
|
|
|
|
|
|
let src = gst::ElementFactory::make("ts-appsrc", Some("src-ps")).unwrap();
|
|
|
|
src.set_property("caps", &caps).unwrap();
|
|
|
|
src.set_property("do-timestamp", &true).unwrap();
|
|
|
|
src.set_property("context", &"appsrc-context").unwrap();
|
|
|
|
src.set_property("context-wait", &APPSRC_CONTEXT_WAIT)
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
let queue = gst::ElementFactory::make("ts-queue", Some("queue-ps")).unwrap();
|
|
|
|
queue.set_property("context", &"queue-context").unwrap();
|
|
|
|
queue
|
|
|
|
.set_property("context-wait", &QUEUE_CONTEXT_WAIT)
|
|
|
|
.unwrap();
|
|
|
|
queue
|
|
|
|
.set_property("max-size-buffers", &QUEUE_ITEMS_CAPACITY)
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
let appsink = gst::ElementFactory::make("appsink", Some("sink-ps")).unwrap();
|
|
|
|
|
|
|
|
pipeline.add_many(&[&src, &queue, &appsink]).unwrap();
|
|
|
|
gst::Element::link_many(&[&src, &queue, &appsink]).unwrap();
|
|
|
|
|
|
|
|
appsink.set_property("emit-signals", &true).unwrap();
|
|
|
|
appsink.set_property("sync", &false).unwrap();
|
|
|
|
appsink.set_property("async", &false).unwrap();
|
|
|
|
|
2020-03-19 18:34:51 +00:00
|
|
|
let (appsink_sender, appsink_receiver) = mpsc::channel();
|
2019-12-02 09:30:07 +00:00
|
|
|
|
|
|
|
let appsink = appsink.dynamic_cast::<gst_app::AppSink>().unwrap();
|
|
|
|
appsink.connect_new_sample(move |appsink| {
|
|
|
|
gst_debug!(CAT, obj: appsink, "premature_shutdown: pulling sample");
|
|
|
|
let _sample = appsink
|
|
|
|
.emit("pull-sample", &[])
|
|
|
|
.unwrap()
|
|
|
|
.unwrap()
|
|
|
|
.get::<gst::Sample>()
|
|
|
|
.unwrap()
|
|
|
|
.unwrap();
|
|
|
|
|
2020-03-19 18:34:51 +00:00
|
|
|
appsink_sender.send(()).unwrap();
|
2019-12-02 09:30:07 +00:00
|
|
|
|
|
|
|
Ok(gst::FlowSuccess::Ok)
|
|
|
|
});
|
|
|
|
|
2020-03-19 18:34:51 +00:00
|
|
|
fn push_buffer(src: &gst::Element, intent: &str) -> bool {
|
|
|
|
gst_debug!(
|
|
|
|
CAT,
|
|
|
|
obj: src,
|
|
|
|
"premature_shutdown: pushing buffer {}",
|
|
|
|
intent
|
|
|
|
);
|
2019-12-02 09:30:07 +00:00
|
|
|
src.emit("push-buffer", &[&gst::Buffer::from_slice(vec![0; 1024])])
|
|
|
|
.unwrap()
|
|
|
|
.unwrap()
|
|
|
|
.get_some::<bool>()
|
|
|
|
.unwrap()
|
|
|
|
}
|
|
|
|
|
2019-11-24 20:12:40 +00:00
|
|
|
let pipeline_clone = pipeline.clone();
|
|
|
|
let l_clone = l.clone();
|
2019-12-02 09:30:07 +00:00
|
|
|
let mut scenario = Some(move || {
|
|
|
|
gst_debug!(CAT, "premature_shutdown: STEP 1: Playing");
|
|
|
|
// Initialize the dataflow
|
2020-03-19 18:34:51 +00:00
|
|
|
assert!(push_buffer(&src, "(initial)"));
|
2019-12-02 09:30:07 +00:00
|
|
|
|
|
|
|
// Wait for the buffer to reach AppSink
|
2020-03-19 18:34:51 +00:00
|
|
|
appsink_receiver.recv().unwrap();
|
|
|
|
assert_eq!(
|
|
|
|
appsink_receiver.try_recv().unwrap_err(),
|
|
|
|
mpsc::TryRecvError::Empty
|
|
|
|
);
|
2019-12-02 09:30:07 +00:00
|
|
|
|
2020-03-19 18:34:51 +00:00
|
|
|
assert!(push_buffer(&src, "before Playing -> Paused"));
|
2019-12-02 09:30:07 +00:00
|
|
|
|
2020-03-19 18:34:51 +00:00
|
|
|
gst_debug!(CAT, "premature_shutdown: STEP 2: Playing -> Paused");
|
2019-12-02 09:30:07 +00:00
|
|
|
pipeline_clone.set_state(gst::State::Paused).unwrap();
|
|
|
|
|
2020-03-19 18:34:51 +00:00
|
|
|
gst_debug!(CAT, "premature_shutdown: STEP 3: Paused -> Playing");
|
2019-12-02 09:30:07 +00:00
|
|
|
pipeline_clone.set_state(gst::State::Playing).unwrap();
|
|
|
|
|
2020-03-19 18:34:51 +00:00
|
|
|
gst_debug!(CAT, "premature_shutdown: Playing again");
|
2019-12-02 09:30:07 +00:00
|
|
|
|
2020-03-19 18:34:51 +00:00
|
|
|
gst_debug!(CAT, "Waiting for buffer sent before Playing -> Paused");
|
|
|
|
appsink_receiver.recv().unwrap();
|
2019-12-02 09:30:07 +00:00
|
|
|
|
2020-03-19 18:34:51 +00:00
|
|
|
assert!(push_buffer(&src, "after Paused -> Playing"));
|
|
|
|
gst_debug!(CAT, "Waiting for buffer sent after Paused -> Playing");
|
|
|
|
appsink_receiver.recv().unwrap();
|
2019-11-24 20:12:40 +00:00
|
|
|
|
2019-12-02 09:30:07 +00:00
|
|
|
// Fill up the (dataqueue) and abruptly shutdown
|
2020-03-19 18:34:51 +00:00
|
|
|
assert!(push_buffer(&src, "filling 1"));
|
|
|
|
assert!(push_buffer(&src, "filling 2"));
|
2019-12-02 09:30:07 +00:00
|
|
|
|
2020-03-19 18:34:51 +00:00
|
|
|
gst_debug!(CAT, "premature_shutdown: STEP 4: Playing -> Null");
|
2019-12-02 09:30:07 +00:00
|
|
|
|
|
|
|
pipeline_clone.set_state(gst::State::Null).unwrap();
|
|
|
|
|
2020-03-19 18:34:51 +00:00
|
|
|
assert!(!push_buffer(&src, "after Null"));
|
2019-12-02 09:30:07 +00:00
|
|
|
|
|
|
|
l_clone.quit();
|
|
|
|
});
|
|
|
|
|
|
|
|
let l_clone = l.clone();
|
2019-11-30 18:51:31 +00:00
|
|
|
pipeline
|
|
|
|
.get_bus()
|
|
|
|
.unwrap()
|
|
|
|
.add_watch(move |_, msg| {
|
|
|
|
use gst::MessageView;
|
|
|
|
|
|
|
|
match msg.view() {
|
|
|
|
MessageView::StateChanged(state_changed) => {
|
|
|
|
if let Some(source) = state_changed.get_src() {
|
|
|
|
if source.get_type() != gst::Pipeline::static_type() {
|
|
|
|
return glib::Continue(true);
|
|
|
|
}
|
|
|
|
if state_changed.get_old() == gst::State::Paused
|
|
|
|
&& state_changed.get_current() == gst::State::Playing
|
|
|
|
{
|
|
|
|
if let Some(scenario) = scenario.take() {
|
|
|
|
std::thread::spawn(scenario);
|
|
|
|
}
|
2019-12-02 09:30:07 +00:00
|
|
|
}
|
|
|
|
}
|
2019-11-24 20:12:40 +00:00
|
|
|
}
|
2019-11-30 18:51:31 +00:00
|
|
|
MessageView::Error(err) => {
|
|
|
|
gst_error!(
|
|
|
|
CAT,
|
|
|
|
"premature_shutdown: Error from {:?}: {} ({:?})",
|
|
|
|
err.get_src().map(|s| s.get_path_string()),
|
|
|
|
err.get_error(),
|
|
|
|
err.get_debug()
|
|
|
|
);
|
|
|
|
l_clone.quit();
|
|
|
|
}
|
|
|
|
_ => (),
|
|
|
|
};
|
2019-11-24 20:12:40 +00:00
|
|
|
|
2019-11-30 18:51:31 +00:00
|
|
|
glib::Continue(true)
|
|
|
|
})
|
|
|
|
.unwrap();
|
2019-11-24 20:12:40 +00:00
|
|
|
|
|
|
|
pipeline.set_state(gst::State::Playing).unwrap();
|
|
|
|
|
2019-12-02 09:30:07 +00:00
|
|
|
gst_debug!(CAT, "Starting main loop for premature_shutdown...");
|
2019-11-24 20:12:40 +00:00
|
|
|
l.run();
|
2019-12-02 09:30:07 +00:00
|
|
|
gst_debug!(CAT, "Stopped main loop for premature_shutdown...");
|
2019-11-24 20:12:40 +00:00
|
|
|
}
|