reqwest: Update to reqwest 0.10 / tokio 0.2

This commit is contained in:
Sebastian Dröge 2019-12-30 16:00:12 +02:00
parent 9d659fbd00
commit 672cb730a8
4 changed files with 246 additions and 213 deletions

View file

@ -8,19 +8,19 @@ description = "Rust HTTP Plugin"
edition = "2018"
[dependencies]
url = "1.7"
url = "2.1"
glib = { git = "https://github.com/gtk-rs/glib" }
reqwest = "0.9"
futures = "0.1.23"
hyperx = "0.15"
reqwest = { version = "0.10", features = ["cookies", "gzip"] }
futures = "0.3"
headers = "0.3"
mime = "0.3"
gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_10"] }
gstreamer-base = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
tokio = "0.1"
bytes = "0.4"
tokio = { version = "0.2", features = ["time", "rt-threaded"] }
lazy_static = "1.0"
[dev-dependencies]
hyper = "0.12"
hyper = "0.13.0-alpha"
[lib]
name = "gstreqwest"

View file

@ -12,13 +12,7 @@
extern crate glib;
#[macro_use]
extern crate gstreamer as gst;
extern crate bytes;
extern crate futures;
extern crate gstreamer_base as gst_base;
extern crate hyperx;
extern crate reqwest;
extern crate tokio;
extern crate url;
#[macro_use]
extern crate lazy_static;

View file

