From 672cb730a8b70c9989d4954c94fbca5d07417d20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 30 Dec 2019 16:00:12 +0200 Subject: [PATCH] reqwest: Update to reqwest 0.10 / tokio 0.2 --- gst-plugin-reqwest/Cargo.toml | 14 +- gst-plugin-reqwest/src/lib.rs | 6 - gst-plugin-reqwest/src/reqwesthttpsrc.rs | 394 +++++++++++---------- gst-plugin-reqwest/tests/reqwesthttpsrc.rs | 45 ++- 4 files changed, 246 insertions(+), 213 deletions(-) diff --git a/gst-plugin-reqwest/Cargo.toml b/gst-plugin-reqwest/Cargo.toml index 0f11f4f6..f766678f 100644 --- a/gst-plugin-reqwest/Cargo.toml +++ b/gst-plugin-reqwest/Cargo.toml @@ -8,19 +8,19 @@ description = "Rust HTTP Plugin" edition = "2018" [dependencies] -url = "1.7" +url = "2.1" glib = { git = "https://github.com/gtk-rs/glib" } -reqwest = "0.9" -futures = "0.1.23" -hyperx = "0.15" +reqwest = { version = "0.10", features = ["cookies", "gzip"] } +futures = "0.3" +headers = "0.3" +mime = "0.3" gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_10"] } gstreamer-base = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } -tokio = "0.1" -bytes = "0.4" +tokio = { version = "0.2", features = ["time", "rt-threaded"] } lazy_static = "1.0" [dev-dependencies] -hyper = "0.12" +hyper = "0.13.0-alpha" [lib] name = "gstreqwest" diff --git a/gst-plugin-reqwest/src/lib.rs b/gst-plugin-reqwest/src/lib.rs index f5bf8945..7960329b 100644 --- a/gst-plugin-reqwest/src/lib.rs +++ b/gst-plugin-reqwest/src/lib.rs @@ -12,13 +12,7 @@ extern crate glib; #[macro_use] extern crate gstreamer as gst; -extern crate bytes; -extern crate futures; extern crate gstreamer_base as gst_base; -extern crate hyperx; -extern crate reqwest; -extern crate tokio; -extern crate url; #[macro_use] extern crate lazy_static; diff --git a/gst-plugin-reqwest/src/reqwesthttpsrc.rs b/gst-plugin-reqwest/src/reqwesthttpsrc.rs index 678deff9..244382ae 100644 --- a/gst-plugin-reqwest/src/reqwesthttpsrc.rs +++ b/gst-plugin-reqwest/src/reqwesthttpsrc.rs @@ -5,17 +5,13 @@ // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. -use bytes::Bytes; -use futures::future::Either; -use futures::sync::oneshot; -use futures::{Future, Stream}; -use reqwest::r#async::{Client, Decoder}; -use reqwest::StatusCode; -use std::mem; use std::sync::{Arc, Mutex}; use std::time::Duration; use std::u64; -use tokio::prelude::*; + +use futures::future; +use futures::prelude::*; +use reqwest::{Client, Response, StatusCode}; use tokio::runtime; use url::Url; @@ -201,7 +197,7 @@ enum State { Stopped, Started { uri: Url, - body: Option, + response: Option, seekable: bool, position: u64, size: Option, @@ -224,8 +220,7 @@ pub struct ReqwestHttpSrc { external_client: Mutex>, settings: Mutex, state: Mutex, - runtime: runtime::Runtime, - canceller: Mutex>>, + canceller: Mutex>, } lazy_static! { @@ -234,6 +229,12 @@ lazy_static! { gst::DebugColorFlags::empty(), Some("Rust HTTP source"), ); + static ref RUNTIME: runtime::Runtime = runtime::Builder::new() + .threaded_scheduler() + .enable_all() + .core_threads(1) + .build() + .unwrap(); } impl ReqwestHttpSrc { @@ -352,11 +353,10 @@ impl ReqwestHttpSrc { start: u64, stop: Option, ) -> Result> { - use hyperx::header::{ - qitem, AcceptEncoding, AcceptRanges, ByteRangeSpec, Connection, ContentLength, - ContentRange, ContentRangeSpec, ContentType, Cookie, Encoding, Headers, Range, - RangeUnit, RawLike, UserAgent, + use headers::{ + AcceptRanges, Connection, ContentLength, ContentRange, HeaderMapExt, Range, UserAgent, }; + use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; gst_debug!(CAT, obj: src, "Creating new request for {}", uri); @@ -366,115 +366,133 @@ impl ReqwestHttpSrc { }; let settings = self.settings.lock().unwrap().clone(); - let mut headers = Headers::new(); + let mut headers = HeaderMap::new(); if settings.keep_alive { - headers.set(Connection::keep_alive()); + headers.typed_insert(Connection::keep_alive()); } else { - headers.set(Connection::close()); + headers.typed_insert(Connection::close()); } match (start != 0, stop) { (false, None) => (), (true, None) => { - headers.set(Range::Bytes(vec![ByteRangeSpec::AllFrom(start)])); + headers.typed_insert(Range::bytes(start..).unwrap()); } (_, Some(stop)) => { - headers.set(Range::Bytes(vec![ByteRangeSpec::FromTo(start, stop - 1)])); + headers.typed_insert(Range::bytes(start..stop).unwrap()); } } - headers.set(UserAgent::new(settings.user_agent.to_owned())); + if let Ok(user_agent) = settings.user_agent.parse::() { + headers.typed_insert(user_agent); + } else { + gst_warning!( + CAT, + obj: src, + "Failed to transform user-agent '{}' to header value", + settings.user_agent + ); + } if !settings.compress { // Compression is the default - headers.set(AcceptEncoding(vec![qitem(Encoding::Identity)])); + headers.append("accept-encoding", "identity".parse().unwrap()); }; if let Some(ref extra_headers) = settings.extra_headers { + use std::convert::TryFrom; + for (field, value) in extra_headers.iter() { - if let Ok(Some(values)) = value.get::() { - for value in values.as_slice() { - if let Some(value) = value.transform::() { - let value = value.get::<&str>().unwrap().unwrap_or(""); - gst_debug!( - CAT, - obj: src, - "Appending extra-header: {}: {}", - field, - value - ); - headers.append_raw(String::from(field), value); - } else { + let field = match HeaderName::try_from(field) { + Ok(field) => field, + Err(err) => { + gst_warning!( + CAT, + obj: src, + "Failed to transform extra-header field name '{}' to header name: {}", + field, + err, + ); + + continue; + } + }; + + let mut append_header = |field: &HeaderName, value: &glib::Value| { + let value = match value.transform::() { + Some(value) => value, + None => { gst_warning!( CAT, obj: src, - "Failed to transform extra-header '{}' to string", + "Failed to transform extra-header '{}' value to string", field ); + return; } + }; + + let value = value.get::<&str>().unwrap().unwrap_or(""); + + let value = match HeaderValue::from_str(value) { + Ok(value) => value, + Err(_) => { + gst_warning!( + CAT, + obj: src, + "Failed to transform extra-header '{}' value to header value", + field + ); + return; + } + }; + + headers.append(field.clone(), value); + }; + + if let Ok(Some(values)) = value.get::() { + for value in values.as_slice() { + append_header(&field, value); } } else if let Ok(Some(values)) = value.get::() { for value in values.as_slice() { - if let Some(value) = value.transform::() { - let value = value.get::<&str>().unwrap().unwrap_or(""); - gst_debug!( - CAT, - obj: src, - "Appending extra-header: {}: {}", - field, - value - ); - headers.append_raw(String::from(field), value); - } else { - gst_warning!( - CAT, - obj: src, - "Failed to transform extra-header '{}' to string", - field - ); - } + append_header(&field, value); } - } else if let Some(value) = value.transform::() { - let value = value.get::<&str>().unwrap().unwrap_or(""); - gst_debug!( - CAT, - obj: src, - "Appending extra-header: {}: {}", - field, - value - ); - headers.append_raw(String::from(field), value); } else { - gst_warning!( - CAT, - obj: src, - "Failed to transform extra-header '{}' to string", - field - ); + append_header(&field, value); } } } if !settings.cookies.is_empty() { - let mut cookies = Cookie::new(); + let mut cookies = String::new(); for cookie in settings.cookies { let mut split = cookie.splitn(2, '='); let key = split.next(); let value = split.next(); if let (Some(key), Some(value)) = (key, value) { - cookies.append(String::from(key), String::from(value)); + if !cookies.is_empty() { + cookies.push_str("; "); + } + cookies.push_str(key); + cookies.push('='); + cookies.push_str(value); } } - headers.set(cookies); + if let Ok(cookies) = HeaderValue::from_str(&cookies) { + headers.append("cookie", cookies); + } else { + gst_warning!(CAT, obj: src, "Failed to convert cookies into header value",); + } } if settings.iradio_mode { - headers.append_raw("icy-metadata", "1"); + headers.append("icy-metadata", "1".parse().unwrap()); } // Add all headers for the request here - let req = req.headers(headers.into()); + let req = req.headers(headers); let req = if let Some(ref user_id) = settings.user_id { // HTTP auth available @@ -485,15 +503,17 @@ impl ReqwestHttpSrc { gst_debug!(CAT, obj: src, "Sending new request: {:?}", req); - let uri_clone = uri.clone(); - let res = self.wait(req.send().map_err(move |err| { - gst_error_msg!( - gst::ResourceError::OpenRead, - ["Failed to fetch {}: {:?}", uri_clone, err] - ) - })); + let future = async { + req.send().await.map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Failed to fetch {}: {:?}", uri, err] + ) + }) + }; + let res = self.wait(future); - let mut res = match res { + let res = match res { Ok(res) => res, Err(Some(err)) => { gst_debug!(CAT, obj: src, "Error {:?}", err); @@ -536,21 +556,14 @@ impl ReqwestHttpSrc { } } - let headers = Headers::from(res.headers()); - let size = headers.get().map(|&ContentLength(cl)| cl + start); - - let accept_byte_ranges = if let Some(&AcceptRanges(ref ranges)) = headers.get() { - ranges.iter().any(|u| *u == RangeUnit::Bytes) - } else { - false - }; - + let headers = res.headers(); + let size = headers.typed_get().map(|ContentLength(cl)| cl + start); + let accept_byte_ranges = headers.typed_get() == Some(AcceptRanges::bytes()); let seekable = size.is_some() && accept_byte_ranges; - let position = if let Some(&ContentRange(ContentRangeSpec::Bytes { - range: Some((range_start, _)), - .. - })) = headers.get() + let position = if let Some((range_start, _range_end)) = headers + .typed_get() + .and_then(|h| ContentRange::bytes_range(&h)) { range_start } else { @@ -565,9 +578,8 @@ impl ReqwestHttpSrc { } let mut caps = headers - .get_raw("icy-metaint") - .and_then(|h| h.one()) - .and_then(|s| std::str::from_utf8(s).ok()) + .get("icy-metaint") + .and_then(|s| s.to_str().ok()) .and_then(|s| s.parse::().ok()) .map(|icy_metaint| { gst::Caps::builder("application/x-icy") @@ -575,7 +587,11 @@ impl ReqwestHttpSrc { .build() }); - if let Some(ContentType(ref content_type)) = headers.get() { + if let Some(content_type) = headers + .get("content-type") + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()) + { gst_debug!(CAT, obj: src, "Got content type {}", content_type); if let Some(ref mut caps) = caps { let caps = caps.get_mut().unwrap(); @@ -606,38 +622,24 @@ impl ReqwestHttpSrc { { let tags = tags.get_mut().unwrap(); - if let Some(ref icy_name) = headers - .get_raw("icy-name") - .and_then(|h| h.one()) - .and_then(|s| std::str::from_utf8(s).ok()) - { + if let Some(ref icy_name) = headers.get("icy-name").and_then(|s| s.to_str().ok()) { tags.add::(icy_name, gst::TagMergeMode::Replace); } - if let Some(ref icy_genre) = headers - .get_raw("icy-genre") - .and_then(|h| h.one()) - .and_then(|s| std::str::from_utf8(s).ok()) - { + if let Some(ref icy_genre) = headers.get("icy-genre").and_then(|s| s.to_str().ok()) { tags.add::(icy_genre, gst::TagMergeMode::Replace); } - if let Some(ref icy_url) = headers - .get_raw("icy-url") - .and_then(|h| h.one()) - .and_then(|s| std::str::from_utf8(s).ok()) - { + if let Some(ref icy_url) = headers.get("icy-url").and_then(|s| s.to_str().ok()) { tags.add::(icy_url, gst::TagMergeMode::Replace); } } gst_debug!(CAT, obj: src, "Request successful"); - let body = mem::replace(res.body_mut(), Decoder::empty()); - Ok(State::Started { uri, - body: Some(body), + response: Some(res), seekable, position, size, @@ -648,59 +650,81 @@ impl ReqwestHttpSrc { }) } - fn wait(&self, future: F) -> Result> + fn wait(&self, future: F) -> Result> where - F: Send + Future + 'static, - F::Item: Send, + F: Send + Future>, + T: Send + 'static, { let timeout = self.settings.lock().unwrap().timeout; - let mut canceller = self.canceller.lock().unwrap(); - let (sender, receiver) = oneshot::channel::(); - canceller.replace(sender); + let mut canceller = self.canceller.lock().unwrap(); + let (abort_handle, abort_registration) = future::AbortHandle::new_pair(); + canceller.replace(abort_handle); drop(canceller); - // wrapping timeout around future - let future_timeout = if timeout == 0 { - Either::A(future) - } else { - Either::B( - future - .timeout(Duration::from_secs(timeout.into())) - .map_err(|err| { - if err.is_elapsed() { - gst_error_msg!(gst::ResourceError::Read, ["Request timeout"]) - } else if err.is_inner() { - err.into_inner().unwrap() - } else { - gst_error_msg!(gst::ResourceError::Read, ["Timer error: {}", err]) - } - }), - ) + // Wrap in a timeout + let future = async { + if timeout == 0 { + future.await + } else { + let res = tokio::time::timeout(Duration::from_secs(timeout.into()), future).await; + + match res { + Ok(res) => res, + Err(_) => Err(gst_error_msg!( + gst::ResourceError::Read, + ["Request timeout"] + )), + } + } }; - let unlock_error = gst_error_msg!(gst::ResourceError::Busy, ["unlock"]); + // And make abortable + let future = async { + match future::Abortable::new(future, abort_registration).await { + Ok(res) => res.map_err(Some), + Err(_) => Err(None), + } + }; - let res = oneshot::spawn(future_timeout, &self.runtime.executor()) - .select(receiver.then(|_| Err(unlock_error.clone()))) - .wait() - .map(|v| v.0) - .map_err(|err| { - if err.0 == unlock_error { - None - } else { - Some(err.0) - } - }); + let res = match block_on(&*RUNTIME, future) { + Ok(res) => res, + Err(_) => Err(Some(gst_error_msg!( + gst::ResourceError::Read, + ["Join error"] + ))), + }; /* Clear out the canceller */ - canceller = self.canceller.lock().unwrap(); - *canceller = None; + let _ = self.canceller.lock().unwrap().take(); res } } +// Until tokio 0.2.0-alpha.6 `Runtime::block_on()` didn't require a mutable reference +// to the runtime. Now it does and we need to work around that. +// See https://github.com/tokio-rs/tokio/issues/2042 +fn block_on( + runtime: &tokio::runtime::Runtime, + future: F, +) -> Result +where + F: Send + Future, + F::Output: Send + 'static, +{ + use futures::task::FutureObj; + use std::mem; + + let future = FutureObj::new(Box::pin(future)); + + // We make sure here to block until the future is completely handled before returning + let future = unsafe { mem::transmute::<_, FutureObj<'static, _>>(future) }; + + let join_handle = runtime.spawn(future); + futures::executor::block_on(join_handle) +} + impl ObjectImpl for ReqwestHttpSrc { glib_object_impl!(); @@ -880,6 +904,14 @@ impl BaseSrcImpl for ReqwestHttpSrc { } } + fn unlock(&self, _src: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { + let canceller = self.canceller.lock().unwrap(); + if let Some(ref canceller) = *canceller { + canceller.abort(); + } + Ok(()) + } + fn start(&self, src: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { let mut state = self.state.lock().unwrap(); @@ -983,14 +1015,14 @@ impl BaseSrcImpl for ReqwestHttpSrc { ) -> Result { let mut state = self.state.lock().unwrap(); - let (body, position, caps, tags) = match *state { + let (response, position, caps, tags) = match *state { State::Started { - ref mut body, + ref mut response, ref mut position, ref mut tags, ref mut caps, .. - } => (body, position, caps, tags), + } => (response, position, caps, tags), State::Stopped => { gst_element_error!(src, gst::LibraryError::Failed, ["Not started yet"]); @@ -1008,15 +1040,11 @@ impl BaseSrcImpl for ReqwestHttpSrc { return Err(gst::FlowError::Error); } - let current_body = match body.take() { - Some(body) => body, + let mut current_response = match response.take() { + Some(response) => response, None => { - gst_error!(CAT, obj: src, "Don't have a response body"); - gst_element_error!( - src, - gst::ResourceError::Read, - ["Don't have a response body"] - ); + gst_error!(CAT, obj: src, "Don't have a response"); + gst_element_error!(src, gst::ResourceError::Read, ["Don't have a response"]); return Err(gst::FlowError::Error); } @@ -1038,12 +1066,15 @@ impl BaseSrcImpl for ReqwestHttpSrc { pad.push_event(gst::Event::new_tag(tags).build()); } - let res = self.wait(current_body.into_future().map_err(move |(err, _body)| { - gst_error_msg!( - gst::ResourceError::Read, - ["Failed to read chunk at offset {}: {:?}", offset, err] - ) - })); + let future = async { + current_response.chunk().await.map_err(move |err| { + gst_error_msg!( + gst::ResourceError::Read, + ["Failed to read chunk at offset {}: {:?}", offset, err] + ) + }) + }; + let res = self.wait(future); let res = match res { Ok(res) => res, @@ -1059,12 +1090,12 @@ impl BaseSrcImpl for ReqwestHttpSrc { }; let mut state = self.state.lock().unwrap(); - let (body, position) = match *state { + let (response, position) = match *state { State::Started { - ref mut body, + ref mut response, ref mut position, .. - } => (body, position), + } => (response, position), State::Stopped => { gst_element_error!(src, gst::LibraryError::Failed, ["Not started yet"]); @@ -1073,7 +1104,7 @@ impl BaseSrcImpl for ReqwestHttpSrc { }; match res { - (Some(chunk), current_body) => { + Some(chunk) => { /* do something with the chunk and store the body again in the state */ gst_trace!( @@ -1090,7 +1121,7 @@ impl BaseSrcImpl for ReqwestHttpSrc { let mut buffer = gst::Buffer::from_slice(chunk); - *body = Some(current_body); + *response = Some(current_response); { let buffer = buffer.get_mut().unwrap(); @@ -1100,10 +1131,10 @@ impl BaseSrcImpl for ReqwestHttpSrc { Ok(buffer) } - (None, current_body) => { + None => { /* No further data, end of stream */ gst_debug!(CAT, obj: src, "End of stream"); - *body = Some(current_body); + *response = Some(current_response); Err(gst::FlowError::Eos) } } @@ -1146,11 +1177,6 @@ impl ObjectSubclass for ReqwestHttpSrc { external_client: Mutex::new(None), settings: Mutex::new(Default::default()), state: Mutex::new(Default::default()), - runtime: runtime::Builder::new() - .core_threads(1) - .name_prefix("gst-http-tokio") - .build() - .unwrap(), canceller: Mutex::new(None), } } diff --git a/gst-plugin-reqwest/tests/reqwesthttpsrc.rs b/gst-plugin-reqwest/tests/reqwesthttpsrc.rs index 65f097d7..7bd3299f 100644 --- a/gst-plugin-reqwest/tests/reqwesthttpsrc.rs +++ b/gst-plugin-reqwest/tests/reqwesthttpsrc.rs @@ -51,10 +51,9 @@ impl Harness { http_func: F, setup_func: G, ) -> Harness { - use hyper::service::{make_service_fn, service_fn_ok}; + use hyper::service::{make_service_fn, service_fn}; use hyper::Server; use std::sync::{Arc, Mutex}; - use tokio::prelude::*; // Create the HTTP source let src = gst::ElementFactory::make("reqwesthttpsrc", None).unwrap(); @@ -94,8 +93,10 @@ impl Harness { pad.set_active(true).unwrap(); // Create the tokio runtime used for the HTTP server in this test - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new() .core_threads(1) + .enable_all() + .threaded_scheduler() .build() .unwrap(); @@ -108,20 +109,34 @@ impl Harness { let http_func = Arc::new(Mutex::new(http_func)); let make_service = make_service_fn(move |_ctx| { let http_func = http_func.clone(); - service_fn_ok(move |req| (&mut *http_func.lock().unwrap())(req)) + async move { + let http_func = http_func.clone(); + Ok::<_, hyper::Error>(service_fn(move |req| { + let http_func = http_func.clone(); + async move { Ok::<_, hyper::Error>((&mut *http_func.lock().unwrap())(req)) } + })) + } }); - // Bind the server, retrieve the local port that was selected in the end and set this as - // the location property on the source - let server = Server::bind(&addr).serve(make_service); - let local_addr = server.local_addr(); - src.set_property("location", &format!("http://{}/", local_addr)) - .unwrap(); + let (local_addr_sender, local_addr_receiver) = tokio::sync::oneshot::channel(); // Spawn the server in the background so that it can handle requests - rt.spawn(server.map_err(move |e| { - let _ = sender.send(Message::ServerError(format!("{:?}", e))); - })); + rt.spawn(async move { + // Bind the server, retrieve the local port that was selected in the end and set this as + // the location property on the source + let server = Server::bind(&addr).serve(make_service); + let local_addr = server.local_addr(); + + local_addr_sender.send(local_addr).unwrap(); + + if let Err(e) = server.await { + let _ = sender.send(Message::ServerError(format!("{:?}", e))); + } + }); + + let local_addr = futures::executor::block_on(local_addr_receiver).unwrap(); + src.set_property("location", &format!("http://{}/", local_addr)) + .unwrap(); // Let the test setup anything needed on the HTTP source now setup_func(&src); @@ -301,8 +316,6 @@ impl Harness { impl Drop for Harness { fn drop(&mut self) { - use tokio::prelude::*; - // Shut down everything that was set up for this test harness // and wait until the tokio runtime exited let bus = self.src.get_bus().unwrap(); @@ -316,7 +329,7 @@ impl Drop for Harness { self.pad.set_active(false).unwrap(); self.src.set_state(gst::State::Null).unwrap(); - self.rt.take().unwrap().shutdown_now().wait().unwrap(); + self.rt.take().unwrap(); } }