diff --git a/Cargo.lock b/Cargo.lock index 325ba115..7a87c5bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -337,7 +337,7 @@ dependencies = [ "fastrand", "hex", "http 0.2.12", - "hyper", + "hyper 0.14.28", "ring", "time", "tokio", @@ -374,7 +374,7 @@ dependencies = [ "bytes", "fastrand", "http 0.2.12", - "http-body", + "http-body 0.4.6", "percent-encoding", "pin-project-lite", "tracing", @@ -450,7 +450,7 @@ dependencies = [ "hex", "hmac 0.12.1", "http 0.2.12", - "http-body", + "http-body 0.4.6", "lru", "once_cell", "percent-encoding", @@ -546,7 +546,7 @@ dependencies = [ "aws-types", "bytes", "http 0.2.12", - "hyper", + "hyper 0.14.28", "once_cell", "regex-lite", "tracing", @@ -628,7 +628,7 @@ dependencies = [ "crc32fast", "hex", "http 0.2.12", - "http-body", + "http-body 0.4.6", "md-5", "pin-project-lite", "sha1", @@ -660,7 +660,7 @@ dependencies = [ "bytes-utils", "futures-core", "http 0.2.12", - "http-body", + "http-body 0.4.6", "once_cell", "percent-encoding", "pin-project-lite", @@ -699,10 +699,10 @@ dependencies = [ "aws-smithy-types", "bytes", "fastrand", - "h2", + "h2 0.3.25", "http 0.2.12", - "http-body", - "hyper", + "http-body 0.4.6", + "hyper 0.14.28", "hyper-rustls", "once_cell", "pin-project-lite", @@ -740,7 +740,7 @@ dependencies = [ "bytes-utils", "futures-core", "http 0.2.12", - "http-body", + "http-body 0.4.6", "itoa", "num-integer", "pin-project-lite", @@ -2625,15 +2625,18 @@ dependencies = [ name = "gst-plugin-reqwest" version = "0.12.3" dependencies = [ + "bytes", "futures", "gst-plugin-version-helper", "gstreamer", "gstreamer-base", - "headers", - "hyper", + "headers 0.4.0", + "http-body-util", + "hyper 1.2.0", "mime", "once_cell", - "reqwest", + "pin-project-lite", + "reqwest 0.12.1", "tokio", "url", ] @@ -2901,7 +2904,7 @@ dependencies = [ "parse_link_header", "rand", "regex", - "reqwest", + "reqwest 0.11.26", "serde", "serde_json", "thiserror", @@ -2960,7 +2963,7 @@ dependencies = [ "gstreamer-webrtc", "once_cell", "parse_link_header", - "reqwest", + "reqwest 0.11.26", "tokio", ] @@ -3430,6 +3433,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "h2" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51ee2dd2e4f378392eeff5d51618cd9a63166a2513846bbc55f21cfacd9199d4" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 1.1.0", + "indexmap 2.2.5", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -3454,13 +3476,28 @@ checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270" dependencies = [ "base64 0.21.7", "bytes", - "headers-core", + "headers-core 0.2.0", "http 0.2.12", "httpdate", "mime", "sha1", ] +[[package]] +name = "headers" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "322106e6bd0cba2d5ead589ddb8150a13d7c4217cf80d7c4f682ca994ccc6aa9" +dependencies = [ + "base64 0.21.7", + "bytes", + "headers-core 0.3.0", + "http 1.1.0", + "httpdate", + "mime", + "sha1", +] + [[package]] name = "headers-core" version = "0.2.0" @@ -3470,6 +3507,15 @@ dependencies = [ "http 0.2.12", ] +[[package]] +name = "headers-core" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" +dependencies = [ + "http 1.1.0", +] + [[package]] name = "heck" version = "0.4.1" @@ -3586,6 +3632,29 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes", + "http 1.1.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d" +dependencies = [ + "bytes", + "futures-core", + "http 1.1.0", + "http-body 1.0.0", + "pin-project-lite", +] + [[package]] name = "httparse" version = "1.8.0" @@ -3620,20 +3689,41 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", + "h2 0.3.25", "http 0.2.12", - "http-body", + "http-body 0.4.6", "httparse", "httpdate", "itoa", "pin-project-lite", - "socket2 0.5.6", + "socket2 0.4.10", "tokio", "tower-service", "tracing", "want", ] +[[package]] +name = "hyper" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "186548d73ac615b32a73aafe38fb4f56c0d340e110e5a200bcadbaf2e199263a" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.3", + "http 1.1.0", + "http-body 1.0.0", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", + "want", +] + [[package]] name = "hyper-proxy" version = "0.9.1" @@ -3642,9 +3732,9 @@ checksum = "ca815a891b24fdfb243fa3239c86154392b0953ee584aa1a2a1f66d20cbe75cc" dependencies = [ "bytes", "futures", - "headers", + "headers 0.3.9", "http 0.2.12", - "hyper", + "hyper 0.14.28", "tokio", "tower-service", ] @@ -3657,7 +3747,7 @@ checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", "http 0.2.12", - "hyper", + "hyper 0.14.28", "log", "rustls", "rustls-native-certs", @@ -3672,12 +3762,48 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ "bytes", - "hyper", + "hyper 0.14.28", "native-tls", "tokio", "tokio-native-tls", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper 1.2.0", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + +[[package]] +name = "hyper-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "hyper 1.2.0", + "pin-project-lite", + "socket2 0.5.6", + "tokio", + "tower", + "tower-service", + "tracing", +] + [[package]] name = "hyphenation" version = "0.8.4" @@ -3995,7 +4121,7 @@ dependencies = [ "futures-util", "getopts", "hex", - "hyper", + "hyper 0.14.28", "librespot-audio", "librespot-connect", "librespot-core", @@ -4064,7 +4190,7 @@ dependencies = [ "hmac 0.11.0", "http 0.2.12", "httparse", - "hyper", + "hyper 0.14.28", "hyper-proxy", "librespot-protocol", "log", @@ -4100,7 +4226,7 @@ dependencies = [ "form_urlencoded", "futures-core", "hmac 0.11.0", - "hyper", + "hyper 0.14.28", "libmdns", "librespot-core", "log", @@ -4203,7 +4329,7 @@ dependencies = [ "log", "parking_lot", "prost", - "reqwest", + "reqwest 0.11.26", "scopeguard", "serde", "sha2", @@ -5324,6 +5450,46 @@ name = "reqwest" version = "0.11.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62" +dependencies = [ + "base64 0.21.7", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2 0.3.25", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.28", + "hyper-tls 0.5.0", + "ipnet", + "js-sys", + "log", + "mime", + "native-tls", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls-pemfile", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "system-configuration", + "tokio", + "tokio-native-tls", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg", +] + +[[package]] +name = "reqwest" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e333b1eb9fe677f6893a9efcb0d277a2d3edd83f358a236b657c32301dc6e5f6" dependencies = [ "async-compression", "base64 0.21.7", @@ -5333,11 +5499,13 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", - "http 0.2.12", - "http-body", - "hyper", - "hyper-tls", + "h2 0.4.3", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.2.0", + "hyper-tls 0.6.0", + "hyper-util", "ipnet", "js-sys", "log", @@ -6308,6 +6476,28 @@ dependencies = [ "winnow 0.6.5", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + [[package]] name = "tower-service" version = "0.3.2" @@ -6625,9 +6815,9 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "headers", + "headers 0.3.9", "http 0.2.12", - "hyper", + "hyper 0.14.28", "log", "mime", "mime_guess", diff --git a/net/reqwest/Cargo.toml b/net/reqwest/Cargo.toml index a98d4aed..607d5cb2 100644 --- a/net/reqwest/Cargo.toml +++ b/net/reqwest/Cargo.toml @@ -10,9 +10,9 @@ rust-version.workspace = true [dependencies] url = "2.1" -reqwest = { version = "0.11", features = ["cookies", "gzip"] } +reqwest = { version = "0.12", features = ["cookies", "gzip"] } futures = "0.3" -headers = "0.3" +headers = "0.4" mime = "0.3" gst.workspace = true gst-base.workspace = true @@ -20,7 +20,10 @@ tokio = { version = "1.0", default-features = false, features = ["time", "rt-mul once_cell.workspace = true [dev-dependencies] -hyper = { version = "0.14", features = ["server"] } +hyper = { version = "1.0", features = ["server"] } +http-body-util = "0.1.1" +bytes = "1.0" +pin-project-lite = "0.2" gst.workspace = true [lib] diff --git a/net/reqwest/tests/reqwesthttpsrc.rs b/net/reqwest/tests/reqwesthttpsrc.rs index 9287822a..0e5407df 100644 --- a/net/reqwest/tests/reqwesthttpsrc.rs +++ b/net/reqwest/tests/reqwesthttpsrc.rs @@ -10,8 +10,9 @@ #![allow(clippy::single_match)] -use gst::glib; -use gst::prelude::*; +use gst::{glib, prelude::*}; +use http_body_util::combinators::BoxBody; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; use std::sync::mpsc; @@ -33,7 +34,7 @@ struct Harness { src: gst::Element, pad: gst::Pad, receiver: Option>, - rt: Option, + rt: tokio::runtime::Runtime, } /// Messages sent from our test harness @@ -46,20 +47,34 @@ enum Message { ServerError(String), } +fn full_body(s: impl Into) -> BoxBody { + use http_body_util::{BodyExt, Full}; + Full::new(s.into()).map_err(|never| match never {}).boxed() +} + +fn empty_body() -> BoxBody { + use http_body_util::{BodyExt, Empty}; + Empty::new().map_err(|never| match never {}).boxed() +} + impl Harness { /// Creates a new HTTP source and test harness around it /// /// `http_func`: Function to generate HTTP responses based on a request /// `setup_func`: Setup function for the HTTP source, should only set properties and similar fn new< - F: FnMut(hyper::Request) -> hyper::Response + Send + 'static, + F: FnMut( + hyper::Request, + ) -> hyper::Response> + + Send + + 'static, G: FnOnce(&gst::Element), >( http_func: F, setup_func: G, ) -> Harness { - use hyper::service::{make_service_fn, service_fn}; - use hyper::Server; + use hyper::server::conn::http1; + use hyper::service::service_fn; use std::sync::{Arc, Mutex}; // Create the HTTP source @@ -112,21 +127,15 @@ impl Harness { .unwrap(); // Create an HTTP sever that listens on localhost on some random, free port - let addr = ([127, 0, 0, 1], 0).into(); + let addr = std::net::SocketAddr::from(([127, 0, 0, 1], 0)); // Whenever a new client is connecting, a new service function is requested. For each // client we use the same service function, which simply calls the function used by the // test let http_func = Arc::new(Mutex::new(http_func)); - let make_service = make_service_fn(move |_ctx| { + let service = service_fn(move |req: hyper::Request| { let http_func = http_func.clone(); - async move { - let http_func = http_func.clone(); - Ok::<_, hyper::Error>(service_fn(move |req| { - let http_func = http_func.clone(); - async move { Ok::<_, hyper::Error>((*http_func.lock().unwrap())(req)) } - })) - } + async move { Ok::<_, hyper::Error>((*http_func.lock().unwrap())(req)) } }); let (local_addr_sender, local_addr_receiver) = tokio::sync::oneshot::channel(); @@ -135,13 +144,22 @@ impl Harness { rt.spawn(async move { // Bind the server, retrieve the local port that was selected in the end and set this as // the location property on the source - let server = Server::bind(&addr).serve(make_service); - let local_addr = server.local_addr(); + let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); + let local_addr = listener.local_addr().unwrap(); local_addr_sender.send(local_addr).unwrap(); - if let Err(e) = server.await { - let _ = sender.send(Message::ServerError(format!("{e:?}"))); + loop { + let (stream, _) = listener.accept().await.unwrap(); + let io = tokio_io::TokioIo::new(stream); + let service = service.clone(); + let sender = sender.clone(); + tokio::task::spawn(async move { + let http = http1::Builder::new().serve_connection(io, service); + if let Err(e) = http.await { + let _ = sender.send(Message::ServerError(format!("{e}"))); + } + }); } }); @@ -155,7 +173,7 @@ impl Harness { src, pad, receiver: Some(receiver), - rt: Some(rt), + rt, } } @@ -337,28 +355,25 @@ impl Drop for Harness { self.pad.set_active(false).unwrap(); self.src.set_state(gst::State::Null).unwrap(); - - self.rt.take().unwrap(); } } #[test] fn test_basic_request() { use std::io::{Cursor, Read}; + init(); // Set up a harness that returns "Hello World" for any HTTP request and checks if the // default headers are all sent let mut h = Harness::new( |req| { - use hyper::{Body, Response}; - let headers = req.headers(); assert_eq!(headers.get("connection").unwrap(), "keep-alive"); assert_eq!(headers.get("accept-encoding").unwrap(), "identity"); assert_eq!(headers.get("icy-metadata").unwrap(), "1"); - Response::new(Body::from("Hello World")) + hyper::Response::new(full_body("Hello World")) }, |_src| { // No additional setup needed here @@ -399,21 +414,20 @@ fn test_basic_request() { #[test] fn test_basic_request_inverted_defaults() { use std::io::{Cursor, Read}; + init(); // Set up a harness that returns "Hello World" for any HTTP request and override various // default properties to check if the corresponding headers are set correctly let mut h = Harness::new( |req| { - use hyper::{Body, Response}; - let headers = req.headers(); assert_eq!(headers.get("connection").unwrap(), "close"); assert_eq!(headers.get("accept-encoding").unwrap(), "gzip"); assert_eq!(headers.get("icy-metadata"), None); assert_eq!(headers.get("user-agent").unwrap(), "test user-agent"); - Response::new(Body::from("Hello World")) + hyper::Response::new(full_body("Hello World")) }, |src| { src.set_property("keep-alive", false); @@ -457,14 +471,13 @@ fn test_basic_request_inverted_defaults() { #[test] fn test_extra_headers() { use std::io::{Cursor, Read}; + init(); // Set up a harness that returns "Hello World" for any HTTP request and check if the // extra-headers property works correctly for setting additional headers let mut h = Harness::new( |req| { - use hyper::{Body, Response}; - let headers = req.headers(); assert_eq!(headers.get("foo").unwrap(), "bar"); assert_eq!(headers.get("baz").unwrap(), "1"); @@ -485,7 +498,7 @@ fn test_extra_headers() { vec!["1", "2"] ); - Response::new(Body::from("Hello World")) + hyper::Response::new(full_body("Hello World")) }, |src| { src.set_property( @@ -534,18 +547,17 @@ fn test_extra_headers() { #[test] fn test_cookies_property() { use std::io::{Cursor, Read}; + init(); // Set up a harness that returns "Hello World" for any HTTP request and check if the // cookies property can be used to set cookies correctly let mut h = Harness::new( |req| { - use hyper::{Body, Response}; - let headers = req.headers(); assert_eq!(headers.get("cookie").unwrap(), "foo=1; bar=2; baz=3"); - Response::new(Body::from("Hello World")) + hyper::Response::new(full_body("Hello World")) }, |src| { src.set_property( @@ -593,6 +605,7 @@ fn test_cookies_property() { #[test] fn test_iradio_mode() { use std::io::{Cursor, Read}; + init(); // Set up a harness that returns "Hello World" for any HTTP request and check if the @@ -600,18 +613,16 @@ fn test_iradio_mode() { // and put into caps/tags let mut h = Harness::new( |req| { - use hyper::{Body, Response}; - let headers = req.headers(); assert_eq!(headers.get("icy-metadata").unwrap(), "1"); - Response::builder() + hyper::Response::builder() .header("icy-metaint", "8192") .header("icy-name", "Name") .header("icy-genre", "Genre") .header("icy-url", "http://www.example.com") .header("Content-Type", "audio/mpeg; rate=44100") - .body(Body::from("Hello World")) + .body(full_body("Hello World")) .unwrap() }, |_src| { @@ -677,17 +688,16 @@ fn test_iradio_mode() { #[test] fn test_audio_l16() { use std::io::{Cursor, Read}; + init(); // Set up a harness that returns "Hello World" for any HTTP request and check if the // audio/L16 content type is parsed correctly and put into the caps let mut h = Harness::new( |_req| { - use hyper::{Body, Response}; - - Response::builder() + hyper::Response::builder() .header("Content-Type", "audio/L16; rate=48000; channels=2") - .body(Body::from("Hello World")) + .body(full_body("Hello World")) .unwrap() }, |_src| { @@ -741,25 +751,23 @@ fn test_audio_l16() { #[test] fn test_authorization() { use std::io::{Cursor, Read}; + init(); // Set up a harness that returns "Hello World" for any HTTP request // but requires authentication first let mut h = Harness::new( |req| { - use hyper::{Body, Response}; - use reqwest::StatusCode; - let headers = req.headers(); if let Some(authorization) = headers.get("authorization") { assert_eq!(authorization, "Basic dXNlcjpwYXNzd29yZA=="); - Response::new(Body::from("Hello World")) + hyper::Response::new(full_body("Hello World")) } else { - Response::builder() - .status(StatusCode::UNAUTHORIZED.as_u16()) + hyper::Response::builder() + .status(reqwest::StatusCode::UNAUTHORIZED.as_u16()) .header("WWW-Authenticate", "Basic realm=\"realm\"") - .body(Body::empty()) + .body(empty_body()) .unwrap() } }, @@ -802,17 +810,14 @@ fn test_authorization() { #[test] fn test_404_error() { - use reqwest::StatusCode; init(); // Harness that always returns 404 and we check if that is mapped to the correct error code let mut h = Harness::new( |_req| { - use hyper::{Body, Response}; - - Response::builder() - .status(StatusCode::NOT_FOUND.as_u16()) - .body(Body::empty()) + hyper::Response::builder() + .status(reqwest::StatusCode::NOT_FOUND.as_u16()) + .body(empty_body()) .unwrap() }, |_src| {}, @@ -830,17 +835,14 @@ fn test_404_error() { #[test] fn test_403_error() { - use reqwest::StatusCode; init(); // Harness that always returns 403 and we check if that is mapped to the correct error code let mut h = Harness::new( |_req| { - use hyper::{Body, Response}; - - Response::builder() - .status(StatusCode::FORBIDDEN.as_u16()) - .body(Body::empty()) + hyper::Response::builder() + .status(reqwest::StatusCode::FORBIDDEN.as_u16()) + .body(empty_body()) .unwrap() }, |_src| {}, @@ -881,13 +883,12 @@ fn test_network_error() { #[test] fn test_seek_after_ready() { use std::io::{Cursor, Read}; + init(); // Harness that checks if seeking in Ready state works correctly let mut h = Harness::new( |req| { - use hyper::{Body, Response}; - let headers = req.headers(); if let Some(range) = headers.get("Range") { if range == "bytes=123-" { @@ -896,11 +897,11 @@ fn test_seek_after_ready() { *d = ((i + 123) % 256) as u8; } - Response::builder() + hyper::Response::builder() .header("content-length", 8192 - 123) .header("accept-ranges", "bytes") .header("content-range", "bytes 123-8192/8192") - .body(Body::from(data_seek)) + .body(full_body(data_seek)) .unwrap() } else { panic!("Received an unexpected Range header") @@ -916,10 +917,10 @@ fn test_seek_after_ready() { *d = (i % 256) as u8; } - Response::builder() + hyper::Response::builder() .header("content-length", 8192) .header("accept-ranges", "bytes") - .body(Body::from(data_full)) + .body(full_body(data_full)) .unwrap() } }, @@ -961,14 +962,13 @@ fn test_seek_after_ready() { #[test] fn test_seek_after_buffer_received() { use std::io::{Cursor, Read}; + init(); // Harness that checks if seeking in Playing state after having received a buffer works // correctly let mut h = Harness::new( |req| { - use hyper::{Body, Response}; - let headers = req.headers(); if let Some(range) = headers.get("Range") { if range == "bytes=123-" { @@ -977,11 +977,11 @@ fn test_seek_after_buffer_received() { *d = ((i + 123) % 256) as u8; } - Response::builder() + hyper::Response::builder() .header("content-length", 8192 - 123) .header("accept-ranges", "bytes") .header("content-range", "bytes 123-8192/8192") - .body(Body::from(data_seek)) + .body(full_body(data_seek)) .unwrap() } else { panic!("Received an unexpected Range header") @@ -992,10 +992,10 @@ fn test_seek_after_buffer_received() { *d = (i % 256) as u8; } - Response::builder() + hyper::Response::builder() .header("content-length", 8192) .header("accept-ranges", "bytes") - .body(Body::from(data_full)) + .body(full_body(data_full)) .unwrap() } }, @@ -1038,14 +1038,13 @@ fn test_seek_after_buffer_received() { #[test] fn test_seek_with_stop_position() { use std::io::{Cursor, Read}; + init(); // Harness that checks if seeking in Playing state after having received a buffer works // correctly let mut h = Harness::new( |req| { - use hyper::{Body, Response}; - let headers = req.headers(); if let Some(range) = headers.get("Range") { if range == "bytes=123-130" { @@ -1054,11 +1053,11 @@ fn test_seek_with_stop_position() { *d = ((i + 123) % 256) as u8; } - Response::builder() + hyper::Response::builder() .header("content-length", 8) .header("accept-ranges", "bytes") .header("content-range", "bytes 123-130/8192") - .body(Body::from(data_seek)) + .body(full_body(data_seek)) .unwrap() } else { panic!("Received an unexpected Range header") @@ -1069,10 +1068,10 @@ fn test_seek_with_stop_position() { *d = (i % 256) as u8; } - Response::builder() + hyper::Response::builder() .header("content-length", 8192) .header("accept-ranges", "bytes") - .body(Body::from(data_full)) + .body(full_body(data_full)) .unwrap() } }, @@ -1131,11 +1130,9 @@ fn test_cookies() { // client let mut h = Harness::new( |_req| { - use hyper::{Body, Response}; - - Response::builder() + hyper::Response::builder() .header("Set-Cookie", "foo=bar") - .body(Body::from("Hello World")) + .body(full_body("Hello World")) .unwrap() }, |_src| { @@ -1158,8 +1155,6 @@ fn test_cookies() { // client provides the cookie that was set in the previous request let mut h2 = Harness::new( |req| { - use hyper::{Body, Response}; - let headers = req.headers(); let cookies = headers .get("Cookie") @@ -1167,8 +1162,8 @@ fn test_cookies() { .to_str() .unwrap(); assert!(cookies.split(';').any(|c| c == "foo=bar")); - Response::builder() - .body(Body::from("Hello again!")) + hyper::Response::builder() + .body(full_body("Hello again!")) .unwrap() }, |_src| { @@ -1224,63 +1219,76 @@ fn test_proxy_prop_souphttpsrc_compatibility() { fn test_proxy() { init(); - // Simplest possible implementation of naive oneshot proxy server? - // Listen on socket before spawning thread (we won't error out with connection refused). - let incoming = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); - let proxy_addr = incoming.local_addr().unwrap(); - println!("listening on {proxy_addr}, starting proxy server"); - let proxy_server = std::thread::spawn(move || { - use std::io::*; - println!("awaiting connection to proxy server"); - let (mut conn, _addr) = incoming.accept().unwrap(); - - println!("client connected, reading request line"); - let mut reader = BufReader::new(conn.try_clone().unwrap()); - let mut buf = String::new(); - reader.read_line(&mut buf).unwrap(); - let parts: Vec<&str> = buf.split(' ').collect(); - let url = reqwest::Url::parse(parts[1]).unwrap(); - let host = format!( - "{}:{}", - url.host_str().unwrap(), - url.port_or_known_default().unwrap() - ); - - println!("connecting to target server {host}"); - let mut server_connection = std::net::TcpStream::connect(host).unwrap(); - - println!("connected to target server, sending modified request line"); - server_connection - .write_all(format!("{} {} {}\r\n", parts[0], url.path(), parts[2]).as_bytes()) - .unwrap(); - - println!("sent modified request line, forwarding data in both directions"); - let send_join_handle = { - let mut server_connection = server_connection.try_clone().unwrap(); - std::thread::spawn(move || { - copy(&mut reader, &mut server_connection).unwrap(); - }) - }; - copy(&mut server_connection, &mut conn).unwrap(); - send_join_handle.join().unwrap(); - println!("shutting down proxy server"); - }); - let mut h = Harness::new( |_req| { - use hyper::{Body, Response}; - - Response::builder() - .body(Body::from("Hello Proxy World")) + hyper::Response::builder() + .body(full_body("Hello Proxy World")) .unwrap() }, - |src| { - src.set_property("proxy", proxy_addr.to_string()); - }, + |_src| {}, ); + // Simplest possible implementation of naive oneshot proxy server? + // Listen on socket before spawning thread (we won't error out with connection refused). + let (proxy_handle, proxy_addr) = { + let (proxy_addr_sender, proxy_addr_receiver) = tokio::sync::oneshot::channel(); + + let proxy_handle = h.rt.spawn(async move { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let proxy_addr = listener.local_addr().unwrap(); + println!("listening on {proxy_addr}, starting proxy server"); + + proxy_addr_sender.send(proxy_addr).unwrap(); + + println!("awaiting connection to proxy server"); + let (conn, _addr) = listener.accept().await.unwrap(); + + let (conn_reader, mut conn_writer) = tokio::io::split(conn); + println!("client connected, reading request line"); + let mut reader = tokio::io::BufReader::new(conn_reader); + let mut buf = String::new(); + reader.read_line(&mut buf).await.unwrap(); + let parts: Vec<&str> = buf.split(' ').collect(); + let url = reqwest::Url::parse(parts[1]).unwrap(); + let host = format!( + "{}:{}", + url.host_str().unwrap(), + url.port_or_known_default().unwrap() + ); + + println!("connecting to target server {host}"); + let mut server_connection = tokio::net::TcpStream::connect(host).await.unwrap(); + + println!("connected to target server, sending modified request line"); + server_connection + .write_all(format!("{} {} {}", parts[0], url.path(), parts[2]).as_bytes()) + .await + .unwrap(); + + let (mut server_reader, mut server_writer) = tokio::io::split(server_connection); + + println!("sent modified request line, forwarding data in both directions"); + let send_join_handle = tokio::task::spawn(async move { + tokio::io::copy(&mut reader, &mut server_writer) + .await + .unwrap(); + }); + tokio::io::copy(&mut server_reader, &mut conn_writer) + .await + .unwrap(); + send_join_handle.await.unwrap(); + println!("shutting down proxy server"); + }); + + ( + proxy_handle, + futures::executor::block_on(proxy_addr_receiver).unwrap(), + ) + }; + // Set the HTTP source to Playing so that everything can start. - h.run(|src| { + h.run(move |src| { + src.set_property("proxy", proxy_addr.to_string()); src.set_state(gst::State::Playing).unwrap(); }); @@ -1292,5 +1300,90 @@ fn test_proxy() { assert_eq!(num_bytes, "Hello Proxy World".len()); // Don't leave threads hanging around. - proxy_server.join().unwrap(); + proxy_handle.abort(); + let _ = futures::executor::block_on(proxy_handle); +} + +/// Adapter from tokio IO traits to hyper IO traits. +mod tokio_io { + use pin_project_lite::pin_project; + use std::pin::Pin; + use std::task::{Context, Poll}; + + pin_project! { + #[derive(Debug)] + pub struct TokioIo { + #[pin] + inner: T, + } + } + + impl TokioIo { + pub fn new(inner: T) -> Self { + Self { inner } + } + } + + impl hyper::rt::Read for TokioIo + where + T: tokio::io::AsyncRead, + { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + mut buf: hyper::rt::ReadBufCursor<'_>, + ) -> Poll> { + let n = unsafe { + let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut()); + match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) { + Poll::Ready(Ok(())) => tbuf.filled().len(), + other => return other, + } + }; + + unsafe { + buf.advance(n); + } + Poll::Ready(Ok(())) + } + } + + impl hyper::rt::Write for TokioIo + where + T: tokio::io::AsyncWrite, + { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + tokio::io::AsyncWrite::poll_flush(self.project().inner, cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx) + } + + fn is_write_vectored(&self) -> bool { + tokio::io::AsyncWrite::is_write_vectored(&self.inner) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs) + } + } }