// Copyright (C) 2019 Sebastian Dröge // // Licensed under the Apache License, Version 2.0 or the MIT license // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. // // SPDX-License-Identifier: MIT OR Apache-2.0 #![allow(clippy::single_match)] use gst::{glib, prelude::*}; use http_body_util::combinators::BoxBody; use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; use std::sync::mpsc; fn init() { use std::sync::Once; static INIT: Once = Once::new(); INIT.call_once(|| { // clear this environment because it affects the default settings std::env::remove_var("http_proxy"); gst::init().unwrap(); gstreqwest::plugin_register_static().expect("reqwesthttpsrc tests"); }); } /// Our custom test harness around the HTTP source #[derive(Debug)] struct Harness { src: gst::Element, pad: gst::Pad, receiver: Option>, rt: tokio::runtime::Runtime, } /// Messages sent from our test harness #[allow(clippy::enum_variant_names)] #[derive(Debug, Clone)] enum Message { Buffer(gst::Buffer), Event(gst::Event), Message(gst::Message), ServerError(String), } fn full_body(s: impl Into) -> BoxBody { use http_body_util::{BodyExt, Full}; Full::new(s.into()).map_err(|never| match never {}).boxed() } fn empty_body() -> BoxBody { use http_body_util::{BodyExt, Empty}; Empty::new().map_err(|never| match never {}).boxed() } impl Harness { /// Creates a new HTTP source and test harness around it /// /// `http_func`: Function to generate HTTP responses based on a request /// `setup_func`: Setup function for the HTTP source, should only set properties and similar fn new< F: FnMut( hyper::Request, ) -> hyper::Response> + Send + 'static, G: FnOnce(&gst::Element), >( http_func: F, setup_func: G, ) -> Harness { use hyper::server::conn::http1; use hyper::service::service_fn; use std::sync::{Arc, Mutex}; // Create the HTTP source let src = gst::ElementFactory::make("reqwesthttpsrc").build().unwrap(); // Sender/receiver for the messages we generate from various places for the tests // // Sending to this sender will block until the corresponding item was received from the // receiver, which allows us to handle everything as if it is running in a single thread let (sender, receiver) = mpsc::sync_channel(0); // Sink pad that receives everything the source is generating let pad = gst::Pad::builder(gst::PadDirection::Sink) .name("sink") .chain_function({ let sender_clone = sender.clone(); move |_pad, _parent, buffer| { let _ = sender_clone.send(Message::Buffer(buffer)); Ok(gst::FlowSuccess::Ok) } }) .event_function({ let sender_clone = sender.clone(); move |_pad, _parent, event| { let _ = sender_clone.send(Message::Event(event)); true } }) .build(); let srcpad = src.static_pad("src").unwrap(); srcpad.link(&pad).unwrap(); let bus = gst::Bus::new(); bus.set_flushing(false); src.set_bus(Some(&bus)); let sender_clone = sender.clone(); bus.set_sync_handler(move |_bus, msg| { let _ = sender_clone.send(Message::Message(msg.clone())); gst::BusSyncReply::Drop }); // Activate the pad so that it can be used now pad.set_active(true).unwrap(); // Create the tokio runtime used for the HTTP server in this test let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() .build() .unwrap(); // Create an HTTP sever that listens on localhost on some random, free port let addr = std::net::SocketAddr::from(([127, 0, 0, 1], 0)); // Whenever a new client is connecting, a new service function is requested. For each // client we use the same service function, which simply calls the function used by the // test let http_func = Arc::new(Mutex::new(http_func)); let service = service_fn(move |req: hyper::Request| { let http_func = http_func.clone(); async move { Ok::<_, hyper::Error>((*http_func.lock().unwrap())(req)) } }); let (local_addr_sender, local_addr_receiver) = tokio::sync::oneshot::channel(); // Spawn the server in the background so that it can handle requests rt.spawn(async move { // Bind the server, retrieve the local port that was selected in the end and set this as // the location property on the source let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); let local_addr = listener.local_addr().unwrap(); local_addr_sender.send(local_addr).unwrap(); loop { let (stream, _) = listener.accept().await.unwrap(); let io = tokio_io::TokioIo::new(stream); let service = service.clone(); let sender = sender.clone(); tokio::task::spawn(async move { let http = http1::Builder::new().serve_connection(io, service); if let Err(e) = http.await { let _ = sender.send(Message::ServerError(format!("{e}"))); } }); } }); let local_addr = futures::executor::block_on(local_addr_receiver).unwrap(); src.set_property("location", format!("http://{local_addr}/")); // Let the test setup anything needed on the HTTP source now setup_func(&src); Harness { src, pad, receiver: Some(receiver), rt, } } fn wait_for_error(&mut self) -> glib::Error { loop { match self.receiver.as_mut().unwrap().recv().unwrap() { Message::ServerError(err) => { panic!("Got server error: {err}"); } Message::Event(ev) => { use gst::EventView; match ev.view() { EventView::Eos(_) => { panic!("Got EOS but expected error"); } _ => (), } } Message::Message(msg) => { use gst::MessageView; match msg.view() { MessageView::Error(err) => { return err.error(); } _ => (), } } Message::Buffer(_buffer) => { panic!("Got buffer but expected error"); } } } } fn wait_for_state_change(&mut self) -> gst::State { loop { match self.receiver.as_mut().unwrap().recv().unwrap() { Message::ServerError(err) => { panic!("Got server error: {err}"); } Message::Event(ev) => { use gst::EventView; match ev.view() { EventView::Eos(_) => { panic!("Got EOS but expected state change"); } _ => (), } } Message::Message(msg) => { use gst::MessageView; match msg.view() { MessageView::StateChanged(state) => { return state.current(); } MessageView::Error(err) => { panic!( "Got error: {} ({})", err.error(), err.debug() .unwrap_or_else(|| glib::GString::from("UNKNOWN")) ); } _ => (), } } Message::Buffer(_buffer) => { panic!("Got buffer but expected state change"); } } } } fn wait_for_segment( &mut self, allow_buffer: bool, ) -> gst::FormattedSegment { loop { match self.receiver.as_mut().unwrap().recv().unwrap() { Message::ServerError(err) => { panic!("Got server error: {err}"); } Message::Event(ev) => { use gst::EventView; match ev.view() { EventView::Segment(seg) => { return seg .segment() .clone() .downcast::() .unwrap(); } _ => (), } } Message::Message(msg) => { use gst::MessageView; match msg.view() { MessageView::Error(err) => { panic!( "Got error: {} ({})", err.error(), err.debug() .unwrap_or_else(|| glib::GString::from("UNKNOWN")) ); } _ => (), } } Message::Buffer(_buffer) => { if !allow_buffer { panic!("Got buffer but expected segment"); } } } } } /// Wait until a buffer is available or EOS was reached /// /// This function will panic on errors. fn wait_buffer_or_eos(&mut self) -> Option { loop { match self.receiver.as_mut().unwrap().recv().unwrap() { Message::ServerError(err) => { panic!("Got server error: {err}"); } Message::Event(ev) => { use gst::EventView; match ev.view() { EventView::Eos(_) => return None, _ => (), } } Message::Message(msg) => { use gst::MessageView; match msg.view() { MessageView::Error(err) => { panic!( "Got error: {} ({})", err.error(), err.debug() .unwrap_or_else(|| glib::GString::from("UNKNOWN")) ); } _ => (), } } Message::Buffer(buffer) => return Some(buffer), } } } /// Run some code asynchronously on another thread with the HTTP source fn run(&self, func: F) { self.src.call_async(move |src| func(src)); } } impl Drop for Harness { fn drop(&mut self) { // Shut down everything that was set up for this test harness // and wait until the tokio runtime exited let bus = self.src.bus().unwrap(); bus.set_flushing(true); // Drop the receiver first before setting the state so that // any threads that might still be blocked on the sender // are immediately unblocked self.receiver.take().unwrap(); self.pad.set_active(false).unwrap(); self.src.set_state(gst::State::Null).unwrap(); } } #[test] fn test_basic_request() { use std::io::{Cursor, Read}; init(); // Set up a harness that returns "Hello World" for any HTTP request and checks if the // default headers are all sent let mut h = Harness::new( |req| { let headers = req.headers(); assert_eq!(headers.get("connection").unwrap(), "keep-alive"); assert_eq!(headers.get("accept-encoding").unwrap(), "identity"); assert_eq!(headers.get("icy-metadata").unwrap(), "1"); hyper::Response::new(full_body("Hello World")) }, |_src| { // No additional setup needed here }, ); // Set the HTTP source to Playing so that everything can start h.run(|src| { src.set_state(gst::State::Playing).unwrap(); }); // And now check if the data we receive is exactly what we expect it to be let expected_output = "Hello World"; let mut cursor = Cursor::new(expected_output); while let Some(buffer) = h.wait_buffer_or_eos() { // On the first buffer also check if the duration reported by the HTTP source is what we // would expect it to be if cursor.position() == 0 { assert_eq!( h.src.query_duration::(), Some(gst::format::Bytes::from_usize(expected_output.len())) ); } // Map the buffer readable and check if it contains exactly the data we would expect at // this point after reading everything else we read in previous runs let map = buffer.map_readable().unwrap(); let mut read_buf = vec![0; map.size()]; assert_eq!(cursor.read(&mut read_buf).unwrap(), map.size()); assert_eq!(&*map, &*read_buf); } // Check if everything was read assert_eq!(cursor.position(), 11); } #[test] fn test_basic_request_inverted_defaults() { use std::io::{Cursor, Read}; init(); // Set up a harness that returns "Hello World" for any HTTP request and override various // default properties to check if the corresponding headers are set correctly let mut h = Harness::new( |req| { let headers = req.headers(); assert_eq!(headers.get("connection").unwrap(), "close"); assert_eq!(headers.get("accept-encoding").unwrap(), "gzip"); assert_eq!(headers.get("icy-metadata"), None); assert_eq!(headers.get("user-agent").unwrap(), "test user-agent"); hyper::Response::new(full_body("Hello World")) }, |src| { src.set_property("keep-alive", false); src.set_property("compress", true); src.set_property("iradio-mode", false); src.set_property("user-agent", "test user-agent"); }, ); // Set the HTTP source to Playing so that everything can start h.run(|src| { src.set_state(gst::State::Playing).unwrap(); }); // And now check if the data we receive is exactly what we expect it to be let expected_output = "Hello World"; let mut cursor = Cursor::new(expected_output); while let Some(buffer) = h.wait_buffer_or_eos() { // On the first buffer also check if the duration reported by the HTTP source is what we // would expect it to be if cursor.position() == 0 { assert_eq!( h.src.query_duration::(), Some(gst::format::Bytes::from_usize(expected_output.len())) ); } // Map the buffer readable and check if it contains exactly the data we would expect at // this point after reading everything else we read in previous runs let map = buffer.map_readable().unwrap(); let mut read_buf = vec![0; map.size()]; assert_eq!(cursor.read(&mut read_buf).unwrap(), map.size()); assert_eq!(&*map, &*read_buf); } // Check if everything was read assert_eq!(cursor.position(), 11); } #[test] fn test_extra_headers() { use std::io::{Cursor, Read}; init(); // Set up a harness that returns "Hello World" for any HTTP request and check if the // extra-headers property works correctly for setting additional headers let mut h = Harness::new( |req| { let headers = req.headers(); assert_eq!(headers.get("foo").unwrap(), "bar"); assert_eq!(headers.get("baz").unwrap(), "1"); assert_eq!( headers .get_all("list") .iter() .map(|v| v.to_str().unwrap()) .collect::>(), vec!["1", "2"] ); assert_eq!( headers .get_all("array") .iter() .map(|v| v.to_str().unwrap()) .collect::>(), vec!["1", "2"] ); hyper::Response::new(full_body("Hello World")) }, |src| { src.set_property( "extra-headers", &gst::Structure::builder("headers") .field("foo", "bar") .field("baz", 1i32) .field("list", gst::List::new([1i32, 2i32])) .field("array", gst::Array::new([1i32, 2i32])) .build(), ); }, ); // Set the HTTP source to Playing so that everything can start h.run(|src| { src.set_state(gst::State::Playing).unwrap(); }); // And now check if the data we receive is exactly what we expect it to be let expected_output = "Hello World"; let mut cursor = Cursor::new(expected_output); while let Some(buffer) = h.wait_buffer_or_eos() { // On the first buffer also check if the duration reported by the HTTP source is what we // would expect it to be if cursor.position() == 0 { assert_eq!( h.src.query_duration::(), Some(gst::format::Bytes::from_usize(expected_output.len())) ); } // Map the buffer readable and check if it contains exactly the data we would expect at // this point after reading everything else we read in previous runs let map = buffer.map_readable().unwrap(); let mut read_buf = vec![0; map.size()]; assert_eq!(cursor.read(&mut read_buf).unwrap(), map.size()); assert_eq!(&*map, &*read_buf); } // Check if everything was read assert_eq!(cursor.position(), 11); } #[test] fn test_cookies_property() { use std::io::{Cursor, Read}; init(); // Set up a harness that returns "Hello World" for any HTTP request and check if the // cookies property can be used to set cookies correctly let mut h = Harness::new( |req| { let headers = req.headers(); assert_eq!(headers.get("cookie").unwrap(), "foo=1; bar=2; baz=3"); hyper::Response::new(full_body("Hello World")) }, |src| { src.set_property( "cookies", vec![ String::from("foo=1"), String::from("bar=2"), String::from("baz=3"), ], ); }, ); // Set the HTTP source to Playing so that everything can start h.run(|src| { src.set_state(gst::State::Playing).unwrap(); }); // And now check if the data we receive is exactly what we expect it to be let expected_output = "Hello World"; let mut cursor = Cursor::new(expected_output); while let Some(buffer) = h.wait_buffer_or_eos() { // On the first buffer also check if the duration reported by the HTTP source is what we // would expect it to be if cursor.position() == 0 { assert_eq!( h.src.query_duration::(), Some(gst::format::Bytes::from_usize(expected_output.len())) ); } // Map the buffer readable and check if it contains exactly the data we would expect at // this point after reading everything else we read in previous runs let map = buffer.map_readable().unwrap(); let mut read_buf = vec![0; map.size()]; assert_eq!(cursor.read(&mut read_buf).unwrap(), map.size()); assert_eq!(&*map, &*read_buf); } // Check if everything was read assert_eq!(cursor.position(), 11); } #[test] fn test_iradio_mode() { use std::io::{Cursor, Read}; init(); // Set up a harness that returns "Hello World" for any HTTP request and check if the // iradio-mode property works correctly, and especially the icy- headers are parsed correctly // and put into caps/tags let mut h = Harness::new( |req| { let headers = req.headers(); assert_eq!(headers.get("icy-metadata").unwrap(), "1"); hyper::Response::builder() .header("icy-metaint", "8192") .header("icy-name", "Name") .header("icy-genre", "Genre") .header("icy-url", "http://www.example.com") .header("Content-Type", "audio/mpeg; rate=44100") .body(full_body("Hello World")) .unwrap() }, |_src| { // No additional setup needed here }, ); // Set the HTTP source to Playing so that everything can start h.run(|src| { src.set_state(gst::State::Playing).unwrap(); }); // And now check if the data we receive is exactly what we expect it to be let expected_output = "Hello World"; let mut cursor = Cursor::new(expected_output); while let Some(buffer) = h.wait_buffer_or_eos() { // On the first buffer also check if the duration reported by the HTTP source is what we // would expect it to be if cursor.position() == 0 { assert_eq!( h.src.query_duration::(), Some(gst::format::Bytes::from_usize(expected_output.len())) ); } // Map the buffer readable and check if it contains exactly the data we would expect at // this point after reading everything else we read in previous runs let map = buffer.map_readable().unwrap(); let mut read_buf = vec![0; map.size()]; assert_eq!(cursor.read(&mut read_buf).unwrap(), map.size()); assert_eq!(&*map, &*read_buf); } // Check if everything was read assert_eq!(cursor.position(), 11); let srcpad = h.src.static_pad("src").unwrap(); let caps = srcpad.current_caps().unwrap(); assert_eq!( caps, gst::Caps::builder("application/x-icy") .field("metadata-interval", 8192i32) .field("content-type", "audio/mpeg; rate=44100") .build() ); { if let Some(tag_event) = srcpad.sticky_event::(0) { let tags = tag_event.tag(); assert_eq!(tags.get::().unwrap().get(), "Name"); assert_eq!(tags.get::().unwrap().get(), "Genre"); assert_eq!( tags.get::().unwrap().get(), "http://www.example.com", ); } else { unreachable!(); } } } #[test] fn test_audio_l16() { use std::io::{Cursor, Read}; init(); // Set up a harness that returns "Hello World" for any HTTP request and check if the // audio/L16 content type is parsed correctly and put into the caps let mut h = Harness::new( |_req| { hyper::Response::builder() .header("Content-Type", "audio/L16; rate=48000; channels=2") .body(full_body("Hello World")) .unwrap() }, |_src| { // No additional setup needed here }, ); // Set the HTTP source to Playing so that everything can start h.run(|src| { src.set_state(gst::State::Playing).unwrap(); }); // And now check if the data we receive is exactly what we expect it to be let expected_output = "Hello World"; let mut cursor = Cursor::new(expected_output); while let Some(buffer) = h.wait_buffer_or_eos() { // On the first buffer also check if the duration reported by the HTTP source is what we // would expect it to be if cursor.position() == 0 { assert_eq!( h.src.query_duration::(), Some(gst::format::Bytes::from_usize(expected_output.len())) ); } // Map the buffer readable and check if it contains exactly the data we would expect at // this point after reading everything else we read in previous runs let map = buffer.map_readable().unwrap(); let mut read_buf = vec![0; map.size()]; assert_eq!(cursor.read(&mut read_buf).unwrap(), map.size()); assert_eq!(&*map, &*read_buf); } // Check if everything was read assert_eq!(cursor.position(), 11); let srcpad = h.src.static_pad("src").unwrap(); let caps = srcpad.current_caps().unwrap(); assert_eq!( caps, gst::Caps::builder("audio/x-unaligned-raw") .field("format", "S16BE") .field("layout", "interleaved") .field("channels", 2i32) .field("rate", 48_000i32) .build() ); } #[test] fn test_authorization() { use std::io::{Cursor, Read}; init(); // Set up a harness that returns "Hello World" for any HTTP request // but requires authentication first let mut h = Harness::new( |req| { let headers = req.headers(); if let Some(authorization) = headers.get("authorization") { assert_eq!(authorization, "Basic dXNlcjpwYXNzd29yZA=="); hyper::Response::new(full_body("Hello World")) } else { hyper::Response::builder() .status(reqwest::StatusCode::UNAUTHORIZED.as_u16()) .header("WWW-Authenticate", "Basic realm=\"realm\"") .body(empty_body()) .unwrap() } }, |src| { src.set_property("user-id", "user"); src.set_property("user-pw", "password"); }, ); // Set the HTTP source to Playing so that everything can start h.run(|src| { src.set_state(gst::State::Playing).unwrap(); }); // And now check if the data we receive is exactly what we expect it to be let expected_output = "Hello World"; let mut cursor = Cursor::new(expected_output); while let Some(buffer) = h.wait_buffer_or_eos() { // On the first buffer also check if the duration reported by the HTTP source is what we // would expect it to be if cursor.position() == 0 { assert_eq!( h.src.query_duration::(), Some(gst::format::Bytes::from_usize(expected_output.len())) ); } // Map the buffer readable and check if it contains exactly the data we would expect at // this point after reading everything else we read in previous runs let map = buffer.map_readable().unwrap(); let mut read_buf = vec![0; map.size()]; assert_eq!(cursor.read(&mut read_buf).unwrap(), map.size()); assert_eq!(&*map, &*read_buf); } // Check if everything was read assert_eq!(cursor.position(), 11); } #[test] fn test_404_error() { init(); // Harness that always returns 404 and we check if that is mapped to the correct error code let mut h = Harness::new( |_req| { hyper::Response::builder() .status(reqwest::StatusCode::NOT_FOUND.as_u16()) .body(empty_body()) .unwrap() }, |_src| {}, ); h.run(|src| { let _ = src.set_state(gst::State::Playing); }); let err_code = h.wait_for_error(); if let Some(err) = err_code.kind::() { assert_eq!(err, gst::ResourceError::NotFound); } } #[test] fn test_403_error() { init(); // Harness that always returns 403 and we check if that is mapped to the correct error code let mut h = Harness::new( |_req| { hyper::Response::builder() .status(reqwest::StatusCode::FORBIDDEN.as_u16()) .body(empty_body()) .unwrap() }, |_src| {}, ); h.run(|src| { let _ = src.set_state(gst::State::Playing); }); let err_code = h.wait_for_error(); if let Some(err) = err_code.kind::() { assert_eq!(err, gst::ResourceError::NotAuthorized); } } #[test] fn test_network_error() { init(); // Harness that always fails with a network error let mut h = Harness::new( |_req| unreachable!(), |src| { src.set_property("location", "http://0.0.0.0:0"); }, ); h.run(|src| { let _ = src.set_state(gst::State::Playing); }); let err_code = h.wait_for_error(); if let Some(err) = err_code.kind::() { assert_eq!(err, gst::ResourceError::OpenRead); } } #[test] fn test_seek_after_ready() { use std::io::{Cursor, Read}; init(); // Harness that checks if seeking in Ready state works correctly let mut h = Harness::new( |req| { let headers = req.headers(); if let Some(range) = headers.get("Range") { if range == "bytes=123-" { let mut data_seek = vec![0; 8192 - 123]; for (i, d) in data_seek.iter_mut().enumerate() { *d = ((i + 123) % 256) as u8; } hyper::Response::builder() .header("content-length", 8192 - 123) .header("accept-ranges", "bytes") .header("content-range", "bytes 123-8192/8192") .body(full_body(data_seek)) .unwrap() } else { panic!("Received an unexpected Range header") } } else { // `panic!("Received no Range header")` should be called here but due to a bug // in `basesrc` we cant do that here. If we do a seek in READY state, basesrc // will do a `start()` call without seek. Once we get seek forwarded, the call // with seek is made. This issue has to be solved. // issue link: https://gitlab.freedesktop.org/gstreamer/gstreamer/issues/413 let mut data_full = vec![0; 8192]; for (i, d) in data_full.iter_mut().enumerate() { *d = (i % 256) as u8; } hyper::Response::builder() .header("content-length", 8192) .header("accept-ranges", "bytes") .body(full_body(data_full)) .unwrap() } }, |_src| {}, ); h.run(|src| { src.set_state(gst::State::Ready).unwrap(); }); let current_state = h.wait_for_state_change(); assert_eq!(current_state, gst::State::Ready); h.run(|src| { src.seek_simple(gst::SeekFlags::FLUSH, 123.bytes()).unwrap(); src.set_state(gst::State::Playing).unwrap(); }); let segment = h.wait_for_segment(false); assert_eq!(segment.start(), Some(123.bytes())); let mut expected_output = vec![0; 8192 - 123]; for (i, d) in expected_output.iter_mut().enumerate() { *d = ((123 + i) % 256) as u8; } let mut cursor = Cursor::new(expected_output); while let Some(buffer) = h.wait_buffer_or_eos() { assert_eq!(buffer.offset(), 123 + cursor.position()); let map = buffer.map_readable().unwrap(); let mut read_buf = vec![0; map.size()]; assert_eq!(cursor.read(&mut read_buf).unwrap(), map.size()); assert_eq!(&*map, &*read_buf); } } #[test] fn test_seek_after_buffer_received() { use std::io::{Cursor, Read}; init(); // Harness that checks if seeking in Playing state after having received a buffer works // correctly let mut h = Harness::new( |req| { let headers = req.headers(); if let Some(range) = headers.get("Range") { if range == "bytes=123-" { let mut data_seek = vec![0; 8192 - 123]; for (i, d) in data_seek.iter_mut().enumerate() { *d = ((i + 123) % 256) as u8; } hyper::Response::builder() .header("content-length", 8192 - 123) .header("accept-ranges", "bytes") .header("content-range", "bytes 123-8192/8192") .body(full_body(data_seek)) .unwrap() } else { panic!("Received an unexpected Range header") } } else { let mut data_full = vec![0; 8192]; for (i, d) in data_full.iter_mut().enumerate() { *d = (i % 256) as u8; } hyper::Response::builder() .header("content-length", 8192) .header("accept-ranges", "bytes") .body(full_body(data_full)) .unwrap() } }, |_src| {}, ); h.run(|src| { src.set_state(gst::State::Playing).unwrap(); }); //wait for a buffer let buffer = h.wait_buffer_or_eos().unwrap(); assert_eq!(buffer.offset(), 0); //seek to a position after a buffer is Received h.run(|src| { src.seek_simple(gst::SeekFlags::FLUSH, 123.bytes()).unwrap(); }); let segment = h.wait_for_segment(true); assert_eq!(segment.start(), Some(123.bytes())); let mut expected_output = vec![0; 8192 - 123]; for (i, d) in expected_output.iter_mut().enumerate() { *d = ((123 + i) % 256) as u8; } let mut cursor = Cursor::new(expected_output); while let Some(buffer) = h.wait_buffer_or_eos() { assert_eq!(buffer.offset(), 123 + cursor.position()); let map = buffer.map_readable().unwrap(); let mut read_buf = vec![0; map.size()]; assert_eq!(cursor.read(&mut read_buf).unwrap(), map.size()); assert_eq!(&*map, &*read_buf); } } #[test] fn test_seek_with_stop_position() { use std::io::{Cursor, Read}; init(); // Harness that checks if seeking in Playing state after having received a buffer works // correctly let mut h = Harness::new( |req| { let headers = req.headers(); if let Some(range) = headers.get("Range") { if range == "bytes=123-130" { let mut data_seek = vec![0; 8]; for (i, d) in data_seek.iter_mut().enumerate() { *d = ((i + 123) % 256) as u8; } hyper::Response::builder() .header("content-length", 8) .header("accept-ranges", "bytes") .header("content-range", "bytes 123-130/8192") .body(full_body(data_seek)) .unwrap() } else { panic!("Received an unexpected Range header") } } else { let mut data_full = vec![0; 8192]; for (i, d) in data_full.iter_mut().enumerate() { *d = (i % 256) as u8; } hyper::Response::builder() .header("content-length", 8192) .header("accept-ranges", "bytes") .body(full_body(data_full)) .unwrap() } }, |_src| {}, ); h.run(|src| { src.set_state(gst::State::Playing).unwrap(); }); //wait for a buffer let buffer = h.wait_buffer_or_eos().unwrap(); assert_eq!(buffer.offset(), 0); //seek to a position after a buffer is Received let start = 123.bytes(); let stop = 131.bytes(); h.run(move |src| { src.seek( 1.0, gst::SeekFlags::FLUSH, gst::SeekType::Set, start, gst::SeekType::Set, stop, ) .unwrap(); }); let segment = h.wait_for_segment(true); assert_eq!(segment.start(), Some(start)); assert_eq!(segment.stop(), Some(stop)); let mut expected_output = vec![0; 8]; for (i, d) in expected_output.iter_mut().enumerate() { *d = ((123 + i) % 256) as u8; } let mut cursor = Cursor::new(expected_output); while let Some(buffer) = h.wait_buffer_or_eos() { assert_eq!(buffer.offset(), 123 + cursor.position()); let map = buffer.map_readable().unwrap(); let mut read_buf = vec![0; map.size()]; assert_eq!(cursor.read(&mut read_buf).unwrap(), map.size()); assert_eq!(&*map, &*read_buf); } } #[test] fn test_cookies() { init(); // Set up a harness that returns "Hello World" for any HTTP request and sets a cookie in our // client let mut h = Harness::new( |_req| { hyper::Response::builder() .header("Set-Cookie", "foo=bar") .body(full_body("Hello World")) .unwrap() }, |_src| { // No additional setup needed here }, ); // Set the HTTP source to Playing so that everything can start h.run(|src| { src.set_state(gst::State::Playing).unwrap(); }); let mut num_bytes = 0; while let Some(buffer) = h.wait_buffer_or_eos() { num_bytes += buffer.size(); } assert_eq!(num_bytes, 11); // Set up a second harness that returns "Hello World" for any HTTP request that checks if our // client provides the cookie that was set in the previous request let mut h2 = Harness::new( |req| { let headers = req.headers(); let cookies = headers .get("Cookie") .expect("No cookies set") .to_str() .unwrap(); assert!(cookies.split(';').any(|c| c == "foo=bar")); hyper::Response::builder() .body(full_body("Hello again!")) .unwrap() }, |_src| { // No additional setup needed here }, ); let context = h.src.context("gst.reqwest.client").expect("No context"); h2.src.set_context(&context); // Set the HTTP source to Playing so that everything can start h2.run(|src| { src.set_state(gst::State::Playing).unwrap(); }); let mut num_bytes = 0; while let Some(buffer) = h2.wait_buffer_or_eos() { num_bytes += buffer.size(); } assert_eq!(num_bytes, 12); } #[test] fn test_proxy_prop_souphttpsrc_compatibility() { init(); fn assert_proxy_set(set_to: Option<&str>, expected: Option<&str>) { // The same assertions should hold for "souphttpsrc". let src = gst::ElementFactory::make("reqwesthttpsrc") .property("proxy", set_to) .build() .unwrap(); assert_eq!(src.property::>("proxy").as_deref(), expected); } // Test env var proxy. assert_proxy_set(Some("http://mydomain/"), Some("http://mydomain/")); // It should prepend http if no protocol specified and add /. assert_proxy_set(Some("myotherdomain"), Some("http://myotherdomain/")); // Empty env var should result in "" proxy (meaning None) for compatibility. assert_proxy_set(Some(""), Some("")); // It should allow setting this value for proxy for compatibility. assert_proxy_set(Some("&$"), Some("http://&$/")); // No env var should result in "" proxy (meaning None) for compatibility. assert_proxy_set(None, Some("")); } #[test] fn test_proxy() { init(); let mut h = Harness::new( |_req| { hyper::Response::builder() .body(full_body("Hello Proxy World")) .unwrap() }, |_src| {}, ); // Simplest possible implementation of naive oneshot proxy server? // Listen on socket before spawning thread (we won't error out with connection refused). let (proxy_handle, proxy_addr) = { let (proxy_addr_sender, proxy_addr_receiver) = tokio::sync::oneshot::channel(); let proxy_handle = h.rt.spawn(async move { let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let proxy_addr = listener.local_addr().unwrap(); println!("listening on {proxy_addr}, starting proxy server"); proxy_addr_sender.send(proxy_addr).unwrap(); println!("awaiting connection to proxy server"); let (conn, _addr) = listener.accept().await.unwrap(); let (conn_reader, mut conn_writer) = tokio::io::split(conn); println!("client connected, reading request line"); let mut reader = tokio::io::BufReader::new(conn_reader); let mut buf = String::new(); reader.read_line(&mut buf).await.unwrap(); let parts: Vec<&str> = buf.split(' ').collect(); let url = reqwest::Url::parse(parts[1]).unwrap(); let host = format!( "{}:{}", url.host_str().unwrap(), url.port_or_known_default().unwrap() ); println!("connecting to target server {host}"); let mut server_connection = tokio::net::TcpStream::connect(host).await.unwrap(); println!("connected to target server, sending modified request line"); server_connection .write_all(format!("{} {} {}", parts[0], url.path(), parts[2]).as_bytes()) .await .unwrap(); let (mut server_reader, mut server_writer) = tokio::io::split(server_connection); println!("sent modified request line, forwarding data in both directions"); let send_join_handle = tokio::task::spawn(async move { tokio::io::copy(&mut reader, &mut server_writer) .await .unwrap(); }); tokio::io::copy(&mut server_reader, &mut conn_writer) .await .unwrap(); send_join_handle.await.unwrap(); println!("shutting down proxy server"); }); ( proxy_handle, futures::executor::block_on(proxy_addr_receiver).unwrap(), ) }; // Set the HTTP source to Playing so that everything can start. h.run(move |src| { src.set_property("proxy", proxy_addr.to_string()); src.set_state(gst::State::Playing).unwrap(); }); // Wait for a buffer. let mut num_bytes = 0; while let Some(buffer) = h.wait_buffer_or_eos() { num_bytes += buffer.size(); } assert_eq!(num_bytes, "Hello Proxy World".len()); // Don't leave threads hanging around. proxy_handle.abort(); let _ = futures::executor::block_on(proxy_handle); } /// Adapter from tokio IO traits to hyper IO traits. mod tokio_io { use pin_project_lite::pin_project; use std::pin::Pin; use std::task::{Context, Poll}; pin_project! { #[derive(Debug)] pub struct TokioIo { #[pin] inner: T, } } impl TokioIo { pub fn new(inner: T) -> Self { Self { inner } } } impl hyper::rt::Read for TokioIo where T: tokio::io::AsyncRead, { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, mut buf: hyper::rt::ReadBufCursor<'_>, ) -> Poll> { let n = unsafe { let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut()); match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) { Poll::Ready(Ok(())) => tbuf.filled().len(), other => return other, } }; unsafe { buf.advance(n); } Poll::Ready(Ok(())) } } impl hyper::rt::Write for TokioIo where T: tokio::io::AsyncWrite, { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf) } fn poll_flush( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { tokio::io::AsyncWrite::poll_flush(self.project().inner, cx) } fn poll_shutdown( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx) } fn is_write_vectored(&self) -> bool { tokio::io::AsyncWrite::is_write_vectored(&self.inner) } fn poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[std::io::IoSlice<'_>], ) -> Poll> { tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs) } } }