@ -5,17 +5,13 @@
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use bytes::Bytes;
use futures::future::Either;
use futures::sync::oneshot;
use futures::{Future, Stream};
use reqwest::r#async::{Client, Decoder};
use reqwest::StatusCode;
use std::mem;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::u64;
use tokio::prelude::*;
use futures::future;
use futures::prelude::*;
use reqwest::{Client, Response, StatusCode};
use tokio::runtime;
use url::Url;
@ -201,7 +197,7 @@ enum State {
Stopped,
Started {
uri: Url,
body: Option<Decoder>,
response: Option<Response>,
seekable: bool,
position: u64,
size: Option<u64>,
@ -224,8 +220,7 @@ pub struct ReqwestHttpSrc {
external_client: Mutex<Option<ClientContext>>,
settings: Mutex<Settings>,
state: Mutex<State>,
runtime: runtime::Runtime,
canceller: Mutex<Option<oneshot::Sender<Bytes>>>,
canceller: Mutex<Option<future::AbortHandle>>,
}
lazy_static! {
@ -234,6 +229,12 @@ lazy_static! {
gst::DebugColorFlags::empty(),
Some("Rust HTTP source"),
);
static ref RUNTIME: runtime::Runtime = runtime::Builder::new()
.threaded_scheduler()
.enable_all()
.core_threads(1)
.build()
.unwrap();
}
impl ReqwestHttpSrc {
@ -352,11 +353,10 @@ impl ReqwestHttpSrc {
start: u64,
stop: Option<u64>,
) -> Result<State, Option<gst::ErrorMessage>> {
use hyperx::header::{
qitem, AcceptEncoding, AcceptRanges, ByteRangeSpec, Connection, ContentLength,
ContentRange, ContentRangeSpec, ContentType, Cookie, Encoding, Headers, Range,
RangeUnit, RawLike, UserAgent,
use headers::{
AcceptRanges, Connection, ContentLength, ContentRange, HeaderMapExt, Range, UserAgent,
};
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
gst_debug!(CAT, obj: src, "Creating new request for {}", uri);
@ -366,115 +366,133 @@ impl ReqwestHttpSrc {
};
let settings = self.settings.lock().unwrap().clone();
let mut headers = Headers::new();
let mut headers = HeaderMap::new();
if settings.keep_alive {
headers.set(Connection::keep_alive());
headers.typed_insert(Connection::keep_alive());
} else {
headers.set(Connection::close());
headers.typed_insert(Connection::close());
}
match (start != 0, stop) {
(false, None) => (),
(true, None) => {
headers.set(Range::Bytes(vec![ByteRangeSpec::AllFrom(start)]));
headers.typed_insert(Range::bytes(start..).unwrap());
}
(_, Some(stop)) => {
headers.set(Range::Bytes(vec![ByteRangeSpec::FromTo(start, stop - 1)]));
headers.typed_insert(Range::bytes(start..stop).unwrap());
}
}
headers.set(UserAgent::new(settings.user_agent.to_owned()));
if let Ok(user_agent) = settings.user_agent.parse::<UserAgent>() {
headers.typed_insert(user_agent);
} else {
gst_warning!(
CAT,
obj: src,
"Failed to transform user-agent '{}' to header value",
settings.user_agent
);
}
if !settings.compress {
// Compression is the default
headers.set(AcceptEncoding(vec![qitem(Encoding::Identity)]));
headers.append("accept-encoding", "identity".parse().unwrap());
};
if let Some(ref extra_headers) = settings.extra_headers {
use std::convert::TryFrom;
for (field, value) in extra_headers.iter() {
if let Ok(Some(values)) = value.get::<gst::Array>() {
for value in values.as_slice() {
if let Some(value) = value.transform::<String>() {
let value = value.get::<&str>().unwrap().unwrap_or("");
gst_debug!(
CAT,
obj: src,
"Appending extra-header: {}: {}",
field,
value
);
headers.append_raw(String::from(field), value);
} else {
let field = match HeaderName::try_from(field) {
Ok(field) => field,
Err(err) => {
gst_warning!(
CAT,
obj: src,
"Failed to transform extra-header field name '{}' to header name: {}",
field,
err,
);
continue;
}
};
let mut append_header = |field: &HeaderName, value: &glib::Value| {
let value = match value.transform::<String>() {
Some(value) => value,
None => {
gst_warning!(
CAT,
obj: src,
"Failed to transform extra-header '{}' to string",
"Failed to transform extra-header '{}' value to string",
field
);
return;
}
};
let value = value.get::<&str>().unwrap().unwrap_or("");
let value = match HeaderValue::from_str(value) {
Ok(value) => value,
Err(_) => {
gst_warning!(
CAT,
obj: src,
"Failed to transform extra-header '{}' value to header value",
field
);
return;
}
};
headers.append(field.clone(), value);
};
if let Ok(Some(values)) = value.get::<gst::Array>() {
for value in values.as_slice() {
append_header(&field, value);
}
} else if let Ok(Some(values)) = value.get::<gst::List>() {
for value in values.as_slice() {
if let Some(value) = value.transform::<String>() {
let value = value.get::<&str>().unwrap().unwrap_or("");
gst_debug!(
CAT,
obj: src,
"Appending extra-header: {}: {}",
field,
value
);
headers.append_raw(String::from(field), value);
} else {
gst_warning!(
CAT,
obj: src,
"Failed to transform extra-header '{}' to string",
field
);
}
append_header(&field, value);
}
} else if let Some(value) = value.transform::<String>() {
let value = value.get::<&str>().unwrap().unwrap_or("");
gst_debug!(
CAT,
obj: src,
"Appending extra-header: {}: {}",
field,
value
);
headers.append_raw(String::from(field), value);
} else {
gst_warning!(
CAT,
obj: src,
"Failed to transform extra-header '{}' to string",
field
);
append_header(&field, value);
}
}
}
if !settings.cookies.is_empty() {
let mut cookies = Cookie::new();
let mut cookies = String::new();
for cookie in settings.cookies {
let mut split = cookie.splitn(2, '=');
let key = split.next();
let value = split.next();
if let (Some(key), Some(value)) = (key, value) {
cookies.append(String::from(key), String::from(value));
if !cookies.is_empty() {
cookies.push_str("; ");
}
cookies.push_str(key);
cookies.push('=');
cookies.push_str(value);
}
}
headers.set(cookies);
if let Ok(cookies) = HeaderValue::from_str(&cookies) {
headers.append("cookie", cookies);
} else {
gst_warning!(CAT, obj: src, "Failed to convert cookies into header value",);
}
}
if settings.iradio_mode {
headers.append_raw("icy-metadata", "1");
headers.append("icy-metadata", "1".parse().unwrap());
}
// Add all headers for the request here
let req = req.headers(headers.into());
let req = req.headers(headers);
let req = if let Some(ref user_id) = settings.user_id {
// HTTP auth available
@ -485,15 +503,17 @@ impl ReqwestHttpSrc {
gst_debug!(CAT, obj: src, "Sending new request: {:?}", req);
let uri_clone = uri.clone();
let res = self.wait(req.send().map_err(move |err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to fetch {}: {:?}", uri_clone, err]
)
}));
let future = async {
req.send().await.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to fetch {}: {:?}", uri, err]
)
})
};
let res = self.wait(future);
let mut res = match res {
let res = match res {
Ok(res) => res,
Err(Some(err)) => {
gst_debug!(CAT, obj: src, "Error {:?}", err);
@ -536,21 +556,14 @@ impl ReqwestHttpSrc {
}
}
let headers = Headers::from(res.headers());
let size = headers.get().map(|&ContentLength(cl)| cl + start);
let accept_byte_ranges = if let Some(&AcceptRanges(ref ranges)) = headers.get() {
ranges.iter().any(|u| *u == RangeUnit::Bytes)
} else {
false
};
let headers = res.headers();
let size = headers.typed_get().map(|ContentLength(cl)| cl + start);
let accept_byte_ranges = headers.typed_get() == Some(AcceptRanges::bytes());
let seekable = size.is_some() && accept_byte_ranges;
let position = if let Some(&ContentRange(ContentRangeSpec::Bytes {
range: Some((range_start, _)),
..
})) = headers.get()
let position = if let Some((range_start, _range_end)) = headers
.typed_get()
.and_then(|h| ContentRange::bytes_range(&h))
{
range_start
} else {
@ -565,9 +578,8 @@ impl ReqwestHttpSrc {
}
let mut caps = headers
.get_raw("icy-metaint")
.and_then(|h| h.one())
.and_then(|s| std::str::from_utf8(s).ok())
.get("icy-metaint")
.and_then(|s| s.to_str().ok())
.and_then(|s| s.parse::<i32>().ok())
.map(|icy_metaint| {
gst::Caps::builder("application/x-icy")
@ -575,7 +587,11 @@ impl ReqwestHttpSrc {
.build()
});
if let Some(ContentType(ref content_type)) = headers.get() {
if let Some(content_type) = headers
.get("content-type")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<mime::Mime>().ok())
{
gst_debug!(CAT, obj: src, "Got content type {}", content_type);
if let Some(ref mut caps) = caps {
let caps = caps.get_mut().unwrap();
@ -606,38 +622,24 @@ impl ReqwestHttpSrc {
{
let tags = tags.get_mut().unwrap();
if let Some(ref icy_name) = headers
.get_raw("icy-name")
.and_then(|h| h.one())
.and_then(|s| std::str::from_utf8(s).ok())
{
if let Some(ref icy_name) = headers.get("icy-name").and_then(|s| s.to_str().ok()) {
tags.add::<gst::tags::Organization>(icy_name, gst::TagMergeMode::Replace);
}
if let Some(ref icy_genre) = headers
.get_raw("icy-genre")
.and_then(|h| h.one())
.and_then(|s| std::str::from_utf8(s).ok())
{
if let Some(ref icy_genre) = headers.get("icy-genre").and_then(|s| s.to_str().ok()) {
tags.add::<gst::tags::Genre>(icy_genre, gst::TagMergeMode::Replace);
}
if let Some(ref icy_url) = headers
.get_raw("icy-url")
.and_then(|h| h.one())
.and_then(|s| std::str::from_utf8(s).ok())
{
if let Some(ref icy_url) = headers.get("icy-url").and_then(|s| s.to_str().ok()) {
tags.add::<gst::tags::Location>(icy_url, gst::TagMergeMode::Replace);
}
}
gst_debug!(CAT, obj: src, "Request successful");
let body = mem::replace(res.body_mut(), Decoder::empty());
Ok(State::Started {
uri,
body: Some(body),
response: Some(res),
seekable,
position,
size,
@ -648,59 +650,81 @@ impl ReqwestHttpSrc {
})
}
fn wait<F>(&self, future: F) -> Result<F::Item, Option<gst::ErrorMessage>>
fn wait<F, T>(&self, future: F) -> Result<T, Option<gst::ErrorMessage>>
where
F: Send + Future<Error = gst::ErrorMessage> + 'static,
F::Item: Send,
F: Send + Future<Output = Result<T, gst::ErrorMessage>>,
T: Send + 'static,
{
let timeout = self.settings.lock().unwrap().timeout;
let mut canceller = self.canceller.lock().unwrap();
let (sender, receiver) = oneshot::channel::<Bytes>();
canceller.replace(sender);
let mut canceller = self.canceller.lock().unwrap();
let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
canceller.replace(abort_handle);
drop(canceller);
// wrapping timeout around future
let future_timeout = if timeout == 0 {
Either::A(future)
} else {
Either::B(
future
.timeout(Duration::from_secs(timeout.into()))
.map_err(|err| {
if err.is_elapsed() {
gst_error_msg!(gst::ResourceError::Read, ["Request timeout"])
} else if err.is_inner() {
err.into_inner().unwrap()
} else {
gst_error_msg!(gst::ResourceError::Read, ["Timer error: {}", err])
}
}),
)
// Wrap in a timeout
let future = async {
if timeout == 0 {
future.await
} else {
let res = tokio::time::timeout(Duration::from_secs(timeout.into()), future).await;
match res {
Ok(res) => res,
Err(_) => Err(gst_error_msg!(
gst::ResourceError::Read,
["Request timeout"]
)),
}
}
};
let unlock_error = gst_error_msg!(gst::ResourceError::Busy, ["unlock"]);
// And make abortable
let future = async {
match future::Abortable::new(future, abort_registration).await {
Ok(res) => res.map_err(Some),
Err(_) => Err(None),
}
};
let res = oneshot::spawn(future_timeout, &self.runtime.executor())
.select(receiver.then(|_| Err(unlock_error.clone())))
.wait()
.map(|v| v.0)
.map_err(|err| {
if err.0 == unlock_error {
None
} else {
Some(err.0)
}
});
let res = match block_on(&*RUNTIME, future) {
Ok(res) => res,
Err(_) => Err(Some(gst_error_msg!(
gst::ResourceError::Read,
["Join error"]
))),
};
/* Clear out the canceller */
canceller = self.canceller.lock().unwrap();
*canceller = None;
let _ = self.canceller.lock().unwrap().take();
res
}
}
// Until tokio 0.2.0-alpha.6 `Runtime::block_on()` didn't require a mutable reference
// to the runtime. Now it does and we need to work around that.
// See https://github.com/tokio-rs/tokio/issues/2042
fn block_on<F>(
runtime: &tokio::runtime::Runtime,
future: F,
) -> Result<F::Output, tokio::task::JoinError>
where
F: Send + Future,
F::Output: Send + 'static,
{
use futures::task::FutureObj;
use std::mem;
let future = FutureObj::new(Box::pin(future));
// We make sure here to block until the future is completely handled before returning
let future = unsafe { mem::transmute::<_, FutureObj<'static, _>>(future) };
let join_handle = runtime.spawn(future);
futures::executor::block_on(join_handle)
}
impl ObjectImpl for ReqwestHttpSrc {
glib_object_impl!();
@ -880,6 +904,14 @@ impl BaseSrcImpl for ReqwestHttpSrc {
}
}
fn unlock(&self, _src: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
let canceller = self.canceller.lock().unwrap();
if let Some(ref canceller) = *canceller {
canceller.abort();
}
Ok(())
}
fn start(&self, src: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap();
@ -983,14 +1015,14 @@ impl BaseSrcImpl for ReqwestHttpSrc {
) -> Result<gst::Buffer, gst::FlowError> {
let mut state = self.state.lock().unwrap();
let (body, position, caps, tags) = match *state {
let (response, position, caps, tags) = match *state {
State::Started {
ref mut body,
ref mut response,
ref mut position,
ref mut tags,
ref mut caps,
..
} => (body, position, caps, tags),
} => (response, position, caps, tags),
State::Stopped => {
gst_element_error!(src, gst::LibraryError::Failed, ["Not started yet"]);
@ -1008,15 +1040,11 @@ impl BaseSrcImpl for ReqwestHttpSrc {
return Err(gst::FlowError::Error);
}
let current_body = match body.take() {
Some(body) => body,
let mut current_response = match response.take() {
Some(response) => response,
None => {
gst_error!(CAT, obj: src, "Don't have a response body");
gst_element_error!(
src,
gst::ResourceError::Read,
["Don't have a response body"]
);
gst_error!(CAT, obj: src, "Don't have a response");
gst_element_error!(src, gst::ResourceError::Read, ["Don't have a response"]);
return Err(gst::FlowError::Error);
}
@ -1038,12 +1066,15 @@ impl BaseSrcImpl for ReqwestHttpSrc {
pad.push_event(gst::Event::new_tag(tags).build());
}
let res = self.wait(current_body.into_future().map_err(move |(err, _body)| {
gst_error_msg!(
gst::ResourceError::Read,
["Failed to read chunk at offset {}: {:?}", offset, err]
)
}));
let future = async {
current_response.chunk().await.map_err(move |err| {
gst_error_msg!(
gst::ResourceError::Read,
["Failed to read chunk at offset {}: {:?}", offset, err]
)
})
};
let res = self.wait(future);
let res = match res {
Ok(res) => res,
@ -1059,12 +1090,12 @@ impl BaseSrcImpl for ReqwestHttpSrc {
};
let mut state = self.state.lock().unwrap();
let (body, position) = match *state {
let (response, position) = match *state {
State::Started {
ref mut body,
ref mut response,
ref mut position,
..
} => (body, position),
} => (response, position),
State::Stopped => {
gst_element_error!(src, gst::LibraryError::Failed, ["Not started yet"]);
@ -1073,7 +1104,7 @@ impl BaseSrcImpl for ReqwestHttpSrc {
};
match res {
(Some(chunk), current_body) => {
Some(chunk) => {
/* do something with the chunk and store the body again in the state */
gst_trace!(
@ -1090,7 +1121,7 @@ impl BaseSrcImpl for ReqwestHttpSrc {
let mut buffer = gst::Buffer::from_slice(chunk);
*body = Some(current_body);
*response = Some(current_response);
{
let buffer = buffer.get_mut().unwrap();
@ -1100,10 +1131,10 @@ impl BaseSrcImpl for ReqwestHttpSrc {
Ok(buffer)
}
(None, current_body) => {
None => {
/* No further data, end of stream */
gst_debug!(CAT, obj: src, "End of stream");
*body = Some(current_body);
*response = Some(current_response);
Err(gst::FlowError::Eos)
}
}
@ -1146,11 +1177,6 @@ impl ObjectSubclass for ReqwestHttpSrc {
external_client: Mutex::new(None),
settings: Mutex::new(Default::default()),
state: Mutex::new(Default::default()),
runtime: runtime::Builder::new()
.core_threads(1)
.name_prefix("gst-http-tokio")
.build()
.unwrap(),
canceller: Mutex::new(None),
}
}

View file

@ -51,10 +51,9 @@ impl Harness {
http_func: F,
setup_func: G,
) -> Harness {
use hyper::service::{make_service_fn, service_fn_ok};
use hyper::service::{make_service_fn, service_fn};
use hyper::Server;
use std::sync::{Arc, Mutex};
use tokio::prelude::*;
// Create the HTTP source
let src = gst::ElementFactory::make("reqwesthttpsrc", None).unwrap();
@ -94,8 +93,10 @@ impl Harness {
pad.set_active(true).unwrap();
// Create the tokio runtime used for the HTTP server in this test
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(1)
.enable_all()
.threaded_scheduler()
.build()
.unwrap();
@ -108,20 +109,34 @@ impl Harness {
let http_func = Arc::new(Mutex::new(http_func));
let make_service = make_service_fn(move |_ctx| {
let http_func = http_func.clone();
service_fn_ok(move |req| (&mut *http_func.lock().unwrap())(req))
async move {
let http_func = http_func.clone();
Ok::<_, hyper::Error>(service_fn(move |req| {
let http_func = http_func.clone();
async move { Ok::<_, hyper::Error>((&mut *http_func.lock().unwrap())(req)) }
}))
}
});
// Bind the server, retrieve the local port that was selected in the end and set this as
// the location property on the source
let server = Server::bind(&addr).serve(make_service);
let local_addr = server.local_addr();
src.set_property("location", &format!("http://{}/", local_addr))
.unwrap();
let (local_addr_sender, local_addr_receiver) = tokio::sync::oneshot::channel();
// Spawn the server in the background so that it can handle requests
rt.spawn(server.map_err(move |e| {
let _ = sender.send(Message::ServerError(format!("{:?}", e)));
}));
rt.spawn(async move {
// Bind the server, retrieve the local port that was selected in the end and set this as
// the location property on the source
let server = Server::bind(&addr).serve(make_service);
let local_addr = server.local_addr();
local_addr_sender.send(local_addr).unwrap();
if let Err(e) = server.await {
let _ = sender.send(Message::ServerError(format!("{:?}", e)));
}
});
let local_addr = futures::executor::block_on(local_addr_receiver).unwrap();
src.set_property("location", &format!("http://{}/", local_addr))
.unwrap();
// Let the test setup anything needed on the HTTP source now
setup_func(&src);
@ -301,8 +316,6 @@ impl Harness {
impl Drop for Harness {
fn drop(&mut self) {
use tokio::prelude::*;
// Shut down everything that was set up for this test harness
// and wait until the tokio runtime exited
let bus = self.src.get_bus().unwrap();
@ -316,7 +329,7 @@ impl Drop for Harness {
self.pad.set_active(false).unwrap();
self.src.set_state(gst::State::Null).unwrap();
self.rt.take().unwrap().shutdown_now().wait().unwrap();
self.rt.take().unwrap();
}
}