diff --git a/gst-plugin-threadshare/tests/proxy.rs b/gst-plugin-threadshare/tests/proxy.rs new file mode 100644 index 00000000..8d2e5399 --- /dev/null +++ b/gst-plugin-threadshare/tests/proxy.rs @@ -0,0 +1,144 @@ +// Copyright (C) 2018 Sebastian Dröge +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +extern crate glib; +use glib::prelude::*; + +extern crate gstreamer as gst; +use gst::prelude::*; + +use std::sync::{Arc, Mutex}; + +fn init() { + use std::sync::{Once, ONCE_INIT}; + static INIT: Once = ONCE_INIT; + + INIT.call_once(|| { + gst::init().unwrap(); + + #[cfg(debug_assertions)] + { + use std::path::Path; + + let mut path = Path::new("target/debug"); + if !path.exists() { + path = Path::new("../target/debug"); + } + + gst::Registry::get().scan_path(path); + } + #[cfg(not(debug_assertions))] + { + use std::path::Path; + + let mut path = Path::new("target/release"); + if !path.exists() { + path = Path::new("../target/release"); + } + + gst::Registry::get().scan_path(path); + } + }); +} + +fn test_push(n_threads: i32) { + init(); + + let pipeline = gst::Pipeline::new(None); + let fakesrc = gst::ElementFactory::make("fakesrc", None).unwrap(); + let proxysink = gst::ElementFactory::make("ts-proxysink", None).unwrap(); + let proxysrc = gst::ElementFactory::make("ts-proxysrc", None).unwrap(); + let appsink = gst::ElementFactory::make("appsink", None).unwrap(); + + pipeline + .add_many(&[&fakesrc, &proxysink, &proxysrc, &appsink]) + .unwrap(); + fakesrc.link(&proxysink).unwrap(); + proxysrc.link(&appsink).unwrap(); + + fakesrc.set_property("num-buffers", &3i32).unwrap(); + proxysink + .set_property("proxy-context", &format!("test-{}", n_threads)) + .unwrap(); + proxysrc + .set_property("proxy-context", &format!("test-{}", n_threads)) + .unwrap(); + proxysrc + .set_property("context-threads", &n_threads) + .unwrap(); + + appsink.set_property("emit-signals", &true).unwrap(); + + let samples = Arc::new(Mutex::new(Vec::new())); + + let samples_clone = samples.clone(); + appsink + .connect("new-sample", true, move |args| { + let appsink = args[0].get::().unwrap(); + + let sample = appsink + .emit("pull-sample", &[]) + .unwrap() + .unwrap() + .get::() + .unwrap(); + + samples_clone.lock().unwrap().push(sample); + + Some(gst::FlowReturn::Ok.to_value()) + }) + .unwrap(); + + pipeline + .set_state(gst::State::Playing) + .into_result() + .unwrap(); + + let mut eos = false; + let bus = pipeline.get_bus().unwrap(); + while let Some(msg) = bus.timed_pop(5 * gst::SECOND) { + use gst::MessageView; + match msg.view() { + MessageView::Eos(..) => { + eos = true; + break; + } + MessageView::Error(..) => unreachable!(), + _ => (), + } + } + + assert!(eos); + let samples = samples.lock().unwrap(); + assert_eq!(samples.len(), 3); + + for sample in samples.iter() { + assert!(sample.get_buffer().is_some()); + } + + pipeline.set_state(gst::State::Null).into_result().unwrap(); +} + +#[test] +fn test_push_single_threaded() { + test_push(-1); +} + +#[test] +fn test_push_multi_threaded() { + test_push(2); +}