threadshare: Add test for single and multi-threaded appsrc

This commit is contained in:
Sebastian Dröge 2018-04-04 17:42:27 +03:00
parent 82ab78fa3d
commit 28100d3a63

View file

@ -21,7 +21,7 @@ use glib::prelude::*;
extern crate gstreamer as gst;
use gst::prelude::*;
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
fn init() {
use std::sync::{Once, ONCE_INIT};
@ -55,8 +55,7 @@ fn init() {
});
}
#[test]
fn test_push() {
fn test_push(n_threads: i32) {
init();
let pipeline = gst::Pipeline::new(None);
@ -67,12 +66,14 @@ fn test_push() {
let caps = gst::Caps::new_simple("foo/bar", &[]);
appsrc.set_property("caps", &caps).unwrap();
appsrc.set_property("context-threads", &n_threads).unwrap();
appsrc.set_property("do-timestamp", &true).unwrap();
appsink.set_property("emit-signals", &true).unwrap();
let (sender, receiver) = mpsc::sync_channel(20);
let samples = Arc::new(Mutex::new(Vec::new()));
let samples_clone = samples.clone();
appsink
.connect("new-sample", true, move |args| {
let appsink = args[0].get::<gst::Element>().unwrap();
@ -83,7 +84,8 @@ fn test_push() {
.unwrap()
.get::<gst::Sample>()
.unwrap();
sender.send(sample).unwrap();
samples_clone.lock().unwrap().push(sample);
Some(gst::FlowReturn::Ok.to_value())
})
@ -94,30 +96,17 @@ fn test_push() {
.into_result()
.unwrap();
assert!(
appsrc
.emit("push-buffer", &[&gst::Buffer::new()])
.unwrap()
.unwrap()
.get::<bool>()
.unwrap()
);
assert!(
appsrc
.emit("push-buffer", &[&gst::Buffer::new()])
.unwrap()
.unwrap()
.get::<bool>()
.unwrap()
);
assert!(
appsrc
.emit("push-buffer", &[&gst::Buffer::new()])
.unwrap()
.unwrap()
.get::<bool>()
.unwrap()
);
for _ in 0..3 {
assert!(
appsrc
.emit("push-buffer", &[&gst::Buffer::new()])
.unwrap()
.unwrap()
.get::<bool>()
.unwrap()
);
}
assert!(
appsrc
.emit("end-of-stream", &[])
@ -127,17 +116,38 @@ fn test_push() {
.unwrap()
);
let sample = receiver.recv().unwrap();
assert!(sample.get_buffer().is_some());
assert_eq!(Some(&caps), sample.get_caps().as_ref());
let mut eos = false;
let bus = pipeline.get_bus().unwrap();
while let Some(msg) = bus.timed_pop(5 * gst::SECOND) {
use gst::MessageView;
match msg.view() {
MessageView::Eos(..) => {
eos = true;
break;
}
MessageView::Error(..) => unreachable!(),
_ => (),
}
}
let sample = receiver.recv().unwrap();
assert!(sample.get_buffer().is_some());
assert_eq!(Some(&caps), sample.get_caps().as_ref());
assert!(eos);
let samples = samples.lock().unwrap();
assert_eq!(samples.len(), 3);
let sample = receiver.recv().unwrap();
assert!(sample.get_buffer().is_some());
assert_eq!(Some(&caps), sample.get_caps().as_ref());
for sample in samples.iter() {
assert!(sample.get_buffer().is_some());
assert_eq!(Some(&caps), sample.get_caps().as_ref());
}
pipeline.set_state(gst::State::Null).into_result().unwrap();
}
#[test]
fn test_push_single_threaded() {
test_push(-1);
}
#[test]
fn test_push_multi_threaded() {
test_push(2);
}