From 6936ce11a791f4afe7873e74a37aaecb4723191f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 1 Jul 2019 01:59:44 +0300 Subject: [PATCH] reqwesthttpsrc: Add a basic unit test and a custom test harness --- gst-plugin-reqwest/Cargo.toml | 9 +- gst-plugin-reqwest/tests/reqwesthttpsrc.rs | 243 +++++++++++++++++++++ 2 files changed, 249 insertions(+), 3 deletions(-) create mode 100644 gst-plugin-reqwest/tests/reqwesthttpsrc.rs diff --git a/gst-plugin-reqwest/Cargo.toml b/gst-plugin-reqwest/Cargo.toml index aaf02e71..098db1ac 100644 --- a/gst-plugin-reqwest/Cargo.toml +++ b/gst-plugin-reqwest/Cargo.toml @@ -13,14 +13,17 @@ glib = { git = "https://github.com/gtk-rs/glib" } reqwest = "0.9" futures = "0.1.23" hyperx = "0.15" -gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["subclassing"] } +gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_10", "subclassing"] } gstreamer-base = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["subclassing"] } tokio = "0.1" bytes = "0.4" +[dev-dependencies] +hyper = "0.12" + [lib] -name = "gstrshttp" -crate-type = ["cdylib"] +name = "gstreqwest" +crate-type = ["cdylib", "rlib"] path = "src/lib.rs" [build-dependencies] diff --git a/gst-plugin-reqwest/tests/reqwesthttpsrc.rs b/gst-plugin-reqwest/tests/reqwesthttpsrc.rs new file mode 100644 index 00000000..fe664cae --- /dev/null +++ b/gst-plugin-reqwest/tests/reqwesthttpsrc.rs @@ -0,0 +1,243 @@ +// Copyright (C) 2019 Sebastian Dröge +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use gst::prelude::*; +use gstreamer as gst; + +use std::sync::mpsc; + +fn init() { + use std::sync::{Once, ONCE_INIT}; + static INIT: Once = ONCE_INIT; + + INIT.call_once(|| { + gst::init().unwrap(); + gstreqwest::plugin_register_static().expect("reqwesthttpsrc tests"); + }); +} + +/// Our custom test harness around the HTTP source +#[derive(Debug)] +struct Harness { + src: gst::Element, + pad: gst::Pad, + receiver: mpsc::Receiver, + rt: Option, +} + +/// Messages sent from our test harness +#[derive(Debug, Clone)] +enum Message { + Buffer(gst::Buffer), + Event(gst::Event), + Message(gst::Message), + ServerError(String), +} + +impl Harness { + /// Creates a new HTTP source and test harness around it + /// + /// `http_func`: Function to generate HTTP responses based on a request + /// `setup_func`: Setup function for the HTTP source, should only set properties and similar + fn new< + F: FnMut(hyper::Request) -> hyper::Response + Send + 'static, + G: FnOnce(&gst::Element), + >( + http_func: F, + setup_func: G, + ) -> Harness { + use hyper::service::{make_service_fn, service_fn_ok}; + use hyper::Server; + use std::sync::{Arc, Mutex}; + use tokio::prelude::*; + + // Create the HTTP source + let src = gst::ElementFactory::make("reqwesthttpsrc", None).unwrap(); + + // Sender/receiver for the messages we generate from various places for the tests + // + // Sending to this sender will block until the corresponding item was received from the + // receiver, which allows us to handle everything as if it is running in a single thread + let (sender, receiver) = mpsc::sync_channel(0); + + // Sink pad that receives everything the source is generating + let pad = gst::Pad::new(Some("sink"), gst::PadDirection::Sink); + let srcpad = src.get_static_pad("src").unwrap(); + srcpad.link(&pad).unwrap(); + + // Collect all buffers, events and messages sent from the source + let sender_clone = sender.clone(); + pad.set_chain_function(move |_pad, _parent, buffer| { + sender_clone.send(Message::Buffer(buffer)).unwrap(); + Ok(gst::FlowSuccess::Ok) + }); + let sender_clone = sender.clone(); + pad.set_event_function(move |_pad, _parent, event| { + sender_clone.send(Message::Event(event)).unwrap(); + true + }); + let bus = gst::Bus::new(); + src.set_bus(Some(&bus)); + let sender_clone = sender.clone(); + bus.set_sync_handler(move |_bus, msg| { + sender_clone.send(Message::Message(msg.clone())).unwrap(); + gst::BusSyncReply::Drop + }); + + // Activate the pad so that it can be used now + pad.set_active(true).unwrap(); + + // Create the tokio runtime used for the HTTP server in this test + let mut rt = tokio::runtime::Builder::new() + .core_threads(1) + .build() + .unwrap(); + + // Create an HTTP sever that listens on localhost on some random, free port + let addr = ([127, 0, 0, 1], 0).into(); + + // Whenever a new client is connecting, a new service function is requested. For each + // client we use the same service function, which simply calls the function used by the + // test + let http_func = Arc::new(Mutex::new(http_func)); + let make_service = make_service_fn(move |_ctx| { + let http_func = http_func.clone(); + service_fn_ok(move |req| (&mut *http_func.lock().unwrap())(req)) + }); + + // Bind the server, retrieve the local port that was selected in the end and set this as + // the location property on the source + let server = Server::bind(&addr).serve(make_service); + let local_addr = server.local_addr(); + src.set_property("location", &format!("http://{}/", local_addr)) + .unwrap(); + + // Spawn the server in the background so that it can handle requests + rt.spawn(server.map_err(move |e| { + sender + .send(Message::ServerError(format!("{:?}", e))) + .unwrap(); + })); + + // Let the test setup anything needed on the HTTP source now + setup_func(&src); + + Harness { + src, + pad, + receiver, + rt: Some(rt), + } + } + + /// Wait until a buffer is available or EOS was reached + /// + /// This function will panic on errors. + fn wait_buffer_or_eos(&mut self) -> Option { + loop { + match self.receiver.recv().unwrap() { + Message::ServerError(err) => { + panic!("Got server error: {}", err); + } + Message::Event(ev) => { + use gst::EventView; + + match ev.view() { + EventView::Eos(_) => return None, + _ => (), + } + } + Message::Message(msg) => { + use gst::MessageView; + + match msg.view() { + MessageView::Error(err) => { + use std::error::Error; + + panic!( + "Got error: {} ({})", + err.get_error().description(), + err.get_debug().unwrap_or_else(|| String::from("None")) + ); + } + _ => (), + } + } + Message::Buffer(buffer) => return Some(buffer), + } + } + } + + /// Run some code asynchronously on another thread with the HTTP source + fn run(&self, func: F) { + self.src.call_async(move |src| func(src)); + } +} + +impl Drop for Harness { + fn drop(&mut self) { + use tokio::prelude::*; + + // Shut down everything that was set up for this test harness + // and wait until the tokio runtime exited + let bus = self.src.get_bus().unwrap(); + bus.set_flushing(true); + self.pad.set_active(false).unwrap(); + self.src.set_state(gst::State::Null).unwrap(); + + self.rt.take().unwrap().shutdown_now().wait().unwrap(); + } +} + +#[test] +fn test_basic_request() { + use std::io::{Cursor, Read}; + init(); + + // Set up a simple harness that returns "Hello World" for any HTTP request + let mut h = Harness::new( + |_req| { + use hyper::{Body, Response}; + + Response::new(Body::from("Hello World")) + }, + |_src| { + // No additional setup needed here + }, + ); + + // Set the HTTP source to Playing so that everything can start + h.run(|src| { + src.set_state(gst::State::Playing).unwrap(); + }); + + // And now check if the data we receive is exactly what we expect it to be + let expected_output = "Hello World"; + let mut cursor = Cursor::new(expected_output); + + while let Some(buffer) = h.wait_buffer_or_eos() { + // On the first buffer also check if the duration reported by the HTTP source is what we + // would expect it to be + if cursor.position() == 0 { + assert_eq!( + h.src.query_duration::(), + Some(gst::format::Bytes::from(expected_output.len() as u64)) + ); + } + + // Map the buffer readable and check if it contains exactly the data we would expect at + // this point after reading everything else we read in previous runs + let map = buffer.map_readable().unwrap(); + let mut read_buf = vec![0; map.get_size()]; + assert_eq!(cursor.read(&mut read_buf).unwrap(), map.get_size()); + assert_eq!(&*map, &*read_buf); + } + + // Check if everything was read + assert_eq!(cursor.position(), 11); +}